Source code for pyplis.cellcalib

# -*- coding: utf-8 -*-
#
# Pyplis is a Python library for the analysis of UV SO2 camera data
# Copyright (C) 2017 Jonas Gliss (jonasgliss@gmail.com)
#
# This program is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License a
# published by the Free Software Foundation, either version 3 of
# the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Pyplis module containing features related to cell calibration."""
from __future__ import (absolute_import, division)
import six
from pyplis import logger, print_log
from matplotlib.pyplot import subplots

from numpy import (float, log, arange, linspace, isnan, diff, mean, argmin,
                   asarray)
from matplotlib.pyplot import Figure
from matplotlib.cm import get_cmap
from datetime import timedelta
from os.path import exists
from collections import OrderedDict as od
from .dataset import Dataset
from .setupclasses import MeasSetup
from .processing import ImgStack, PixelMeanTimeSeries
from .imagelists import ImgList, CellImgList
from .exceptions import CellSearchError, ImgMetaError
from .image import Img
from .helpers import subimg_shape, isnum
from .optimisation import PolySurfaceFit
from .calib_base import CalibData
from .glob import SPECIES_ID


[docs]class CellSearchInfo(object): """Class for for storage cell search from automatic cell search engine. Parameters ---------- filter_id : str string ID of filter / Image type add_id : str additional identifier (e.g. "bg", "cell1") y_max : float a reference intensity """
[docs] def __init__(self, filter_id, add_id, y_max): self.filter_id = filter_id self.add_id = add_id # self.abbr = None self.y_max = y_max self.mean_vals = [] self.mean_vals_err = [] self.file_paths = [] self.start_acq = [] self.texps = [] self.img_list = None
@property def start(self): """Get time stamp of first detected image. Returns ------- datetime First time stamp, i.e. ``start_acq[0]`` Raises ------ IndexError if ``start_acq`` is empty """ return self.start_acq[0] @property def stop(self): """Get time stamp of last detected image. Returns ------- datetime Last time stamp, i.e. ``self.start_acq[-1]`` Raises ------ IndexError if ``start_acq`` is empty """ return self.start_acq[-1] @property def tot_num(self): """Get total number of detected images. Returns ------- int length of ``self.mean_vals`` """ return len(self.mean_vals)
[docs] def from_img_list(self, img_list): """Fill values using all images from a specific image list. Note ---- Old beta version, not tested, currently not in use Parameters ---------- img_list : ImgList image list from which pixel mean value time series is supposed to be determined """ if not isinstance(img_list, ImgList): raise TypeError("Wrong input type") dat = img_list.get_mean_value() self.file_paths = img_list.files self.mean_vals = asarray(dat.values) self.mean_vals_err = asarray(dat.std) self.start_acq = asarray(dat.index) self.y_max = max(dat.values)
@property def mean_err(self): """Return average std of mean value time series. Returns ------- float Error of mean value """ return mean(self.mean_vals_err) @property def mid_point_val(self): """Get mean value of middle image of the time series. Returns ------- float Mean intensity of middle image """ num = len(self.mean_vals) if num < 1: raise Exception("No data available in CellSearchInfo") elif num == 1: return self.mean_vals[0] elif num == 2: return mean(self.mean_vals) else: mid_index = int(num / 2) return self.mean_vals[mid_index]
[docs] def point_ok(self, idx): """Check data point at given index. Checks if intensity value at given index is within acceptance intensity range with respect to the middle intensity value of the time series Parameters ---------- idx : int index of datapoint Returns ------- bool True, if ok, False if not """ try: val = self.mean_vals[idx] if abs(self.mid_point_val - val) < self.mean_err: return True return False except IndexError as e: logger.warning(repr(e)) return False except BaseException: raise
[docs] def create_image_list(self, camera): """Create image list containing all valid cell images. Creates a :class:`CellImgList` which includes all detected data points that fulfill condition :func:`point_ok`. Note ---- If successful, the list is assigned to :attr:`img_list` Parameters ---------- camera : Camera the camera used Returns ------- CellImgList image list containing all (valid) cell images """ lst = CellImgList(list_id=self.filter_id, cell_id=self.add_id, camera=camera) paths = self.file_paths for idx in range(len(self.mean_vals)): if self.point_ok(idx): lst.files.append(paths[idx]) lst.init_filelist() self.img_list = lst if lst.nof < 1: raise CellSearchError("No suitable %s images found on creation of " "image list for cell %s" % (self.filter_id, self.add_id)) logger.info("Succesfully created image list %s for cell with ID %s from " "cell search results" % (self.filter_id, self.add_id)) return lst
@property def offs(self): """Get array containing offset values. The offset values are determined from :attr:`mean_vals`` with respect to :attr:`y_max`. Returns ------- ndarray array containing offset values for each intensity in :attr:`mean_vals` """ return self.y_max - self.mean_vals
[docs]class CellAutoSearchResults(object): """Helper class collecting results from auto-cell detection algorithm. This object is included in :class:`CellCalibEngine` object and will be filled with :class:`CellSearchInfo` objects if the cell autodetection is used (:func:`find_cells`) Note ---- This class is normally not intended to be used directly Attributes ---------- cell_info : OrderedDict Ordered dictionary containing dictionaries for all filter_ids for which cell search was performed (e.g. on, off). These dictionaries contain :class:`CellSearchInfo` ordered based on the acq. time of the detected cell dip bg_info : OrderedDict Ordered dictionary containing :class:`CellSearchInfo` objects that include detected background images for each filter (e.g. on off) rest_info : OrderedDict Ordered dictionary containing all images for each filter (e.g. on off) that could not be identified as Cell or BG image """
[docs] def __init__(self): self.cell_info = od() self.bg_info = od() self.rest_info = od()
[docs] def add_cell_search_result(self, filter_id, cell_info, bg_info, rest_info): """Add a collection of :class:`CellSearchInfo` objects. Parameters ---------- filter_id : str image type ID (e.g. on, off) cell_info : dictlike dictonary containing :class:`CellSearchInfo` objects containing information about images belonging to one cell bg_info : CellSearchInfo object containing information about detected background images for image type specified by ``filter_id`` rest_info : CellSearchInfo object containing information about images that could not be assigned to a detected cell nor to the background images """ self.cell_info[filter_id] = od() for cell_id, res in six.iteritems(cell_info): if isinstance(res, CellSearchInfo): self.cell_info[filter_id][cell_id] = res if isinstance(bg_info, CellSearchInfo): self.bg_info[filter_id] = bg_info if isinstance(rest_info, CellSearchInfo): self.rest_info[filter_id] = rest_info
[docs]class CellCalibData(CalibData): """Class representing cell calibration data. This class inherits from the :class:`CalibData` base class. Parameters ---------- tau_vec : ndarray tau data vector for calibration data cd_vec : ndarray DOAS-CD data vector for calibration data cd_vec_err : ndarray Fit errors of DOAS-CDs time_stamps : ndarray array with datetime objects containing time stamps (e.g. start acquisition) of calibration data calib_fun : function optimisation function used for fitting of calibration data calib_coeffs : ;obj:`list`, optional optimisation parameters for calibration curve. senscorr_mask : :obj:`ndarray`or :obj:`Img`, optional sensitivity correction mask that was normalised relative to the pixel position where the calibration data was retrieved (i.e. position of DOAS FOV in case of DOAS calibration data, or image pixel position, where cell calibration data was retrieved) calib_id : str calibration ID (e.g. "aa", "tau_on", "tau_off") camera : Camera camera object (not necessarily required). A camera can be assigned in order to convert the FOV extend from pixel coordinates into decimal degrees pos_x_abs : int x-position of image pixel for which the data was retrieved pos_y_abs : int y-position of image pixel for which the data was retrieved """
[docs] def __init__(self, tau_vec=None, cd_vec=None, cd_vec_err=None, time_stamps=None, calib_fun=None, calib_coeffs=None, senscorr_mask=None, polyorder=1, calib_id="", camera=None, pos_x_abs=None, pos_y_abs=None): super(CellCalibData, self).__init__(tau_vec, cd_vec, cd_vec_err, time_stamps, calib_fun, calib_coeffs, senscorr_mask, polyorder, calib_id, camera) if tau_vec is None: tau_vec = [] if cd_vec is None: cd_vec = [] if cd_vec_err is None: cd_vec_err = [] if time_stamps is None: time_stamps = [] if calib_coeffs is None: calib_coeffs = [] self.type = "cell" self.pos_x_abs = pos_x_abs self.pos_y_abs = pos_y_abs
def _prep_fits_save(self): """Prepare FITS HDU list for storing calibration data. Note ---- See baseclass :class:`CalibData` for more detailed information Returns ------- HDUList hdu list containing sensitivity correction mask and table with calib data """ hdu = super(CellCalibData, self)._prep_fits_save() hdu[0].header["pos_x_abs"] = self.pos_x_abs hdu[0].header["pos_y_abs"] = self.pos_y_abs return hdu
[docs] def load_from_fits(self, file_path): """Load calibration data from FITS file. Note ---- See methods of base class :func:`pyplis.calib_base.CalibData.save_as_fits` and :func:`pyplis.calib_base.CalibData.load_from_fits` for more details Parameter --------- file_path : str path of FITS file """ hdu = super(CellCalibData, self).load_from_fits(file_path) self.pos_x_abs = hdu[0].header["pos_x_abs"] self.pos_y_abs = hdu[0].header["pos_y_abs"] hdu.close()
[docs]class CellCalibEngine(Dataset): """Class for performing automatic cell calibration. This class is designed to define datasets related to time windows, where cell calibration was performed, i.e. the camera pointing into a gas (and cloud) free area of the sky with a number of calibration cells are put in front of the lense consecutively (ideally, the cells should cover the whole FOV of the camera in order to be able to retrieve calibration polynomials for each image pixel individually). Individual time windows for each cell are extracted by analysing the time series of pixel mean intensities for all images that fall into the start / stop interval. Cells can be identified by dips of decreased intensities in the time series. The individual cells are then assigned automatically based on the depth of each dip (in the on band) and the column densities of the cells used (the latter need to be provided). Is initialised as :class:`pyplis.Datasets.Dataset` object, i.e. normal setup is like plume data using a :class:`MeasSetup` object (make sure that ``cell_info_dict`` is set in the setup class). Parameters ---------- setup : MeasSetup see :class:`Dataset` for details init : bool if True, the image lists are initiated and filled (if possible) """
[docs] def __init__(self, setup=None, init=True): logger.info('\n') logger.info("INIT CALIB DATASET OBJECT") logger.info('\n') super(CellCalibEngine, self).__init__(setup, lst_type=CellImgList, init=init) self.type = "cell_calib" self.cell_search_performed = False self._cell_info_auto_search = od() if isinstance(self.setup, MeasSetup): self.set_cell_info_dict_autosearch(self.setup.cell_info_dict) self.cell_lists = od() self.bg_lists = od() self.search_results = CellAutoSearchResults() self.pix_mean_tseries = od() self.bg_tseries = od() self.tau_stacks = od() self.calib_data = od() logger.info('\n') logger.info("FILELISTS IN CALIB DATASET OBJECT INITIALISED") logger.info('\n')
@property def cell_lists_ready(self): """Call :func:`check_all_lists``.""" return self.check_all_lists()
[docs] def set_cell_images(self, img_paths, cell_gas_cd, cell_id, filter_id, img_import_method=None): """Add cell images corresponding to one cell and image type. Creates :class:`CellImgList` containing input cell images and adds them calling :func:`add_cell_img_list` Parameters ---------- img_paths : list list containing image file paths (can also be a single image path) cell_gas_cd : float column amount of gas in cell cell_id : str string identification of cell filter_id : str filter ID for images (e.g. "on", "off") """ try: # input is not a list but a valid path to (hopefully) an image if exists(img_paths): img_paths = [img_paths, ] except BaseException: pass paths = [p for p in img_paths if exists(p)] if not len(paths) > 0: raise TypeError("No valid filepaths could be identified") lst = CellImgList(files=paths, list_id=filter_id, camera=self.camera, cell_id=cell_id, gas_cd=cell_gas_cd) self.add_cell_img_list(lst)
[docs] def set_bg_images(self, img_paths, filter_id): """Set background images for a certain filter type. Creates :class:`CellImgList` containing input background images and adds them calling :func:`add_bg_img_list` Parameters ---------- img_paths : list list containing image file paths of bg images (can also be a single image file path) filter_id : str image type (e.g. "on", "off") """ try: # input is not a list but a valid path to (hopefully) an image if exists(img_paths): img_paths = [img_paths, ] except BaseException: pass paths = [p for p in img_paths if exists(p)] if not len(paths) > 0: raise TypeError("No valid filepaths could be identified") lst = CellImgList(files=paths, list_id=filter_id, camera=self.camera) self.add_bg_img_list(lst)
[docs] def add_cell_img_list(self, lst): """Add a cell image list for calibration. Parameters ---------- lst : CellImgList if, valid, the list is added to :attr:`cell_lists` using it's ID (``lst.list_id``) as first key and ``lst.cell_id`` as second, e.g. ``self.cell_lists["on"]["a53"]`` """ if not isinstance(lst, CellImgList): raise TypeError("Error adding cell image list, need CellImgList " "object, got %s" % type(lst)) elif not lst.nof > 0: raise IOError("No files available in cell ImgList %s, %s" % (lst.list_id, lst.cell_id)) elif any([lst.gas_cd == x for x in [0, None]]) or isnan(lst.gas_cd): raise ValueError("Error adding cell image list, invalid value " "encountered for attribute gas_cd: %s" % lst.gas_cd) if lst.list_id not in self.cell_lists: self.cell_lists[lst.list_id] = od() self.cell_lists[lst.list_id][lst.cell_id] = lst
[docs] def add_bg_img_list(self, lst): """Add an image list containing background images for calibration. Parameters ---------- lst : CellImgList if valid input, the list is added to dictionary ``self.bg_lists`` using ``lst.list_id`` as key """ if not isinstance(lst, CellImgList): raise TypeError("Error adding bg image list, background image " "list needs to be initiated as CellImgList in " "CellCalibEngine, got %s" % type(lst)) elif not lst.nof > 0: raise IOError("No files available in bg ImgList %s, %s" % (lst.list_id, lst.cell_id)) self.bg_lists[lst.list_id] = lst
[docs] def det_bg_mean_pix_timeseries(self, filter_id): """Determine (or get) pixel mean values of background image list. Gets the average pixel intenisty (considering the whole image) for all images in specified background image list and stores it within a :class:`PixelMeanTimeSeries` object. The latter is then stored in :attr:`bg_tseries` and can be used to interpolate background intensities for cell image time stamps (this might be important for large SZA measurements where the background radiance changes fastly, cf. :func:`prepare_tau_calib`). Parameters ---------- filter_id : str ID of background image list (must be valid key of dict :attr:`bg_lists` Returns ------- PixelMeanTimeSeries time series object containing background mean intensities """ ts = self.bg_lists[filter_id].get_mean_value() ts.fit_polynomial() ts.img_prep.update(self.bg_lists[filter_id].current_img().edit_log) self.bg_tseries[filter_id] = ts return ts
[docs] def find_cells(self, filter_id="on", threshold=0.10, accept_last_in_dip=False): """Autodetection of cell images and bg images using mean value series. This algorithm tries to separate individual cell images and background images by analysing the 1st derivative of the mean pixel intensity of each image in the time span specified in this object (``self.start``, ``self.stop``). The separation of the individual cell images is performed by identifying dips in the mean intensity evolution and assignment of all image files belonging to each dip. Parameters ---------- filter_id : str filter ID (e.g. on, off, uses :func:`get_list` with ``filter_id`` as input) threshold : float threshold in percent by which intensity decreases are identified accept_last_in_dip : bool if True, also the last image in one of the Cell intensity dips is considered a valid cell image (by default, the first and the last images of a dip are not considered) """ logger.info('\n') logger.info("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++") logger.info("++++SEARCHING CELL TIME WINDOWS ", filter_id, " ++++++++++++++") logger.info("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++") logger.info('\n') l = self.get_list(filter_id) ts = l.get_mean_value() ts.name = filter_id x, y, yerr, texps = ts.index, ts.values, ts.std, ts.texps # this will be overwritten in the loop to find the BG image with the # lowest standard deviation, which is then set as current bg image # yerrCurrentBG = 9999 ydiff = diff(y) # 1st derivative (finite differences) y_max = max(y) bg_info = CellSearchInfo(filter_id, "bg", y_max) rest = CellSearchInfo(filter_id, "rest", y_max) cell_info = od() # will be filled with CellSearchInfo objects # in the loop cell_count = 0 # counter for number of cells detected on_cell = 0 # flag which is set when cell entry_cond is fulfilled for k in range(len(y) - 2): # Look for dip in intensity => candidate for calib cell time stamp # Define entry and leave acceptance condition for detection of time # window entry_cond = ((1 - abs(y[k + 1] / y_max)) > threshold and abs(ydiff[k]) / y_max < threshold and abs(ydiff[k - 1]) / y_max > threshold) leave_cond = ((1 - abs(y[k] / y_max)) > threshold and abs(ydiff[k]) / y_max > threshold and abs(ydiff[k - 1]) / y_max < threshold) # Condition for background image candidates bg_cond = ((1 - abs(y[k] / y_max)) < threshold and abs(ydiff[k]) / y_max < threshold and abs(ydiff[k - 1]) / y_max < threshold) if not accept_last_in_dip: # adapt entry and leave condition for cell time window" entry_cond = (entry_cond and abs(ydiff[k + 1]) / y_max < threshold) leave_cond = ((1 - abs(y[k] / y_max)) > threshold and abs(ydiff[k]) / y_max < threshold and abs(ydiff[k - 1]) / y_max < threshold and abs(ydiff[k + 1]) / y_max > threshold) if entry_cond: if on_cell: ts.plot() raise Exception("Fatal: found cell dip within cell dip" "plotted time series...") logger.info("Found cell at %s, %s" % (k, x[k])) on_cell = 1 cell_count += 1 cell_id = "Cell%s" % cell_count result = CellSearchInfo(filter_id, cell_id, y_max) # Look for increase in intensity => candidate for removal of calib # cell elif leave_cond and on_cell: # and onFilter: logger.info("Reached end of cell DIP at %s, %s" % (k, x[k])) # result.stop = x[k] on_cell = 0 result.mean_vals.append(y[k]) result.mean_vals_err.append(yerr[k]) result.file_paths.append(l.files[k]) result.start_acq.append(x[k]) result.texps.append(texps[k]) cell_info[result.add_id] = result # onFilter=0 elif bg_cond: logger.info("Found BG candidate at %s, %s" % (k, x[k])) bg_info.mean_vals.append(y[k]) bg_info.mean_vals_err.append(yerr[k]) bg_info.file_paths.append(l.files[k]) bg_info.start_acq.append(x[k]) bg_info.texps.append(texps[k]) else: if on_cell: result.mean_vals.append(y[k]) result.mean_vals_err.append(yerr[k]) result.file_paths.append(l.files[k]) result.start_acq.append(x[k]) result.texps.append(texps[k]) else: rest.mean_vals.append(y[k]) rest.mean_vals_err.append(yerr[k]) rest.file_paths.append(l.files[k]) rest.start_acq.append(x[k]) rest.texps.append(texps[k]) k += 1 if not len(self._cell_info_auto_search) == len(cell_info): raise CellSearchError("Number of detected cells (%s) is " "different from number of cells " "specified in cellSpecInfo (%s) " % (len(self._cell_info_auto_search), len(cell_info))) # Create new image lists from search results for background images # and one list for each cell that was detected bg_info.create_image_list(self.camera) bg_info.img_list.update_cell_info("bg", 0.0, 0.0) self.assign_dark_offset_lists(into_list=bg_info.img_list) for cell_id, info in six.iteritems(cell_info): info.create_image_list(self.camera) self.assign_dark_offset_lists(into_list=info.img_list) self.search_results.add_cell_search_result(filter_id, cell_info, bg_info, rest) self.pix_mean_tseries["%s_auto_search" % filter_id] = ts
def _assign_calib_specs(self, filter_id=None): """Assign the gas CD amounts to search results for all filter lists. This function assigns gas CD amounts (stored in ``self._cell_info_auto_search`` to the individual cells detected using the automatic cell search routine (:func:`find_cells`) and which are stored in ``self.search_results``. The latter object is of type :class:`CellAutoSearchResults` and contains image lists for all *filter_ids* for which the search was performed (e.g. "on", "off"). These are seperated by detected background images (``self.search_results.bg_info``) and individual cells (``self.search_results.cell_info``). The gas amounts (in ``self._cell_info_auto_search``) are assigned based on the magnitude of the corrseponding intensity decrease of the cell (in the pixel mean time series). Parameters ---------- filter_id : str ID of filter which is supposed used for assignment (e.g. "on"). Uses default on-band filter if input is unspecified (None) or if the imagelist for this filter key does not exist Note ---- 1. In order for this to work, the automatic cell search must have been performed 2. This function does not change class attributes which are actually used for calibration. The latter are stored in ``self.cell_lists`` and ``self.bg_lists`` and have to be assigned specifically """ # check input list ID and set default if invalid if filter_id not in self.lists_access_info: filter_id = self.filters.default_key_on # the info about columns in the cells cell_info = self._cell_info_auto_search # init temporary dicts (will be filled below) offs_dict = {} cell_cd_dict = {} # the results of the cell search res = self.search_results.cell_info for val in res[filter_id].values(): offs_dict[val.add_id] = val.offs.mean() # read the gas column amounts for key, val in six.iteritems(cell_info): cell_cd_dict[key] = val[0] # sort the dicionaries by column amount or intensity decrease s0 = sorted(offs_dict, key=offs_dict.get) s1 = sorted(cell_cd_dict, key=cell_cd_dict.get) logger.info("Cell search keys sorted by depth of Dip: %s" % s0) logger.info("Cell amounts sorted by depth of Dip: %s" % s1) filter_ids = res.keys() for k in range(len(s0)): cell_id = s1[k] gas_cd, gas_cd_err = cell_info[s1[k]][0], cell_info[s1[k]][1] logger.info("Search key: %s\nDel I: %s\nCell abbr: %s\nGasCol %s +/- %s" % (s0[k], offs_dict[s0[k]], cell_id, gas_cd, gas_cd_err)) # now add gas column to corresponding list in search result object for filter_id in filter_ids: res[filter_id][s0[k]].img_list.update_cell_info(cell_id, gas_cd, gas_cd_err)
[docs] def add_search_results(self): """Add results from automatic cell detection to calibration. This method analyses ``self.search_results`` for valid cell image lists (i.e. lists that contain images and have the gas column assigned) """ # Add all cell image lists that were found for each filter for filter_id, cell_dict in six.iteritems( self.search_results.cell_info): for cell_id, cell in six.iteritems(cell_dict): lst = cell.img_list if lst.has_files() and lst.gas_cd > 0: self.add_cell_img_list(lst) # Add all BG image lists found for each filter for filter_id, info in six.iteritems(self.search_results.bg_info): self.add_bg_img_list(info.img_list) # set background images closest to first detected cell list for each # filter self.set_bg_closest()
[docs] def set_bg_closest(self, cell_id=None): """Set the current background image closest to one of the cells. Parameters ---------- cell_id : str cell ID supposed to be used, if None, then the first cell list in :attr:`cell_lists` is used """ if cell_id is None: cell_id = list( self.cell_lists[self.filters.default_key_on].keys())[0] for filter_id, lst in six.iteritems(self.bg_lists): self.cell_lists[filter_id][cell_id].link_imglist(lst)
[docs] def find_and_assign_cells_all_filter_lists(self, threshold=0.10): """High level function for automatic cell and BG image search. This method basically calls the following functions: 1. :func:`find_cells` (for all filter IDs, e.g. on/off) #. :func:`_assign_calib_specs` #. :func:`add_search_results` #. :func:`check_all_lists` and sets flag ``cell_search_performed=True``. Parameters ---------- threshold : float percentage threshold for identification of regions of decreased intensity in time series """ for filter_id in self.filters.filters.keys(): try: self.find_cells(filter_id, threshold, False) except BaseException: self.find_cells(filter_id, threshold, True) self._assign_calib_specs() self.add_search_results() self.check_all_lists() self.cell_search_performed = True
[docs] def bg_img_available(self, filter_id): """Check if a background image is available. Parameters ---------- filter_id : str filter ID of image list (e.g. on / off) """ try: if isinstance(self.bg_lists[filter_id], Img): return True raise Exception except BaseException: self.check_all_lists(filter_id) if isinstance(self.bg_lists[filter_id], Img): return True return False
[docs] def check_image_list(self, lst): """Check if image list contains files and has images ready (loaded). Parameters ---------- lst : ImgList image list object Raises ------ IndexError If list does not contain images Exception If images cannot be loaded in list (unexpected error) or if ``lst.gas_cd`` is not a float """ if not lst.nof > 0: raise IndexError("Error, image list %s does not contain images" % lst.list_id) if not isinstance(lst.current_img(), Img): if not lst.load(): raise Exception("Unexpected error...") # raises Exception is gas column is not a number float(lst.gas_cd)
[docs] def check_all_lists(self): """Check if all image lists are ready for analysis. Returns ------- bool True (if it makes it to the return statement) """ filter_ids = list(self.cell_lists.keys()) cell_ids = self.cell_lists[filter_ids[0]].keys() # get number of cells for first filter ID first_cell_num = len(self.cell_lists[filter_ids[0]]) for filter_id in filter_ids: if not len(self.cell_lists[filter_id]) == first_cell_num: raise Exception("Mismatch in number of cells in " "self.cell_lists between filter list %s and %s" % (filter_ids[0], filter_id)) for cell_id in cell_ids: self.check_image_list(self.cell_lists[filter_id][cell_id]) if filter_id not in self.bg_lists: raise KeyError("Error: BG image data (list) for filter ID %s " "is not available" % filter_id) else: self.check_image_list(self.bg_lists[filter_id]) return True
[docs] def check_cell_info_dict_autosearch(self, cell_info_dict): """Check if dict including cell gas column info is right format. Parameters ---------- cell_info_dict : dict keys: cell ids (e.g. "a57"), values: list of gas column density and uncertainty in cm-2, format: ``[value, error]`` Raises ------ Exception If any of the specs in ``cell_info_dict`` is invalid """ for key, val in six.iteritems(cell_info_dict): if (not isinstance(key, str) and not isinstance(key, six.string_types)): raise KeyError("Invalid key: %s" % key) if not isinstance(val, list): raise ValueError("Invalid cell column specification, need " "list containing [value, uncertainty] of gas " "column with id %s, got %s" % (key, val)) else: if len(val) != 2: raise ValueError("Invalid cell column specification, need " "list containing [value, uncertainty] of " "gas column with " "id %s, got %s" % (key, val)) for k in range(len(val)): if not isinstance(val[k], (six.integer_types, float, complex)): raise ValueError("Invalid data type for cell gas " "column specification %s" % val)
[docs] def set_cell_info_dict_autosearch(self, cell_info_dict): """Set attribute ``self._cell_info_auto_search`` (dictionary). Parameters ---------- cell_info_dict : dict dictionary containing cell information """ self.check_cell_info_dict_autosearch(cell_info_dict) self._cell_info_auto_search = cell_info_dict
def _prep_tau_stack(self, filter_id="on", darkcorr=True, blurring=2): """Prepare a stack containing cell tau images of a certain type. The number of images in the stack corresponds to the number of cells that are available in :attr:`cell_lists`. The images in the stack are optical density (OD) images for each of the cells, determined based on the corresponding sky background that is derived from the background image list. Parameters ---------- filter_id : str filter ID for which the stack is computed (must be a valid list ID in :attr:`cell_lists` as well as :attr:`bg_list`) darkcorr : bool if True, the images will be dark corrected before the OD image """ if filter_id not in self.bg_lists: raise AttributeError("No background images for filter ID %s " "available in cell calibration engine" % filter_id) elif filter_id not in self.cell_lists: raise AttributeError("No cell images for filter ID %s " "available in cell calibration engine" % filter_id) bg_list = self.bg_lists[filter_id] bg_list.update_img_prep(blurring=blurring) bg_list.darkcorr_mode = darkcorr bg_img = bg_list.current_img() bg_mean = bg_img.img.mean() h, w = subimg_shape(bg_list.current_img().img.shape) num = len(self.cell_lists[filter_id]) tau_stack = ImgStack(h, w, num, stack_id=filter_id) # TEMPORARY SOLUTION tau_stack.add_data_err = [] try: bg_mean_tseries = self.det_bg_mean_pix_timeseries(filter_id) except BaseException: pass for cell_id, lst in six.iteritems(self.cell_lists[filter_id]): lst.update_img_prep(blurring=blurring) lst.darkcorr_mode = darkcorr cell_img = lst.current_img() try: bg_mean_now = \ bg_mean_tseries.get_poly_vals(cell_img.meta["start_acq"]) offset = bg_mean - bg_mean_now except BaseException: print_log.warning("Warning in tau image stack calculation for filter " " %s: Time series data for background list (background " "poly) is not available. Calculating tau image for cell " " image %s, %s based on unchanged background image " " recorded at %s" % (filter_id, cell_id, cell_img.meta["start_acq"], bg_img.meta["start_acq"])) offset = 0.0 bg_img = bg_img - offset tau_img = cell_img.duplicate() if bg_img.edit_log["darkcorr"] != cell_img.edit_log["darkcorr"]: raise ImgMetaError("Fatal: cannot determine tau stack, bg " "image and cell image have different " "darkcorr modes") tau_img.img = log(bg_img.img / cell_img.img) # tau_img.to_pyrlevel(pyrlevel) tau_stack.add_img(tau_img.img, start_acq=cell_img.meta["start_acq"], texp=cell_img.meta["texp"], add_data=lst.gas_cd) tau_stack.add_data_err.append(lst.gas_cd_err) tau_stack.img_prep.update(tau_img.edit_log) self.tau_stacks[filter_id] = tau_stack return tau_stack
[docs] def prep_tau_stacks(self, on_id="on", off_id="off", darkcorr=True, blurring=2): """Prepare image stacks for on, off and AA calibration data. Parameters ---------- on_id : str ID of onband filter off_id : str ID of offband filter darkcorr : bool Use dark corrected images blurring : int sigma of Gaussian blurring kernel pyrlevel : int pyramid level of calibration stack """ on_stack = self._prep_tau_stack(on_id, darkcorr, blurring) off_stack = self._prep_tau_stack(off_id, darkcorr, blurring) self.tau_stacks["aa"] = on_stack - off_stack
[docs] def prepare_calib_data(self, pos_x_abs=None, pos_y_abs=None, radius_abs=1, on_id="on", off_id="off", darkcorr=True, blurring=1, **kwargs): """Prepare calib data for onband, offband and AA. This function creates 3 :class:`CellCalibData` objects for each OD type (on, off and from that, AA). If not differently specified using the input parameters ``pos_x_abs`` and ``pos_y_abs`` the corresponding cell optical densities are retrieved at the image center coordinate. The 3 :class:`CellCalibData` instances for each type (on, off, AA) can be accessed via the :attr:`calib_data` attribute of this class. Parameters ---------- pos_x_abs : :obj:`int`, optional x-position for which the calibration data is retrieved pos_y_abs : :obj:`int`, optional y-position for which the calibration data is retrieved radius_abs : int radius specifying the disk size around ``pos_x_abs`` and ``pos_y_abs`` used to retrieve the cell-ODs on_id : str ID of onband filter used to determine calib curve off_id : str ID of offband filter used for calibration darkcorr : bool perform dark correction before determining cell tau images blurring : int apply gaussian blurring to cell tau images pyrlevel : int downscale factor (Gauss pyramid) """ self.check_all_lists() self.prep_tau_stacks(on_id, off_id, darkcorr, blurring) for calib_id, stack in six.iteritems(self.tau_stacks): if any([x is None for x in [pos_x_abs, pos_y_abs]]): logger.warning("Using image center coordinates for retrieval of cell " "calibration polynomial") h, w = stack.shape[1:] pos_x_abs, pos_y_abs = int(w / 2.0), int(h / 2.0) tau_series = stack.get_time_series(pos_x=pos_x_abs, pos_y=pos_y_abs, radius=radius_abs)[0] c = CellCalibData(tau_vec=tau_series.values, cd_vec=stack.add_data, cd_vec_err=stack.add_data_err, time_stamps=tau_series.index, pos_x_abs=pos_x_abs, pos_y_abs=pos_y_abs, calib_id=calib_id) try: c.fit_calib_data() except: print_log.warning("Failed to fit calibration data for calib_id %s" % calib_id) self.calib_data[calib_id] = c
[docs] def get_sensitivity_corr_mask(self, calib_id="aa", pos_x_abs=None, pos_y_abs=None, radius_abs=1, cell_cd_closest=0, surface_fit_pyrlevel=2): """Get sensitivity correction mask. Prepares a sensitivity correction mask to correct for filter transmission shifts. These shifts result in increasing optical densities towards the image edges for a given gas column density. The mask is determined for original image resolution, i.e. pyramid level 0 and for a specific cell optical density image (aa, tau_on, tau_off). The latter is normalised with respect to the input pixel position (e.g. center position of DOAS FOV or pixel position where cell calibration data was retrieved). Plume AA (or tau_on, tau_off) images can then be corrected for sensitivity variations by division with the mask. If DOAS calibration is used, the calibration function can then be used for all image pixels. If only cell calibration is used, the mask is normalised with respect to the image center, the corresponding cell calibration polynomial should then be retrieved in the center coordinate which is the default calibration position when using creating calibration data if not explicitely specified. You may then calibrate a given aa image (``aa_img``) as follows with using a :class:`CellCalibData` object (denoted with ``cellcalib``):: mask = cellcalib.get_sensitivity_corr_mask() aa_corr = aa_img.duplicate() aa_corr.img = aa_img.img / mask #this is retrieved in the image center if not other specified gas_cd_img = cellcalib(aa_corr) gas_cd_img.show() Parameters ---------- calib_id : str the mask is determined from the corresponding calib data (e.g. "on", "off", "aa") pos_x_abs : int x-pixel position of normalisation mask, if None the image center position is used (which is also the default pixel used to retrieve the vector of calibration optical densities from the cell OD images) pos_y_abs : int y-pixel position of normalisation mask, if None the image center position is used (which is also the default pixel used to retrieve the vector of calibration optical densities from the cell OD images) radius_abs : int radius specifying the disk size around ``pos_x_abs`` and ``pos_y_abs`` used to normalise the mask (i.e. uses average OD of cell image in this OD) filter_id : str mask is determined from the corresponding calib data (e.g. "on", "off", "aa") cell_cd_closest : float use the cell which is closest to the provided column density surface_fit_pyrlevel : int additional downscaling factor for 2D polynomial surface fit Raises ------ ValueError if the corresponding :class:`ImgStack` is cropped, from which the cell OD image is supposed to be retrieved Returns ------- Img the sensitivity correction mask Note ---- This function was only tested for AA images and not for on / off cell tau images """ if calib_id not in self.tau_stacks.keys(): raise ValueError("Tau image is not available for calib ID %s" % calib_id) stack = self.tau_stacks[calib_id] # make sure the stack is at pyramid level 0 stack = stack.to_pyrlevel(0) try: if stack.img_prep["crop"]: raise ValueError("Stack is cropped: sensitivity mask can only" "be determined for uncropped images") except: pass idx = argmin(abs(stack.add_data - cell_cd_closest)) cell_img, cell_cd_closest = stack.stack[idx], stack.add_data[idx] try: pos_x_abs = int(pos_x_abs) pos_y_abs = int(pos_y_abs) if not isnum(pos_x_abs) * isnum(pos_y_abs) == 1: raise ValueError except: print_log.warning("Using image center coordinate for normalisation position " "of sensitivity correction mask") h, w = stack.shape[1:] pos_x_abs, pos_y_abs = int(w / 2.0), int(h / 2.0) fov_mask = stack.make_circular_access_mask(pos_x_abs, pos_y_abs, radius_abs) try: cell_img = PolySurfaceFit(cell_img, pyrlevel=surface_fit_pyrlevel).model except: print_log.warning("2D polyfit failed while determination of sensitivity " "correction mask, using original cell tau image for mask " "determination") mean = (cell_img * fov_mask).sum() / fov_mask.sum() mask = Img(cell_img / mean) if calib_id in self.calib_data: c = self.calib_data[calib_id] if c.pos_x_abs == pos_x_abs and c.pos_y_abs == pos_y_abs: logger.info("Assigning sensitivity correction for calibration ID " "%s to corresponding CellCalibData object" % calib_id) c.senscorr_mask = mask return mask
""" Redefinitions from base class (:class:`Dataset`) """
[docs] def get_list(self, list_id, cell_id=None): """Expand functionality of this method from :class:`Dataset`. Parameters ---------- list_id : str filter ID of list (e.g. on, off). If parameter ``cell_id`` is None, then this function returns the initial Dataset list (containing all images, not the ones separated by cells / background). cell_id : str if input is specified (type str) and valid (available cell img list), then the corresponding list is returned which only contains images from this cell. The string "bg" might be used to access the background image list of the filter specified with parameter ``list_id`` Returns ------- ImgList the actual list object """ if cell_id is not None and isinstance(cell_id, str): if cell_id in self.cell_lists[list_id].keys(): return self.cell_lists[list_id][cell_id] elif cell_id == "bg": return self.bg_lists[list_id] return super(CellCalibEngine, self).get_list(list_id)
""" Plotting etc """
[docs] def plot_cell_search_result(self, filter_id="on", for_app=False, include_tit=True, cell_cmap="Oranges", ax=None): """High level plotting function for results from auto-cell search. Parameters ---------- filter_id : str filter ID (e.g. "on", "off") for_app : bool currently irrelevant (default is False) include_tit : bool if True, include default title cell_cmap : str string specifying matplotlib colormap used to plot cell time windows ax : matplotlib axes object Returns ------- axes matplotlib axes object """ try: cmap = get_cmap(cell_cmap) except BaseException: logger.warning("Invalid input for cell_cmap, using Oranges") cmap = get_cmap("Oranges") # get stored time series (was automatically saved in # :func:`find_cells`) ts_all = self.pix_mean_tseries[("%s_auto_search" % filter_id)] # get cell search results res = self.search_results if filter_id not in res.cell_info or \ len(res.cell_info[filter_id]) < 1: logger.info("Error plotting cell search results: no results found...") return 0 if for_app: fig = Figure(figsize=(14, 8)) ax = fig.add_subplot(111) else: if ax is None: fig, ax = subplots(1, 1, figsize=(14, 8)) info = res.cell_info[filter_id] num = len(info) nums = [int(255.0 / k) for k in range(1, num + 3)] ts_all.plot(include_tit=include_tit, ax=ax, ls="--", c=cmap(nums[0]), label="Avg. pix intensities (%s)" % filter_id) ts = ts_all.index dt = timedelta( 0, (ts[-1] - ts[0]).total_seconds() / (len(ts_all) * 10)) k = 2 for cell in info.values(): lbl = (r"Cell %s: $S_{%s}$=%.2e cm$^{-2}$" % (cell.img_list.cell_id, SPECIES_ID, cell.img_list.gas_cd)) p = ax.plot(cell.start_acq, cell.mean_vals, ' o', color=cmap(nums[k]), ms=8, label=lbl, markeredgecolor="None", markeredgewidth=1) c = p[0].get_color() ax.fill_betweenx(arange(0, ts_all.max() * 1.05, 1), cell.start - dt, cell.stop + dt, facecolor=c, alpha=0.15, edgecolor=c) k += 1 if filter_id in res.bg_info.keys(): bg_info = res.bg_info[filter_id] c = cmap(nums[1]) ax.plot(bg_info.start_acq, bg_info.mean_vals, ' o', color=c, ms=10, markerfacecolor="None", markeredgecolor=c, mew=2, label='BG image candidates') ts = PixelMeanTimeSeries(bg_info.mean_vals, bg_info.start_acq) ts.fit_polynomial(2) bg_poly_vals = ts.get_poly_vals(bg_info.start_acq, ext_border_secs=30) ax.plot(bg_info.start_acq, bg_poly_vals, '-', color=c, lw=2, ls="--", label='Fitted BG polynomial') cfn = bg_info.img_list.cfn ax.plot(bg_info.start_acq[cfn], bg_info.mean_vals[cfn], marker="+", color=cmap(nums[0]), ms=12, mew=2, label='Current BG image') ax.legend(loc="best", fancybox=True, framealpha=0.5) ax.set_ylabel(r"$\mu_{pix}$") return ax
[docs] def plot_calib_curve(self, calib_id, **kwargs): """Plot calibration curve. Parameters ---------- filter_id : str image type ID (e.g. "aa") **kwargs : additional keyword arguments for plot passed to :func:`plot` of corresponding :class:`CellCalibData` object Returns ------- axes matplotlib axes object """ return self.calib_data[calib_id].plot(**kwargs)
[docs] def plot_all_calib_curves(self, ax=None, **kwargs): """Plot all available calibration curves in a certain pixel region. Parameters ---------- ax : axes matplotlib axes instance **kwargs : additional keyword arguments passed to :func:`get_calibration_polynomial` of corresponding :class:`CellCalibData` objects Returns ------- axes matplotlib axes object """ if ax is None: fig, ax = subplots(1, 1) tau_max = -10 y_min = 1e20 for calib_id, calib in six.iteritems(self.calib_data): tau = calib.tau_vec gas_cd, gas_cd_errs = calib.cd_vec, calib.cd_vec_err fun, coeffs = calib.calib_fun, calib.calib_coeffs if coeffs is None: raise ValueError("Calibration coefficients not available " "for calib_id %s. Please call fit_calib_data " "first" % calib_id) taus = linspace(0, tau.max() * 1.2, 100) # plot data points pl = ax.plot(tau, gas_cd, " ^", label="Data %s (pix" % calib_id) # try adding error bars ax.errorbar(tau, gas_cd, gas_cd_errs, linestyle="none", color="#6E6E6E") ax.plot(taus, fun(taus, *coeffs), "--", color=pl[0].get_color(), label="Fit result") tm = tau.max() if tm > tau_max: tau_max = tm offs = fun(0, *coeffs) if offs < y_min: y_min = offs ax.set_ylabel(r"$S_{%s}$ [cm$^{-2}$]" % SPECIES_ID) ax.set_xlabel(r"$\tau$") ax.set_ylim([y_min - gas_cd.min() * 0.1, gas_cd.max() * 1.05]) ax.set_xlim([0, tau_max * 1.05]) ax.grid() ax.legend(loc="best", fancybox=True, framealpha=0.5) return ax
def __call__(self, value, calib_id="aa", **kwargs): """Apply calibration to input value (i.e. convert into gas CD). Parameters ---------- value : float tau or AA value calib_id : str ID of calibration data supposed to be used **kwargs : additional keyword arguments to extract calibration information (e.g. pos_x_abs, pos_y_abs, radius_abs) Returns ------- float corresponding column density """ return self.calib_data[calib_id](value, **kwargs)