Source code for skultrafast.fitter

# -*- coding: utf-8 -*-

from typing import Callable, Tuple, cast

import lmfit
import numpy as np
import scipy.linalg as linalg

from . import dv, zero_finding
from .base_functions import _fold_exp, _fold_exp_and_coh

[docs] posv = linalg.get_lapack_funcs( ('posv' ))
[docs] LinAlgError = np.linalg.LinAlgError
[docs] def direct_solve(a, b): c, x, info = posv(a, b, lower=False, overwrite_a=True, overwrite_b=False) return x
[docs] alpha = 0.001
[docs] def solve_mat(A, b_mat, method='ridge'): """ Returns the solution for the least squares problem |Ax - b_i|^2. """ if method == 'fast': #return linalg.solve(A.T.dot(A), A.T.dot(b_mat), sym_pos=True) return direct_solve(A.T.dot(A), A.T.dot(b_mat)) elif method == 'ridge': X = np.dot(A.T, A) X.flat[::A.shape[1] + 1] += alpha Xy = np.dot(A.T, b_mat) #return linalg.solve(X, Xy, sym_pos=True, overwrite_a=True) return direct_solve(X, Xy) elif method == 'qr': cq, r = linalg.qr_multiply(A, b_mat) return linalg.solve_triangular(r, cq) elif method == 'cho': c, l = linalg.cho_factor(A.T.dot(A)) return linalg.cho_solve((c, l), A.T.dot(b_mat)) elif method == 'lstsq': return np.linalg.lstsq(A, b_mat)[0] elif method == 'lasso': import sklearn.linear_model as lm s = lm.Lasso(fit_intercept=False) s.alpha = alpha s.fit(A, b_mat) return s.coef_.T elif method == 'enet': import sklearn.linear_model as lm s = lm.ElasticNet(fit_intercept=False, l1_ratio=0.2) s.alpha = alpha s.fit(A, b_mat) return s.coef_.T else: raise ValueError('Unknow lsq method, use ridge, qr, fast or lasso')
[docs] class Fitter(object): """ The fit object, takes all the need data and allows to fit it. There a two different methods to fit the data. The fast one assumes, that the data has no dispersion, so the base vectors are the same for each channel. It is recommended to first work with the fast version. Note that the fast version is able to handle dispersion by using linear interpolation to transform the data to dispersion free data. The slower version calculates the base vector for each channel, in which the dispersion is integrated. The slower methods using the prefix full. Parameters ---------- wl : ndarray(M) Array containing the wavelength-coordinates. t : ndarray(N) Array containing the time-coordinates. data : ndarry(N,M) The 2d-data to fit. model_coh : bool If the model contains coherent artifacts at the time zero, defaults to False. model_disp : int Degree of the polynomial which models the dispersion. If 1, only a offset is modeled, which is very fast. """ def __init__(self, tup, model_coh=False, model_disp=1): wl, t, data = tup self.t = t self.wl = wl self.data = data self.verbose = False self.model_coh = model_coh self.model_disp = model_disp self.lsq_method = 'ridge' self.num_exponentials = -1 self.weights = None if model_disp > 1: self.org = data[:] self.disp_x = (wl - np.min(wl)) / (wl.max() - wl.min()) self.used_disp = np.zeros(model_disp)
[docs] def make_model(self, para): """ Calculates the model for given parameters. After calling, the DAS is at self.c, the model at self.model. If the dispersion is modeled, it is done via linear interpolation. This way, the base- vectors and their decomposition are only calculated once. Parameters ---------- para : ndarray(N) para has the following form: [p_0, ..., p_M, w, tau_1, ..., tau_N] Where p are the coefficients of the dispersion polynomial, w is the width of the system response and tau are the decay times. M is equal to self.model_disp. """ self.last_para = np.asarray(para) if self._chk_for_disp_change(para): # Only calculate interpolated data if necessary: self.tn = np.poly1d(para[:self.model_disp])(self.disp_x) tup = dv.tup(self.wl, self.t, self.org) self.data = zero_finding.interpol(tup, self.tn)[2] self.used_disp[:] = para[:self.model_disp] self.num_exponentials = self.last_para.size - self.model_disp - 1 if self.model_disp <= 1: self._build_xvec(para) self.x_vec = np.nan_to_num(self.x_vec) self.c = solve_mat(self.x_vec, self.data, self.lsq_method) self.model = np.dot(self.x_vec, self.c) self.c = self.c.T
[docs] def _chk_for_disp_change(self, para): if self.model_disp > 1: if np.any(para[:self.model_disp] != self.used_disp): return True return False
[docs] def _build_xvec(self, para): """ Build the base (the folded functions) for given parameters. """ para = np.array(para) if self.verbose: print(para) try: idx = (para != self._last) except AttributeError: #self._l idx = [True] * len(para) if self.model_disp == 1: x0, w, taus = para[0], para[1], para[2:] tau_idx = idx[2:] else: x0, w, taus = 0., para[0], para[1:] tau_idx = idx[1:] if any(idx[:2]) or self.model_disp or True: if self.model_coh: x_vec = np.zeros((self.t.size, self.num_exponentials + 3)) #print(taus) a, b = _fold_exp_and_coh(self.t[:, None], w, x0, taus) #print(a.shape, b.shape) x_vec[:, -3:] = b[..., 0, :] x_vec[:, :-3] = a[..., 0, :] else: x_vec = _fold_exp(self.t[:, None], w, x0, taus).squeeze() self.x_vec = np.nan_to_num(x_vec) #self.x_vec /= np.max(self.x_vec, 0) self._last = para.copy() else: self.x_vec[:, tau_idx] = _fold_exp(self.t, w, x0, taus[tau_idx]).T
[docs] def res(self, para): """ Return the residuals for given parameters using the same basevector for each channel. See make_model for para format. """ self.make_model(para) self.residuals = (self.model - self.data) if self.weights is not None: self.residuals *= self.weights return self.residuals.ravel()
[docs] def full_res(self, para): """ Return the residuals for given parameter modelling each channel for it own. """ self.make_full_model(para) self.residuals = (self.model - self.data) if self.weights is not None: self.residuals *= self.weights return self.residuals.ravel()
[docs] def make_full_model(self, para): """ Calculates the model for given parameters. After calling, the DAS is at self.c, the model at self.model. Parameters ---------- para : ndarray(N) para has the following form: [p_0, ..., p_M, w, tau_1, ..., tau_N] Where p are the coefficients of the dispersion polynomial, w is the width of the system response and tau are the decay times. M is equal to self.model_disp. """ para = np.asarray(para) self._check_num_expontials(para) try: m_disp = self.model_disp is_disp_changed = (para[:m_disp] != self.last_para[:m_disp]).any() except AttributeError: is_disp_changed = True self.last_para = para if self.model_disp != 0 and is_disp_changed or True: self.tn = np.poly1d(para[:self.model_disp])(self.disp_x) self.t_mat = self.t[:, None] - self.tn[None, :] self._build_xmat(para[self.model_disp:], is_disp_changed) for i in range(self.data.shape[1]): A = self.xmat[:, i, :] self.c[i, :] = solve_mat(A, self.data[:, i], self.lsq_method) self.model = np.dot(self.xmat, self.c)
[docs] def _build_xmat(self, para, is_disp_changed): """ Builds the basevector for every channel. The vectors are save self.xmat. """ para = np.array(para) try: idx = (para != self._last) except AttributeError: idx = [True] * len(para) w = para[0] taus = para[1:] x0 = 0. #Only calculate what is necessary. if idx[0] or is_disp_changed or True: exps, coh = _fold_exp_and_coh(self.t_mat, w, x0, taus) if self.model_coh: #print('test') self.xmat[:, :, -3:] = coh num_exp = self.num_exponentials self.xmat[:, :, :num_exp] = exps elif any(idx): self.xmat[:, :, idx[1:]] = _fold_exp(self.t_mat, w, x0, taus[idx[1:]]) #self.xmat = np.nan_to_num(self.xmat) self._last = para
[docs] def _check_num_expontials(self, para): """ Check if num_exp changed and allocate space as necessary. """ new_num_exp = para.size - self.model_disp - 1 if new_num_exp != self.num_exponentials: self.num_exponentials = new_num_exp if self.model_disp: new_num_exp += 3 n, m = self.data.shape self.xmat = np.empty((n, m, new_num_exp)) self.c = np.zeros((self.data.shape[1], self.xmat.shape[-1])) self.model = np.empty_like(self.data)
[docs] def res_sum(self, para): """Returns the squared sum of the residuals for given parameters""" return np.sum(self.res(para)**2)
[docs] def start_lmfit(self, x0, fixed_names=[], lower_bound=0.3, fix_long=True, fix_disp=False, full_model=1): p = lmfit.Parameters() for i in range(self.model_disp): p.add('p' + str(i), x0[i]) if fix_disp: p['p' + str(i)].vary = False x0 = x0[self.model_disp:] p.add('w', x0[0], min=0) num_exp = len(x0) - 1 for i, tau in enumerate(x0[1:]): name = 't' + str(i) # # p.add(name, tau, vary=True) if name not in fixed_names: p[name].min = lower_bound else: p[name].vary = False for i in fixed_names: p[i].vary = False if fix_long: p['t' + str(num_exp - 1)].vary = False def res(p): x = [k.value for k in p.values()] return self.res(x) def full_res(p): x = [k.value for k in p.values()] return self.full_res(x) fun = full_res if full_model else res return lmfit.Minimizer(fun, p)