Source code for multipac_testbench.util.post_treaters

"""Define various smoothing functions for measured data."""

import logging
import math
from typing import Literal

import numpy as np
from numpy.typing import NDArray
from scipy.ndimage import minimum_filter1d, uniform_filter1d

#: Available modes for the running mean routine.
CONVOLUTION_MODES_T = Literal[
    "reflect", "constant", "nearest", "mirror", "wrap"
]
DEPRECATED_CONVOLUTION_MODES_T = Literal["full", "same", "valid"]
#: Deprecated modes for the running mean routine.
DEPRECATED_CONVOLUTION_MODES = ("full", "same", "valid")


[docs] def running_mean( input_data: NDArray[np.float64], n_mean: int, mode: CONVOLUTION_MODES_T | DEPRECATED_CONVOLUTION_MODES_T = "nearest", **kwargs, ) -> NDArray[np.float64]: """Compute the running mean. .. deprecated:: 1.9.0 The modes listed in :data:`DEPRECATED_CONVOLUTION_MODES` will call the :func:`numpy.convolve` function. Prefer the :data:`CONVOLUTION_MODES_T`, which will call :func:`scipy.ndimage.uniform_filter1d`. See Also -------- :func:`numpy.convolve` Parameters ---------- input_data : Data to smooth of shape ``N``. n_mean : Number of points on which running mean is ran. mode : Convolution modes. Refer to :func:`scipy.ndimage.uniform_filter1d` documentation, which is the actual function that is called. Returns ------- Smoothed data. """ if mode in DEPRECATED_CONVOLUTION_MODES: logging.warning( "Using deprecated mode, based on np.convolve. Prefer the scipy." "ndimage.uniform_filter1d function, which is more robust on the " "edges." ) return np.convolve( input_data, np.ones(n_mean) / n_mean, mode=mode ).astype(np.float64) return uniform_filter1d(input_data, size=n_mean, mode=mode)
[docs] def lower_envelope( input_data: NDArray[np.float64], envelope_window: int, mode: CONVOLUTION_MODES_T = "nearest", ) -> NDArray[np.float64]: """Compute the lower envelope, i.e. the signal minimum without the bumps. .. todo:: Illustration in documentation. .. todo:: May also use :func:`scipy.ndimage.percentile_filter`. May be more robust with noisy data or occasional downward spikes. To test. Parameters ---------- input_data : Data to smooth of shape ``N``. envelope_window : Number of points on which lower envelope is taken. mode : Convolution modes. Refer to :func:`scipy.ndimage.minimum_filter1d` documentation, which is the actual function that is called. Returns ------- Smoothed data. """ return minimum_filter1d(input_data, size=envelope_window, mode=mode)
[docs] def average_y_for_nearby_x_within_distance( y_values: NDArray[np.float64], x_values: NDArray[np.float64], tol: float = 1e-6, max_index_distance: int = 1, keep_shape: bool = True, ) -> NDArray[np.float64]: """Average ``y_values`` measured at nearly identical ``x_values`` This function groups values in ``y_values`` that correspond to ``x_values`` which are numerically close (within ``tol``) and occur within ``max_index_distance`` of each other. These grouped ``y_values`` are averaged, and the result is returned either in a shape-preserving format or as a compact array, depending on ``keep_shape``. Parameters ---------- y_values : The dependent variable values to average. x_values : The independent variable values used to group corresponding ``y_values``. tol : Maximum absolute difference under which ``x_values`` are considered equal. max_index_distance : Maximum index separation allowed when grouping similar ``x_values``. Prevents averaging across distant, unrelated measurements. keep_shape : If ``True``, the returned array has the same shape as the input, with only the first element of each group containing the average and others filled with ``np.nan``. If ``False`` (not recommended), returns a compact array with only the averaged values. Returns ------- The averaged ``y_values``, either shape-preserving or compact. Raises ------ ValueError If ``x_values`` and ``y_values`` do not have the same shape. Examples -------- >>> x = np.array([100.0, 100.0, 200.0, 200.0]) >>> y = np.array([1.0, 3.0, 10.0, 14.0]) >>> average_y_for_nearby_x_within_distance(y, x) array([2.0, nan, 12.0, nan]) >>> average_y_for_nearby_x_within_distance(y, x, keep_shape=False) array([2.0, 12.0]) """ if y_values.shape != x_values.shape: raise ValueError("x_data and y_data must have the same shape.") averaged = np.full_like(y_values, np.nan) used = np.zeros_like(x_values, dtype=bool) n = len(x_values) for i in range(n): if used[i]: continue xi = x_values[i] group_indices = [i] for j in range(i + 1, n): if used[j]: continue if ( math.isclose(x_values[j], xi, abs_tol=tol) and (j - group_indices[-1]) <= max_index_distance ): group_indices.append(j) if len(group_indices) > 1: avg = np.mean(y_values[group_indices]) if keep_shape: averaged[group_indices[0]] = avg else: averaged[group_indices[0]] = avg elif keep_shape: averaged[i] = y_values[i] for idx in group_indices: used[idx] = True if not keep_shape: averaged = averaged[~np.isnan(averaged)] return averaged
[docs] def drop_x_where_y_is_nan( x_values: NDArray[np.float64], y_values: NDArray[np.float64] ) -> NDArray[np.float64]: """Return ``x_values`` without indexes where ``y_values`` is ``np.nan``. This can be used in for :class:`.RPA` when some current data is dropped (``keep_shape = False``) but we still want the same shape for potential. """ indexes = ~np.isnan(y_values) return x_values[indexes]
[docs] def replace_data_under_threshold( input_data: NDArray[np.float64], threshold: float, replace_value: float, min_consecutive: int = 1, ) -> NDArray[np.float64]: """Replace data where ``min_consecutive`` values are below ``threshold``. Data is replaced by ``replace_value``. Parameters ---------- input_data : Data to filter. threshold : Threshold under which data is considered noise. replace_value : Value to replace the data with. min_consecutive : Minimum number of consecutive values below the threshold required for replacement. Returns ------- Modified data array. """ data = input_data.copy() mask = data < threshold i = 0 while i < len(mask): if mask[i]: start = i while i < len(mask) and mask[i]: i += 1 end = i if end - start >= min_consecutive: data[start:end] = replace_value else: i += 1 return data
[docs] def return_constant( input_data: NDArray[np.float64], constant: float ) -> NDArray[np.float64]: """Always return same value.""" return np.full_like(input_data, constant)
[docs] def get_data_above_noise( data: NDArray, noise_level: float | None = None, level: float | None = None, ) -> NDArray: """Detect signal above noise, return it. Parameters ---------- data : Array of data in pulsed shape, such as power pulse or synch. noise_level : Absolute level of noise. If not provided, we use ``level`` to compute it. level : Noise percentile wrt ``data``, in :unit:`\\%`. Returns ------- NDArray Only the widest pulse detected in the signal. """ if noise_level is None: if level is None: raise ValueError("You must provide ``noise_level`` or ``level``.") noise_level = np.percentile(np.abs(data), level) above = data > noise_level # Detect rising and falling edges padded = np.concatenate(([0], above.astype(int), [0])) diff = np.diff(padded) starts = np.where(diff == 1)[0] ends = np.where(diff == -1)[0] if len(starts) == 0: logging.warning("No active pulse found above the noise floor.") # Take the longest one best = int(np.argmax(ends - starts)) return data[int(starts[best]) : int(ends[best])]