Source code for quickcontrol

"""
Module to import and work with files generated by QuickControl from phasetech.
"""

from functools import cached_property
from os import PathLike
from pathlib import Path
from typing import Callable, Dict, Iterable, Optional, Tuple, Any

import attr
import numpy as np
from scipy.constants import speed_of_light
from scipy.ndimage import gaussian_filter1d

from skultrafast.unit_conversions import THz2cm, cm2THz
from skultrafast.dataset import PolTRSpec, TimeResSpec
from skultrafast.twoD_dataset import TwoDim
from skultrafast.utils import poly_bg_correction, inbetween


def parse_str(s: str):
    """
    Parse entry of info file

    Parameters
    ----------
    s : str
        Value
    Returns
    -------
    obj
        Corresponding python type
    """
    if s.isnumeric():
        return int(s)
    elif set(s) - set("-.0123456789E") == set():
        # Try is a workaround for the version string
        try:
            return float(s)
        except ValueError:
            return s
    elif set(s) - set("-.0123456789E,") == set():
        return list(map(float, s.split(",")))
    elif s == "TRUE":
        return True
    elif s == "FALSE":
        return False
    else:
        return s


@attr.s(auto_attribs=True)
class QCFile:
    """
    Base class for QC files.
    """

    fname: PathLike = attr.ib()
    """Full path to info file"""

    @cached_property
    def path(self) -> Path:
        """Path to the directory containing the file"""
        return Path(self.fname).parent

    @cached_property
    def prefix(self) -> str:
        """Prefix of the file"""
        return Path(self.fname).with_suffix("").name

    @cached_property
    def info(self) -> dict[str, Any]:
        d = {}
        with (self.path / self.prefix).with_suffix(".info").open() as info_file:
            for line in info_file:
                key, val = line.split("\t")
                val = val[:-1].strip()
                d[key] = parse_str(val)
        return d


@attr.s(auto_attribs=True)
class QCBaseTimeRes(QCFile):

    @cached_property
    def wavelength(self, disp=None):
        """Wavelength data calculated from given grating and mono wavelength"""
        if disp is None:
            grating = self.info["MONO1 Grating"]
            disp_per_grating = {"30": 7.7, "75": 7.7 * 30 / 75.0}
            disp = disp_per_grating[grating.split()[2]]
        wls = (np.arange(128) - 64) * disp + self.info["MONO1 Wavelength"]
        self.wavelength = wls
        return wls

    @cached_property
    def wavenumbers(self):
        return 1e7 / self.wavelength


[docs] @attr.s(auto_attribs=True) class QC1DSpec(QCBaseTimeRes): """Helper class to load time resolved spectra measured with QuickControl""" @cached_property def par_data(self) -> np.ndarray: par_scan_files = self.path.glob(self.prefix + "*_PAR*.scan") return np.array([np.loadtxt(p)[:-1, 1:] for p in par_scan_files]) @cached_property def per_data(self) -> np.ndarray: per_scan_files = self.path.glob(self.prefix + "*_PER*.scan") return np.array([np.loadtxt(p)[:-1, 1:] for p in per_scan_files]) @cached_property def t(self) -> list[float]: t_list = np.array(self.info["Delays"]) if self.info["Delay Units"] == "fs": t_list /= 1000.0 return t_list def make_pol_ds(self, sigma=None) -> PolTRSpec: para = np.nanmean(self.par_data, axis=0) ds_para = TimeResSpec(self.wavelength, self.t, 1000 * para, disp_freq_unit="cm") perp = np.nanmean(self.per_data, axis=0) ds_perp = TimeResSpec(self.wavelength, self.t, 1000 * perp, disp_freq_unit="cm") return PolTRSpec(ds_para, ds_perp)
[docs] @attr.s(auto_attribs=True) class QC2DSpec(QCBaseTimeRes): """Helper class to load 2D-spectra measured with QuickControl""" par_data: Dict = attr.ib(init=False) """Data for parallel polarization""" per_data: Dict = attr.ib(init=False) """Data for perpendicular polarization""" par_spec: Optional[Dict] = None """Resulting 2D spectra for parallel polarization""" per_spec: Optional[Dict] = None """Resulting 2D spectra for perpendicular polarization""" probe_filter: Optional[float] = None """Size of the filter applied to the spectral axis. 'None' is no filtering""" upsampling: int = 2 """Upsamling factor of the pump-axis""" pump_freq: np.ndarray = attr.ib(init=False) """ Resulting wavenumbers of the pump axis. """ bg_correct: Optional[Tuple] = None """ If given, the size of the left and right region used to calculate the signal background which will be subtracted """ win_function: Optional[Callable] = np.hamming """ Window function used for apodization. The coded will the window-function with `2*len(t2)` and uses only the second half of the returned array. """ @cached_property def t(self) -> np.ndarray: "Waiting time delays in ps" t_list = np.array(self.info["Waiting Time Delays"]) if self.info["Waiting Time Delay Units"] == "fs": t_list /= 1000 return t_list @cached_property def t2(self) -> np.ndarray: "Interferogram delays in ps, e.g. the inter pump delay" end = self.info["Final Delay (fs)"] step = self.info["Step Size (fs)"] return np.arange(0.0, end + 1, step) / 1000.0 def _loader(self, which: str): data_dict: Dict[int, np.ndarray] = {} for t in range(len(self.t)): T = "_T%02d" % (t + 1) pol_scans = self.path.glob(self.prefix + T + f"_{which}*.scan") if not pol_scans: raise FileNotFoundError(f"No files found for {which} polarization") data = [] for s in pol_scans: d = np.loadtxt(s) self.t2 = d[1:, 0] data.append(d) if len(data) > 0: d = np.array(data) data_dict[t] = d else: self.t = self.t[: t + 1] break return data_dict def __attrs_post_init__(self): self.par_data = self._loader("PAR") self.per_data = self._loader("PER") self.pump_freq = self._calc_freqs() def switch_pol(self): self.par_data, self.per_data = self.per_data, self.par_data self.par_spec, self.per_spec = self.per_spec, self.par_spec def calc_spec(self): par_dict: Dict[int, np.ndarray] = {} perp_dict: Dict[int, np.ndarray] = {} for i, data in enumerate((self.par_data, self.per_data)): for t in data: d = np.nanmean(data[t], 0) d = d[:-1, 1:] if self.probe_filter is not None: d = gaussian_filter1d(d, self.probe_filter, -1, mode="nearest") if self.bg_correct: poly_bg_correction( self.wavelength, d, self.bg_correct[0], self.bg_correct[1] ) d[0, :] *= 0.5 if self.win_function is not None: win = self.win_function(2 * len(self.t2)) else: win = np.ones(2 * len(self.t2)) spec = np.fft.rfft( d * win[len(self.t2):, None], axis=0, n=self.upsampling * len(self.t2), ) (par_dict, perp_dict)[i][t] = spec.real return par_dict, perp_dict def _calc_freqs(self): freqs = np.fft.rfftfreq(self.upsampling * len(self.t2), self.t2[1] - self.t2[0]) om0 = self.info["Rotating Frame (Scanned)"] with np.errstate(divide="ignore"): cm = 10 / ((1 / freqs) * 1e-12 * speed_of_light) + om0 cm[0] = om0 return cm def make_ds(self) -> Dict[str, TwoDim]: par, perp = self.calc_spec() self.pump_freq = self._calc_freqs() par_arr = np.dstack(list(par.values())).T per_arr = np.dstack(list(perp.values())).T iso = (2 * per_arr + par_arr) / 3 d = { "para": TwoDim( t=self.t, pump_wn=self.pump_freq, probe_wn=self.wavenumbers, spec2d=par_arr, ), "perp": TwoDim( t=self.t, pump_wn=self.pump_freq, probe_wn=self.wavenumbers, spec2d=per_arr, ), "iso": TwoDim( t=self.t, pump_wn=self.pump_freq, probe_wn=self.wavenumbers, spec2d=iso ), } return d