# -*- coding: utf-8 -*-
import lmfit
from scipy.optimize import nnls
from scipy.sparse.linalg import svds
import scipy.ndimage as nd
from scipy.stats import trim_mean
from scipy.interpolate import UnivariateSpline
import numpy as np
import scipy.stats as st
import scipy.signal as sig
from collections import namedtuple
from scipy.constants import c, physical_constants
from typing import Union, List, overload
[docs]
tup = namedtuple('tup', 'wl t data')
[docs]
def add_to_cls(cls):
def function_enum(fn):
setattr(cls, fn.__name__, staticmethod(fn))
return fn
return function_enum
[docs]
def add_tup(str_lst):
def function_enum(fn):
str_lst[0].append(fn.__name__)
str_lst[1].append(fn)
return fn
return function_enum
[docs]
def trimmed_mean(arr, axis=-1, ratio=2., use_sem=True):
arr = np.sort(arr, axis=axis)
std = np.std(arr, axis, keepdims=1)
std = np.std(st.trimboth(arr, 0.1, axis), keepdims=1)
mean = np.mean(st.trimboth(arr, 0.1, axis), keepdims=1)
# std = np.std(st.trimboth(arr, 0.1, axis), keepdims=1)
# mean = np.mean(st.trimboth(arr, 0.1, axis), keepdims=1)
idx = np.abs(arr - mean) > ratio * std
n = np.sqrt(np.sum(~idx, axis))
if not use_sem:
n = 1
arr[idx] = np.nan
mean = np.nanmean(arr, axis)
std = np.nanstd(arr, axis, ddof=1)/n
return mean, std
[docs]
def smooth_spline(x, y, s):
s = UnivariateSpline(x, y, s=s)
return s(x)
[docs]
def svd_filter(d, n=6):
u, s, v = np.linalg.svd(d, full_matrices=0)
s[n:] = 0
f = np.dot(u, np.diag(s).dot(v))
return f
[docs]
def apply_spline(t, d, s=None):
out = np.zeros_like(d)
for i in range(d.shape[1]):
out[:, i] = smooth_spline(t, d[:, i], s)
return out
[docs]
def normalize(x):
return x/abs(x).max()
[docs]
def weighted_binner(n, wl, dat, std):
"""
Given wavelengths and data it bins the data into n-wavelenths.
Returns bdata and bwl
"""
i = np.argsort(wl)
wl = wl[i]
dat = dat[:, i]
idx = np.searchsorted(wl, np.linspace(wl.min(), wl.max(), n+1))
binned = np.empty((dat.shape[0], n))
binned_std = np.empty_like(binned)
binned_wl = np.empty(n)
for i in range(n):
data = dat[:, idx[i]:idx[i+1]]
weights = 1/std[:, idx[i]:idx[i+1]]**2
binned[:, i] = np.average(data, 1, weights)
binned_std[:, i] = np.average(std[:, idx[i]:idx[i+1]], 1, weights)
binned_wl[i] = np.mean(wl[idx[i]:idx[i+1]])
return binned, binned_wl, binned_std
[docs]
def binner(n, wl, dat, func=np.mean):
"""
Given wavelengths and data it bins the data into n-wavelenths.
Returns bdata and bwl
"""
i = np.argsort(wl)
wl = wl[i]
dat = dat[:, i]
idx = np.searchsorted(wl, np.linspace(wl.min(), wl.max(), n+1))
binned = np.empty((dat.shape[0], n))
binned_wl = np.empty(n)
for i in range(n):
binned[:, i] = func(dat[:, idx[i]:idx[i+1]], 1)
binned_wl[i] = np.mean(wl[idx[i]:idx[i+1]])
return binned, binned_wl
@overload
[docs]
def fi(w: np.ndarray, x: float) -> int:
...
@overload
def fi(w: np.ndarray, x: List[float]) -> List[int]:
...
def fi(w, x):
"""
Given a value, it finds the index of the nearest value in the array.
Parameters
----------
w : np.ndarray
Array where to look.
x : float or list of floats
Value or values to look for.
Returns
-------
int or list of ints
Indicies of the nearest values.
"""
try:
len(x)
except TypeError:
x = [x]
ret = [np.argmin(np.abs(w-i)) for i in x]
if len(ret) == 1:
return ret[0]
else:
return ret
[docs]
def subtract_background(dat, t, tn, offset=0.3):
out = np.zeros_like(dat)
for i in range(dat.shape[1]):
mask = (t-tn[i]) < -offset
corr = dat[mask, i].mean()
out[:, i] = dat[:, i] - corr
return out
[docs]
def polydetrend(x, t=None, deg=3):
if t is None:
t = np.arange(x.shape[0])
p = np.polyfit(t, x, deg)
yf = np.poly1d(p)(t)
return x - yf
[docs]
def exp_detrend(y, t, start_taus=[1], use_constant=True):
m, yf = exp_fit(t, y, start_taus, use_constant=use_constant, verbose=0)
return y - yf
[docs]
def arr_polydetrend(x, t=None, deg=3):
out = np.zeros_like(x)
for i in range(x.shape[1]):
out[:, i] = polydetrend(x[:, i], t, deg)
return out
[docs]
def meaner(dat, t, llim, ulim, proportiontocut=0.0):
return trim_mean(dat[fi(t, llim):fi(t, ulim)], axis=0, proportiontocut=proportiontocut)
[docs]
def apply_sg(y, window_size, order, deriv=0):
out = np.zeros_like(y)
coeffs = sig.savgol_coeffs(window_size, order, deriv=0, use='dot')
for i in range(y.shape[1]):
out[:, i] = coeffs.dot(y[:, i])
return out
[docs]
def apply_sg_scan(y, window_size, order, deriv=0):
out = np.zeros_like(y)
c = sig.savgol_coeffs(window_size, order, deriv=0)
# for s in range(y.shape[-1]):
# for i in range(y.shape[1]):
# print c.shape
out = nd.convolve1d(y, c, 0, mode='nearest')
# out, s] = c.dot(y[:, i, 1, s])
return out
[docs]
def calc_error(args):
"""
Calculates the error from a leastsq fit infodict.
"""
p, cov, info, mesg, success = args
chisq = sum(info["fvec"] * info["fvec"])
dof = len(info["fvec"]) - len(p)
sigma = np.array([np.sqrt(cov[i, i]) * np.sqrt(chisq / dof) for i in range(len(p))])
return p, sigma
[docs]
def min_pulse_length(width_in_cm, shape='gauss'):
width_hz = width_in_cm * 3e10
if shape == 'gauss':
return (0.44 / width_hz) / 1e-15
[docs]
def wavelength2rgb(w):
"""
Converts a wavelength to a RGB color.
"""
if w >= 380 and w < 440:
R = -(w - 440.) / (440. - 350.)
G = 0.0
B = 1.0
elif w >= 440 and w < 490:
R = 0.0
G = (w - 440.) / (490. - 440.)
B = 1.0
elif w >= 490 and w < 510:
R = 0.0
G = 1.0
B = -(w - 510.) / (510. - 490.)
elif w >= 510 and w < 580:
R = (w - 510.) / (580. - 510.)
G = 1.0
B = 0.0
elif w >= 580 and w < 645:
R = 1.0
G = -(w - 645.) / (645. - 580.)
B = 0.0
elif w >= 645 and w <= 780:
R = 1.0
G = 0.0
B = 0.0
else:
R = 0.0
G = 0.0
B = 0.0
if (w > 700.):
s = .3+.7 * (780.-w)/(780.-700.)
elif (w < 420.):
s = .3+.7*(w-380.)/(420.-380.)
else:
s = 1.
R, G, B = (np.array((R, G, B))*s)**0.8
return R, G, B
[docs]
def equal_color(plots1, plots2):
if len(plots1) != len(plots2):
raise ValueError
for (plot1, plot2) in zip(plots1, plots2):
plot2.set_color(plot1.get_color())
[docs]
def find_linear_part(t):
"""
Finds the first value of an 1d-array where the difference betweeen
consecutively values varys.
"""
d = np.diff(t)
return np.argmin(np.abs(d-d[0]) < 0.00001)
[docs]
def rebin(a, new_shape):
"""
Resizes a 2d array by averaging or repeating elements,
new dimensions must be integral factors of original dimensions
Parameters
----------
a : array_like
Input array.
new_shape : tuple of int
Shape of the output array
Returns
-------
rebinned_array : ndarray
If the new shape is smaller of the input array, the data are averaged,
if the new shape is bigger array elements are repeated
See Also
--------
resize : Return a new array with the specified shape.
Examples
--------
>>> a = np.array([[0, 1], [2, 3]])
>>> b = rebin(a, (4, 6)) #upsize
>>> b
array([[0, 0, 0, 1, 1, 1],
[0, 0, 0, 1, 1, 1],
[2, 2, 2, 3, 3, 3],
[2, 2, 2, 3, 3, 3]])
>>> c = rebin(b, (2, 3)) #downsize
>>> c
array([[ 0. , 0.5, 1. ],
[ 2. , 2.5, 3. ]])
"""
M, N = a.shape
m, n = new_shape
if m < M:
return a.reshape((m, M/m, n, N/n)).mean(3).mean(1)
else:
return np.repeat(np.repeat(a, m/M, axis=0), n/N, axis=1)
[docs]
def efa(dat, n, reverse=False):
"""
Doing evolving factor analyis.
"""
if reverse:
data = dat[::-1, :]
else:
data = dat
out = np.zeros((data.shape[0], n))
for i in range(6, data.shape[0]):
sv = svds(data[:i, :], min(i, n))[1]
out[i, :] = sv
return out
[docs]
def moving_efa(dat, n, ncols, method='svd'):
out = np.zeros((dat.shape[0], n))
p = PCA()
for i in range(0, dat.shape[0]-n):
if method == 'svd':
sv = np.linalg.svd(dat[i:i+ncols, :])[1][:n]
elif method == 'pca':
p = p.fit(dat[i:i+ncols, :])
sv = p.explained_variance_ratio_[:n]
out[i, :] = sv
return out
[docs]
def pfid_tau_to_w(tau):
"""
Given the rise time of the pertuabed
free induction decay, calculate the
corresponding spectral width in cm-^1.
"""
return 1/(np.pi*3e7*tau*1e-9)
[docs]
def als(dat, n=5):
u, s, v = np.linalg.svd(dat)
u0 = u[:n]
v0 = v.T[:n]
u0 = np.random.random(u0.shape)+0.1
v0 = np.random.random(v0.shape)+0.1
res = np.linalg.lstsq(u0.T, dat)[1].sum()
res = 10000.
for i in range(15000):
if i % 2 == 0:
v0 = do_nnls(u0.T, dat)
v0 /= np.max(v0, 1)[:, None]
res_n = ((u0.T.dot(v0) - dat)**2).sum()
else:
u0, res_n, t, t = np.linalg.lstsq(v0.T, dat.T)
# r.fit(dat.T, v0.T)
# u0 = r.coef_[:]
res_n = ((u0.T.dot(v0) - dat)**2).sum()
if abs(res-res_n) < 0.001:
break
else:
print(i, res_n)
res = res_n
return u0.T, v0.T
[docs]
def spec_int(tup, r, is_wavelength=True):
wl, t, d = tup.wl, tup.t, tup.data
if is_wavelength:
wl = 1e7/wl
wl1, wl2 = 1e7 / r[0], 1e7 / r[1]
else:
wl1, wl2 = r
ix = np.argsort(wl)
wl = wl[ix]
d = d[:, ix]
idx1, idx2 = sorted([fi(wl, wl1), fi(wl, wl2)])
dat = np.trapz(d[:, idx1:idx2], wl[idx1:idx2]) / np.ptp(wl[idx1:idx2])
return dat
# import mls
[docs]
def do_nnls(A, b):
n = b.shape[1]
out = np.zeros((A.shape[1], n))
for i in range(n):
# mls.bounded_lsq(A.T, b[:,i], np.zeros((A.shape[1],1)), np.ones((A.shape[1],1))).shape
out[:, i] = nnls(A, b[:, i])[0]
return out
[docs]
def exp_fit(x, y, start_taus=[1], use_constant=True, amp_max=None, amp_min=None, weights=None, amp_penalty=0,
verbose=True, start_amps=None):
num_exp = len(start_taus)
para = lmfit.Parameters()
if use_constant:
para.add('const', y[-1])
for i in range(num_exp):
para.add('tau' + str(i), start_taus[i], min=0)
y_c = y - y[-1]
if start_amps is None:
a = y_c[fi(x, start_taus[i])]
else:
a = start_amps[i]
para.add('amp' + str(i), a)
if amp_max is not None:
para['amp' + str(i)].max = amp_max
if amp_min is not None:
para['amp' + str(i)].min = amp_min
def fit(p):
y_fit = np.zeros_like(y)
if use_constant:
y_fit += p['const'].value
for i in range(num_exp):
amp = p['amp'+str(i)].value
tau = p['tau'+str(i)].value
y_fit += amp * np.exp(-x/tau)
return y_fit
def res(p):
if weights is None:
pen = 0
for i in range(num_exp):
pen += p['amp'+str(i)].value**2
return np.hstack(((y - fit(p)), amp_penalty*pen))
else:
return (y - fit(p)) / weights
mini = lmfit.minimize(res, para)
if verbose:
lmfit.report_fit(mini)
y_fit = fit(mini.params)
return mini, y_fit
[docs]
def calc_ratios(fitter, tmin=0.35, tmax=200):
from skultrafast import zero_finding
tup = zero_finding.interpol(fitter, fitter.tn)
w, t, d = tup
i = fi(t, tmin)
i_max = fi(t, tmax)
t = t[i:i_max]
d = d[i:i_max, :]
pos = np.where(d > 0, d, 0)
neg = np.where(d < 0, d, 0)
pos = np.trapz(pos, w)
neg = np.trapz(neg, w)
return t, pos, neg, pos/neg, d.sum(1)
[docs]
def make_fi(data_to_search):
return lambda x: fi(data_to_search, x)