Source code for skultrafast.referencing

# This file contains function to apply adavanced referencing to raw dataset

import h5py
import numpy as np
from collections import defaultdict
from scipy.ndimage import gaussian_filter1d
from attr import dataclass


[docs] def get_stats(grp: h5py.Group) -> np.ndarray: keys = list(grp.keys()) n = len(keys) dim = grp[keys[0]].shape out = np.zeros((n, *dim), dtype=np.float32) for i in range(n): out[i, :] = grp[str(i)][:].astype(np.float32) return out
[docs] def get_scans(f: h5py.File) -> dict[str, list[np.ndarray]]: out = defaultdict(list) for p in 'Probe1', 'Probe2': for i in range(len(f['frames'][p])): out[p].append(get_stats(f['frames'][p][str(i)])) return out
[docs] def get_ref_stats(f: h5py.File) -> list[np.ndarray]: out = [] for i in range(len(f['ref_data'])): out.append(get_stats(f['ref_data'][str(i)])) return out
[docs] def get_all_scans(f: h5py.File, filter_val=0.8) -> np.ndarray: data = get_scans(f) max_scan = min([arr.shape[0] for arr in data['Probe1']]) _, n_wl, num_frames = data['Probe1'][0].shape n_t2 = len(data['Probe1']) if 'ref_data' in f: has_ref = f['ref_data/0/0'].shape[1] == num_frames ref_data = get_ref_stats(f) else: has_ref = False ref_data = None lines = 3 if has_ref else 2 all_scans = np.empty((lines, max_scan, n_t2, n_wl, num_frames)) for i in range(n_t2): all_scans[0, :, i, ...] = data['Probe1'][i][:max_scan, ...] all_scans[1, :, i, ...] = data['Probe2'][i][:max_scan, ...] if has_ref: assert ref_data is not None all_scans[2, :, i, ...] = ref_data[i][:max_scan, ...] if filter_val > 0: all_scans = gaussian_filter1d(all_scans, filter_val, axis=-2) return all_scans
[docs] def build_basis(all_scans): pass
[docs] def use_edge_ref(probe_wn, all_scan, low, high, ref_scans=2): """ This function takes the whole 2D dataset and applies edge referencing to it. After referencing, the 2D-signal is calculated and returned along with the corresponding wavenumbers. Assumes 4 phase cycling steps. """ data_idx = (probe_wn < high) & (probe_wn > low) data = all_scan[:, :, :, data_idx, :] edge = all_scan[:, :, :, ~data_idx, :] data_diffs = np.diff(data, axis=-1) edge_diffs = np.diff(edge, axis=-1) num_frames = data.shape[-1] out = np.empty_like(data_diffs)[..., ::4] max_scan = data.shape[1] for k in [0, 1]: for i in range(data.shape[1]): zero0 = np.concatenate(data_diffs[k, i, :ref_scans, ...], axis=-1) zero_base0 = np.concatenate(edge_diffs[k, i, :ref_scans, ...], axis=-1) if i != max_scan - 1: zero_tmp = np.concatenate(data_diffs[k, i+1, :ref_scans, ...], axis=-1) zero_base_tmp = np.concatenate( edge_diffs[k, i+1, :ref_scans, ...], axis=-1) zero0 = np.concatenate((zero0, zero_tmp), axis=-1) zero_base0 = np.concatenate((zero_base0, zero_base_tmp), axis=-1) coeffs = np.linalg.lstsq(zero_base0.T, zero0.T, rcond=None)[0] for j in range(ref_scans, data_diffs.shape[2]): sig = data_diffs[k, i, j, ...] means = data[k, i, j, ...].mean(axis=-1) reffed = (edge_diffs[k, i, j, ...].T @ coeffs).T sig = (sig - reffed)[:, ::2] sig = (sig[:, :-1:2] + sig[:, 1::2])/means[:, None] fac = -1000/np.log(10) sig = np.log1p(sig)*fac sig[..., 0] *= 0.5 sig *= np.hamming(num_frames/2)[sig.shape[1]:] out[k, i, j, ...] = sig out2d = np.fft.rfft(out.mean(1), axis=-1, n=2*num_frames).real return out2d, probe_wn[data_idx]
[docs] def use_edge_ref_full(all_scan, probe_wn, low, high, add_higher_degree): """ This function takes the whole 2D dataset and applies edge referencing to it. After referencing, the 2D-signal is calculated and returned along with the corresponding wavenumbers. Assumes 4 phase cycling steps. Parameters ---------- all_scan : np.ndarray The 2D dataset. probe_wn : np.ndarray The wavenumbers of the dataset. low : float The lower bound of the wavenumber range to exclude. high : float The upper bound of the wavenumber range to exclude. add_higher_degree : bool Whether to add higher degree polynomials to the referencing. """ data_idx = (probe_wn < high) & (probe_wn > low) data = all_scan[:, :, :, data_idx, :] edge = all_scan[:, :, :, ~data_idx, :] data_diffs = np.diff(data, axis=-1) edge_diffs = np.diff(edge, axis=-1) num_frames = data.shape[-1] zero_a = np.concatenate(data_diffs[0, :, :2, ...], axis=-1) zero_a = np.concatenate(zero_a, axis=-1) zero_b = np.concatenate(data_diffs[1, :, :2, ...], axis=-1) zero_b = np.concatenate(zero_b, axis=-1) zero_base_a = np.concatenate(edge_diffs[0, :, :2, ...], axis=-1) zero_base_a = np.concatenate(zero_base_a, axis=-1) zero_base_b = np.concatenate(edge_diffs[1, :, :2, ...], axis=-1) zero_base_b = np.concatenate(zero_base_b, axis=-1) if add_higher_degree: zero_base_a = np.vstack((zero_base_a, zero_base_a**2)) zero_base_b = np.vstack((zero_base_b, zero_base_b**2)) print(zero_base_a.shape, zero_base_b.shape) coefs_a = np.linalg.lstsq(zero_base_a.T, zero_a.T, rcond=None)[0] coefs_b = np.linalg.lstsq(zero_base_b.T, zero_b.T, rcond=None)[0] sig_a = data_diffs.mean(1)[0] base = np.swapaxes(edge_diffs[0].mean(0), -1, -2) if add_higher_degree: base = np.dstack((base, base**2)) reffed_a = (base @ coefs_a).swapaxes(-1, -2) sig_b = data_diffs.mean(1)[1] base = np.swapaxes(edge_diffs[1].mean(0), -1, -2) if add_higher_degree: base = np.dstack((base, base**2)) reffed_b = (base @ coefs_b).swapaxes(-1, -2) sig = np.stack((sig_a, sig_b), axis=0) reffed = np.stack((reffed_a, reffed_b), axis=0) sig.shape, reffed.shape means = data[:2].mean(1).mean(-1) sig = (sig - reffed)[..., ::2] # If not using 4 phase cycling steps, change the indexing here sig = (sig[..., :-1:2] + sig[..., 1::2])/means[..., None] fac = 1000/np.log(10) sig = np.log1p(sig)*fac sig[..., 0] *= 0.5 sig *= np.hamming(num_frames/2)[sig.shape[-1]:] out2d = np.fft.rfft(sig, axis=-1, n=4*num_frames//4).real return out2d, probe_wn[data_idx]