Source code for pyplis.doascalib

"""Pyplis module for DOAS calibration including FOV search engines."""
from __future__ import (absolute_import, division)
from numpy import (min, arange, asarray, zeros, column_stack,
                   ones, nan, float64)
from scipy.stats.stats import pearsonr
from scipy.sparse.linalg import lsmr

from datetime import datetime
from pandas import Series
from copy import deepcopy
from import fits
from traceback import format_exc

import six

from pyplis import logger
from matplotlib.pyplot import subplots
from matplotlib.patches import Circle, Ellipse
from import RdBu
from matplotlib.dates import DateFormatter

from .glob import SPECIES_ID
from .helpers import (shifted_color_map, mesh_from_img, get_img_maximum,
                      sub_img_to_detector_coords, map_coordinates_sub_img,
                      exponent, rotate_xtick_labels)

from .optimisation import gauss_fit_2d, GAUSS_2D_PARAM_INFO
from .image import Img
from .inout import get_camera_info
from .setupclasses import Camera
from .calib_base import CalibData
from .helpers import make_circular_mask

[docs]class DoasCalibData(CalibData): """Class containing DOAS calibration data. Parameters ---------- tau_vec : ndarray tau data vector for calibration data cd_vec : ndarray DOAS-CD data vector for calibration data cd_vec_err : ndarray Fit errors of DOAS-CDs time_stamps : ndarray array with datetime objects containing time stamps (e.g. start acquisition) of calibration data calib_fun : function optimisation function used for fitting of calibration data calib_coeffs : ;obj:`list`, optional optimisation parameters for calibration curve. senscorr_mask : :obj:`ndarray`or :obj:`Img`, optional sensitivity correction mask that was normalised relative to the pixel position where the calibration data was retrieved (i.e. position of DOAS FOV in case of DOAS calibration data, or image pixel position, where cell calibration data was retrieved) calib_id : str calibration ID (e.g. "aa", "tau_on", "tau_off") camera : Camera camera object (not necessarily required). A camera can be assigned in order to convert the FOV extend from pixel coordinates into decimal degrees fov : DoasFOV information about position and shape of the FOV of the DOAS within the camera images """
[docs] def __init__(self, tau_vec=None, cd_vec=None, cd_vec_err=None, time_stamps=None, calib_fun=None, calib_coeffs=None, senscorr_mask=None, polyorder=1, calib_id="", camera=None, fov=None): super(DoasCalibData, self).__init__(tau_vec, cd_vec, cd_vec_err, time_stamps, calib_fun, calib_coeffs, senscorr_mask, polyorder, calib_id, camera) if tau_vec is None: tau_vec = [] if cd_vec is None: cd_vec = [] if cd_vec_err is None: cd_vec_err = [] if time_stamps is None: time_stamps = [] if calib_coeffs is None: calib_coeffs = [] self.type = "doas" if not isinstance(fov, DoasFOV): fov = DoasFOV(camera) self.fov = fov
[docs] def save_as_fits(self, save_dir=None, save_name=None, overwrite_existing=True): """Save calibration data as FITS file. Parameters ---------- save_dir : str save directory, if None, the current working directory is used save_name : str filename of the FITS file (if None, use pyplis default naming) """ # hdulist containing calibration data and senscorr_mask hdulist = self._prep_fits_save() # add DOAS FOV information (if applicable) hdulist.extend(self.fov.prep_hdulist()) # returns abspath of current wkdir if None hdulist.writeto(self._prep_fits_savepath(save_dir, save_name), clobber=overwrite_existing)
[docs] def load_from_fits(self, file_path): """Load stack object (fits). Parameters ---------- file_path : str file path of calibration data """ # loads senscorr_mask and calibration data (tau and cd vectors, # timestamps) hdu = super(DoasCalibData, self).load_from_fits(file_path) self.fov.import_from_hdulist(hdu, first_idx=2) hdu.close()
[docs] def plot_data_tseries_overlay(self, date_fmt=None, ax=None): """Plot overlay of tau and DOAS time series.""" if ax is None: fig, ax = subplots(1, 1) s1 = self.tau_tseries s2 = self.cd_tseries p1 = ax.plot(s1.index.to_pydatetime(), s1.values, "--xb", label=r"$\tau$") ax.set_ylabel("tau") ax2 = ax.twinx() p2 = ax2.plot(s2.index.to_pydatetime(), s2.values, "--xr", label="DOAS CDs") ax2.set_ylabel(r"$S_{%s}$ [cm$^{-2}$]" % SPECIES_ID) ax.set_title("Time series overlay DOAS calib data") try: if date_fmt is not None: ax.xaxis.set_major_formatter(DateFormatter(date_fmt)) except BaseException: pass ps = p1 + p2 labs = [l.get_label() for l in ps] ax.legend(ps, labs, loc="best", fancybox=True, framealpha=0.5) ax.grid() rotate_xtick_labels(ax) return (ax, ax2)
[docs]class DoasFOV(object): """Class for storage of FOV information."""
[docs] def __init__(self, camera=None): self.search_settings = {} self.img_prep = {} self.roi_abs = None self.img_shape_orig = None = None self.start_search = datetime(1900, 1, 1) self.stop_search = datetime(1900, 1, 1) self.corr_img = None self.fov_mask_rel = None self.result_pearson = {"cx_rel": nan, "cy_rel": nan, "rad_rel": nan, "corr_curve": None} self.result_ifr = {"popt": None, "pcov": None} if isinstance(camera, Camera): = camera self.img_shape_orig = (camera.pixnum_y, camera.pixnum_x)
@property def method(self): """Return search method.""" try: return self.search_settings["method"] except KeyError: raise ValueError("No information about FOV search available") @property def pyrlevel(self): """Return pyramide level at which FOV search was performed.""" try: return self.img_prep["pyrlevel"] except KeyError: raise KeyError("Image preparation data is not available: %s" % format_exc()) @property def cx_rel(self): """Return center x coordinate of FOV (in relative coords).""" if self.method == "ifr": return self.result_ifr["popt"][1] else: return self.result_pearson["cx_rel"] @property def cy_rel(self): """Return center x coordinate of FOV (in relative coords).""" if self.method == "ifr": return self.result_ifr["popt"][2] else: return self.result_pearson["cy_rel"] @property def radius_rel(self): """Return radius of FOV (in relative coords). :raises: TypeError if method == "ifr" """ if self.method == "ifr": raise TypeError("Invalid value: method IFR does not have FOV " "parameter radius, call self.popt for relevant " "parameters") return self.result_pearson["rad_rel"] @property def popt(self): """Return super gauss optimisation parameters (in relative coords). :raises: TypeError if method == "pearson" """ if self.method == "pearson": raise TypeError("Invalid value: method pearson does not have " "FOV shape parameters, call self.radius to " "retrieve disk radius") return self.result_ifr["popt"] @property def x_abs(self): return self.pos_abs[0] @property def y_abs(self): return self.pos_abs[1] @property def sigma_x_abs(self): if self.method == "pearson": return self.radius_rel * 2**self.pyrlevel return self.popt[3] * 2**self.pyrlevel @property def sigma_y_abs(self): if self.method == "pearson": return self.radius_rel * 2**self.pyrlevel return (self.popt[3] / self.popt[4]) * 2 ** self.pyrlevel @property def pos_abs(self): """Return center coordinates of FOV (in absolute detector coords).""" return self.pixel_position_center(True) def _max_extend_rel(self): """Return maximum pixel extend of FOV. For method pearson this is the radius (trivial), for an elliptical super gauss (i.e. method IFR) this is the longer axis """ if self.method == "pearson": return self.radius_rel else: return max([self.popt[3], self.popt[3] / self.popt[4]])
[docs] def pixel_extend(self, abs_coords=True): """Return pixel extend of FOV on image. :param bool abs_coords: return value in absolute or relative coordinates (considering pyrlevel and roi) """ ext_rel = self._max_extend_rel() if not abs_coords: return ext_rel return ext_rel * 2**self.pyrlevel
[docs] def pixel_position_center(self, abs_coords=False): """Return pixel position of center of FOV. :param bool abs_coords: return position in absolute or relative coordinates (considering pyrlevel and roi) :return: - tuple, ``(cx, cy)`` """ try: cx, cy = self.cx_rel, self.cy_rel except BaseException: logger.warning("Could not access information about FOV position") if not abs_coords: return (cx, cy) return map_coordinates_sub_img(cx, cy, self.roi_abs, self.pyrlevel, inverse=True)
[docs] def fov_mask_abs(self, img_shape_orig=(), cam_id=""): """Convert the FOV mask to absolute detector coordinates. The shape of the FOV mask (and the represented pixel coordinates) depends on the image preparation settings of the :class:`ImgStack` object which was used to identify the FOV. Parameters ---------- img_shape_orig : tuple image shape of original image data (can be extracted from an unedited image) cam_id : str string ID of pyplis default camera (e.g. "ecII") """ if not len(img_shape_orig) == 2: try: info = get_camera_info(cam_id) img_shape_orig = (int(info["pixnum_y"]), int(info["pixnum_x"])) except BaseException: raise IOError("Image shape could not be retrieved...") mask = self.fov_mask_rel.astype(float64) return sub_img_to_detector_coords(mask, img_shape_orig, self.pyrlevel, self.roi_abs).astype(bool)
# ============================================================================== # # def fov_mask(self, abs_coords = False): # """Returns FOV mask for data access # # :param bool abs_coords: if False, mask is created in stack # coordinates (i.e. corresponding to ROI and pyrlevel of stack). # If True, the FOV parameters are converted into absolute # detector coordinates such that they can be used for original # images. # # """ # raise NotImplementedError # =============================================================================
[docs] def import_from_hdulist(self, hdu, first_idx=0): """Import FOV information from FITS HDU list. Parameters ---------- hdu : HDUList HDU list containing a list of HDUs created using :func:`prep_hdulist` starting at index :param:`first_idx` (e.g. first_idx==2 if the method :func:`save_as_fits` from the :class:`DoasCalibData` class is used, since the first 2 indices are used for saving the acutal calibration data) first_idx : int index specifying the first entry of the FOV info in the provided HDU list """ i = first_idx try: self.fov_mask_rel = hdu[i].data.byteswap().newbyteorder() except BaseException:"(Warning loading DOAS calib data): FOV mask not " "available") prep_keys = Img().edit_log.keys() search_keys = DoasFOVEngine()._settings.keys() for key, val in six.iteritems(hdu[i].header): k = key.lower() if k in prep_keys: self.img_prep[k] = val elif k in search_keys: self.search_settings[k] = val elif k in self.result_pearson.keys(): self.result_pearson[k] = val try: self.corr_img = Img(hdu[i + 1].data.byteswap().newbyteorder()) except BaseException:"(Warning loading DOAS calib data): FOV search correlation " "image not available") self.roi_abs = hdu[i + 2].data["roi"].byteswap().newbyteorder() try: self.result_ifr["popt"] =\ hdu[i + 3].data["ifr_res"].byteswap().newbyteorder() except BaseException:"Failed to import array containing IFR optimisation " " results from FOV search")
[docs] def prep_hdulist(self): """Prepare and return :class:`HDUList` object for saving as FITS.""" fov_mask = fits.ImageHDU(self.fov_mask_rel) fov_mask.header.update(self.img_prep) fov_mask.header.update(self.search_settings) ifr_res = [] if self.method == "pearson": rd = self.result_pearson try: fov_mask.header.update(cx_rel=rd["cx_rel"], cy_rel=rd["cy_rel"], rad_rel=rd["rad_rel"]) except BaseException: logger.warning("Position of FOV (pearson method) not available") elif self.method == "ifr": ifr_res = self.result_ifr["popt"] try: hdu_cim = fits.ImageHDU(data=self.corr_img.img) except BaseException: hdu_cim = fits.ImageHDU() logger.warning("FOV search correlation image not available") roi = fits.BinTableHDU.from_columns([fits.Column(name="roi", format="I", array=self.roi_abs)]) col_ifr = fits.Column(name="ifr_res", format="D", array=ifr_res) res_ifr = fits.BinTableHDU.from_columns([col_ifr]) return fits.HDUList([fov_mask, hdu_cim, roi, res_ifr])
[docs] def save_as_fits(self, **kwargs): """Save the fov as fits file. Saves this object as DoasCalibData:: d = DoasCalibData(fov = self) d.save_as_fits(**kwargs) """ d = DoasCalibData(fov=self) d.save_as_fits(**kwargs)
def __str__(self): s = "DoasFOV information\n------------------------\n" s += "\nImg stack preparation settings\n............................\n" for k, v in six.iteritems(self.img_prep): s += "%s: %s\n" % (k, v) s += "\nFOV search settings\n............................\n" for k, v in six.iteritems(self.search_settings): s += "%s: %s\n" % (k, v) if self.method == "ifr": s += "\nIFR search results \n.........................\n" s += "\nSuper gauss fit optimised params\n" popt = self.popt for k in range(len(popt)): s += "%s: %.3f\n" % (GAUSS_2D_PARAM_INFO[k], popt[k]) elif self.method == "pearson": s += "\nPearson search results \n.......................\n" for k, v in six.iteritems(self.result_pearson): if not k == "corr_curve": s += "%s: %s\n" % (k, v) return s
[docs] def plot(self, ax=None): """Draw the current FOV position into the current correlation img.""" if ax is None: fig, ax = subplots(1, 1, figsize=(12, 8)) else: fig = ax.figure img = self.corr_img.img vmin, vmax = img.min(), img.max() cmap = shifted_color_map(vmin, vmax, cmap=RdBu) h, w = img.shape disp = ax.imshow(img, vmin=vmin, vmax=vmax, cmap=cmap) cb = fig.colorbar(disp, ax=ax, shrink=0.9) cx, cy = self.pixel_position_center(1) if self.method == "ifr": popt = self.popt cb.set_label(r"FOV fraction [$10^{-2}$ pixel$^{-1}$]") xgrid, ygrid = mesh_from_img(img) if len(popt) == 7: ell = Ellipse(xy=(popt[1], popt[2]), width=popt[3], height=popt[3] / popt[4], color="k", lw=2, fc="lime", alpha=.5) else: ell = Ellipse(xy=(popt[1], popt[2]), width=popt[3], height=popt[3] / popt[4], angle=popt[7], color="k", lw=2, fc="lime", alpha=.5) ax.add_artist(ell) ax.axhline(self.cy_rel, ls="--", color="k") ax.axvline(self.cx_rel, ls="--", color="k") ax.get_xaxis().set_ticks([0, self.cx_rel, w]) ax.get_yaxis().set_ticks([0, self.cy_rel, h]) # ax.set_axis_off() ax.set_title(r"Corr img (IFR), pos abs (x,y): (%d, %d), " "lambda=%.1e" % (cx, cy, self.search_settings["ifrlbda"])) elif self.method == "pearson": cb.set_label(r"Pearson corr. coeff.") ax.autoscale(False) c = Circle((self.cx_rel, self.cy_rel), self.radius_rel, ec="k", lw=2, fc="lime", alpha=.5) ax.add_artist(c) ax.set_title("Corr img (pearson), pos abs (x,y): (%d, %d)" % (cx, cy)) ax.get_xaxis().set_ticks([0, self.cx_rel, w]) ax.get_yaxis().set_ticks([0, self.cy_rel, h]) ax.axhline(self.cy_rel, ls="--", color="k") ax.axvline(self.cx_rel, ls="--", color="k") ax.set_xlabel("Pixel row") ax.set_ylabel("Pixel column") return ax
[docs]class DoasFOVEngine(object): """Engine to perform DOAS FOV search."""
[docs] def __init__(self, img_stack=None, doas_series=None, method="pearson", **settings): self._settings = {"method": "pearson", "maxrad": 80, "ifrlbda": 1e-6, # lambda val IFR "g2dasym": True, # elliptic FOV "g2dsuper": True, # super gauss fit (IFR) "g2dcrop": True, "g2dtilt": False, "blur": 4, "mergeopt": "average"} self.data_merged = False self.img_stack = img_stack self.doas_series = doas_series self.calib_data = DoasCalibData() # includes DoasFOV class self.update_search_settings(**settings) self.method = method
@property def maxrad(self): """For Pearson method: maximum expected disk radius of FOV. Note ---- this radius is considered independent of the current pyramid level of the image stack, hence, if it is set 20 and the pyramid level of the stack is 2, then, the FOV disk radius (in detector coords) may be 80. """ return self._settings["maxrad"] @maxrad.setter def maxrad(self, val):"Updating seeting for maximum radius of FOV, new value: %s" % val) self._settings["maxrad"] = int(val) @property def ifrlbda(self): """For IFR method: allow asymmetric 2d gauss fit.""" return self._settings["ifrlbda"] @ifrlbda.setter def ifrlbda(self, val): self._settings["ifrlbda"] = val @property def g2dasym(self): """For IFR method: allow asymmetric 2d gauss fit.""" return self._settings["g2dasym"] @g2dasym.setter def g2dasym(self, val): if val not in [True, False]: raise ValueError("Invalid input value: require boolean") self._settings["g2dasym"] = val @property def g2dsuper(self): """For IFR method: use supergauss parametrisation.""" return self._settings["g2dsuper"] @g2dsuper.setter def g2dsuper(self, val): if val not in [True, False]: raise ValueError("Invalid input value: require boolean") self._settings["g2dsuper"] = val @property def g2dcrop(self): """For IFR method: crop gaussian FOV parametrisation at sigma.""" return self._settings["g2dcrop"] @g2dcrop.setter def g2dcrop(self, val): if val not in [True, False]: raise ValueError("Invalid input value: require boolean") self._settings["g2dcrop"] = val @property def g2dtilt(self): """For IFR method: allow supergauss-fit to be tilted.""" return self._settings["g2dtilt"] @g2dtilt.setter def g2dtilt(self, val): if val not in [True, False]: raise ValueError("Invalid input value: require boolean") self._settings["g2dtilt"] = val @property def blur(self): """Sigma of gaussian blurring filter applied to correlation image. The filter is applied to the correlation image before finding the position of the maximum correlation. This is only relevant for method IFR, since this method parameterises the FOV by fitting a 2D Gaussian to the correlation image. Defaults to 4. """ return self._settings["blur"] @blur.setter def blur(self, val): self._settings["blur"] = val @property def mergeopt(self): """Option for temporal merging of stack and DOAS vector. Choose from ``average, nearest, interpolation`` """ return self._settings["mergeopt"] @mergeopt.setter def mergeopt(self, val): if val not in ["average", "nearest", "interpolation"]: raise ValueError("Invalid method: choose from average, " "nearest or interpolation") self._settings["mergeopt"] = val @property def method(self): """Return method used for FOV search (e.g. pearson, ifr).""" return self._settings["method"] @method.setter def method(self, val): if val not in ["pearson", "ifr"]: raise ValueError("Invalid method: choose from pearson or ifr") self._settings["method"] = val
[docs] def update_search_settings(self, **settings): """Update current search settings. :param **settings: keyword args to be updated (only valid keys will be updated) """ for k, v in six.iteritems(settings): if k in self._settings:"Updating FOV search setting %s, new value: %s" % (k, v)) self._settings[k] = v
@property def doas_data_vec(self): """Return DOAS CD vector (values of ``self.doas_series``).""" return self.doas_series.values @property def method(self): """Return current FOV search method.""" return self._settings["method"] @method.setter def method(self, value): """Return current FOV search method.""" if value not in ["ifr", "pearson"]: raise ValueError("Invalid search method: choose from ifr or" " pearson") self._settings["method"] = value
[docs] def merge_data(self, merge_type=None): """Merge stack data and DOAS vector in time. Wrapper for :func:`merge_with_time_series` of :class:`ImgStack` :param str merge_type: choose between ``average, interpolation, nearest`` Note ---- Current data (i.e. ``self.img_stack`` and ``self.doas_series``) will be overwritten if merging succeeds. Parameters ---------- merge_type : :obj:`str`, optional, one of the available merge types, see :attr:`mergeopt` for valid options Raises ------ RuntimeError if merging of data fails """ if self.data_merged:"Data merging unncessary, img stack and DOAS vector are " "already merged in time") return if merge_type is None: merge_type = self._settings["mergeopt"] new_stack, new_doas_series = self.img_stack.merge_with_time_series( self.doas_series, method=merge_type) if len(new_doas_series) == new_stack.shape[0]: self.img_stack = new_stack self.doas_series = new_doas_series self._settings["mergeopt"] = merge_type self.data_merged = True return raise RuntimeError("Temporal merging of image and DOAS data failed...")
[docs] def det_correlation_image(self, search_type="pearson", **kwargs): """Determine correlation image. Determines correlation image either using IFR or Pearson method. Results are written into ``self.calib_data.fov`` (:class:`DoasFOV`) :param str search_type: updates current search type, available types ``["pearson", "ifr"]`` """ if not self.img_stack.shape[0] == len(self.doas_series): raise ValueError("DOAS correlation image object could not be " "determined: inconsistent array lengths, please " "perform timemerging first") self.update_search_settings(method=search_type, **kwargs) if search_type == "pearson": corr_img, _ = self._det_correlation_image_pearson( **self._settings) elif search_type == "ifr": corr_img, _ = self._det_correlation_image_ifr_lsmr( **self._settings) else: raise ValueError("Invalid search type %s: choose from " "pearson or ifr" % search_type) corr_img = Img(corr_img, pyrlevel=self.img_stack.img_prep["pyrlevel"]) corr_img.add_gaussian_blurring(self._settings["blur"]) # corr_img.pyr_up(self.img_stack.img_prep["pyrlevel"]) self.calib_data.fov.corr_img = corr_img self.calib_data.fov.img_prep = self.img_stack.img_prep self.calib_data.fov.roi_abs = self.img_stack.roi_abs self.calib_data.fov.start_search = self.img_stack.start self.calib_data.fov.stop_search = self.img_stack.stop try: if self.img_stack.img_prep["is_aa"]: cid = "AA" else: raise Exception except: cid = self.img_stack.stack_id self.calib_data.calib_id = cid return corr_img
def _det_correlation_image_pearson(self, **kwargs): """Determine correlation image based on pearson correlation. :returns: - correlation image (pix wise value of pearson corr coeff) """ h, w = self.img_stack.shape[1:] corr_img = zeros((h, w), dtype=float64) corr_img_err = zeros((h, w), dtype=float64) cd_vec = self.doas_series.values exp = int(10**exponent(h) / 4.0) for i in range(h): try: if i % exp == 0:"FOV search: current img row (y): " + str(i)) except BaseException: pass for j in range(w): # get series from stack at current pixel corr_img[i, j], corr_img_err[i, j] = pearsonr( self.img_stack.stack[:, i, j], cd_vec) self._settings["method"] = "pearson" return corr_img, corr_img_err def _det_correlation_image_ifr_lsmr(self, ifrlbda=1e-6, **kwargs): """Apply LSMR algorithm to identify the FOV. :param float ifrlbda: tolerance parameter lambda """ # some input data size checking (m,) = self.doas_data_vec.shape (m2, ny, nx) = self.img_stack.shape if m != m2: raise ValueError("Inconsistent array lengths, please perform time " "merging of image stack and doas vector first") # construct H-matrix through reshaping image stack # h_matrix = transpose(self.img_stack.stack, (2,0,1)).reshape(m, nx*ny) h_matrix = self.img_stack.stack.reshape(m, nx * ny) # and one-vector h_vec = ones((m, 1), dtype=h_matrix.dtype) # and stacking in the end h = column_stack((h_vec, h_matrix)) # solve using LSMR regularisation a = lsmr(h, self.doas_data_vec, atol=ifrlbda, btol=ifrlbda) c = a[0] # separate offset and image lsmr_offset = c[0] lsmr_image = c[1:].reshape(ny, nx) / max(c[1:]) # THIS NORMALISATION IS NEW # lsmr_image = lsmr_image / abs(lsmr_image).max() self._settings["method"] = "ifr" self._settings["ifrlbda"] = ifrlbda return lsmr_image, lsmr_offset
[docs] def get_fov_shape(self, **settings): """Find shape of FOV based on correlation image. Search pixel coordinate of highest correlation in ``self.calib_data.fov.corr_img`` (using :func:`get_img_maximum`) and based on that finds FOV shape either using disk approach (if ``self.method == 'pearson'``) calling :func:`fov_radius_search` or using 2D Gauss fit (if ``self.method == 'ifr'``) calling :func:`fov_gauss_fit`. Results are written into ``self.calib_data.fov`` (:class:`DoasFOV` object) :param **settings: update current settings (keyword args passed to :func:`update_search_settings`) """ if not isinstance(self.calib_data.fov.corr_img, Img): raise Exception("Could not access correlation image") if self.method == "pearson": cy, cx = get_img_maximum(self.calib_data.fov.corr_img.img)"Start radius search in stack around x/y: %s/%s" % (cx, cy)) (radius, corr_curve, tau_vec, cd_vec, fov_mask) = self.fov_radius_search(cx, cy) if not radius > 0: raise ValueError("Pearson FOV search failed") self.calib_data.fov.result_pearson["cx_rel"] = cx self.calib_data.fov.result_pearson["cy_rel"] = cy self.calib_data.fov.result_pearson["rad_rel"] = radius self.calib_data.fov.result_pearson["corr_curve"] = corr_curve self.calib_data.fov.fov_mask_rel = fov_mask self.calib_data.tau_vec = tau_vec.astype(float64) self.calib_data.cd_vec = cd_vec.astype(float64) try: self.calib_data.cd_vec_err = self.doas_series.fit_errs except BaseException: pass self.calib_data.time_stamps = self.img_stack.time_stamps return elif self.method == "ifr": # the fit is performed in absolute dectector coordinates # corr_img_abs = Img(self.fov.corr_img.img).pyr_up(pyrlevel).img popt, pcov, fov_mask = self._fov_gauss_fit( self.calib_data.fov.corr_img, **self._settings) tau_vec = self.convolve_stack_fov(fov_mask) self.calib_data.fov.result_ifr["popt"] = popt self.calib_data.fov.result_ifr["pcov"] = pcov self.calib_data.fov.fov_mask_rel = fov_mask self.calib_data.tau_vec = tau_vec self.calib_data.cd_vec = self.doas_data_vec try: self.calib_data.cd_vec_err = self.doas_series.fit_errs except BaseException: pass self.calib_data.time_stamps = self.img_stack.time_stamps else: raise ValueError("Invalid search method...")
# define IFR model function (Super-Gaussian) def _fov_gauss_fit(self, corr_img, g2dasym=True, g2dsuper=True, g2dcrop=True, g2dtilt=False, blur=4, **kwargs): """Apply 2D gauss fit to correlation image. Parameters ---------- corr_img : Img correlation image g2dasym : bool allow for assymetric shape (sigmax != sigmay), True g2dsuper: bool allow for supergauss fit, True g2dcrop : bool if True, set outside (1/e amplitude) datapoints = 0, True g2dtilt : bool allow gauss to be tilted with respect to x/y axis blur : int width of gaussian smoothing kernel convolved with correlation image in order to identify position of maximum Returns ------- tuple 3-element tuple containing - array (popt): optimised multi-gauss parameters - 2d array (pcov): estimated covariance of popt - 2d array: correlation image """ img = corr_img.img h, w = img.shape xgrid, ygrid = mesh_from_img(img) # apply maximum of filtered image to initialise 2D gaussian fit (cy, cx) = get_img_maximum(img) maxrad = self.maxrad * 2**(-corr_img.pyrlevel) mask = make_circular_mask(h, w, cx, cy, maxrad).astype(float) img = img * mask # constrain fit, if requested (popt, pcov, fov_mask) = gauss_fit_2d(img, cx, cy, g2dasym, g2d_super_gauss=g2dsuper, g2d_crop=g2dcrop, g2d_tilt=g2dtilt, **kwargs) return (popt, pcov, fov_mask) # function convolving the image stack with the obtained FOV distribution
[docs] def convolve_stack_fov(self, fov_mask): """Normalize fov image and convolve stack. :returns: - stack time series vector within FOV """ # normalize fov_mask normsum = fov_mask.sum() fov_mask_norm = fov_mask / normsum # convolve with image stack # stack_data_conv = transpose(self.stac, (2,0,1)) * fov_fitted_norm stack_data_conv = self.img_stack.stack * fov_mask_norm return stack_data_conv.sum((1, 2))
