# -*- coding: utf-8 -*-
#
# Pyplis is a Python library for the analysis of UV SO2 camera data
# Copyright (C) 2017 Jonas Gliss (jonasgliss@gmail.com)
#
# This program is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License a
# published by the Free Software Foundation, either version 3 of
# the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Pyplis module containing features related to plume background analysis."""
from __future__ import (absolute_import, division)
from numpy import (polyfit, poly1d, linspace, logical_and, log, argmin,
gradient, nan, ndarray, arange, ones, finfo, asarray)
from matplotlib.patches import Rectangle
from matplotlib.pyplot import figure, subplots, setp
import matplotlib.colors as colors
from collections import OrderedDict as od
from scipy.ndimage.filters import gaussian_filter
import six
from pyplis import logger, print_log
from .image import Img
from .utils import LineOnImage
from .optimisation import PolySurfaceFit
from .helpers import shifted_color_map, _roi_coordinates
from .plumespeed import find_movement
[docs]class PlumeBackgroundModel(object):
"""Class for plume background modelling and tau image determination.
Parameters
----------
bg_raw : Img
sky background radiance raw image data
plume_init : Img
initial plume image data (is used to estimate default clear sky areas
for bg modelling)
**kwargs :
additional class attributes (e.g. for modelling, valid keys
are all keys in self.__dict__.keys())
"""
[docs] def __init__(self, bg_raw=None, plume_init=None,
init_surf_fit_mask=True, **kwargs):
if isinstance(bg_raw, ndarray):
bg_raw = Img(bg_raw)
if isinstance(plume_init, ndarray):
plume_init = Img(plume_init)
self.last_tau_img = None
self._last_surffit = None
#: Correction mode
self._mode = 0
#: settings for poly surface fit (corr mode: 0)
self._surface_fit_mask = None
self.surface_fit_pyrlevel = 4
self.surface_fit_polyorder = 2
#: Rectangle for scaline of background image
#: corr modes: 1 - 6
self.scale_rect = None
#: Rectangle for linear based correction of vertical gradient
#: corr modes: 2, 4
self.ygrad_rect = None
#: Settings for quadratic correction of vertical gradient (along line)
#: corr modes: 3, 5, 6
self.ygrad_line_colnum = None # detector column of vertical line
self.ygrad_line_polyorder = 2
self.ygrad_line_startrow = 0 # start row for profile fit
self.ygrad_line_stoprow = None # stop row for profile fit
self.ygrad_line_mask = None # mask specifying rows for profile fit
#: Rectangle for linear based correction of horizontal gradient
#: (applied before ygradient correction is performed)
#: corr modes: 4, 5
self.xgrad_rect = None
#: Settings for quadratic correction of horizontal gradient
#: (along line) corr modes: 6
self.xgrad_line_rownum = None
self.xgrad_line_polyorder = 2
self.xgrad_line_startcol = 0
self.xgrad_line_stopcol = None
self.xgrad_line_mask = None
# initialisations
self.update(**kwargs)
if isinstance(plume_init, Img):
self.set_missing_ref_areas(plume_init)
self._init_bgsurf_mask(plume_init)
if isinstance(bg_raw, Img):
self.mode = 1
self.last_tau_img = self.get_tau_image(plume_init, bg_raw)
self.last_settings = self.settings_dict()
@property
def all_modes(self):
"""List containing valid modelling modes."""
return list(self.mode_info_dict.keys())
@property
def mode(self):
"""Return current modelling mode."""
return self._mode
@mode.setter
def mode(self, val):
if val not in self.all_modes:
raise ValueError("Invalid mode %d, choose from %s"
% (val, self.all_modes))
self._mode = val
@property
def CORR_MODE(self):
"""Return current background modelling mode."""
raise AttributeError("Deprecated attribute name, please use mode")
@CORR_MODE.setter
def CORR_MODE(self, val):
raise AttributeError("Deprecated attribute name, please use mode")
@property
def surface_fit_mask(self):
"""Mask for retrieval mode 0: fit 2D polynomial."""
return self._surface_fit_mask
@surface_fit_mask.setter
def surface_fit_mask(self, val):
if isinstance(val, Img):
val = val.img
self._surface_fit_mask = val
[docs] def check_settings(self):
"""Check if any of the modelling settings is not specified."""
for value in self.__dict__.values():
if value is None:
return False
return True
[docs] def mean_in_rects(self, img):
"""Determine ``(mean, min, max)`` intensity in reference rectangles.
Parameters
----------
img : array
image data array (can also be :class:`Img`)
Returns
-------
tuple
2-element tuple containing mean value and error
"""
try:
img = img.img
except BaseException:
pass
a = []
a.append(_mean_in_rect(img, self.scale_rect)[0])
a.append(_mean_in_rect(img, self.ygrad_rect)[0])
a.append(_mean_in_rect(img, self.xgrad_rect)[0])
a = asarray(a)
return (a.mean(), a.min(), a.max())
[docs] def update(self, **kwargs):
"""Update class attributes.
:param **kwargs:
"""
for k, v in six.iteritems(kwargs):
self.__setitem__(k, v)
[docs] def guess_missing_settings(self, plume_img):
"""Call and return :func:`set_missing_ref_areas`.
Note
----
This is the previous name of the method
:func:`set_missing_ref_areas`
"""
print_log.warning("Please use new name of method: set_missing_ref_areas",
DeprecationWarning)
self.set_missing_ref_areas(plume_img)
[docs] def set_missing_ref_areas(self, plume_img):
"""Find and set missing default sky reference areas for modelling.
Based on the input plume image, the clear sky reference areas for sky
radiance image based tau modelling are estimated, i.e.:
1. The rectangle areas for scaling and linear gradient corrections:
``self.scale_rect, self.ygrad_rect, self.xgrad_rect``
2. Coordinates of horizontal and vertical profile lines
for quadratic gradient corrections:
``self.ygrad_line_colnum, self.xgrad_line_rownum``
(i.e. positions and start / stop pixel coordinates)
The estimation is performed based on a brightness analysis for left and
right image area.
Parameters
----------
plume_img : Img
exemplary plume image (should be representative for a whole
dataset)
"""
if not isinstance(plume_img, Img):
raise TypeError("Invalid, input type: need Img object...")
plume = plume_img.img
if self.check_settings():
return
if self.surface_fit_mask is None:
self.surface_fit_mask = ones(plume.shape)
h, w = plume.shape
res = find_sky_reference_areas(plume)
if self.ygrad_line_colnum is None:
self.ygrad_line_colnum = res["ygrad_line_colnum"]
self.ygrad_line_stoprow = res["ygrad_line_stoprow"]
self.ygrad_line_startrow = res["ygrad_line_startrow"]
if self.xgrad_line_rownum is None:
self.xgrad_line_rownum = res["xgrad_line_rownum"]
self.xgrad_line_startcol = res["xgrad_line_startcol"]
self.xgrad_line_stopcol = res["xgrad_line_stopcol"]
if not self._check_rect(self.scale_rect, plume):
self.scale_rect = res["scale_rect"]
if not self._check_rect(self.ygrad_rect, plume):
self.ygrad_rect = res["ygrad_rect"]
if not self._check_rect(self.xgrad_rect, plume):
self.xgrad_rect = res["xgrad_rect"]
[docs] def calc_sky_background_mask(self, plume_img, next_img=None,
lower_thresh=None,
apply_movement_search=True,
**settings_movement_search):
"""Retrieve and set background mask for 2D poly surface fit.
Wrapper for method :func:`find_sky_background`
Calculates mask specifying sky radiance pixels for modelling mode
0 (plume background retrieval directly from plume image without
an additional I0-image, using a 2D polynomial surface fit). The
mask is stored in :attr:`surface_fit_mask`.
Parameters
----------
plume_img : Img
the plume image for which the sky background pixels are supposed
to be detected
next_img : :obj:`Img`, optional
second image used to compute the optical flow in :param:`plume_img`
lower_thresh : :obj:`float`, optional
lower intensity threshold. If provided, this value is used,
else, the minimum value is derived from the minimum intensity
in the plume image within the current 3 sky reference
rectangles
**settings_movement_search
additional keyword arguments passed to :func:`find_movement`.
Note that these may include settings for the optical flow
calculation which are further passed to the
initiation of the :class:`FarnebackSettings` class
Returns
-------
array
2D-numpy boolean numpy array specifying sky background pixels
"""
mask = find_sky_background(plume_img, next_img,
self.settings_dict(),
lower_thresh,
apply_movement_search,
**settings_movement_search)
self.surface_fit_mask = mask
return mask
def _init_bgsurf_mask(self, plume):
logger.info("Initiating BG surface mask in PlumeBackgroundModel")
mask = ones(plume.shape)
self.surface_fit_mask = mask
return mask
[docs] def bg_from_poly_surface_fit(self, plume, mask=None, polyorder=2,
pyrlevel=4):
"""Apply poly surface fit to plume image for bg retrieval.
Parameters
----------
plume : Img
plume image
mask : ndarray
mask specifying gas free areas (if None, use all pixels). Note that
the mask needs to be computed within the same ROI and at the same
pyrlevel as the input plume image
polyorder : int
order of polynomial used for fit, defaults to 4
pyrlevel : int
pyramid level in which fit is performed
(e.g. 4 => image size for fit is reduced by factor 2^4 = 16). Note
that the
:return tuple: 1st entry: fitted background image
second: ``PolySurfaceFit`` object
Returns
-------
ndarray
fitted sky background
Note
----
The :class:`PolySurfaceFit` object used to retrieve the background
is stored in the :attr:`_last_surffit`.
"""
if not isinstance(plume, Img):
raise TypeError("Need instance of pyplis Img class")
if mask is None:
mask = self.surface_fit_mask
if not isinstance(mask, ndarray):
try:
mask = mask.img
if not mask.shape == plume.shape:
raise AttributeError("Shape mismatch between mask and "
"plume image")
except:
mask = self._init_bgsurf_mask(plume)
pyrlevel_rel = pyrlevel - plume.pyrlevel
if pyrlevel_rel < 0:
print_log.warning("Pyramid level of input image (%d) is larger than desired "
"pyramid level for computation of surface fit (%d). Using "
"the current pyrlevel %d of input image" % (plume.pyrlevel,
pyrlevel))
pyrlevel_rel = 0
# update settings from input keyword arg
fit = PolySurfaceFit(plume.img, mask,
polyorder=polyorder,
pyrlevel=pyrlevel_rel)
if not fit.model.shape == plume.shape:
raise ValueError("Mismatch in shape between input plume image and "
"fit result of PolySurfaceFit. Check pyramid "
"level of input image")
self._last_surffit = fit
return fit.model
[docs] def get_tau_image(self, plume_img, bg_img=None,
check_state=True, **kwargs):
"""Determine current tau image for input plume image.
Parameters
----------
plume_img : Img
plume image in intensity space
bg_img : :obj:`Img`, optional
sky radiance image (for ``self.CORR_MODE = 1 - 6``)
check_state : bool
if True and current mode != 0, it is checked whether the input
images (plume and bg) have the same darkcorrection and vignetting
state
**kwargs :
additional keyword arguments for updating current settings
(valid input keywords (strings): mode, ygrad_rect,
ygrad_line_colnum, ygrad_line_startrow, ygrad_line_stoprow
Returns
-------
Img
plume tau image
Raises
------
AttributeError
if input image is already a tau or AA image or if input plume
image and the current background image have different states
with regard to vignetting or dark correction.
"""
if not isinstance(plume_img, Img):
raise TypeError("Invalid, input type: need Img object...")
# update current settings
for k, v in six.iteritems(kwargs):
self.__setitem__(k, v)
if not plume_img.is_darkcorr:
logger.warning("plume image is not corrected for dark current")
if plume_img.is_tau:
raise AttributeError("Input image is already tau image")
tau = None
if self.mode == 0: # no sky radiance image, poly surface fit
bg = self.bg_from_poly_surface_fit(plume_img,
self.surface_fit_mask,
self.surface_fit_polyorder,
self.surface_fit_pyrlevel)
r = bg / plume_img.img
# make sure no 0 values or neg. numbers are in the image
r[r <= 0] = finfo(float).eps
tau = log(r)
else:
if check_state:
self._check_img_states(plume_img, bg_img)
r = bg_img.img / plume_img.img
# make sure no 0 values or neg. numbers are in the image
r[r <= 0] = finfo(float).eps
tau = log(r)
if self.mode != 99:
tau = self.correct_tau_curvature_ref_areas(tau)
tau_img = Img(tau, **plume_img.meta)
tau_img.edit_log.update(plume_img.edit_log)
tau_img.meta["bit_depth"] = nan
tau_img.edit_log["is_tau"] = True
self.last_tau_img = tau_img
return tau_img
[docs] def get_aa_image(self, plume_on, plume_off, bg_on=None, bg_off=None,
check_state=True, **kwargs):
"""Retrieve apparent absorbance image from on and off imgs.
Determines an initial AA image based on input plume and background
images and
Parameters
----------
plume_on : Img
on-band plume image
plume_off : Img
off-band plume image
bg_on : :obj:`Img`, optional
on-band sky radiance image (for ``self.CORR_MODE = 1 - 6``)
bg_off : :obj:`Img`, optional
off-band sky radiance image (for ``self.CORR_MODE = 1 - 6``)
check_state : bool
if True and current mode != 0, it is checked whether the input
images (plume and bg) have the same darkcorrection and vignetting
state
**kwargs :
additional keyword arguments for updating current settings
(valid input keywords (strings), e.g. ``surface_fit_mask`` if
``mode == 0``
Returns
-------
Img
plume AA image
"""
if not isinstance(plume_on, Img) or not isinstance(plume_off, Img):
raise TypeError("Need Img objects for background modelling")
for k, v in six.iteritems(kwargs):
self.__setitem__(k, v)
if self.mode == 0:
mask = self.surface_fit_mask
po = self.surface_fit_polyorder
pyr = self.surface_fit_pyrlevel
bg_on = self.bg_from_poly_surface_fit(plume_on, mask, po, pyr)
bg_off = self.bg_from_poly_surface_fit(plume_off, mask, po, pyr)
r_on = bg_on / plume_on.img
# make sure no 0 values or neg. numbers are in the image
r_on[r_on <= 0] = finfo(float).eps
r_off = bg_off / plume_off.img
# make sure no 0 values or neg. numbers are in the image
r_off[r_off <= 0] = finfo(float).eps
aa = log(r_on) - log(r_off)
else:
if check_state:
self._check_img_states(plume_on, plume_off, bg_on, bg_off)
r1 = bg_on.img / plume_on.img
r1[r1 <= 0] = finfo(float).eps
r2 = bg_off.img / plume_off.img
r2[r2 <= 0] = finfo(float).eps
aa = log(r1) - log(r2)
if self.mode != 99:
aa = self.correct_tau_curvature_ref_areas(aa)
aa_img = Img(aa, **plume_on.meta)
aa_img.edit_log.update(plume_on.edit_log)
aa_img.meta["bit_depth"] = nan
aa_img.edit_log["is_tau"] = True
aa_img.edit_log["is_aa"] = True
self.last_tau_img = aa_img
return aa_img
[docs] def correct_tau_curvature_ref_areas(self, tau_init):
"""Scale and correct curvature in initial tau image.
The method used is depends on the current ``CORR_MODE``. This method
only applies for correction modes 1-6.
Parameters
----------
tau_init : :obj:`array`, :obj:`Img`
inital tau image
Returns
-------
array
modelled tau image
"""
mode = self.mode
if not 1 <= mode <= 6:
raise ValueError("This method only works for background model"
"modes (param CORR_MODE) 1-6")
try:
tau_init = tau_init.img
except BaseException:
pass
if mode == 1:
return scale_tau_img(tau_init, self.scale_rect)
elif mode == 2:
return corr_tau_curvature_vert_two_rects(tau_init,
self.scale_rect,
self.ygrad_rect)
elif mode == 3:
return corr_tau_curvature_vert_line(tau_init,
self.ygrad_line_colnum,
self.ygrad_line_startrow,
self.ygrad_line_stoprow,
self.ygrad_line_mask,
self.ygrad_line_polyorder)[0]
elif mode == 4:
tau_init = corr_tau_curvature_vert_two_rects(tau_init,
self.scale_rect,
self.ygrad_rect)
return corr_tau_curvature_hor_two_rects(tau_init,
self.scale_rect,
self.xgrad_rect)
elif mode == 5:
tau_init = corr_tau_curvature_vert_line(tau_init,
self.ygrad_line_colnum,
self.ygrad_line_startrow,
self.ygrad_line_stoprow,
self.ygrad_line_mask,
self.ygrad_line_polyorder
)[0]
return corr_tau_curvature_hor_two_rects(tau_init,
self.scale_rect,
self.xgrad_rect)
elif mode == 6:
tau_init = corr_tau_curvature_vert_line(tau_init,
self.ygrad_line_colnum,
self.ygrad_line_startrow,
self.ygrad_line_stoprow,
self.ygrad_line_mask,
self.ygrad_line_polyorder
)[0]
return corr_tau_curvature_hor_line(tau_init,
self.xgrad_line_rownum,
self.xgrad_line_startcol,
self.xgrad_line_stopcol,
self.xgrad_line_mask,
self.xgrad_line_polyorder)[0]
"""Plotting"""
[docs] def plot_sky_reference_areas(self, plume):
"""Plot the current sky ref areas into a plume image."""
d = self.settings_dict()
try:
return plot_sky_reference_areas(plume, d)
except:
raise ValueError("Could not plot sky reference areas. Please check"
" if all relevant sky reference areas are set and"
" if not, set them manually or use class method"
" set_missing_ref_areas")
[docs] def plot_tau_result(self, tau_img=None, tau_min=None, tau_max=None,
edit_profile_labels=True, legend_loc=3,
figheight=8, add_mode_info=False,
fsize_legend=12, fsize_labels=16,
**add_lines):
"""Plot current tau image including all reference areas.
Parameters
----------
tau_img : Img
the tau image to be displayed
tau_min : :obj:`float`, optional
lower tau boundary to be displayed
tau_max : :obj:`float`, optional
upper tau boundary for colormap
edit_profile_labels : bool
beta version of smart layout for axis labels from profile
subplots
legend_loc : int
number ID for specifying legend position
figheight : int
figure height in inches (dpi=matplotlib default)
add_mode_info : bool
if True, information about the used correction mode is
included in the plot
**kwargs:
additional lines to be plotted, e.g.::
pcs = [300, 400, 500, 600]
"""
tau = tau_img
if not isinstance(tau, Img):
tau = self.last_tau_img
if not isinstance(tau, Img):
raise AttributeError("No tau image available in background model")
tau = tau.duplicate().to_pyrlevel(0)
tmin = tau_min
tmax = tau_max
if tau_max is None:
tau_max = tau.max()
if tau_min is None:
tau_min = - tau_max
cmap = shifted_color_map(tau_min, tau_max)
ax = []
figheight = 8
# margins (left, bottom, top, inner)
lm, bm, tm, im = 0.10, 0.10, 0.02, 0.01
tau_frac = 0.7
h0, w0 = tau.shape
R = w0 / float(h0)
d_panels = 1 - tm - bm - im - tau_frac
fig = figure(figsize=(R * figheight, figheight))
ax.append(fig.add_axes([lm, bm,
tau_frac, tau_frac]))
ax.append(fig.add_axes([lm + tau_frac + im, bm,
d_panels, tau_frac]))
ax.append(fig.add_axes([lm, bm + tau_frac + im,
tau_frac, d_panels]))
if self.mode == 0:
ax.append(fig.add_axes([lm + tau_frac + im,
bm + tau_frac + im,
d_panels, d_panels]))
palette = colors.ListedColormap(['white', 'lime'])
norm = colors.BoundaryNorm([0, .5, 1], palette.N)
ax[3].imshow(self.surface_fit_mask, cmap=palette, norm=norm,
alpha=.7)
ax[3].set_xticklabels([])
ax[3].set_yticklabels([])
ax[0].imshow(tau.img, cmap=cmap, vmin=tau_min, vmax=tau_max)
ax[0].plot([self.ygrad_line_colnum, self.ygrad_line_colnum],
[0, h0], "-b", label="vert profile")
ax[0].plot([0, w0], [self.xgrad_line_rownum, self.xgrad_line_rownum],
"-c", label="hor profile")
for k, l in six.iteritems(add_lines):
try:
x0, y0, x1, y1 = l.to_list()
c = l.color
except:
x0, y0, x1, y1 = l
c = "g"
ax[0].plot([x0, x1], [y0, y1], "-", lw=2, c=c, label=k)
ax[0].set_xlim([0, w0 - 1])
ax[0].set_ylim([h0 - 1, 0])
xs, ys, ws, hs = _roi_coordinates(self.scale_rect)
ax[0].add_patch(Rectangle((xs, ys), ws, hs, ec="lime", fc="lime",
label="scale_rect", alpha=0.3))
xs, ys, ws, hs = _roi_coordinates(self.ygrad_rect)
ax[0].add_patch(Rectangle((xs, ys), ws, hs, ec="b", fc="b",
label="ygrad_rect", alpha=0.3))
xs, ys, ws, hs = _roi_coordinates(self.xgrad_rect)
ax[0].add_patch(Rectangle((xs, ys), ws, hs, ec="c", fc="c",
label="xgrad_rect", alpha=0.3))
ax[2].set_xticklabels([])
ax[1].set_yticklabels([])
# plot vertical profile
lvert = LineOnImage(self.ygrad_line_colnum, 0, self.ygrad_line_colnum,
h0 - 1, line_id="vert")
p_vert = lvert.get_line_profile(tau.img)
ax[1].plot(p_vert, arange(0, len(p_vert), 1), "-b",
label="vert profile")
ax[1].yaxis.tick_right()
ax[1].set_ylim([h0 - 1, 0])
setp(ax[1].xaxis.get_majorticklabels(), rotation=15)
ax[1].yaxis.tick_right()
# plot horizontal profile
line_hor = LineOnImage(0, self.xgrad_line_rownum, w0 - 1,
self.xgrad_line_rownum, line_id="hor")
p_hor = line_hor.get_line_profile(tau.img)
ax[2].plot(arange(0, len(p_hor), 1), p_hor, "-c",
label="hor profile")
# ax[2].get_yaxis().set_ticks(horYLabels)
# ax[2].set_ylim([-.05,.25])
ax[2].set_xlim([0, w0 - 1])
# subplots_adjust(wspace=0.02, hspace=0.02)
ax[2].axhline(0, ls="--", color="k")
ax[1].axvline(0, ls="--", color="k")
if edit_profile_labels:
low, high = tmin, tmax
if low is None:
low = p_vert.min()
if high is None:
high = p_vert.max()
_range = high - low
lbls = [0]
if high > 0 and high / _range > 0.2:
lbls.append(high - _range * .1)
if low < 0 and abs(low) / high > 0.5:
lbls.insert(0, low + _range * .1)
ax[1].get_xaxis().set_ticks(lbls)
lbl_str = ["%.2f" % lbl for lbl in lbls]
ax[1].set_xlim([low, high])
ax[1].set_xticklabels(lbl_str)
low, high = tmin, tmax
if low is None:
low = p_hor.min()
if high is None:
high = p_hor.max()
_range = high - low
lbls = [0]
if high > 0 and high / _range > 0.2:
lbls.append(high - _range * .1)
if low < 0 and abs(low) / high > 0.5:
lbls.insert(0, low + _range * .1)
ax[2].get_yaxis().set_ticks(lbls)
lbl_str = ["%.2f" % lbl for lbl in lbls]
ax[2].set_ylim([low, high])
ax[2].set_yticklabels(lbl_str)
ax[1].set_xlabel(r"$\tau$", fontsize=fsize_labels)
ax[2].set_ylabel(r"$\tau$", fontsize=fsize_labels)
if add_mode_info:
ax[0].set_xlabel("CORR_MODE: %s" % self.CORR_MODE,
fontsize=fsize_labels)
ax[0].legend(loc=legend_loc, fancybox=True, framealpha=0.7,
fontsize=fsize_legend)
return fig
"""Helpers"""
[docs] def settings_dict(self):
"""Write current sky reference areas and masks into dictionary."""
d = {}
d["mode"] = self.mode
d["surface_fit_mask"] = self.surface_fit_mask
d["surface_fit_pyrlevel"] = self.surface_fit_pyrlevel
d["surface_fit_polyorder"] = self.surface_fit_polyorder
d["scale_rect"] = self.scale_rect
d["ygrad_rect"] = self.ygrad_rect
d["ygrad_line_colnum"] = self.ygrad_line_colnum
d["ygrad_line_polyorder"] = self.ygrad_line_polyorder
d["ygrad_line_startrow"] = self.ygrad_line_startrow
d["ygrad_line_stoprow"] = self.ygrad_line_stoprow
d["ygrad_line_mask"] = self.ygrad_line_mask
d["xgrad_rect"] = self.xgrad_rect
d["xgrad_line_rownum"] = self.xgrad_line_rownum
d["xgrad_line_polyorder"] = self.xgrad_line_polyorder
d["xgrad_line_stopcol"] = self.xgrad_line_stopcol
d["xgrad_line_startcol"] = self.xgrad_line_startcol
d["xgrad_line_mask"] = self.xgrad_line_mask
return d
@property
def mode_info_dict(self):
"""Return information on available bg modelling modes."""
return od([[0, "No additional BG image: poly surface fit using plume"
" image pixels specified with mask"],
[1, "Scaling of bg image in rect scale_rect"],
[2, "Scaling (mode 1, scale_rect) and linear y gradient "
"correction using rects scale_rect and ygrad_rect"],
[3, "Scaling (mode 1, scale_rect) and quadratic y "
"gradient correction using vertical profile line"],
[4, "Like 2, including linear x gradient correction using "
"rect xgrad_rect"],
[5, "Like 3, including linear x gradient correction using "
"rect xgrad_rect"],
[6, "Like 3, including quadratic x gradient correction "
"using horizontal profile line"],
[99, "USE AS IS: no background modelling performed"]])
[docs] def mode_info(self, mode_num):
"""Return short information about one of the available modelling modes.
Parameters
----------
mode_num : int
the background modelling mode for which information is required
Returns
-------
str
short description of mode
"""
try:
return self.mode_info_dict[mode_num]
except KeyError:
return "Mode does not exist"
[docs] def print_mode_info(self):
"""Print information about the different correction modes."""
print_log.info("Available modes for automatic plume background retrieval")
for k, v in six.iteritems(self.mode_info_dict):
print_log.info("Mode %s: %s" % (k, v))
def _check_rect(self, rect, img):
"""Check if rect is not None and if it is within image borders.
:param list r: rectangular area ``[x0, y0, x1, y1]``
:param ndarray img: exemplary image
:return bool:
"""
if rect is None:
return False
h, w = img.shape
if rect[0] < 0 or rect[1] < 0 or rect[2] >= w or rect[3] >= h:
return False
return True
def _check_img_states(self, img, *more_images):
"""Check if other images have the same edit state relative to image."""
if not all([isinstance(x, Img) for x in more_images]):
raise TypeError("All provided images need to be of type Img")
if not all([x.is_vigncorr == img.is_vigncorr for x in more_images]):
raise AttributeError("Cannot model tau image: mismatch with "
"regard to vignetting correction state "
"between plume image(s) and sky radiance "
"image(s)")
elif not all([x.is_darkcorr == img.is_darkcorr for x in more_images]):
raise AttributeError("Cannot model tau image: mismatch with "
"regard to dark correction state "
"between plume image(s) and sky radiance "
"image(s)")
def __setitem__(self, key, value):
"""Update class item."""
if key in self.__dict__:
logger.info("Updating %s in background model" % key)
self.__dict__[key] = value
elif key == "mode":
"Updating %s in background model" % key
self.mode = value
elif key == "surface_fit_mask":
self.surface_fit_mask = value
elif key == "CORR_MODE":
logger.warning("Got input key CORR_MODE which is out-dated in versions 0.10+"
". Updated background modelling mode accordingly")
self.mode = value
def __call__(self, plume, bg, **kwargs):
return self.get_tau_image(plume, bg, **kwargs)
def _mean_in_rect(img_array, rect=None):
"""Get mean and standard deviation of pixels within rectangle.
:param ndarray imgarray: the image data
:param rect: rectanglular area ``[x0, y0, x1, y1]` where x0 < x1, y0 < y1
"""
if rect is None:
sub = img_array
else:
sub = img_array[rect[1]: rect[3], rect[0]: rect[2]]
return sub.mean(), sub.std()
[docs]def scale_tau_img(tau, rect):
"""Scale tau image such that it fulfills tau==0 in reference area."""
avg, _ = _mean_in_rect(tau, rect)
return tau - avg
[docs]def scale_bg_img(bg, plume, rect):
"""Normalise background image to plume image intensity in input rect.
Parameters
----------
bg : ndarray
background image
plume : ndarray
plume image
rect : list
list containing rectangle coordinates
Returns
-------
ndarray
the scaled background image
"""
# bg, plume = [x.img for x in [bg, plume] if isinstance(x, Img)]
mean_bg, _ = _mean_in_rect(bg, rect)
mean_img, _ = _mean_in_rect(plume, rect)
del_rad = mean_img / float(mean_bg)
return bg * del_rad
[docs]def corr_tau_curvature_vert_two_rects(tau0, r0, r1):
"""Apply vertical linear background curvature correction to tau img.
Retrieves pixel mean value from two rectangular areas and determines
linear offset function based on the vertical positions of the rectangle
center coordinates. The corresponding offset for each image row is then
subtracted from the input tau image
:param (ndarray, Img) tau0: inital tau image
:param list r0: 1st rectanglular area ``[x0, y0, x1, y1]`
:param list r1: 2nd rectanglular area ``[x0, y0, x1, y1]`
:return ndarray: modified tau image
"""
try:
tau0 = tau0.img
except BaseException:
pass
y0, y1 = 0.5 * (r0[1] + r0[3]), 0.5 * (r1[1] + r1[3])
max_y = tau0.shape[0]
mean_r0, _ = _mean_in_rect(tau0, r0)
mean_r1, _ = _mean_in_rect(tau0, r1)
slope = float(mean_r0 - mean_r1) / float(y0 - y1)
offs = mean_r1 - slope * y1
ygrid = linspace(0, max_y - 1, max_y, dtype=int)
poly_vals = offs + slope * ygrid
tau_mod = (tau0.T - poly_vals).T
return tau_mod # , vert_poly
[docs]def corr_tau_curvature_hor_two_rects(tau0, r0, r1):
"""Apply horizonzal linear background curvature correction to tau img.
Retrieves pixel mean values from two rectangular areas and determines
linear offset function based on the horizontal positions of the
rectangle center coordinates. The corresponding offset for each image
row is then subtracted from the input tau image.
:param (ndarray, Img) tau0: inital tau image
:param list r0: 1st rectanglular area ``[x0, y0, x1, y1]`
:param list r1: 2nd rectanglular area ``[x0, y0, x1, y1]`
:return ndarray: modified tau image
"""
try:
tau0 = tau0.img
except BaseException:
pass
x0, x1 = 0.5 * (r0[0] + r0[2]), 0.5 * (r1[0] + r1[2])
max_x = tau0.shape[1]
mean_r0, _ = _mean_in_rect(tau0, r0)
mean_r1, _ = _mean_in_rect(tau0, r1)
slope = float(mean_r0 - mean_r1) / float(x0 - x1)
offs = mean_r1 - slope * x1
xgrid = linspace(0, max_x - 1, max_x, dtype=int)
poly_vals = offs + slope * xgrid
tau_mod = tau0 - poly_vals
return tau_mod # , vert_poly
[docs]def corr_tau_curvature_vert_line(tau0, pos_x, start_y=0, stop_y=None,
row_mask=None, polyorder=2):
"""Correct vertical tau curvature using selected row indices of vertical line.
:param (ndarray, Img) tau0: inital tau image
:param int pos_x: x position of line (column number)
:param int start_y: first considered vertical index for fit (0)
:param int stop_y: last considered vertical index for fit (is set
to last row number if unspecified)
:param ndarray row_mask: boolean mask specifying considered row indices
(if valid, params start_y, stop_y are not considered)
:param int polyorder: order of polynomial to fit curvature
return tuple: 1st entry: modified tau image, second: fitted polynomial
"""
try:
tau0 = tau0.img
except BaseException:
pass
max_y = tau0.shape[0]
line_vert = LineOnImage(pos_x, 0, pos_x, max_y)
vert_profile = line_vert.get_line_profile(tau0)
if stop_y is None:
stop_y = max_y
ygrid = linspace(0, max_y - 1, max_y, dtype=int)
try:
if len(row_mask) == max_y:
mask = row_mask
except BaseException:
mask = logical_and(ygrid >= start_y, ygrid <= stop_y)
vert_poly = poly1d(polyfit(ygrid[mask], vert_profile[mask], polyorder))
poly_vals = vert_poly(ygrid)
tau_mod = (tau0.T - poly_vals).T
return (tau_mod, vert_poly)
[docs]def corr_tau_curvature_hor_line(tau0, pos_y, start_x=0, stop_x=None,
col_mask=None, polyorder=2):
# fixme: this doc is propably wrong
"""Correct vertical tau curvature using selected row indices of vertical line.
:param (ndarray, Img) tau0: inital tau image
:param int pos_y: y position of line (row number)
:param int start_x: first considered horizontal index for fit (0)
:param int stop_y: last considered horizontal index for fit (is
set to last col number if unspecified)
:param ndarray col_mask: boolean mask specifying considered column
indices (if valid, params start_x, stop_x are not considered)
:param int polyorder: order of polynomial to fit curvature
return tuple: 1st entry: modified tau image, second: fitted polynomial
"""
try:
tau0 = tau0.img
except BaseException:
pass
max_x = tau0.shape[1]
line_hor = LineOnImage(0, pos_y, max_x, pos_y)
hor_profile = line_hor.get_line_profile(tau0)
if stop_x is None:
stop_x = max_x
xgrid = linspace(0, max_x - 1, max_x, dtype=int)
try:
if len(col_mask) == max_x:
mask = col_mask
except BaseException:
mask = logical_and(xgrid >= start_x, xgrid <= stop_x)
hor_poly = poly1d(polyfit(xgrid[mask], hor_profile[mask], polyorder))
poly_vals = hor_poly(xgrid)
tau_mod = tau0 - poly_vals
return (tau_mod, hor_poly)
[docs]def find_sky_reference_areas(plume_img, sigma_blur=2, plot=False):
"""Take an input plume image and identify suitable sky reference areas."""
try:
plume = plume_img.img
except BaseException:
plume = plume_img
plume = gaussian_filter(plume, sigma_blur)
h, w = plume.shape
results = {}
vert_mag, hor_mag = int(h * 0.005) + 1, int(w * 0.005) + 1
# estimate mean intensity in left image part (without flank pixels)
y0_left = argmin(gradient(plume[vert_mag: h - vert_mag, hor_mag]))
avg_left = plume[vert_mag: y0_left - vert_mag, hor_mag:hor_mag * 2].mean()
# estimate mean intensity in right image part (without flank pixels
grad = gradient(plume[vert_mag: h - vert_mag, w - hor_mag])
y0_right = argmin(grad)
avg_right = plume[vert_mag: y0_right - vert_mag,
w - 2 * hor_mag: w - hor_mag].mean()
results["xgrad_line_rownum"] = vert_mag
# brighter on the right image side (assume this is clear sky)
if avg_right > avg_left:
results["ygrad_line_colnum"] = w - hor_mag
results["ygrad_line_stoprow"] = int(y0_right - 0.2 * vert_mag)
results["xgrad_line_startcol"] = int(w / 2.0)
results["xgrad_line_stopcol"] = int(w - 1)
results["scale_rect"] = [int(w - 5 * hor_mag), int(vert_mag),
int(w - hor_mag), int(5 * vert_mag)]
else:
results["ygrad_line_colnum"] = 1
results["ygrad_line_stoprow"] = int(y0_left - 2 * vert_mag)
results["xgrad_line_startcol"] = hor_mag
results["xgrad_line_stopcol"] = int(w / 2.0)
results["scale_rect"] = [int(hor_mag), int(vert_mag),
int(5 * hor_mag), int(5 * vert_mag)]
results["ygrad_line_startrow"] = 1
x0, y0, x1, y1 = results["scale_rect"]
ymax = results["ygrad_line_stoprow"]
results["ygrad_rect"] = [x0, int(ymax - 8 * hor_mag),
x1, int(ymax - 4 * hor_mag)]
results["xgrad_rect"] = [int(w / 2.0 - 2 * hor_mag), y0,
int(w / 2.0 + 2 * hor_mag), y1]
if plot:
plot_sky_reference_areas(plume, results)
return results
[docs]def plot_sky_reference_areas(plume_img, settings_dict, ax=None):
"""Plot provided sky reference areas into a plume image.
:param (ndarray, Img) plume_img: plume image data
:param dict settings_dict: dictionary containing settings (e.g. retrieved
using :func:`find_sky_reference_areas`)
"""
try:
plume = plume_img.img
except BaseException:
plume = plume_img
if ax is None:
fig, ax = subplots(1, 1)
r = settings_dict
h0, w0 = plume.shape[:2]
ax.imshow(plume, cmap="gray")
ax.plot([r["ygrad_line_colnum"], r["ygrad_line_colnum"]],
[r["ygrad_line_startrow"], r["ygrad_line_stoprow"]],
"-", c="lime", label="vert profile")
ax.plot([r["xgrad_line_startcol"], r["xgrad_line_stopcol"]],
[r["xgrad_line_rownum"], r["xgrad_line_rownum"]],
"-", c="lime", label="hor profile")
ax.set_xlim([0, w0 - 1])
ax.set_ylim([h0 - 1, 0])
xs, ys, ws, hs = _roi_coordinates(r["scale_rect"])
ax.add_patch(Rectangle((xs, ys), ws, hs, ec="lime", fc="lime", alpha=0.3,
label="scale_rect"))
xs, ys, ws, hs = _roi_coordinates(r["ygrad_rect"])
ax.add_patch(Rectangle((xs, ys), ws, hs, ec="b", fc="b", alpha=0.3,
label="ygrad_rect"))
xs, ys, ws, hs = _roi_coordinates(r["xgrad_rect"])
ax.add_patch(Rectangle((xs, ys), ws, hs, ec="c", fc="c", alpha=0.3,
label="xgrad_rect"))
ax.legend(loc="best", fancybox=True, framealpha=0.5, fontsize=10)
return ax
[docs]def find_sky_background(plume_img, next_img=None,
bgmodel_settings_dict=None,
lower_thresh=None,
apply_movement_search=True,
**settings_movement_search):
"""Prepare mask for background fit based on analysis of current image.
The mask is determined by applying an intensity threshold to a plume
image based on the intensities in 3 sky reference rectangles of an
instance of the :class:`PlumeBackgroundModel` object. If not specified
on input from an exisiting instance of the class (using
:param:`bgmodel_settings_dict`), the 3 required reference areas are
calculated automatically using :func:`find_sky_reference_areas`.
Alternatively, the lower threshold can be provided on input using
:param:`lower_thresh`.
Furthermore, pixels showing movement between the input image
:param:`plume_img`and a second provided image (i.e. :param:`next_img`)
can be excluded. The detection of movement between the two frames is
performed using the movement detection algorithm :func:`find_movement`
(see :mod:`plumespeed`).
Note
----
1. This is a Beta version
2. The method requires images in radiances space (i.e. plume and
terrain pixels appear darker than the sky background).
3. The input plume image should not contain clouds. If it does, it is
highly recommended to make use of the movement detection algorithm
Parameters
----------
plume_img : Img
the plume image for which the sky background pixels are supposed
to be detected
next_img : :obj:`Img`, optional
second image used to compute the optical flow in :param:`plume_img`
bgmodel_settings_dict : dict
dictionary containing information about sky reference areas (e.g.
created using :func:`settings_dict` from an existing instance of
the :class:`PlumeBackgroundModel`). If not specified, the required
reference rectangles (``scale_rect``, ``ygrad_rect``,
``xgrad_rect``) are determined automatically using
:func:`find_sky_reference_areas`.
lower_thresh : :obj:`float`, optional
lower intensity threshold. If provided, this value is used rather
than the value derived from the 3 sky reference rectangles (see
:param:`bgmodel_settings_dict`)
**settings_movement_search
additional keyword arguments passed to :func:`find_movement`.
Note that these may include settings for the optical flow
calculation which are further passed to the
initiation of the :class:`FarnebackSettings` class
Returns
-------
array
2D-numpy boolean numpy array specifying sky background pixels
"""
if bgmodel_settings_dict is None:
bgmodel_settings_dict = {}
if not isinstance(plume_img, Img):
raise ValueError("Invalid input for parameter plume_img: need"
"Img object, got %s" % type(plume_img))
if plume_img.is_tau:
raise AttributeError("Input plume_img is an optical density image. "
"This method only works for images in "
"intensity mode where pixel values are gray "
"values!")
if lower_thresh is None:
bg_model = PlumeBackgroundModel(**bgmodel_settings_dict)
bg_model.set_missing_ref_areas(plume_img)
mean, low, high = bg_model.mean_in_rects(plume_img)
lower_thresh = mean * 0.9
mask = (plume_img.img > lower_thresh)
if apply_movement_search:
if not isinstance(next_img, Img):
raise ValueError("Invalid input for parameter next_img: need"
"Img object, got %s" % type(next_img))
no_movement = ~find_movement(plume_img, next_img,
**settings_movement_search)
mask = mask * no_movement
return mask