Source code for skultrafast.quickcontrol

"""
Module to import and work with files generated by QuickControl from phasetech.
"""

from functools import cached_property
from os import PathLike
from pathlib import Path
from typing import Callable, Dict, Iterable, Optional, Tuple, Any

import attr
import numpy as np
from scipy.constants import speed_of_light
from scipy.ndimage import gaussian_filter1d

from skultrafast.unit_conversions import THz2cm, cm2THz
from skultrafast.dataset import PolTRSpec, TimeResSpec
from skultrafast.twoD_dataset import TwoDim
from skultrafast.utils import poly_bg_correction, inbetween


[docs] def parse_str(s: str): """ Parse entry of info file Parameters ---------- s : str Value Returns ------- obj Corresponding python type """ if s.isnumeric(): return int(s) elif set(s) - set("-.0123456789E") == set(): # Try is a workaround for the version string try: return float(s) except ValueError: return s elif set(s) - set("-.0123456789E,") == set(): return list(map(float, s.split(","))) elif s == "TRUE": return True elif s == "FALSE": return False else: return s
@attr.s(auto_attribs=True)
[docs] class QCFile: """ Base class for QC files. """
[docs] fname: PathLike = attr.ib()
"""Full path to info file""" @cached_property
[docs] def path(self) -> Path: """Path to the directory containing the file""" return Path(self.fname).parent
@cached_property
[docs] def prefix(self) -> str: """Prefix of the file""" return Path(self.fname).with_suffix("").name
@cached_property
[docs] def info(self) -> dict[str, Any]: d = {} with (self.path / self.prefix).with_suffix(".info").open() as info_file: for line in info_file: key, val = line.split("\t") val = val[:-1].strip() d[key] = parse_str(val) return d
@attr.s(auto_attribs=True)
[docs] class QCBaseTimeRes(QCFile): @cached_property
[docs] def wavelength(self, disp=None): """Wavelength data calculated from given grating and mono wavelength""" if disp is None: grating = self.info["MONO1 Grating"] disp_per_grating = {"30": 7.7, "75": 7.7 * 30 / 75.0} disp = disp_per_grating[grating.split()[2]] wls = (np.arange(128) - 64) * disp + self.info["MONO1 Wavelength"] self.wavelength = wls return wls
@cached_property
[docs] def wavenumbers(self): return 1e7 / self.wavelength
@attr.s(auto_attribs=True)
[docs] class QC1DSpec(QCBaseTimeRes): """Helper class to load time resolved spectra measured with QuickControl""" @cached_property
[docs] def par_data(self) -> np.ndarray: par_scan_files = self.path.glob(self.prefix + "*_PAR*.scan") return np.array([np.loadtxt(p)[:-1, 1:] for p in par_scan_files])
@cached_property
[docs] def per_data(self) -> np.ndarray: per_scan_files = self.path.glob(self.prefix + "*_PER*.scan") return np.array([np.loadtxt(p)[:-1, 1:] for p in per_scan_files])
@cached_property
[docs] def t(self) -> list[float]: t_list = np.array(self.info["Delays"]) if self.info["Delay Units"] == "fs": t_list /= 1000.0 return t_list
[docs] def make_pol_ds(self, sigma=None) -> PolTRSpec: para = np.nanmean(self.par_data, axis=0) ds_para = TimeResSpec(self.wavelength, self.t, 1000 * para, disp_freq_unit="cm") perp = np.nanmean(self.per_data, axis=0) ds_perp = TimeResSpec(self.wavelength, self.t, 1000 * perp, disp_freq_unit="cm") return PolTRSpec(ds_para, ds_perp)
@attr.s(auto_attribs=True)
[docs] class QC2DSpec(QCBaseTimeRes): """Helper class to load 2D-spectra measured with QuickControl"""
[docs] par_data: Dict = attr.ib(init=False)
"""Data for parallel polarization"""
[docs] per_data: Dict = attr.ib(init=False)
"""Data for perpendicular polarization"""
[docs] par_spec: Optional[Dict] = None
"""Resulting 2D spectra for parallel polarization"""
[docs] per_spec: Optional[Dict] = None
"""Resulting 2D spectra for perpendicular polarization"""
[docs] probe_filter: Optional[float] = None
"""Size of the filter applied to the spectral axis. 'None' is no filtering"""
[docs] upsampling: int = 2
"""Upsamling factor of the pump-axis"""
[docs] pump_freq: np.ndarray = attr.ib(init=False)
""" Resulting wavenumbers of the pump axis. """
[docs] bg_correct: Optional[Tuple] = None
""" If given, the size of the left and right region used to calculate the signal background which will be subtracted """
[docs] win_function: Optional[Callable] = np.hamming
""" Window function used for apodization. The coded will the window-function with `2*len(t2)` and uses only the second half of the returned array. """ @cached_property
[docs] def t(self) -> np.ndarray: "Waiting time delays in ps" t_list = np.array(self.info["Waiting Time Delays"]) if self.info["Waiting Time Delay Units"] == "fs": t_list /= 1000 return t_list
@cached_property
[docs] def t2(self) -> np.ndarray: "Interferogram delays in ps, e.g. the inter pump delay" end = self.info["Final Delay (fs)"] step = self.info["Step Size (fs)"] return np.arange(0.0, end + 1, step) / 1000.0
[docs] def _loader(self, which: str): data_dict: Dict[int, np.ndarray] = {} for t in range(len(self.t)): T = "_T%02d" % (t + 1) pol_scans = self.path.glob(self.prefix + T + f"_{which}*.scan") if not pol_scans: raise FileNotFoundError(f"No files found for {which} polarization") data = [] for s in pol_scans: d = np.loadtxt(s) self.t2 = d[1:, 0] data.append(d) if len(data) > 0: d = np.array(data) data_dict[t] = d else: self.t = self.t[: t + 1] break return data_dict
[docs] def __attrs_post_init__(self): self.par_data = self._loader("PAR") self.per_data = self._loader("PER") self.pump_freq = self._calc_freqs()
[docs] def switch_pol(self): self.par_data, self.per_data = self.per_data, self.par_data self.par_spec, self.per_spec = self.per_spec, self.par_spec
[docs] def calc_spec(self): par_dict: Dict[int, np.ndarray] = {} perp_dict: Dict[int, np.ndarray] = {} for i, data in enumerate((self.par_data, self.per_data)): for t in data: d = np.nanmean(data[t], 0) d = d[:-1, 1:] if self.probe_filter is not None: d = gaussian_filter1d(d, self.probe_filter, -1, mode="nearest") if self.bg_correct: poly_bg_correction( self.wavelength, d, self.bg_correct[0], self.bg_correct[1] ) d[0, :] *= 0.5 if self.win_function is not None: win = self.win_function(2 * len(self.t2)) else: win = np.ones(2 * len(self.t2)) spec = np.fft.rfft( d * win[len(self.t2):, None], axis=0, n=self.upsampling * len(self.t2), ) (par_dict, perp_dict)[i][t] = spec.real return par_dict, perp_dict
[docs] def _calc_freqs(self): freqs = np.fft.rfftfreq(self.upsampling * len(self.t2), self.t2[1] - self.t2[0]) om0 = self.info["Rotating Frame (Scanned)"] with np.errstate(divide="ignore"): cm = 10 / ((1 / freqs) * 1e-12 * speed_of_light) + om0 cm[0] = om0 return cm
[docs] def make_ds(self) -> Dict[str, TwoDim]: par, perp = self.calc_spec() self.pump_freq = self._calc_freqs() par_arr = np.dstack(list(par.values())).T per_arr = np.dstack(list(perp.values())).T iso = (2 * per_arr + par_arr) / 3 d = { "para": TwoDim( t=self.t, pump_wn=self.pump_freq, probe_wn=self.wavenumbers, spec2d=par_arr, ), "perp": TwoDim( t=self.t, pump_wn=self.pump_freq, probe_wn=self.wavenumbers, spec2d=per_arr, ), "iso": TwoDim( t=self.t, pump_wn=self.pump_freq, probe_wn=self.wavenumbers, spec2d=iso ), } return d