# -*- coding: utf-8 -*-
#
# Pyplis is a Python library for the analysis of UV SO2 camera data
# Copyright (C) 2017 Jonas Gliss (jonasgliss@gmail.com)
#
# This program is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License a
# published by the Free Software Foundation, either version 3 of
# the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
r"""Pyplis module contains the following processing classes and methods.
1. :class:`ImgStack`: Object for storage of 3D image data
#. :class:`PixelMeanTimeSeries`: storage and post analysis of time\
series of average pixel intensities
"""
from __future__ import (absolute_import, division)
from numpy import (vstack, empty, ones, asarray, sum, dstack, float32, zeros,
poly1d, polyfit, argmin, where, logical_and, rollaxis,
delete, hstack)
from scipy.ndimage.filters import gaussian_filter1d, median_filter
from copy import deepcopy
from datetime import datetime, timedelta
from matplotlib.pyplot import subplots
from matplotlib.dates import date2num, DateFormatter
from pandas import Series, concat, DatetimeIndex
from cv2 import pyrDown, pyrUp
from os.path import join, exists, dirname, basename, isdir, abspath
from astropy.io import fits
import six
from pyplis import logger
from .image import Img
from .setupclasses import Camera
from .helpers import to_datetime, make_circular_mask
from .glob import DEFAULT_ROI
[docs]class ImgStack(object):
"""Image stack object.
The images are stacked into a 3D numpy array, note, that for large datasets
this may cause MemoryErrors. This object is for instance used to perform
a DOAS field of view search (see also :mod:`doascalib`).
It provides basic image processing functionality, for instance changing
the pyramid level, time merging with other time series data (e.g. DOAS
CD time series vector).
The most important attributes (data objects) are:
1. ``self.stack``: 3D numpy array containing stacked images. The first
axis corresponds to the time axis, allowing for easy image access,
e.g. ``self.stack[10]`` would yield the 11th image in the time series.
2. ``self.start_acq``: 1D array containing acquisition time stamps
(datetime objects)
3. ``self.texps``: 1D array conaining exposure times in s for each
image
4. ``self.add_data``: 1D array which can be used to store additional
data for each image (e.g. DOAS CD vector)
Todo
----
1. Include optical flow routine for emission rate retrieval
Parameters
----------
height : int
height of images to be stacked
width : int
width of images to be stacked
num : int
number of images to be stacked
dtype :
numerical data type (e.g. uint8, makes the necessary space smaller,
default: float32)
stack_id : str
string ID of this object ("")
img_prep : dict
additional information about the preparation state of the images
(e.g. roi, gauss pyramid level, dark corrected?, blurred?)
**stack_data
can be used to pass stack data directly
"""
[docs] def __init__(self, height=0, width=0, img_num=0, dtype=float32,
stack_id="", img_prep=None, camera=None, **stack_data):
self.stack_id = stack_id
self.dtype = dtype
self.current_index = 0
self.stack = None
self.start_acq = None
self.texps = None
self.add_data = None
self._access_mask = None
if img_prep is None:
img_prep = {"pyrlevel": 0}
self.img_prep = img_prep
self.roi_abs = DEFAULT_ROI
self._cam = Camera()
self.init_stack_array(height, width, img_num)
if "stack" in stack_data:
self.set_stack_data(**stack_data)
if isinstance(camera, Camera):
self.camera = camera
[docs] def init_stack_array(self, height=0, width=0, img_num=0):
"""Initialize the actual stack data array.
Note
----
All current data stored in :attr:`stack`, :attr:`start_acq`,
:attr:`texps`, :attr:`add_data` will be deleted.
Parameters
----------
height : int
height of images to be stacked
width : int
width of images to be stacked
num : int
number of images to be stacked
"""
try:
self.stack = empty((int(img_num), int(height), int(width))).\
astype(self.dtype)
except MemoryError:
raise MemoryError("Could not initiate empty 3D numpy array "
"(d, h, w): (%s, %s, %s)" % (img_num, height,
width))
self.start_acq = asarray([datetime(1900, 1, 1)] * img_num)
self.texps = zeros(img_num, dtype=float32)
self.add_data = zeros(img_num, dtype=float32)
self._access_mask = zeros(img_num, dtype=bool)
self.current_index = 0
@property
def last_index(self):
"""Return last index."""
return self.num_of_imgs - 1
@property
def start(self):
"""Return start time stamp of first image."""
try:
i = self.start_acq[self._access_mask][0]
add = timedelta(self.texps[self._access_mask][0] / 86400.)
return i + add
except IndexError:
raise IndexError("Stack is empty...")
except BaseException:
raise ValueError("Start acquisition time could accessed in stack")
@property
def stop(self):
"""Return start time stamp of first image."""
try:
i = self.start_acq[self._access_mask][-1]
add = timedelta(self.texps[self._access_mask][-1] / 86400.)
return i + add
except IndexError:
raise IndexError("Stack is empty...")
except BaseException:
raise ValueError("Start acquisition time could accessed in stack")
@property
def time_stamps(self):
"""Acq. time stamps of all images."""
try:
dts = ([timedelta(x / (2 * 86400.)) for x in self.texps])
return self.start_acq + asarray(dts)
except BaseException:
raise ValueError("Failed to access information about acquisition "
"time stamps and / or exposure times")
@property
def pyrlevel(self):
"""Gauss pyramid level of images in stack."""
return self.img_prep["pyrlevel"]
@property
def camera(self):
"""Camera object assigned to stack."""
return self._cam
@camera.setter
def camera(self, value):
if isinstance(value, Camera):
self._cam = value
else:
raise TypeError("Need Camera object...")
@property
def num_of_imgs(self):
"""Depth of stack."""
return self.stack.shape[0]
[docs] def check_index(self, idx=0):
if 0 <= idx <= self.last_index:
return
elif idx == self.num_of_imgs:
self._extend_stack_array()
else:
raise IndexError("Invalid index %d for inserting image in stack "
"with current depth %d" % (idx, self.num_of_imgs))
def _extend_stack_array(self):
"""Extend the first index of the stack array."""
h, w = self.shape[1:]
try:
self.stack = vstack((self.stack, empty((1, h, w))))
except MemoryError:
raise MemoryError("Cannot add more data to stack due to memory "
"overflow...")
self.start_acq = hstack((self.start_acq, [datetime(1900, 1, 1)]))
self.texps = hstack((self.texps, [0.0]))
self.add_data = hstack((self.add_data, [0.0]))
self._access_mask = hstack((self._access_mask, [False]))
[docs] def insert_img(self, pos, img_arr, start_acq=datetime(1900, 1, 1),
texp=0.0, add_data=0.0):
"""Insert an image into the stack at provided index.
Parameters
----------
pos : int
Insert position of img in stack
img_arr : array
image data (must have same dimension than ``self.stack.shape[:2]``,
can also be of type :obj:`Img`)
start_acq : datetime
acquisition time stamp of image, defaults to datetime(1900, 1, 1)
texp : float
exposure time of image (in units of s), defaults to 0.0
add_data
arbitrary additional data appended to list :attr:`add_data`
"""
try:
img_arr = img_arr.img
except BaseException:
pass
if sum(self.shape) == 0:
h, w = img_arr.shape
self.init_stack_array(height=h, width=w, img_num=1)
self.check_index(pos)
self.stack[pos] = img_arr
self.start_acq[pos] = to_datetime(start_acq)
self.texps[pos] = texp
self.add_data[pos] = add_data
self._access_mask[pos] = True
[docs] def add_img(self, img_arr, start_acq=datetime(1900, 1, 1), texp=0.0,
add_data=0.0):
"""Add image at current index position.
The image is inserted at the current index position ``current_index``
which is increased by 1 afterwards. If the latter exceeds the dimension
of the actual stack data array :attr:`stack`, the stack shape will be
extended by 1.
Parameters
----------
img_arr : array
image data (must have same dimension than ``self.stack.shape[:2]``)
start_acq : datetime
acquisition time stamp of image, defaults to datetime(1900, 1, 1)
texp : float
exposure time of image (in units of s), defaults to 0.0
add_data
arbitrary additional data appended to list :attr:`add_data`
"""
# ==============================================================================
# if self.current_index >= self.last_index:
# print self.last_index
# raise IndexError("Last stack index reached...")
# ==============================================================================
self.insert_img(self.current_index, img_arr, start_acq, texp, add_data)
self.current_index += 1
[docs] def make_circular_access_mask(self, cx, cy, radius):
"""Create a circular mask for stack.
Parameters
----------
cx : int
x position of centre
cy : nint
y position of centre
radius : int
radius
Returns
-------
array
circular mask (use e.g. like ``img[mask]`` which will return a
1D vector containing all pixel values of ``img`` that fall into
the mask)
"""
# cx, cy = self.img_prep.map_coordinates(pos_x_abs, pos_y_abs)
h, w = self.stack.shape[1:]
return make_circular_mask(h, w, cx, cy, radius)
[docs] def set_stack_data(self, stack, start_acq=None, texps=None):
"""Set the current data based on input.
Parameters
----------
stack : array
3D numpy array containing the image stack data
start_acq : :obj:`array`, optional
array containing acquisition time stamps
texps : obj:`array`, optional
array containing exposure times
"""
num = stack.shape[0]
self.stack = stack
if start_acq is None:
start_acq = asarray([datetime(1900, 1, 1)] * num)
self.start_acq = start_acq
if texps is None:
texps = zeros(num, dtype=float32)
self.texps = texps
self._access_mask = ones(num, dtype=bool)
[docs] def get_data(self):
"""Get stack data (containing of stack, acq. and exp. times).
Returns
-------
tuple
3-element tuple containing
- :obj:`array`: stack data
- :obj:`array`: acq. time stamps
- :obj:`array`: exposure times
"""
m = self._access_mask
return (self.stack[m], asarray(self.time_stamps)[m],
asarray(self.texps)[m])
[docs] def apply_mask(self, mask):
"""Convolves the stack data with a input mask along time axis.
Parameter
---------
mask : array
2D bool mask for image pixel access
Returns
-------
tuple
3-element tuple containing
- :obj:`array`: 3D numpy array containing convolved stack data
- :obj:`array`: acq. time stamps
- :obj:`array`: exposure times
"""
# mask_norm = boolMask.astype(float32)/sum(boolMask)
d = self.get_data()
# [:, :, newaxis])#, d[1], d[2])
data_conv = (d[0] * mask.astype(float32))
return (data_conv, d[1], d[2])
[docs] def get_time_series(self, pos_x=None, pos_y=None, radius=1, mask=None):
"""Get time series in a ROI.
Retrieve time series at a given pixel position *in stack
coordinates* in a circular pixel neighbourhood.
Parameters
----------
pos_x : int
x position of center pixel on detector
pos_y : int
y position of center pixel on detector
radius : float
radius of pixel disk on detector (centered around pos_x, pos_y,
default: 1)
mask : array
mask for image pixel access, default is None, if the mask is
specified and valid (i.e. same shape than images in stack) then
the other three input parameter are ignored
Returns
-------
tuple
2-element tuple containing
- :obj:`Series`: time series data
- :obj:`array`: pixel access mask used to convolve stack images
"""
d = self.get_data()
try:
data_mask, start_acq, texps = self.apply_mask(mask)
except BaseException:
if not radius > 0:
raise ValueError("Invalid input for param radius (3. pos): "
"value must be larger than 0, got %d"
% radius)
if radius == 1:
mask = zeros(self.shape[1:]).astype(bool)
mask[pos_y, pos_x] = True
s = Series(d[0][self._access_mask, pos_y, pos_x], d[1])
return s, mask
mask = self.make_circular_access_mask(pos_x, pos_y, radius)
data_mask, start_acq, texps = self.apply_mask(mask)
values = data_mask.sum((1, 2)) / float(sum(mask))
return Series(values, start_acq), mask
[docs] def merge_with_time_series(self, time_series, method="average",
**kwargs):
"""High level wrapper for data merging.
Choose from either of three methods to perform an index merging based
on time stamps of stack and of other time series data (provided on
input).
Parameters
----------
time_series : Series
time series data supposed to be merged with stack data
method : str
merge method, currently available methods are:
- average: determine new stack containing images averaged based
on start / stop time stamps of each datapoint in input
``time_series`` (requires corresponding data to be available
in input, i.e. ``time_series`` must be of type
:class:`DoasResults` of ``pydoas`` library).
- nearest: perform merging based on nearest datapoint per image
- interpolation: perform cross interpolation onto unified time
index array from stack and time series data
**kwargs
additional keyword args specifying additional merge settings (e.g.
``itp_type=quadratic`` in case ``method=interpolation`` is used)
Returns
-------
tuple
2-element tuple containing
- :obj:`ImgStack`: new stack containing merged data
- :obj:`Series`: merged time series data
"""
if not isinstance(time_series, Series):
raise TypeError("Could not merge stack data with input data: "
"wrong type: %s" % type(time_series))
if method == "average":
try:
return self._merge_tseries_average(time_series, **kwargs)
except BaseException:
logger.info("Failed to merge data using method average, trying "
"method nearest instead")
method = "nearest"
if method == "nearest":
return self._merge_tseries_nearest(time_series, **kwargs)
elif method == "interpolation":
return self._merge_tseries_cross_interpolation(time_series,
**kwargs)
else:
raise TypeError("Unkown merge type: %s. Choose from "
"[nearest, average, interpolation]")
def _merge_tseries_nearest(self, time_series):
"""Find nearest in time image for each time stamp in input series.
Find indices (and time differences) in input time series of nearest
data point for each image in this stack. Then, get rid of all indices
showing double occurences using time delta information.
"""
stack, time_stamps, texps = self.get_data()
nearest_idxs, del_ts = self.get_nearest_indices(time_series.index)
img_idxs = []
spec_idxs_final = []
del_ts_abs = []
for idx in range(min(nearest_idxs), max(nearest_idxs) + 1):
logger.info("Current tseries index %s" % idx)
matches = where(nearest_idxs == idx)[0]
if len(matches) > 0:
del_ts_temp = del_ts[matches]
spec_idxs_final.append(idx)
del_ts_abs.append(min(del_ts_temp))
img_idxs.append(matches[argmin(del_ts_temp)])
series_new = time_series[spec_idxs_final]
try:
series_new.fit_errs = time_series.fit_errs[spec_idxs_final]
except BaseException:
pass
stack_new = self.stack[img_idxs]
texps_new = asarray(self.texps[img_idxs])
start_acq_new = asarray(self.start_acq[img_idxs])
stack_obj_new = ImgStack(stack_id=self.stack_id,
img_prep=self.img_prep, stack=stack_new,
start_acq=start_acq_new, texps=texps_new)
stack_obj_new.roi_abs = self.roi_abs
stack_obj_new.add_data = series_new
return (stack_obj_new, series_new)
def _merge_tseries_cross_interpolation(self, time_series,
itp_type="linear"):
"""Merge this stack with input data using interpolation.
:param Series time_series_data: pandas Series object containing time
series data (e.g. DOAS column densities)
:param str itp_type: interpolation type (passed to
:class:`pandas.DataFrame` which does the interpolation, default is
linear)
"""
h, w = self.shape[1:]
stack, time_stamps, _ = self.get_data()
# first crop time series data based on start / stop time stamps
time_series = self.crop_other_tseries(time_series)
time_series.name = None
if not len(time_series) > 0:
raise IndexError("Time merging failed, data does not overlap")
# interpolate exposure times
s0 = Series(self.texps, time_stamps)
try:
errs = Series(time_series.fit_errs, time_series.index)
df0 = concat([s0, time_series, errs], axis=1).\
interpolate(itp_type).dropna()
except BaseException:
df0 = concat([s0, time_series], axis=1).\
interpolate(itp_type).dropna()
new_num = len(df0[0])
if not new_num >= self.num_of_imgs:
raise ValueError("Unexpected error, length of merged data "
"array does not exceed length of inital image "
"stack...")
# create new arrays for the merged stack
new_stack = empty((new_num, h, w))
new_acq_times = df0[0].index
new_texps = df0[0].values
for i in range(h):
for j in range(w):
logger.info("Stack interpolation active...: current img row (y):"
"%s (%s)" % (i, j))
# get series from stack at current pixel
series_stack = Series(stack[:, i, j], time_stamps)
# create a dataframe
df = concat([series_stack, df0[1]], axis=1).\
interpolate(itp_type).dropna()
# throw all N/A values
# df = df.dropna()
new_stack[:, i, j] = df[0].values
stack_obj = ImgStack(new_num, h, w,
stack_id=self.stack_id,
img_prep=self.img_prep)
stack_obj.roi_abs = self.roi_abs
# print new_stack.shape, new_acq_times.shape, new_texps.shape
stack_obj.set_stack_data(new_stack, new_acq_times, new_texps)
new_series = df[1]
try:
new_series.fit_errs = df0[2].values
except BaseException:
logger.info("Failed to access / process errors on time series data")
return (stack_obj, new_series)
def _merge_tseries_average(self, time_series):
"""Make new stack of averaged images based on input start / stop arrays.
The averaging is based on the start / stop time stamps (e.g. of
measured spectra) specified by two input arrays.
These arrays must have the same length.
The method loops over these arrays indices and at each iteration step
k, all images (wihin this stack) falling into the corresponding
start / stop interval are averaged and added to a new stack of averaged
images. Indices k (of the input arrays) for which
no images can be found are added to the list ``bad_indices`` (second
return parameter) and have to be removed from the corresponding data
in case, these data (e.g. DOAS SO2 CD time series) is supposed to be
compared with the averaged stack.
Parameters
----------
time_series : DoasResults
Time series containing DOAS results, including arrays
for start / stop acquisition time stamps (required for averaging)
Returns
-------
tuple
2-element tuple containing
- :class:`ImgStack`: new stack object with averaged images
- :obj:`list`: list of bad indices (where no overlap was found)
"""
try:
if not time_series.has_start_stop_acqtamps():
raise ValueError("No start / stop acquisition time stamps "
"available in input data...")
start_acq = asarray(time_series.start_acq)
stop_acq = asarray(time_series.stop_acq)
except BaseException:
raise
stack, times, texps = self.get_data()
h, w = stack.shape[1:]
num = len(start_acq)
# new_stack = empty((h, w, self.num_of_imgs))
new_acq_times = []
new_texps = []
bad_indices = []
counter = 0
for k in range(num):
i = start_acq[k]
f = stop_acq[k]
texp = (f - i).total_seconds()
cond = (times >= i) & (times < f)
if sum(cond) > 0:
# ==============================================================================
# print ("Found %s images for spectrum #%s (of %s)"
# %(sum(cond), k, num))
# ==============================================================================
im = stack[cond].mean(axis=0)
if counter == 0:
new_stack = im
else:
new_stack = dstack((new_stack, im))
new_acq_times.append(i + (f - i) / 2)
# img_avg_info.append(sum(cond))
new_texps.append(texp)
counter += 1
else:
bad_indices.append(k)
new_stack = rollaxis(new_stack, 2)
stack_obj = ImgStack(len(new_texps), h, w,
stack_id=self.stack_id,
img_prep=self.img_prep)
stack_obj.roi_abs = self.roi_abs
stack_obj.set_stack_data(new_stack, asarray(new_acq_times),
asarray(new_texps))
tseries = time_series.drop(time_series.index[bad_indices])
try:
errs = delete(time_series.fit_errs, bad_indices)
tseries.fit_errs = errs
except BaseException:
pass
return (stack_obj, tseries)
"""Helpers
"""
[docs] def crop_other_tseries(self, time_series):
"""Crops other time series object based on start / stop time stamps."""
# ==============================================================================
# start = self.start - self.total_time_period_in_seconds() * tol_borders
# stop = self.stop + self.total_time_period_in_seconds() * tol_borders
# ==============================================================================
cond = logical_and(time_series.index >= self.start,
time_series.index <= self.stop)
new = time_series[cond]
try:
new.fit_errs = new.fit_errs[cond]
except BaseException:
pass
return new
[docs] def total_time_period_in_seconds(self):
"""Return start time stamp of first image."""
return (self.stop - self.start).total_seconds()
[docs] def get_nearest_indices(self, tstamps_other):
"""Find indices of time stamps nearest to img acq. time stamps.
Parameters
----------
tstamps_other :
datetime, or datetime array of other time series for which closest
index / indices are searched
"""
idx = []
delt = []
img_stamps = self.time_stamps[self._access_mask]
for tstamp in img_stamps:
diff = [x.total_seconds() for x in abs(tstamps_other - tstamp)]
delt.append(min(diff))
idx.append(argmin(diff))
return asarray(idx), asarray(delt)
[docs] def get_nearest_img(self, time_stamp):
"""Return stack image which is nearest to input timestamp.
Searches the nearest image(s) with respect to input datetime(s)
:param (datetime, ndarray) time_stamps: the actual time stamp(s) (for
instance from another time series object)
"""
raise NotImplementedError
[docs] def has_data(self):
"""Return bool.""" # fixme: improve this doc
return bool(sum(self._access_mask))
[docs] def sum(self, *args, **kwargs):
"""Sum over all pixels of stack.
Parameters
----------
*args
non-keyword arguments passed to :func:`sum` of numpy array
**kwargs
keyword arguments passed to :func:`sum` of numpy array
Returns
-------
float
result of summation operation
"""
return self.stack.sum(*args, **kwargs)
[docs] def mean(self, *args, **kwargs):
"""Apply numpy.mean function to stack data.
:param *args: non keyword arguments passed to :func:`numpy.mean`
applied to stack data
:param **kwargs: keyword arguments passed to :func:`numpy.mean`
applied to stack data
"""
return self.stack.mean(*args, **kwargs)
[docs] def std(self, *args, **kwargs):
"""Apply numpy.std function to stack data.
:param *args: non keyword arguments passed to :func:`numpy.std`
applied to stack data
:param **kwargs: keyword arguments passed to :func:`numpy.std`
applied to stack data
"""
return self.stack.std(*args, **kwargs)
@property
def shape(self):
"""Return stack shape."""
return self.stack.shape
@property
def ndim(self):
"""Return stack dimension."""
return self.stack.ndim
"""Plots / visualisation"""
[docs] def show_img(self, index=0):
"""Show image at input index.
Parameters
----------
index : int
index of image in stack
"""
stack, ts, _ = self.get_data()
im = Img(stack[index], start_acq=ts[index], texp=self.texps[index])
im.edit_log.update(self.img_prep)
im.roi_abs = self.roi_abs
return im.show()
[docs] def pyr_down(self, steps=0):
"""Reduce the stack image size using gaussian pyramid.
Parameters
----------
steps : int
steps down in the pyramide
Returns
-------
ImgStack
new, downscaled image stack object
"""
if not steps:
return
h, w = Img(self.stack[0]).pyr_down(steps).shape
prep = deepcopy(self.img_prep)
new_stack = ImgStack(height=h, width=w, img_num=self.num_of_imgs,
stack_id=self.stack_id, img_prep=prep)
for i in range(self.shape[0]):
im = self.stack[i]
for k in range(steps):
im = pyrDown(im)
new_stack.add_img(img_arr=im, start_acq=self.start_acq[i],
texp=self.texps[i], add_data=self.add_data[i])
new_stack._format_check()
new_stack.img_prep["pyrlevel"] += steps
return new_stack
[docs] def pyr_up(self, steps):
"""Increasing the image size using gaussian pyramide.
:param int steps: steps down in the pyramide
Algorithm used: :func:`cv2.pyrUp`
"""
if not steps:
return
h, w = Img(self.stack[0]).pyr_up(steps).shape
prep = deepcopy(self.img_prep)
new_stack = ImgStack(height=h, width=w, img_num=self.num_of_imgs,
stack_id=self.stack_id, img_prep=prep)
for i in range(self.shape[0]):
im = self.stack[i]
for k in range(steps):
im = pyrUp(im)
new_stack.add_img(img_arr=im, start_acq=self.start_acq[i],
texp=self.texps[i], add_data=self.add_data[i])
new_stack._format_check()
new_stack.img_prep["pyrlevel"] -= steps
return new_stack
[docs] def to_pyrlevel(self, final_state=0):
"""Down / upscale image to a given pyramide level."""
steps = final_state - self.img_prep["pyrlevel"]
if steps > 0:
return self.pyr_down(steps)
elif steps < 0:
return self.pyr_up(-steps)
else:
return self
[docs] def duplicate(self):
"""Return deepcopy of this object."""
return deepcopy(self)
def _format_check(self):
"""Check if all relevant data arrays have the same length."""
if not all([len(x) == self.num_of_imgs for x in [self.add_data,
self.texps,
self._access_mask,
self.start_acq]]):
raise ValueError("Mismatch in array lengths of stack data, check"
"add_data, texps, start_acq, _access_mask")
[docs] def load_stack_fits(self, file_path):
"""Load stack object (fits).
Note
----
FITS stores in Big-endian and needs to be converted into little-endian
(see `this issue <https://github.com/astropy/astropy/issues/1156>`__).
We follow the suggested fix and use::
byteswap().newbyteorder()
on any loaded data array.
Parameters
----------
file_path : str
file path of stack
"""
if not exists(file_path):
raise IOError("ImgStack could not be loaded, path does not exist")
hdu = fits.open(file_path)
self.set_stack_data(hdu[0].data.byteswap().newbyteorder().
astype(self.dtype))
prep = Img().edit_log
for key, val in six.iteritems(hdu[0].header):
if key.lower() in prep.keys():
self.img_prep[key.lower()] = val
self.stack_id = hdu[0].header["stack_id"]
try:
times = hdu[1].data["start_acq"].byteswap().newbyteorder()
self.start_acq = asarray([datetime.strptime(x, "%Y%m%d%H%M%S%f")
for x in times])
except BaseException:
logger.warning("Failed to import acquisition times")
try:
self.texps = asarray(
hdu[1].data["texps"].byteswap().newbyteorder())
except BaseException:
logger.warning("Failed to import exposure times")
try:
self._access_mask = asarray(hdu[1].data["_access_mask"].
byteswap().newbyteorder())
except BaseException:
logger.warning("Failed to import data access mask")
try:
self.add_data = asarray(hdu[1].data["add_data"].byteswap().
newbyteorder())
except BaseException:
logger.warning("Failed to import data additional data")
self.roi_abs = hdu[2].data["roi_abs"].byteswap().\
newbyteorder()
self._format_check()
[docs] def save_as_fits(self, save_dir=None, save_name=None,
overwrite_existing=True):
"""Save stack as FITS file."""
self._format_check()
# returns abspath of current wkdir if None
save_dir = abspath(save_dir)
if not isdir(save_dir): # save_dir is a file path
save_name = basename(save_dir)
save_dir = dirname(save_dir)
if save_name is None:
save_name = ("pyplis_imgstack_id_%s_%s_%s_%s.fts"
% (self.stack_id,
self.start.strftime("%Y%m%d"),
self.start.strftime("%H%M"),
self.stop.strftime("%H%M")))
else:
save_name = save_name.split(".")[0] + ".fts"
logger.info("DIR: %s" % save_dir)
logger.info("Name: %s" % save_name)
hdu = fits.PrimaryHDU()
start_acq_str = [x.strftime("%Y%m%d%H%M%S%f") for x in self.start_acq]
col1 = fits.Column(name="start_acq", format="25A", array=start_acq_str)
col2 = fits.Column(name="texps", format="D", array=self.texps)
col3 = fits.Column(name="_access_mask", format="L",
array=self._access_mask)
col4 = fits.Column(name="add_data", format="D", array=self.add_data)
cols = fits.ColDefs([col1, col2, col3, col4])
arrays = fits.BinTableHDU.from_columns(cols)
col5 = fits.Column(name="roi_abs", format="I", array=self.roi_abs)
roi_abs = fits.BinTableHDU.from_columns([col5])
hdu.data = self.stack
hdu.header.update(self.img_prep)
hdu.header["stack_id"] = self.stack_id
hdu.header.append()
hdulist = fits.HDUList([hdu, arrays, roi_abs])
path = join(save_dir, save_name)
if exists(path):
logger.info("Stack already exists at %s and will be overwritten"
% path)
try:
hdulist.writeto(path, clobber=overwrite_existing)
except BaseException:
logger.warning("Failed to save stack to FITS File "
"(check previous warnings)")
"""Magic methods"""
def __str__(self):
raise NotImplementedError
def __sub__(self, other):
"""Subtract data.
:param other: data to be subtracted object (e.g. offband stack)
"""
new = self.duplicate()
try:
new.stack = self.stack - other.stack
new.stack_id = "%s - %s" % (self.stack_id, other.stack_id)
except BaseException:
new.stack = self.stack - other
new.stack_id = "%s - %s" % (self.stack_id, other)
return new
[docs]def find_registration_shift_optflow(on_img, off_img,
roi_abs=DEFAULT_ROI, **flow_settings):
"""Search average shift between two images using optical flow.
Computes optical flow between two input images and determines the
registration shift based on peaks in two histograms of the orientation
angle distribution and vector magnitued distribution of the retrieved
flow field. The histogram analysis may be reduced to a certain ROI in the
images.
The default settings used here correspond to the settings suggested by
Peters et al., Use of motion estimation algorithms for improved flux
measurements using SO2 cameras, JVGR, 2015.
Parameters
----------
on_img : Img
onband image containing (preferably fixed) objects in the scene that
can be tracked
off_img : Img
corresponding offband image (ideally recorded at the same time)
roi_abs : list
if specified, the optical flow histogram parameters are retrieved from
the flow field within this ROI (else, the whole image is used)
**flow_settings
additional keyword args specifying the optical flow computation and
post analysis settings (see
:class:`pyplis.plumespeed.FarnebackSettings` for details)
Returns
-------
tuple
2-element tuple containing
- float: shift in x-direction
- float: shift in y-direction
"""
if not on_img.shape == off_img.shape:
raise ValueError("Shape mismatch between input images")
if on_img.pyrlevel != 0:
logger.warning("Input images are at pyramid level %d and registration shift "
"will be computed for this pyramid level")
# from pyplis import OptflowFarneback
# flow = OptflowFarneback(on_img, off_img, **flow_settings)
raise NotImplementedError("Under development")
[docs]class PixelMeanTimeSeries(Series):
"""A time series of mean pixel values.
This class implements a ``pandas.Series`` object with extended
functionality representing time series data of pixel mean values in a
certain image region.
.. note::
This object is only used to store results of a mean series analysis
in a certain ROI, it does not include any algorithms for actually
calculating the series
"""
std = None
texps = None
img_prep = {}
roi_abs = None
poly_model = None
[docs] def __init__(self, data, start_acq, std=None, texps=None, roi_abs=None,
img_prep=None, **kwargs):
"""Initialize pixel mean time series.
:param ndarray data: data array
(is passed into pandas Series init -> ``self.values``)
:param ndarray start_acq: array containing acquisition time stamps
(is passed into pandas Series init -> ``self.index``)
:param ndarray std: array containing standard deviations
:param ndarray texps: array containing exposure times
:param list roi_abs: image area from which data was extracted, list of
shape: ``[x0, y0, x1, y1]``
:param dict img_prep: dictionary containing information about image
preparation settings (e.g. blurring, etc..) or other
important information which may need to be stored
:param **kwargs: additional keyword parameters which are passed to
the initiation of the :class:`pandas.Series` object
"""
super(PixelMeanTimeSeries, self).__init__(data, start_acq, **kwargs)
if img_prep is None:
img_prep = {}
try:
if len(texps) == len(data):
self.texps = texps
except BaseException:
self.texps = zeros(len(data), dtype=float32)
try:
if len(std) == len(data):
self.std = std
except BaseException:
self.std = zeros(len(data), dtype=float32)
self.img_prep = img_prep
self.roi_abs = roi_abs
for key, val in six.iteritems(kwargs):
self[key] = val
@property
def start(self):
return self.index[0]
@property
def stop(self):
return self.index[-1]
[docs] def get_data_normalised(self, texp=None):
"""Normalise the mean value to a given exposure time.
:param float texp (None): the exposure time to which all deviating
times will be normalised. If None, the values will be normalised
to the largest available exposure time
:return: A new :class:`PixelMeanTimeSeries`instance with normalised
data
"""
try:
if texp is None:
texp = self.texps.max()
facs = texp / self.texps
ts = self.texps * facs
return PixelMeanTimeSeries(self.values * facs, self.index,
self.std, ts, self.roi_abs,
self.img_prep)
except Exception as e:
logger.info("Failed to normalise data bases on exposure times:\n%s\n\n"
% repr(e))
[docs] def fit_polynomial(self, order=2):
"""Fit polynomial to data series.
:param int order: order of polynomial
:returns:
- poly1d, the fitted polynomial
"""
s = self.dropna()
num = len(s)
if num == 1:
raise ValueError("Could not fit polynomial to PixelMeanTimeSeries"
" object: only one data point available")
elif num == 2:
logger.warning("PixelMeanTimeSeries object only contains 2 data points, "
"setting polyfit order to one (default is 2)")
order = 1
x = [date2num(idx) for idx in s.index]
y = s.values
p = poly1d(polyfit(x, y, deg=order))
self.poly_model = p
return p
[docs] def includes_timestamp(self, time_stamp, ext_border_secs=0.0):
"""Check if input time stamp is included in this dataset.
:param datetime time_stamp: the time stamp to be checked
:param float ext_border_secs: extend start / stop range (default 0 s)
:return:
- bool, True / False (timestamp is within interval)
"""
i = self.start - timedelta(ext_border_secs / 86400.0)
f = self.stop + timedelta(ext_border_secs / 86400.0)
if i <= to_datetime(time_stamp) <= f:
return True
return False
[docs] def get_poly_vals(self, time_stamps, ext_border_secs=0.0):
"""Get value of polynomial at input time stamp.
:param datetime time_stamp: poly input value
"""
if not isinstance(self.poly_model, poly1d):
raise AttributeError("No polynomial available, please call"
"function fit_polynomial first")
if isinstance(time_stamps, datetime):
time_stamps = [time_stamps, ]
if not any([isinstance(time_stamps, x)
for x in [list, DatetimeIndex]]):
raise ValueError("Invalid input for time stamps, need list")
if not all([self.includes_timestamp(x, ext_border_secs)
for x in time_stamps]):
raise IndexError("At least one of the time stamps is not included "
"in this series: %s - %s"
% (self.start, self.stop))
values = []
for time_stamp in time_stamps:
values.append(self.poly_model(date2num(time_stamp)))
return asarray(values)
[docs] def estimate_noise_amplitude(self, sigma_gauss=1, median_size=3, plot=0):
"""Estimate the amplitude of the noise in the data.
Steps:
1. Determines high frequency variations by applying binomial
filter (sigma = 3) to data and subtract this from data,
resulting in a residual
2. Median filtering of residual signal to get rid of narrow peaks
(i.e. where the original data shows abrupt changes)
3. subtract both signals and determine std
..note::
Beta version: no guarantee it works for all cases
"""
# make bool array of indices considered (initally all)
y0 = median_filter(self.values, 3)
y1 = gaussian_filter1d(y0, sigma_gauss)
res0 = y0 - y1
res1 = median_filter(res0, median_size)
diff = res1 - res0
if plot:
fig, ax = subplots(2, 1)
ax[0].plot(y0, "-c", label="y0")
ax[0].plot(y1, "--xr", label="y1: Smoothed y0")
ax[0].legend(
loc='best',
fancybox=True,
framealpha=0.5,
fontsize=10)
ax[1].plot(res0, "--c", label="res0: y0 - y1")
ax[1].plot(res1, "--r", label="res1: Median(res0)")
ax[1].plot(diff, "--b", label="diff: res1 - res0")
ax[1].legend(
loc='best',
fancybox=True,
framealpha=0.5,
fontsize=10)
return diff.std()
[docs] def plot(self, include_tit=True, date_fmt=None, **kwargs):
"""Plot time series.
Parameters
----------
include_tit : bool
Include a title
date_fmt : str
Date / time formatting string for x labels, passed to
:class:`DateFormatter` instance (optional)
**kwargs
Additional keyword arguments passed to pandas Series plot method
Returns
-------
axes
matplotlib axes instance
"""
try:
self.index = self.index.to_pydatetime()
except BaseException:
pass
try:
if "style" not in kwargs:
kwargs["style"] = "--x"
ax = super(PixelMeanTimeSeries, self).plot(**kwargs)
try:
if date_fmt is not None:
ax.xaxis.set_major_formatter(DateFormatter(date_fmt))
except BaseException:
pass
if include_tit:
ax.set_title("Mean value (%s), roi_abs: %s"
% (self.name, self.roi_abs))
ax.grid()
return ax
except Exception as e:
logger.info(repr(e))
fig, ax = subplots(1, 1)
ax.text(.1, .1, "Plot of PixelMeanTimeSeries failed...")
fig.canvas.draw()
return ax
def __setitem__(self, key, value):
"""Update class item."""
logger.info("%s : %s" % (key, value))
if key in self.__dict__:
logger.info("Writing...")
self.__dict__[key] = value
def __call__(self, normalised=False):
"""Return the current data arrays (mean, std)."""
if normalised:
return self.get_data_normalised()
return self.get_data()
# ==============================================================================
# import matplotlib.animation as animation
#
# def animate_stack(img_stack):
#
# fig = figure() # make figure
#
# # make axesimage object
# # the vmin and vmax here are very important to get the color map correct
# im = imshow(sta, cmap=plt.get_cmap('jet'), vmin=0, vmax=255)
#
# # function to update figure
# def updatefig(j):
# # set the data in the axesimage object
# im.set_array(imagelist[j])
# # return the artists set
# return im,
# # kick off the animation
# animation.FuncAnimation(fig, updatefig, frames=range(20),
# interval=50, blit=True)
# plt.show()
# ==============================================================================