"""
Module to import and work with files generated by QuickControl from phasetech.
"""
from functools import cached_property
from os import PathLike
from pathlib import Path
from typing import Callable, Dict, Iterable, Optional, Tuple, Any
import attr
import numpy as np
from scipy.constants import speed_of_light
from scipy.ndimage import gaussian_filter1d
from skultrafast.unit_conversions import THz2cm, cm2THz
from skultrafast.dataset import PolTRSpec, TimeResSpec
from skultrafast.twoD_dataset import TwoDim
from skultrafast.utils import poly_bg_correction, inbetween
def parse_str(s: str):
"""
Parse entry of info file
Parameters
----------
s : str
Value
Returns
-------
obj
Corresponding python type
"""
if s.isnumeric():
return int(s)
elif set(s) - set("-.0123456789E") == set():
# Try is a workaround for the version string
try:
return float(s)
except ValueError:
return s
elif set(s) - set("-.0123456789E,") == set():
return list(map(float, s.split(",")))
elif s == "TRUE":
return True
elif s == "FALSE":
return False
else:
return s
@attr.s(auto_attribs=True)
class QCFile:
"""
Base class for QC files.
"""
fname: PathLike = attr.ib()
"""Full path to info file"""
@cached_property
def path(self) -> Path:
"""Path to the directory containing the file"""
return Path(self.fname).parent
@cached_property
def prefix(self) -> str:
"""Prefix of the file"""
return Path(self.fname).with_suffix("").name
@cached_property
def info(self) -> dict[str, Any]:
d = {}
with (self.path / self.prefix).with_suffix(".info").open() as info_file:
for line in info_file:
key, val = line.split("\t")
val = val[:-1].strip()
d[key] = parse_str(val)
return d
@attr.s(auto_attribs=True)
class QCBaseTimeRes(QCFile):
@cached_property
def wavelength(self, disp=None):
"""Wavelength data calculated from given grating and mono wavelength"""
if disp is None:
grating = self.info["MONO1 Grating"]
disp_per_grating = {"30": 7.7, "75": 7.7 * 30 / 75.0}
disp = disp_per_grating[grating.split()[2]]
wls = (np.arange(128) - 64) * disp + self.info["MONO1 Wavelength"]
self.wavelength = wls
return wls
@cached_property
def wavenumbers(self):
return 1e7 / self.wavelength
[docs]
@attr.s(auto_attribs=True)
class QC1DSpec(QCBaseTimeRes):
"""Helper class to load time resolved spectra measured with QuickControl"""
@cached_property
def par_data(self) -> np.ndarray:
par_scan_files = self.path.glob(self.prefix + "*_PAR*.scan")
return np.array([np.loadtxt(p)[:-1, 1:] for p in par_scan_files])
@cached_property
def per_data(self) -> np.ndarray:
per_scan_files = self.path.glob(self.prefix + "*_PER*.scan")
return np.array([np.loadtxt(p)[:-1, 1:] for p in per_scan_files])
@cached_property
def t(self) -> list[float]:
t_list = np.array(self.info["Delays"])
if self.info["Delay Units"] == "fs":
t_list /= 1000.0
return t_list
def make_pol_ds(self, sigma=None) -> PolTRSpec:
para = np.nanmean(self.par_data, axis=0)
ds_para = TimeResSpec(self.wavelength, self.t, 1000 * para, disp_freq_unit="cm")
perp = np.nanmean(self.per_data, axis=0)
ds_perp = TimeResSpec(self.wavelength, self.t, 1000 * perp, disp_freq_unit="cm")
return PolTRSpec(ds_para, ds_perp)
[docs]
@attr.s(auto_attribs=True)
class QC2DSpec(QCBaseTimeRes):
"""Helper class to load 2D-spectra measured with QuickControl"""
par_data: Dict = attr.ib(init=False)
"""Data for parallel polarization"""
per_data: Dict = attr.ib(init=False)
"""Data for perpendicular polarization"""
par_spec: Optional[Dict] = None
"""Resulting 2D spectra for parallel polarization"""
per_spec: Optional[Dict] = None
"""Resulting 2D spectra for perpendicular polarization"""
probe_filter: Optional[float] = None
"""Size of the filter applied to the spectral axis. 'None' is no filtering"""
upsampling: int = 2
"""Upsamling factor of the pump-axis"""
pump_freq: np.ndarray = attr.ib(init=False)
"""
Resulting wavenumbers of the pump axis.
"""
bg_correct: Optional[Tuple] = None
"""
If given, the size of the left and right region used to calculate the signal background which
will be subtracted
"""
win_function: Optional[Callable] = np.hamming
"""
Window function used for apodization. The coded will the window-function with
`2*len(t2)` and uses only the second half of the returned array.
"""
@cached_property
def t(self) -> np.ndarray:
"Waiting time delays in ps"
t_list = np.array(self.info["Waiting Time Delays"])
if self.info["Waiting Time Delay Units"] == "fs":
t_list /= 1000
return t_list
@cached_property
def t2(self) -> np.ndarray:
"Interferogram delays in ps, e.g. the inter pump delay"
end = self.info["Final Delay (fs)"]
step = self.info["Step Size (fs)"]
return np.arange(0.0, end + 1, step) / 1000.0
def _loader(self, which: str):
data_dict: Dict[int, np.ndarray] = {}
for t in range(len(self.t)):
T = "_T%02d" % (t + 1)
pol_scans = self.path.glob(self.prefix + T + f"_{which}*.scan")
if not pol_scans:
raise FileNotFoundError(f"No files found for {which} polarization")
data = []
for s in pol_scans:
d = np.loadtxt(s)
self.t2 = d[1:, 0]
data.append(d)
if len(data) > 0:
d = np.array(data)
data_dict[t] = d
else:
self.t = self.t[: t + 1]
break
return data_dict
def __attrs_post_init__(self):
self.par_data = self._loader("PAR")
self.per_data = self._loader("PER")
self.pump_freq = self._calc_freqs()
def switch_pol(self):
self.par_data, self.per_data = self.per_data, self.par_data
self.par_spec, self.per_spec = self.per_spec, self.par_spec
def calc_spec(self):
par_dict: Dict[int, np.ndarray] = {}
perp_dict: Dict[int, np.ndarray] = {}
for i, data in enumerate((self.par_data, self.per_data)):
for t in data:
d = np.nanmean(data[t], 0)
d = d[:-1, 1:]
if self.probe_filter is not None:
d = gaussian_filter1d(d, self.probe_filter, -1, mode="nearest")
if self.bg_correct:
poly_bg_correction(
self.wavelength, d, self.bg_correct[0], self.bg_correct[1]
)
d[0, :] *= 0.5
if self.win_function is not None:
win = self.win_function(2 * len(self.t2))
else:
win = np.ones(2 * len(self.t2))
spec = np.fft.rfft(
d * win[len(self.t2):, None],
axis=0,
n=self.upsampling * len(self.t2),
)
(par_dict, perp_dict)[i][t] = spec.real
return par_dict, perp_dict
def _calc_freqs(self):
freqs = np.fft.rfftfreq(self.upsampling * len(self.t2), self.t2[1] - self.t2[0])
om0 = self.info["Rotating Frame (Scanned)"]
with np.errstate(divide="ignore"):
cm = 10 / ((1 / freqs) * 1e-12 * speed_of_light) + om0
cm[0] = om0
return cm
def make_ds(self) -> Dict[str, TwoDim]:
par, perp = self.calc_spec()
self.pump_freq = self._calc_freqs()
par_arr = np.dstack(list(par.values())).T
per_arr = np.dstack(list(perp.values())).T
iso = (2 * per_arr + par_arr) / 3
d = {
"para": TwoDim(
t=self.t,
pump_wn=self.pump_freq,
probe_wn=self.wavenumbers,
spec2d=par_arr,
),
"perp": TwoDim(
t=self.t,
pump_wn=self.pump_freq,
probe_wn=self.wavenumbers,
spec2d=per_arr,
),
"iso": TwoDim(
t=self.t, pump_wn=self.pump_freq, probe_wn=self.wavenumbers, spec2d=iso
),
}
return d