# -*- coding: utf-8 -*-
#
# Pyplis is a Python library for the analysis of UV SO2 camera data
# Copyright (C) 2017 Jonas Gliss (jonasgliss@gmail.com)
#
# This program is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License a
# published by the Free Software Foundation, either version 3 of
# the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Pyplis module for image based correction of the signal dilution effect."""
from __future__ import (absolute_import, division)
from numpy import asarray, linspace, exp, ones, nan
from scipy.ndimage.filters import median_filter
from matplotlib.pyplot import subplots, rcParams
from collections import OrderedDict as od
from pandas import Series, DataFrame
import six
from pyplis import logger, print_log
from .utils import LineOnImage
from .image import Img
from .optimisation import dilution_corr_fit
from .model_functions import dilutioncorr_model
from .geometry import MeasGeometry
from .helpers import check_roi, isnum
from .imagelists import ImgList
from .exceptions import ImgModifiedError
LABEL_SIZE = rcParams["font.size"] + 2
[docs]class DilutionCorr(object):
r"""Class for management of dilution correction.
The class provides functionality to retrieve topographic distances from
meas geometry, to manage lines in the image used for the retrieval, to
perform the actual dilution fit (i.e. retrieval of atmospheric scattering
coefficients) and to apply the dilution correction.
This class does not store any results related to individual images.
Parameters
----------
lines : list
optional, list containing :class:`LineOnImage` objects used to
retrieve terrain distances for the dilution fit
meas_geometry : MeasGeometry
optional, measurement geometry (required for terrain distance
retrieval)
**settings :
settings for terrain distance retrieval:
- skip_pix: specify pixel step on line for which topo \
intersections are searched
- min_slope_angle: minimum slope of topography in order to be \
considered for topo distance retrieval
- topo_res_m: interpolation resolution applied to \
:class:`ElevationProfile` objects used to find intersections \
of pixel viewing direction with topography
"""
[docs] def __init__(self, lines=None, meas_geometry=None, **settings):
if lines is None:
lines = []
elif isinstance(lines, LineOnImage):
lines = [lines]
if not isinstance(lines, list):
raise TypeError("Invalid input type for parameter lines, need "
"LineOnGrid class or a python list containing "
"LineOnGrid objects")
if not isinstance(meas_geometry, MeasGeometry):
meas_geometry = MeasGeometry()
self.meas_geometry = meas_geometry
self.lines = od()
self.settings = {"skip_pix": 5,
"min_slope_angle": 5.0,
"topo_res_m": 5.0}
self._masks_lines = od()
self._dists_lines = od()
# additional retrieval points that were added manually using
# method add_retrieval_point
self._add_points = []
self._skip_pix = od()
self._geopoints = od()
self._geopoints["add_points"] = []
for line in lines:
self.lines[line.line_id] = line
self.update_settings(**settings)
@property
def line_ids(self):
"""Get IDs of all :class:`LineOnImage` objects for distance retrieval.
"""
return list(self.lines.keys())
[docs] def update_settings(self, **settings):
"""Update settings dict for topo distance retrieval."""
for k, v in six.iteritems(settings):
if k in self.settings:
self.settings[k] = v
[docs] def add_retrieval_line(self, line):
"""Add one topography retrieval line."""
if not isinstance(line, LineOnImage):
raise TypeError("Need LineOnImage object")
if line.line_id in self.line_ids:
raise KeyError("A line with ID %s is already assigned to Dilution "
"correction engine" % line.line_id)
self.lines[line.line_id] = line
[docs] def add_retrieval_point(self, pos_x_abs, pos_y_abs, dist=None):
"""Add a distinct pixel with known distance to image.
Parameters
----------
pos_x_abs : int
x-pixel position of point in image in absolute coordinate (i.e.
pyramid level 0 and not cropped)
pos_y_abs : int
y-pixel position of point in image in absolute coordinate (i.e.
pyramid level 0 and not cropped)
dist : :obj:`float`, optional
distance to feature in image in m. If None (default), the distance
will be estimated
"""
if not isnum(dist):
logger.info("Input distance for point unspecified, trying automatic "
"access")
(dist,
derr,
p) = self.meas_geometry.get_topo_distance_pix(pos_x_abs,
pos_y_abs)
self._geopoints["add_points"].append(p)
dist *= 1000.0
self._add_points.append((pos_x_abs, pos_y_abs, dist))
[docs] def det_topo_dists_all_lines(self, **settings):
"""Estimate distances to topo distances to all assigned lines.
Parameters
----------
**settings
keyword args passed to update search settings (:attr:`settings`)
and passed to
:func:`get_topo_distances_line` in :class:`MeasGeometry`
"""
for lid, line in six.iteritems(self.lines):
self.det_topo_dists_line(lid, **settings)
[docs] def det_topo_dists_line(self, line_id, **settings):
"""Estimate distances to pixels on current lines.
Retrieves distances to all :class:`LineOnImage` objects in
``self.lines`` using ``self.meas_geometry`` (i.e. camera position
and viewing direction).
Parameters
----------
line_id : str
ID of line
**settings :
additional key word args used to update search settings (passed to
:func:`get_topo_distances_line` in :class:`MeasGeometry`)
Returns
-------
array
retrieved distances
"""
if line_id not in self.lines.keys():
raise KeyError("No line with ID %s available" % line_id)
logger.info("Searching topo distances for pixels on line %s" % line_id)
self.update_settings(**settings)
l = self.lines[line_id]
res = self.meas_geometry.get_topo_distances_line(l, **self.settings)
dists = res["dists"] * 1000. # convert to m
self._geopoints[line_id] = res["geo_points"]
self._dists_lines[line_id] = dists
self._masks_lines[line_id] = res["ok"]
self._skip_pix[line_id] = self.settings["skip_pix"]
return dists
[docs] def get_radiances(self, img, line_ids=None):
"""Get radiances for dilution fit along terrain lines.
The data is only extracted along specified input lines. The terrain
distance retrieval :func:`det_topo_dists_lines_line` must have been
performed for that.
Parameters
----------
img : Img
vignetting corrected plume image from which the radiances are
extracted
line_ids : list
if desired, the data can also be accessed for specified line ids,
which have to be provided in a list. If empty (default), all lines
assigned to this class are considered
"""
if line_ids is None:
line_ids = []
if not isinstance(img, Img) or not img.edit_log["vigncorr"]:
raise ValueError("Invalid input, need Img class and Img needs to "
"be corrected for vignetting")
if img.is_cropped or img.is_resized:
raise ImgModifiedError("Image must not be cropped or rescaled")
if len(line_ids) == 0:
line_ids = self.line_ids
dists, rads = [], []
for line_id in line_ids:
if line_id in self._dists_lines:
skip = int(self._skip_pix[line_id])
l = self.lines[line_id]
mask = self._masks_lines[line_id]
dists.extend(self._dists_lines[line_id][mask])
rads.extend(l.get_line_profile(img)[::skip][mask])
else:
print_log.warning("Distances to line %s not available, please apply "
"distance retrieval first using class method "
"det_topo_dists_line")
for x, y, dist in self._add_points:
dists.append(dist)
rads.append(img.img[y, x])
return asarray(dists), asarray(rads)
[docs] def apply_dilution_fit(self, img, rad_ambient, i0_guess=None,
i0_min=0, i0_max=None, ext_guess=1e-4, ext_min=0,
ext_max=1e-3, line_ids=None, plot=True, **kwargs):
r"""Perform dilution correction fit to retrieve extinction coefficient.
Uses :func:`dilution_corr_fit` of :mod:`optimisation` which is a
bounded least square fit based on the following model function
.. math::
I_{meas}(\lambda) = I_0(\lambda)e^{-\epsilon(\lambda)d} +
I_A(\lambda)(1-e^{-\epsilon(\lambda)d})
Parameters
----------
img : Img
vignetting corrected image for radiance extraction
rad_ambient : float
ambient intensity (:math:`I_A` in model)
i0_guess : float
optional: guess value for initial intensity of topographic
features, i.e. the reflected radiation before entering scattering
medium (:math:`I_0` in model, if None, then it is set 5% of the
ambient intensity ``rad_ambient``)
i0_min : float
optional: minimum initial intensity of topographic features
i0_max : float
optional: maximum initial intensity of topographic features
ext_guess : float
guess value for atm. extinction coefficient
(:math:`\epsilon` in model)
ext_min : float
minimum value for atm. extinction coefficient
ext_max : float
maximum value for atm. extinction coefficient
line_ids : list
if desired, the data can also be accessed for specified line ids,
which have to be provided in a list. If empty (default), all lines
are considered
plot : bool
if True, the result is plotted
**kwargs :
additional keyword args passed to plotting function (e.g. to
pass an axes object)
Returns
-------
tuple
4-element tuple containing
- retrieved extinction coefficient
- retrieved initial intensity
- fit result object
- axes instance or None (dependent on :param:`plot`)
"""
if line_ids is None:
line_ids = []
dists, rads = self.get_radiances(img, line_ids)
fit_res = dilution_corr_fit(rads, dists, rad_ambient, i0_guess,
i0_min, i0_max, ext_guess,
ext_min, ext_max)
i0, ext = fit_res.x
ax = None
if plot:
ax = self.plot_fit_result(dists, rads, rad_ambient, i0, ext,
**kwargs)
return ext, i0, fit_res, ax
[docs] def get_ext_coeffs_imglist(self, lst, roi_ambient=None, apply_median=5,
**kwargs):
"""Apply dilution fit to all images in an :class:`ImgList`.
Parameters
----------
lst : ImgList
image list for which the coefficients are supposed to be retrieved
roi_ambient : list
region of interest used to estimage ambient intensity, if None
(default), usd :attr:`scale_rect` of :class:`PlumeBackgroundModel`
of the input list
apply_median : int
if > 0, then a median filter of provided width is applied to
the result time series (ext. coeffs and initial intensities)
**kwargs :
additional keyword args passed to dilution fit method
:func:`apply_dilution_fit`.
Returns
-------
DataFrame
pandas data frame containing time series of retrieved extinction
coefficients and initial intensities as well as the ambient
intensities used, access keys are:
- ``coeffs``: retrieved extinction coefficients
- ``i0``: retrieved initial intensities
- ``ia``: retrieved ambient intensities
"""
if not isinstance(lst, ImgList):
raise ValueError("Invalid input type for param lst, need ImgList")
lst.vigncorr_mode = True
if not check_roi(roi_ambient):
try:
roi_ambient = lst.bg_model.scale_rect
except BaseException:
pass
if not check_roi(roi_ambient):
raise ValueError("Input parameter roi_ambient is not a valied"
"ROI and neither is scale_rect in background "
"model of input image list...")
cfn = lst.cfn
lst.goto_img(0)
nof = lst.nof
times = lst.acq_times
coeffs = []
i0s = []
ias = []
for k in range(nof):
img = lst.current_img()
try:
ia = img.crop(roi_ambient, True).mean()
ext, i0, _, _ = self.apply_dilution_fit(img=img,
rad_ambient=ia,
plot=False,
**kwargs)
coeffs.append(ext)
i0s.append(i0)
ias.append(ia)
except BaseException:
coeffs.append(nan)
i0s.append(nan)
ias.append(nan)
lst.goto_next()
lst.goto_img(cfn)
if apply_median > 0:
coeffs = median_filter(coeffs, apply_median)
i0s = median_filter(i0s, apply_median)
ias = median_filter(ias, apply_median)
return DataFrame(dict(coeffs=coeffs, i0=i0s, ia=ias), index=times)
[docs] def correct_img(self, plume_img, ext, plume_bg_img, plume_dists,
plume_pix_mask):
"""Perform dilution correction for a plume image.
Note
-----
See :func:`correct_img` for description
Returns
-------
Img
dilution corrected image
"""
return correct_img(plume_img, ext, plume_bg_img, plume_dists,
plume_pix_mask)
[docs] def plot_fit_result(self, dists, rads, rad_ambient, i0, ext, ax=None):
"""Plot result of dilution fit."""
if ax is None:
fig, ax = subplots(1, 1)
x = linspace(0, dists.max(), 100)
ints = dilutioncorr_model(x, rad_ambient, i0, ext)
ax.plot(dists / 1000.0, rads, " x", label="Data")
ext_perkm = ext * 1000
lbl_fit = (r"Fit: $I_0$=%.1f DN, $\epsilon$ = %.4f km$^{-1}$"
% (i0, ext_perkm))
ax.plot(x / 1000.0, ints, "--c", label=lbl_fit)
ax.set_xlabel("Distance [km]", fontsize=LABEL_SIZE)
ax.set_ylabel("Radiances [DN]", fontsize=LABEL_SIZE)
ax.set_title(r"$I_A$ = %.1f" % rad_ambient, fontsize=LABEL_SIZE + 2)
ax.grid()
# ax = rotate_ytick_labels(ax, deg=45, va="center")
ax.legend(loc="best", fancybox=True, framealpha=0.5, fontsize=13)
return ax
[docs] def get_extinction_coeffs_imglist(self, imglist, ambient_roi_abs,
darkcorr=True, line_ids=None,
**fit_settings):
"""Retrieve extinction coefficients for all imags in list.
.. note::
Alpha version: not yet tested
"""
if line_ids is None:
line_ids = []
imglist.aa_mode = False
imglist.tau_mode = False
imglist.auto_reload = False
imglist.darkcorr_mode = True
if imglist.gaussian_blurring and imglist.pyrlevel == 0:
logger.info("Adding gaussian blurring of 2 for topographic radiance "
"retrieval")
imglist.gaussian_blurring = 2
if imglist.pyrlevel != list(self.lines.values())[0].pyrlevel:
raise ValueError("Mismatch in pyramid level of lines and imglist")
if len(line_ids) == 0:
line_ids = self.line_ids
imglist.vigncorr_mode = True
imglist.goto_img(0)
imglist.auto_reload = True
num = imglist.nof
i0s, exts, acq_times = ones(num) * nan, ones(num) * nan, [nan] * num
for k in range(num):
img = imglist.current_img()
rad_ambient = img.crop(ambient_roi_abs, True).mean()
ext, i0, _, _ = self.apply_dilution_fit(img, rad_ambient,
line_ids=line_ids,
plot=False,
**fit_settings)
acq_times[k] = img.meta["start_acq"]
i0s[k] = i0
exts[k] = ext
return Series(exts, acq_times), Series(i0s, acq_times)
[docs] def plot_distances_3d(self, draw_cam=1, draw_source=1, draw_plume=0,
draw_fov=0, cmap_topo="Oranges",
contour_color="#708090", contour_antialiased=True,
contour_lw=0.2, axis_off=True, line_ids=None,
**kwargs):
"""Draw 3D map of scene including geopoints of distance retrievals.
Parameters
----------
draw_cam : bool
insert camera position into map
draw_source : bool
insert source position into map
draw_plume : bool
insert plume vector into map
draw_fov : bool
insert camera FOV (az range) into map
cmap_topo : str
string specifying colormap for topography surface plot defaults to
"Oranges"
contour_color : str
string specifying color of contour lines colors of topo contour
lines (default: "#708090")
contour_antialiased : bool
apply antialiasing to surface plot of topography, defaults to False
contour_lw :
width of drawn contour lines, defaults to 0.5, use 0 if you do not
want contour lines inserted
axis_off : bool
if True, then the rendering of axes is excluded
line_ids : list
if desired, the data can also be accessed for specified line ids,
which have to be provided in a list. If empty (default), all topo
lines are drawn
Returns
-------
Map
plotted map instance (is of type Basemap)
"""
if line_ids is None:
line_ids = []
map3d = self.meas_geometry.draw_map_3d(
draw_cam, draw_source,
draw_plume, draw_fov,
cmap_topo,
contour_color=contour_color,
contour_antialiased=contour_antialiased,
contour_lw=contour_lw)
if len(line_ids) == 0:
line_ids = self.line_ids
for line_id in self.line_ids:
if line_id in self._dists_lines:
line = self.lines[line_id]
mask = self._masks_lines[line_id]
pts = self._geopoints[line_id][mask]
map3d.add_geo_points_3d(pts, color=line.color, **kwargs)
for pt in self._geopoints["add_points"]:
map3d.draw_geo_point_3d(pt, color="r")
if axis_off:
map3d.ax.set_axis_off()
return map3d
[docs]def correct_img(plume_img, ext, plume_bg_img, plume_dists, plume_pix_mask):
"""Perform dilution correction for a plume image.
Corresponds to Eq. 4 in in `Campion et al., 2015 <http://
www.sciencedirect.com/science/article/pii/S0377027315000189>`_.
Parameters
----------
plume_img : Img
vignetting corrected plume image
ext : float
atmospheric extinction coefficient
plume_bg_img : Img
vignetting corrected plume background image (can be, for instance,
retrieved using :mod:`plumebackground`)
plume_dists : :obj:`array`, :obj:`Img`, :obj:`float`
plume distance(s) in m. If input is numpy array or :class:`Img` then,
it must have the same shape as :param:`plume_img`
plume_pix_mask : ndarray
mask specifying plume pixels (only those are corrected), can also be
type :class:`Img`
Returns
-------
Img
dilution corrected image
"""
for im in [plume_img, plume_bg_img]:
if not isinstance(im, Img) or im.edit_log["vigncorr"] is False:
raise ValueError("Plume and background image need to be Img "
"objects and vignetting corrected")
try:
plume_dists = plume_dists.img
except BaseException:
pass
try:
plume_pix_mask = plume_pix_mask.img
except BaseException:
pass
dists = plume_pix_mask * plume_dists
plume_img.img = ((plume_img.img - plume_bg_img.img *
(1 - exp(-ext * dists))) / exp(-ext * dists))
plume_img.edit_log["dilcorr"] = True
return plume_img
[docs]def get_topo_dists_lines(lines, geom, img=None, skip_pix=5, topo_res_m=5.0,
min_slope_angle=5.0, plot=False, line_color="lime"):
if isinstance(lines, LineOnImage):
lines = [lines]
ax = None
map3d = None
pts, dists, mask = [], [], []
for line in lines:
l = line.to_list() # line coords as list
res = geom.get_topo_distances_line(l, skip_pix, topo_res_m,
min_slope_angle)
pts.extend(res["geo_points"])
dists.extend(res["dists"])
mask.extend(res["ok"])
pts, dists = asarray(pts), asarray(dists) * 1000.
if plot:
if isinstance(img, Img):
ax = img.show()
h, w = img.img.shape
for line in lines:
line.plot_line_on_grid(ax=ax, color=line_color, marker="")
ax.set_xlim([0, w - 1])
ax.set_ylim([h - 1, 0])
map3d = geom.draw_map_3d(0, 0, 0, 0, cmap_topo="gray")
# insert camera position into 3D map
map3d.add_geo_points_3d(pts, color=line_color)
geom.cam_pos.plot_3d(map=map3d, add_name=True, dz_text=40)
map3d.ax.set_axis_off()
return dists, asarray(mask), map3d, ax
[docs]def get_extinction_coeff(rads, dists, rad_ambient, plot=True, **kwargs):
"""Perform dilution correction fit to retrieve extinction coefficient.
:param ndarray rads: radiances retrieved for topographic features
:param ndarray dists: distances corresponding to ``rads``
:param rad_ambient: ambient sky intensity
:param bool plot: if True, the result is plotted
:param **kwargs: additional keyword arguments for fit settings (passed
to :func:`dilution_corr_fit` of module :mod:`optimisation`)
"""
fit_res = dilution_corr_fit(rads, dists, rad_ambient, **kwargs)
i0, ext = fit_res.x
ax = None
if plot:
x = linspace(0, dists.max(), 100)
ints = dilutioncorr_model(x, rad_ambient, i0, ext)
fig, ax = subplots(1, 1)
ax.plot(dists, rads, " x", label="Data")
lbl_fit = r"Fit result: $I_0$=%.1f DN, $\epsilon$ = %.2e" % (i0, ext)
ax.plot(x, ints, "--c", label=lbl_fit)
ax.set_xlabel("Distance [m]")
ax.set_ylabel("Radiances [DN]")
ax.legend(loc="best", fancybox=True, framealpha=0.5, fontsize=12)
return ext, i0, fit_res, ax