Source code for pyplis.calib_base

# -*- coding: utf-8 -*-
#
# Pyplis is a Python library for the analysis of UV SO2 camera data
# Copyright (C) 2017 Jonas Gliss (jonasgliss@gmail.com)
#
# This program is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License a
# published by the Free Software Foundation, either version 3 of
# the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Pyplis module containing the :class:`CalibData`.

This is the base class for storing calibration data, fitting calibration
curves, and corresponding I/O routines (e.g storage as FITS or text file).
"""
from __future__ import (absolute_import, division)
from pyplis import logger
from numpy import (min, asarray, zeros, linspace, ones, float64, isnan,
                   ndarray, argmax, inf)
from inspect import getargspec
from scipy.optimize import curve_fit

from datetime import datetime
from pandas import Series
from astropy.io import fits
from os.path import join, exists, isdir, abspath, basename, dirname


from matplotlib.pyplot import subplots

from .glob import SPECIES_ID
from .processing import ImgStack
from .helpers import exponent, isnum

from .model_functions import CalibFuns
from .image import Img
from .setupclasses import Camera
import six


[docs]class CalibData(object): """Base class representing calibration data and optimisation parameters. The default calibration curve is a polynomial of first order. Calibration data is represneted by two arrays ``cd_vec`` and ``tau_vec`` and optionally, a vector containing errors in the column densities ``cd_vec_err`` (note that errors in the optical densities are not supported). Furthermore, an array of ``time_stamps`` can be provided and If you want to use a custom calibration function you can provide the function using :param:`calib_fun`. Parameters ---------- tau_vec : ndarray tau data vector for calibration data cd_vec : ndarray DOAS-CD data vector for calibration data cd_vec_err : ndarray Fit errors of DOAS-CDs time_stamps : ndarray array with datetime objects containing time stamps (e.g. start acquisition) of calibration data calib_fun : function optimisation function used for fitting of calibration data calib_coeffs : ;obj:`list`, optional optimisation parameters for calibration curve. senscorr_mask : :obj:`Img`, optional sensitivity correction mask that was normalised relative to the pixel position where the calibration data was retrieved (i.e. position of DOAS FOV in case of DOAS calibration data, or image pixel position, where cell calibration data was retrieved) calib_id : str calibration ID (e.g. "aa", "tau_on", "tau_off") camera : Camera camera object (not necessarily required). A camera can be assigned in order to convert the FOV extend from pixel coordinates into decimal degrees """
[docs] def __init__(self, tau_vec=None, cd_vec=None, cd_vec_err=None, time_stamps=None, calib_fun=None, calib_coeffs=None, senscorr_mask=None, polyorder=1, calib_id="", camera=None): # type of calibration performed (e.g. "doas", "cell") if tau_vec is None: tau_vec = [] if cd_vec is None: cd_vec = [] if cd_vec_err is None: cd_vec_err = [] if time_stamps is None: time_stamps = [] if calib_coeffs is None: calib_coeffs = [] self.type = "base" # ID specifying image OD type (e.g. "on", "off", "aa") self.calib_id = calib_id # tau data vector within FOV self.tau_vec = asarray(tau_vec).astype(float64) self._calib_funs = CalibFuns() num = len(tau_vec) # CDs data vector if not len(cd_vec) == len(tau_vec): raise ValueError("Length mismatch between tau_vec and cd_vec") self.cd_vec = asarray(cd_vec).astype(float64) if not len(cd_vec_err) == len(cd_vec): cd_vec_err = zeros(len(cd_vec)) self.cd_vec_err = asarray(cd_vec_err).astype(float64) try: num = len(tau_vec) if not len(time_stamps) == num: raise AttributeError elif not isinstance(time_stamps[0], datetime): raise ValueError except BaseException: time_stamps = asarray([datetime(1900, 1, 1)] * num) self.time_stamps = time_stamps if camera is None: camera = Camera() self.camera = camera self._senscorr_mask = None if senscorr_mask is None: try: senscorr_mask = ones((self.camera.pixnum_y, self.camera.pixnum_x)) except BaseException: logger.warning("Could not retrieve image dimensions from camera " "(probably since no camera was provided on input). " "Initiating attribute senscorr_mask with ones and " "shape=(10, 10)") senscorr_mask = Img(ones((10, 10))) self.senscorr_mask = senscorr_mask self.fit_weighted = True # irrelevant if custom fit function is provided self.poly_through_origin = False self._calib_fun = None self._calib_coeffs = None self._cov = None self.residual = None self._polyorder = None try: self.calib_fun = calib_fun except BaseException: pass try: self.calib_coeffs = calib_coeffs except BaseException: pass self.polyorder = polyorder
[docs] def num_optargs_fun(self, fun): """Return number of optimisation args of a function.""" return len(getargspec(fun).args) - 1
@property def senscorr_mask(self): """Get current sensitivity correction mask (:class:`Img` instance).""" return self._senscorr_mask @senscorr_mask.setter def senscorr_mask(self, val): if not isinstance(val, Img): raise TypeError("Need Img object...") self._senscorr_mask = val @property def calib_coeffs(self): """List containing calibration coefficients for :attr:`calib_fun`.""" return self._calib_coeffs @calib_coeffs.setter def calib_coeffs(self, val): try: iter(val) except BaseException: raise TypeError("Input is not iterable, need list, tuple or " "similar, containing optimisation coefficients") req_num_args = self.num_optargs_fun(self.calib_fun) if not len(val) == req_num_args: raise AttributeError("Number of provided coefficients does not " "match the number of optimisation params " "in current optimisation function. " "Please check and update class attribute " "calib_fun first...") if self._calib_coeffs is not None and len(self._calib_coeffs) > 0: logger.warning("Resetting calibration coefficients manually. " "This may introduce analysis errors. It is " "recommended to use the method fit_calib_data " "instead") self._calib_coeffs = val @property def calib_fun(self): """Mathematical function used for retrieval of calibration curve. Note ---- The function can be defined on class initiation and may be updated using the setter method. If not explicitely specified, a polynomial is used with order :attr:`polyorder`. """ if not callable(self._calib_fun): return self._calib_funs.get_poly(self.polyorder, self.poly_through_origin) return self._calib_fun @calib_fun.setter def calib_fun(self, val): if not callable(val): raise ValueError("Need a callable object (e.g. lambda function)") args = getargspec(val).args logger.info("Setting optimisation function in CalibData class. " "Argspec: %s" % args) self._calib_fun = val @property def start(self): """Start time of calibration data (datetime).""" try: return self.time_stamps[0] except BaseException: raise ValueError("Start time could not be accessed") @property def stop(self): """Stop time of calibration data (datetime).""" try: return self.time_stamps[-1] except BaseException: raise ValueError("Start time could not be accessed") @property def polyorder(self): """Get current order of fit polynomial.""" return self._polyorder @polyorder.setter def polyorder(self, val): allowed = self._calib_funs.available_poly_orders( self.poly_through_origin) if val not in allowed: raise ValueError("Invalid value for polyorder: %.1f. " "Choose from %s" % (val, allowed)) self._polyorder = val @property def cov(self): """Get covariance matrix of calibration polynomial.""" if not isinstance(self._cov, ndarray): self.fit_calib_data() return self._cov @cov.setter def cov(self, value): raise IOError("Covariance matrix of calibration data cannot " "be set manually, please call function " "fit_calib_data") @property def y_offset(self): """Y-axis offset of calib curve.""" return self.calib_fun(0, *self.calib_coeffs) @property def cd_tseries(self): """Pandas Series object of doas data.""" return Series(self.cd_vec, self.time_stamps) @property def tau_tseries(self): """Pandas Series object of tau data.""" return Series(self.tau_vec, self.time_stamps) @property def tau_range(self): """Range of tau values extended by 5%. Returns ------- tuple 2-element tuple, containing - float, tau_min: lower end of tau range - float, tau_max: upper end of tau range """ tau = self.tau_vec taumin, taumax = tau.min(), tau.max() if taumin > 0: taumin = 0 add = (taumax - taumin) * 0.05 return taumin - add, taumax + add @property def cd_range(self): """Range of DOAS cd values extended by 5%.""" cds = self.cd_vec cdmin, cdmax = cds.min(), cds.max() if cdmin > 0: cdmin = 0 add = (cdmax - cdmin) * 0.05 return (cdmin - add, cdmax + add)
[docs] def has_calib_data(self): """Check if calibration data is available.""" if not all([len(x) > 0 for x in [self.cd_vec, self.tau_vec]]): return False if not len(self.tau_vec) == len(self.cd_vec): return False return True
def _check_bounds(self, fun, bounds): """Check fit boundaries and sets inits default if necessary.""" sd = False nargs = self.num_optargs_fun(fun) try: if bounds is None: sd = True elif not len(bounds) == 2: sd = True elif not (len(x) == nargs for x in bounds): sd = True except BaseException: sd = True if sd: logger.info("Input bounds invalid, initiating default") bounds = (-ones(nargs) * inf, ones(nargs) * inf) return bounds
[docs] def fit_calib_data(self, calib_fun=None, guess=None, polyorder=None, weighted=True, weights_how="abs", through_origin=False, param_bounds=None, normalise_cds=False, plot=False): """Fit calibration polynomial to current data. The calibration data is fitted using a least squares optimisation. Be careful with cusomised optimisation functions that are not linear in all their optimisation parameters (especially, with using input argument :arg:`normalise_cds`). Parameters ---------- calib_fun : :obj:`function`, optional if specified, the current calibration function is updated guess : :obj:`list`, optional initial guess for optimisation (is only considerd) polyorder : :obj:`int`, optional if specified, the current polyorder is updated (only relevant for polynomial optimisation functions, i.e. if no custom calibration function has been provided) weighted : bool performs weighted fit based on DOAS errors in ``cd_vec_err`` (if available), defaults to True weights_how : str use "rel" if relative errors are supposed to be used (i.e. w=CD_sigma / CD) or "abs" if absolute error is supposed to be used (i.e. w=CD_sigma). through_origin : bool only relevant for polynomial fits (i.e. if no custom fit function has been provided). If True, the polynomial fit is forced to cross the coordinate origin param_bounds : tuple 2-element tuple containing two lists (or tuples) specifying lower (param_borders[0]) and upper (param_borders[1]) borders for the fit parameters. If unspecified (None), the borders will automatically be set to -/+ infinity normalise_cds : bool if True, the CD vector is normalised by its exponential magnitude before applying the fit. plot : bool If True, the calibration curve and the polynomial are plotted Returns ------- list list containing optimised parameters """ if weights_how not in ["rel", "abs"]: raise IOError("Invalid input for parameter weights_how:" "Use rel for relative errors or abs for absolute" "errors for calculation of weights") # is used in method calib_fun and only considered if a polynomial is # fitted (i.e. not considered if custom calibration function is # specified self.poly_through_origin = through_origin if not self.has_calib_data(): raise ValueError("Calibration data is not available") if isnum(polyorder): self.polyorder = polyorder try: self.calib_fun = calib_fun except BaseException: pass fun = self.calib_fun if sum(isnan(self.tau_vec)) + sum(isnan(self.cd_vec)) > 0: raise ValueError("Encountered nans in data") exp = exponent(self.cd_vec.max()) yerr = ones(len(self.cd_vec)) yerr_abs = True if weighted: if not len(self.cd_vec) == len(self.cd_vec_err): logger.warning("Could not perform weighted calibration fit: " "Length mismatch between CD data vector " "and corresponding error vector") elif sum(self.cd_vec_err) == 0: logger.warning("Could not perform weighted calibration fit: " "Values of DOAS fit errors are 0. Do you have pydoas " "installed?") else: try: if weights_how == "abs": yerr = (self.cd_vec_err / 10**exp if normalise_cds else self.cd_vec_err) else: yerr = self.cd_vec_err / self.cd_vec yerr_abs = False # ws = ws / max(ws) except BaseException: logger.warning("Failed to calculate weights") tau_vals = self.tau_vec if normalise_cds and callable(self._calib_fun): raise ValueError("Cannot use option normalise_cds with custom " "fit functions (only works with polynomials") cds = self.cd_vec / 10**exp if normalise_cds else self.cd_vec numargs = self.num_optargs_fun(fun) if guess is not None: if not len(guess) == numargs: raise ValueError("Number of entries in initial guess does " "not match the number of optimisation args " "of the current fit function") elif not callable(self._calib_fun): # calibration function is poly guess = ones(numargs) idx = argmax(cds) slope = cds[idx] / tau_vals[idx] if through_origin: guess[-1] = slope else: guess[-1] = min(cds) guess[-2] = slope bounds = self._check_bounds(fun, param_bounds) popt, cov = curve_fit(f=fun, xdata=tau_vals.astype(float64), ydata=cds.astype(float64), p0=guess, bounds=bounds, sigma=yerr.astype(float64), absolute_sigma=yerr_abs) self._calib_coeffs = popt * 10**exp if normalise_cds else popt self._cov = cov * 10**(2 * exp) if normalise_cds else cov self.residual = (self.calib_fun(self.tau_vec, *self.calib_coeffs) - self.cd_vec) if plot: self.plot() return self.calib_coeffs
def _prep_fits_save(self): """Prepare FITS HDU list for storing calibration data. Returns ------- HDUList hdu list containing sensitivity correction mask and table with calib data """ prim_hdu = fits.PrimaryHDU() prim_hdu.header["type"] = self.type prim_hdu.header["calib_id"] = self.calib_id prim_hdu.header.update(self.senscorr_mask.edit_log) try: mask = self.senscorr_mask.img except: mask = self.senscorr_mask prim_hdu.data = mask if not len(self.cd_vec) == len(self.tau_vec): raise ValueError("Could not save calibration data, mismatch in " " lengths of data arrays") if not len(self.time_stamps) == len(self.cd_vec): self.time_stamps = asarray([datetime(1900, 1, 1)] * len(self.cd_vec)) tstamps = [x.strftime("%Y%m%d%H%M%S%f") for x in self.time_stamps] col1 = fits.Column(name="time_stamps", format="25A", array=tstamps) col2 = fits.Column(name="tau_vec", format="D", array=self.tau_vec) col3 = fits.Column(name="cd_vec", format="D", array=self.cd_vec) if not len(self.cd_vec_err) == len(self.cd_vec): self.cd_vec_err = zeros(len(self.cd_vec)) col4 = fits.Column( name="cd_vec_err", format="D", array=self.cd_vec_err) cols = fits.ColDefs([col1, col2, col3, col4]) arrays = fits.BinTableHDU.from_columns(cols) return fits.HDUList([prim_hdu, arrays]) def _prep_fits_savepath(self, save_dir=None, save_name=None): save_dir = abspath(save_dir) if not isdir(save_dir): # save_dir is a file path save_name = basename(save_dir) save_dir = dirname(save_dir) if save_name is None: save_name = ("calib_type_%s_id_%s_%s_%s_%s.fts" % (self.type, self.calib_id, self.start.strftime("%Y%m%d"), self.start.strftime("%H%M"), self.stop.strftime("%H%M"))) else: save_name = save_name.split(".")[0] + ".fts" return join(save_dir, save_name)
[docs] def save_as_fits(self, save_dir=None, save_name=None, overwrite_existing=True): """Save calibration data as FITS file. Save all relevant information in an HDU list as FITS. The first HDU (:class:`PrimaryHDU`) contains the sensitivity correction mask (:attr:`senscorr_mask`) and the second HDU is of type :class:`BinTableHDU` and contains the calibration data, which contains the following 4 columns in the specified order: 1. :attr:`time_stamps` (as strings, format: %Y%m%d%H%M%S%f) 2. :attr:`tau_vec` 3. :attr:`cd_vec` 4. :attr:`cd_vec_err` Parameters ---------- save_dir : str save directory, if None, the current working directory is used save_name : str filename of the FITS file (if None, use pyplis default naming) overwrite_existing : bool if True, an existing calibration file with the same name will be overwritten """ hdulist = self._prep_fits_save() # returns abspath of current wkdir if None hdulist.writeto(self._prep_fits_savepath(save_dir, save_name), clobber=overwrite_existing)
[docs] def to_csv(self): """Store calibration data as tab delimited text file.""" raise NotImplementedError("Coming soon...")
[docs] def load_from_fits(self, file_path): """Load stack object (fits). Parameters ---------- file_path : str file path of calibration data Returns ------- HDUList opened HDU object (e.g. to access potential further data in a function that is calling this method) """ if not exists(file_path): raise IOError("CalibData could not be loaded, " "path does not exist") hdu = fits.open(file_path) self.senscorr_mask = Img(hdu[0].data) self.calib_id = hdu[0].header["calib_id"] self.type = hdu[0].header["type"] for key, val in six.iteritems(hdu[0].header): k = key.lower() if k in self.senscorr_mask.edit_log: self.senscorr_mask.edit_log[k] = val if self.senscorr_mask.is_cropped: logger.warning("Imported sensitivity correction mask is flagged as cropped " "and might not work on uncropped images") ctable = hdu[1] try: times = ctable.data["time_stamps"].byteswap().newbyteorder() self.time_stamps = [datetime.strptime(x, "%Y%m%d%H%M%S%f") for x in times] except BaseException: logger.warning("Failed to import vector containing calib time stamps from " "FITS") try: self.tau_vec = ctable.data["tau_vec"].byteswap().newbyteorder() except BaseException: logger.warning("Failed to import calibration tau vector from FITS") try: self.cd_vec = ctable.data["cd_vec"].byteswap().newbyteorder() except BaseException: logger.warning("Failed to import CD vector from FITS") try: self.cd_vec_err = ctable.data["cd_vec_err"].byteswap( ).newbyteorder() except BaseException: logger.warning("Failed to import CD uncertainty vector from FITS") if self.type == "base": hdu.close() hdu = None return hdu
[docs] def plot(self, add_label_str="", ax=None, **kwargs): """Plot calibration data and fit result. Parameters ---------- add_label_str : str additional string added to label of plots for legend ax : matplotlib axes object, if None, a new one is created """ if "color" not in kwargs: kwargs["color"] = "b" if ax is None: fig, ax = subplots(1, 1, figsize=(10, 8)) taumin, taumax = self.tau_range x = linspace(taumin, taumax, 100) cds = self.cd_vec ax.plot(self.tau_vec, cds, ls="", marker=".", label="Data %s" % add_label_str, **kwargs) try: ax.errorbar(self.tau_vec, cds, yerr=self.cd_vec_err, marker="None", ls=" ", c="#b3b3b3") except BaseException: logger.warning("No CD errors available") try: cds_calib = self.calib_fun(x, *self.calib_coeffs) ax.plot(x, cds_calib, ls="-", marker="", label="Fit result", **kwargs) except TypeError: logger.info("Calibration poly probably not fitted") ax.set_title("Calibration data, ID: %s" % self.calib_id) ax.set_ylabel(r"$S_{%s}$ [cm$^{-2}$]" % SPECIES_ID) ax.set_xlabel(r"$\tau_{%s}$" % self.calib_id) ax.grid() ax.legend(loc='best', fancybox=True, framealpha=0.7) return ax
[docs] def plot_calib_fun(self, add_label_str="", shift_yoffset=False, ax=None, **kwargs): """Plot calibration fit result. Parameters ---------- add_label_str : str additional string added to label of plots for legend shift_yoffset : bool if True, the data is plotted without y-offset ax : matplotlib axes object, if None, a new one is created """ if "color" not in kwargs: kwargs["color"] = "b" if ax is None: fig, ax = subplots(1, 1, figsize=(10, 8)) taumin, taumax = self.tau_range x = linspace(taumin, taumax, 100) if self.calib_coeffs is None: logger.warning("Calibration function not yet fitted, applying default fit" "(1. order polynomial)") self.fit_calib_data() cds_poly = self.calib_fun(x, *self.calib_coeffs) if shift_yoffset: try: cds_poly -= self.y_offset except BaseException: logger.warning("Failed to subtract y offset") try: ax.plot(x, cds_poly, ls="-", marker="", label="Fit result %s" % add_label_str, **kwargs) except TypeError: logger.info("Calibration poly probably not fitted") ax.grid() ax.legend(loc='best', fancybox=True, framealpha=0.7) return ax
[docs] def err(self): """Return standard deviation of fit residual.""" if self.residual is None: raise ValueError("Fit residual is not available, please call " "fit_calib_data first") elif len(self.residual) < 10: ValueError("Standard deviation of residual of calibration data is " " not rerpresentative for less than 10 calibration " "points") return self.residual.std()
[docs] def calibrate(self, value): """Apply calibration to input. Parameters ---------- value optical density (can be float, n-dimensional numpy array, :class:`Img`, :class:`ImgStack`) Returns ------- calibrated input """ # make sure calibration data and fit result are available try: self.calib_fun(0, *self.calib_coeffs) except BaseException: self.fit_calib_data() if isinstance(value, Img): vals = (self.calib_fun(value.img, *self.calib_coeffs) - self.y_offset) value.img = vals value.edit_log["gascalib"] = True return value elif isinstance(value, ImgStack): vals = (self.calib_fun(value.stack, *self.calib_coeffs) - self.y_offset) value.stack = vals value.img_prep["gascalib"] = True return value return (self.calib_fun(value, *self.calib_coeffs) - self.y_offset)
def __call__(self, value, **kwargs): """Define call function to apply calibration. Parameters ---------- value optical density, can be float or n-dimensional numpy array :param float value: tau or AA value :return: corresponding column density """ return self.calibrate(value)