Source code for multipac_testbench.multipactor_test.loader

"""Define functions to prepare data for :class:`.MultipactorTest`."""

import logging
import math
from pathlib import Path
from typing import Literal

import numpy as np
import pandas as pd
from numpy.typing import NDArray

#: How consecutive-same power points should be treated.
#:
#: - ``"keep_all"``: keep all data (default)
#: - ``"trim"``: remove trailing points
#: - ``"average"``: average the data on the same point
#: - ``"first"``: only consider first point (least conditionned)
#: - ``"last"``: only consider last point (most conditionned)
#: - ``"max"``: retain maximum value
#: - ``"min"``: retain minimum value
#:
TRIGGER_POLICIES = Literal[
    "keep_all", "trim", "average", "first", "last", "max", "min"
]


[docs] def load( filepath: Path, sep: str = "\t", trigger_policy: TRIGGER_POLICIES = "keep_all", index_col: str = "Sample index", remove_metadata_columns: bool = False, **kwargs, ) -> tuple[pd.DataFrame, list[str]]: """Load the LabViewer file. If ``trigger_policy`` is set, perform operations to select the desired trigger. These operations do not preserve original sample indexes. Parameters ---------- filepath : LabViewer file to be loaded. sep : Column separator. trigger_policy : How consecutive measures at the same power should be treated. index_col : Name of the column holding indexes. remove_metadata_columns : Remove the rightmost columns holding metadata. kwargs : Other kwargs passed to :func:`._load_file`. Returns ------- pandas.DataFrame Holds data. list[str] The comments, without their comment character, line by line. If loading a ``XLSX``, an empty list is returned. """ data, commented_lines = _load_file( filepath.resolve(), sep=sep, index_col=index_col, **kwargs ) if remove_metadata_columns: data = data.select_dtypes(include=["float", "int"]) filtered = _apply_trigger_filtering( trigger_policy, data, dbm_column="NI9205_dBm" ).reset_index(drop=True) filtered.index.name = index_col printer = logging.info if trigger_policy in ("average", "keep_all"): printer(f"Applied {trigger_policy = } on {filepath}") return filtered, commented_lines fraction = 100 * len(filtered) / len(data) if trigger_policy == "trim": if fraction < 90.0 and trigger_policy: printer = logging.warning elif fraction < 50.0: printer = logging.error printer(f"After {trigger_policy = }, kept {fraction:.2f}% of {filepath}") return filtered, commented_lines
[docs] def _load_file( filepath: Path, index_col: str = "Sample index", comment: str = "#", **kwargs, ) -> tuple[pd.DataFrame, list[str]]: """Load the data file. .. todo:: Allow for ``TXT`` or ``XLSX`` input files. Parameters ---------- filepath : File to load. index_col : Name of the index column. comment : Comment character. kwargs : Other keyword arguments passed to the loading function. Holds ``"sep"`` key-value pair, which is removed if loading a ``XLSX``. Returns ------- pandas.DataFrame Holds data. list[str] The comments, without their comment character, line by line. If loading a ``XLSX``, an empty list is returned. """ ext = filepath.suffix if ext == ".csv": pandas_reader = pd.read_csv elif ext == ".xlsx": pandas_reader = pd.read_excel if "sep" in kwargs: del kwargs["sep"] else: logging.error(f"{filepath} extension not supported.") raise RuntimeError try: data = pandas_reader( (filepath), index_col=index_col, comment=comment, **kwargs ) except Exception as e: logging.error( f"There was a mismatch in the number of columns in {filepath}" ". Check that the number of column header match the number of " "columns, that the trailing comments in the first lines " "were removed, and that you set the appropriate column separator." ) logging.exception(e) raise e if ext == ".xlsx": return data, [] commented_lines = [] with open(filepath) as f: for line in f: if line.startswith(comment): commented_lines.append(line.removeprefix(comment).rstrip("\n")) else: break return data, commented_lines
[docs] def _apply_trigger_filtering( trigger_policy: TRIGGER_POLICIES, data: pd.DataFrame, dbm_column: str = "NI9205_dBm", tol: float = 1e-10, ) -> pd.DataFrame: """Apply desired trigger policy. Original indexes are not preserved. """ if trigger_policy == "keep_all": return data if dbm_column not in data.columns: logging.error( f"{dbm_column = } not found in the results file. Mandatory for " "edition of trigger." ) return data power = data[dbm_column].to_numpy() labels = _group_consecutive_equal_power(power, tol) grouped = data.groupby(labels, sort=False) if trigger_policy == "trim": unique_labels = np.unique(labels) mask = (labels != unique_labels[0]) & (labels != unique_labels[-1]) trimmed = data[mask] return trimmed if trigger_policy == "average": return grouped.mean(numeric_only=True) if trigger_policy == "first": return grouped.nth(0) if trigger_policy == "last": return grouped.nth(-1) logging.error(f"{trigger_policy = } not understood. Not doing anything.") return data
[docs] def _group_consecutive_equal_power( power: NDArray, tol: float = 1e-10 ) -> NDArray[np.int32]: """Gather measurements with the same power (consecutive). Parameters ---------- power : The input power array. tol : Tolerance for comparing equality. Returns ------- An array of group labels of the same length as ``power``. """ labels = [0] group = 0 for i in range(1, len(power)): if not math.isclose(power[i], power[i - 1], abs_tol=tol): group += 1 labels.append(group) return np.array(labels)
[docs] def save( filepath: Path, data: pd.DataFrame, info: str | None = None, verbose=True, **kwargs, ) -> None: """Save the dataframe as a new LabViewer results file.""" save_meth = data.to_csv if filepath.suffix == ".xlsx": save_meth = data.to_excel if "sep" in kwargs: del kwargs["sep"] if info is None: info = "new file" if verbose: logging.info(f"Saving {info} to {filepath}") try: save_meth(filepath, **kwargs) except OSError as e: logging.error(f"Could not save file:\n{e}")