# -*- coding: utf-8 -*-
#
# Pyplis is a Python library for the analysis of UV SO2 camera data
# Copyright (C) 2017 Jonas Gliss (jonasgliss@gmail.com)
#
# This program is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License a
# published by the Free Software Foundation, either version 3 of
# the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Assorted data import functionality.
The :class:`Dataset` object is doing all sorts of stuff related to the
general data import setup, for instance the automated separation of image
files by their type (e.g. on-band, off-band, dark, offset) using
information from a file naming convention specified within a
:class:`Camera` object. For more information how to customise your data
import see :mod:`pyplis.setupclasses` or read `this little introductory
tutorial
<http://pyplis.readthedocs.io/en/latest/tutorials.html#primer-on-data-import>`_
"""
from __future__ import (absolute_import, division)
from os.path import exists, join, isfile, isdir
from os import listdir, walk
from datetime import datetime, date
from numpy import inf
from matplotlib.pyplot import subplots, FuncFormatter, tight_layout, Line2D
from matplotlib.patches import Rectangle
from copy import deepcopy
from traceback import format_exc
from collections import OrderedDict as od
from pyplis import logger, print_log
from .imagelists import ImgList, DarkImgList
from .image import Img
from .helpers import shifted_color_map
from .setupclasses import MeasSetup, Source, Camera
from .exceptions import ImgMetaError
import six
[docs]class Dataset(object):
"""Class for data import management.
Default input is a :class:`pyplis.setupclasses.MeasSetup` object, which
specifies the camera used (e.g. file naming convention, detector specifics)
the measurement geometry and information about the source and
meteorological wind direction, start / stop time stamps and the image
base directory.
Attributes
----------
setup : MeasSetup
class containing measurement setup information
lst_type : object
default type of ImgList objects (cf. :class:`CellCalibEngine`)
lists_access_info : OrderedDict
dictionary filled on data import based on camera specifications.
Used to map API IDs of filters and dark / offset information
(e.g. "on", "off", "dark0") onto internal list keys. It is normally
not required to use this dictionary or apply changes to it.
For EC2 camera standard this will look like::
>>> self.lists_access_info
OrderedDict([('on', ['F01', 'F01']),
('off', ['F02', 'F02']),
('offset0', ['D0L', 'D0L']),
('dark0', ['D1L', 'D1L']),
('offset1', ['D0H', 'D0H']),
('dark1', ['D1H', 'D1H'])])
Parameters
----------
input
Usable ``input`` includes :class:`MeasSetup` instance or a valid
image directory. ``input`` is passed to :func:`load_input`.
lst_type : object
default type of image list objects (e.g. :class:`ImgList`,
:class:`CellImgList`), defaults to :class:`ImgList`.
init : bool
init
"""
[docs] def __init__(self, input=None, lst_type=ImgList, init=1):
self.setup = None
self.lst_type = lst_type
self._lists_intern = od()
self.lists_access_info = od()
ok = self.load_input(input)
if init and ok:
self.init_image_lists()
else:
self.create_lists_default()
logger.info("DATASET INITIALISED")
@property
def camera(self):
"""Return camera base info object."""
return self.setup.camera
@camera.setter
def camera(self, val):
self.setup.camera = val
@property
def source(self):
"""Get / set current Source."""
return self.setup.source
@source.setter
def source(self, val):
self.setup.source = val
@property
def cam_id(self):
"""Return current camera ID."""
return self.setup.camera.cam_id
@property
def base_dir(self):
"""Getter / setter of current image base_dir."""
return self.setup.base_dir
@base_dir.setter
def base_dir(self, val):
if exists(val):
self.setup.base_dir = val
self.init_image_lists()
@property
def USE_ALL_FILES(self):
"""Return USE_ALL_FILES boolen from setup."""
return self.setup.USE_ALL_FILES
@USE_ALL_FILES.setter
def USE_ALL_FILES(self, val):
self.setup.USE_ALL_FILES = val
print_log.info("Option USE_ALL_FILES was updated in Dataset, please call class"
" method ``init_image_lists`` in order to apply the changes")
@property
def USE_ALL_FILE_TYPES(self):
"""Return USE_ALL_FILE_TYPES option from setup."""
return self.setup.USE_ALL_FILE_TYPES
@USE_ALL_FILE_TYPES.setter
def USE_ALL_FILE_TYPES(self, val):
self.setup.USE_ALL_FILE_TYPES = val
print_log.info("Option USE_ALL_FILE_TYPES was updated in Dataset, please call "
"class method ``init_image_lists`` in order "
"to apply the changes")
@property
def INCLUDE_SUB_DIRS(self):
"""Return boolean sub directory inclusion option."""
return self.setup.INCLUDE_SUB_DIRS
@INCLUDE_SUB_DIRS.setter
def INCLUDE_SUB_DIRS(self, val):
self.setup.INCLUDE_SUB_DIRS = val
print_log.info("Option INCLUDE_SUB_DIRS was updated in Dataset, please call "
"class method ``init_image_lists`` to apply the changes")
@property
def LINK_OFF_TO_ON(self):
"""I/O option defined in :class:`BaseSetup`."""
return self.setup.LINK_OFF_TO_ON
@LINK_OFF_TO_ON.setter
def LINK_OFF_TO_ON(self, val):
self.setup.LINK_OFF_TO_ON = val
print_log.info("Option INCLUDE_SUB_DIRS was updated in Dataset, please call "
"class method ``init_image_lists`` in order "
"to apply the changes")
@property
def start(self):
"""Getter / setter for current start time stamp."""
return self.setup.start
@start.setter
def start(self, val):
self.setup.start = val
print_log.info("Start time stamp was updated in Dataset, please call class "
"method ``init_image_lists`` in order to apply the changes")
@property
def stop(self):
"""Getter / setter for current stop time stamp."""
return self.setup.stop
@stop.setter
def stop(self, val):
self.setup.stop = val
print_log.info("Stop time stamp was updated in Dataset, please call class "
"method ``init_image_lists`` in order to apply the changes")
@property
def file_type(self):
"""Return current image file type."""
return self.setup.camera.file_type
@property
def meas_geometry(self):
"""Return current measurement geometry."""
return self.setup.meas_geometry
@property
def filters(self):
"""Return the current filter setup."""
return self.setup.camera.filter_setup
@property
def filter_acronyms(self):
"""Make a dictionary of filter IDs and corresponding acronyms."""
acros = {}
for key, val in six.iteritems(self.filters.filters):
# read the acronym from the filter object
acros[key] = val.acronym
return acros
@property
def num_of_filters(self):
"""Return the number of filters in ``self.filters``."""
return len(self.filters.filters.keys())
@property
def _fname_access_flags(self):
return self.camera._fname_access_flags
@property
def rects(self):
"""Return rectangle collection."""
return self.setup.forms.rects
@rects.setter
def rects(self, name, value):
"""Setter method for rectangle collection stored in ``self.setup``."""
self.setup.forms.rects[name] = value
@property
def lines(self):
"""Return rectangle collection."""
return self.setup.forms.lines
@lines.setter
def lines(self, name, value):
"""Setter method for rectangle collection stored in ``self.setup``."""
self.setup.forms.lines[name] = value
[docs] def set_setup(self, stp):
"""Set the current measurement setup.
Parameters
----------
stp : MeasSetup
Class containing information about measurement setup
"""
if isinstance(stp, MeasSetup):
logger.info("Updating measurement setup in Dataset")
self.setup = stp
return 1
return 0
[docs] def init_image_lists(self):
"""Create and fill image lists."""
#: create img list objects for each filter and for dark / offset lists
self.create_lists_cam()
return self.fill_image_lists()
[docs] def create_lists_default(self):
"""Initialize of default lists (if camera specs not available)."""
self._lists_intern = od()
for key, f in six.iteritems(self.filters.filters):
l = self.lst_type(list_id=key, list_type=f.type,
camera=self.camera,
geometry=self.meas_geometry)
l.filter = f
if f.meas_type_acro not in self._lists_intern:
self._lists_intern[f.meas_type_acro] = od()
self._lists_intern[f.meas_type_acro][f.acronym] = l
self.lists_access_info[f.id] = [f.meas_type_acro, f.acronym]
[docs] def create_lists_cam(self):
"""Initialize of all image lists, old lists are deleted."""
self._lists_intern = od()
for key, f in six.iteritems(self.filters.filters):
l = self.lst_type(list_id=key, list_type=f.type,
camera=self.camera,
geometry=self.meas_geometry)
l.filter = f
if f.meas_type_acro not in self._lists_intern:
self._lists_intern[f.meas_type_acro] = od()
self._lists_intern[f.meas_type_acro][f.acronym] = l
self.lists_access_info[f.id] = [f.meas_type_acro, f.acronym]
if not bool(self.camera.dark_info):
msg = ("Warning: dark image lists could not be initiated, no "
"dark image file information available in self.camera")
print_log.warning(msg)
return 0
for item in self.camera.dark_info:
l = DarkImgList(list_id=item.id, list_type=item.type,
read_gain=item.read_gain, camera=self.camera)
l.filter = item
if item.meas_type_acro not in self._lists_intern:
self._lists_intern[item.meas_type_acro] = od()
self._lists_intern[item.meas_type_acro][item.acronym] = l
self.lists_access_info[item.id] = [item.meas_type_acro,
item.acronym]
[docs] def fill_image_lists(self):
"""Import all images and fill image list objects."""
warnings = []
cam = self.camera
#: check if image filetype is specified and if not, set option to use
#: all file types
self._check_file_type()
if self.base_dir is None or not exists(self.base_dir):
s = ("Warning: image base directory does not exist, method "
"init_image_lists aborted in Dataset")
warnings.append(s)
print_log.warning(s)
return False
#: load all file paths
paths = self.get_all_filepaths()
# paths now includes all valid paths dependent on whether file_type is
# specified or not and whether also subdirectories were considered
if not bool(paths):
s = ("Warning: lists could not be initiated, no valid files found "
"method init_image_lists aborted in Dataset")
warnings.append(s)
print_log.warning(s)
return False
# check which image meta information can be accessed from first file in
# list (updates ``_fname_access_flags`` in :class:`Camera`)
self.check_filename_info_access(paths[0])
# get the current meta access flags
flags = cam._fname_access_flags
if self.USE_ALL_FILES and flags["start_acq"]:
# take all files in the basefolder (i.e. set start and stop date
# the first and last date of the files in the folder)
self.setup.start = cam.get_img_meta_from_filename(paths[0])[0]
self.setup.stop = cam.get_img_meta_from_filename(paths[-1])[0]
#: Set option to use all files in case acquisition time stamps cannot
#: be accessed from filename
if not flags["start_acq"]:
print_log.warning("Acquisition time access from filename not possible, "
"using all files")
self.setup.options["USE_ALL_FILES"] = True
#: Separate the current list based on specified time stamps
if not self.setup.options["USE_ALL_FILES"]:
paths_temp = self.extract_files_time_ival(paths)
if not bool(paths_temp):
# check if any files were found in specified t-window
s = ("No images found in specified time interval "
"%s - %s, mode was changed to: USE_ALL_FILES=True"
% (self.start, self.stop))
warnings.append(s)
self.setup.options["USE_ALL_FILES"] = True
else:
paths = paths_temp
if self.setup.ON_OFF_SAME_FILE:
logger.warning("Option ON_OFF_SAME_FILE is active: using same file paths "
"in default on and offband list. Please note that no "
"further file separation is applied (e.g. separation of "
"dark images)")
# the function add_files ads the file paths to the list and loads
# the current and next images (at index 0 and 1)
self.img_lists[self.filters.default_key_on].add_files(paths)
self.img_lists[self.filters.default_key_off].add_files(paths)
else:
if not (flags["filter_id"] and flags["meas_type"]):
#: it is not possible to separate different image types (on,
#: off, dark..) from filename, thus all are loaded into on
#: image list
warnings.append("Images can not be separated by type/meas_type"
" (e.g. on, off, dark, offset...) from "
"filename info, loading "
"all files into on-band list")
self.setup.options["SEPARATE_FILTERS"] = False
i = self.lists_access_info[self.filters.default_key_on]
self._lists_intern[i[0]][i[1]].add_files(paths)
[logger.warning(x) for x in warnings]
return True
#: now perform separation by meastype and filter
for p in paths:
try:
_, filter_id, meas_type, _, _ = self.camera. \
get_img_meta_from_filename(p)
self._lists_intern[meas_type][filter_id].files.append(p)
except:
logger.warning("File %s could not be added..." % p)
for meas_type, sub_dict in six.iteritems(self._lists_intern):
for filter_id, lst in six.iteritems(sub_dict):
lst.init_filelist()
for lst in self.img_lists_with_data.values():
lst.load()
self.assign_dark_offset_lists()
if self.LINK_OFF_TO_ON:
try:
off_list = self.get_list(self.filters.default_key_off)
self.get_list(self.filters.default_key_on). \
link_imglist(off_list)
except BaseException:
pass
if self.setup.REG_SHIFT_OFF:
for lst in self.img_lists_with_data.values():
if lst.list_type == "off":
lst.shift_mode = True
[logger.warning(x) for x in warnings]
return True
[docs] def get_all_filepaths(self):
"""Find all valid image filepaths in current base directory.
Returns
-------
list
list containing all valid image file paths (Note, that these
include all files found in the folder(s) in case the file
type is not explicitely set in the camera class.)
"""
logger.info("\nSEARCHING VALID FILE PATHS IN\n%s\n" % self.base_dir)
p = self.base_dir
ftype = self.file_type
if not isinstance(ftype, str):
logger.warning("file_type not specified in Dataset..."
"Using all files and file_types")
self.setup.options["USE_ALL_FILES"] = True
self.setup.options["USE_ALL_FILE_TYPES"] = True
if p is None or not exists(p):
message = ('Error: path %s does not exist' % p)
logger.warning(message)
return []
if not self.INCLUDE_SUB_DIRS:
logger.info("Image search is only performed in specified directory "
"and does not include subdirectories")
if self.USE_ALL_FILE_TYPES:
logger.info("Using all file types")
all_paths = [join(p, f) for f in listdir(p) if
isfile(join(p, f))]
else:
logger.info("Using only %s files" % self.file_type)
all_paths = [join(p, f) for f in listdir(p) if
isfile(join(p, f)) and f.endswith(ftype)]
else:
logger.info("Image search includes files from subdirectories")
all_paths = []
if self.USE_ALL_FILE_TYPES:
logger.info("Using all file types")
for path, subdirs, files in walk(p):
for filename in files:
all_paths.append(join(path, filename))
else:
logger.info("Using only %s files" % ftype)
for path, subdirs, files in walk(p):
for filename in files:
if filename.endswith(ftype):
all_paths.append(join(path, filename))
all_paths.sort()
logger.info("Total number of files found %s" % len(all_paths))
return all_paths
[docs] def check_filename_info_access(self, filepath):
"""Check which information can be accessed from file name.
The access test is performed based on the filename access
information specified in the :class:`Camera` object of the
measurement setup
Parameters
----------
filepath : str
valid file path of an example image
Returns
-------
dict
Dictionary containing information about which meta inforamtion
could be identified from the image file path based on the
current camera
"""
err = self.camera.get_img_meta_from_filename(filepath)[4]
for item in err:
logger.warning(item)
return self.camera._fname_access_flags
[docs] def change_img_base_dir(self, img_dir):
"""Set or update the current base_dir.
:param str p: new path
"""
if not exists(img_dir):
msg = ("Could not update base_dir, input path %s does not "
"exist" % img_dir)
logger.warning(msg)
self.warnings.append(msg)
return 0
self.setup.base_dir = img_dir
def _check_file_type(self):
"""Check if filtype information is available.
Sets::
self.USE_ALL_FILE_TYPES = True
if file type information can not be accessed
"""
info = self.camera
val = True
if isinstance(info.file_type, str):
val = False
self.setup.USE_ALL_FILE_TYPES = val
def _find_files_ival_time_only(self, all_paths):
"""Extract all files belonging to specified time interval.
:param list all_paths: list of image filepaths
"""
paths = []
i, f = self.start.time(), self.stop.time()
func = self.camera.get_img_meta_from_filename
for path in all_paths:
acq_time = func(path)[0].time()
if i <= acq_time <= f:
paths.append(path)
return paths
def _find_files_ival_datetime(self, all_paths):
"""Extract all files belonging to specified time interval.
This function considers the datetime stamps of ``self.start`` and
``self.stop``, see also :func:`_find_files_ival_time_only` which only
uses the actual time to find valid files.
:param list all_paths: list of image filepaths
"""
paths = []
func = self.camera.get_img_meta_from_filename
i, f = self.start, self.stop
for path in all_paths:
acq_time = func(path)[0]
if i <= acq_time <= f:
paths.append(path)
if not bool(paths):
logger.warning("Error: no files could be found in specified time "
"interval %s - %s" % (self.start, self.stop))
else:
logger.info("%s files of type were found in specified time interval %s "
"- %s" % (len(paths), self.start, self.stop))
return paths
[docs] def find_closest_img(self, filename, in_list, acronym, meas_type_acro):
"""Find closest-in-time image to input image file.
:param str filename: image filename
:param str in_list: input list with filepaths
:param str acronym: the acronym of the image type to be searched (e.g.
an acronym for a dark image as specified in camera)
:param str meas_type_acro: meas type acronym of image type to be
searched (e.g. an acronym for a dark image as specified in
camera)
"""
get_meta = self.camera.get_img_meta_from_filename
t0 = get_meta(filename)[0]
del_t = inf
idx = -1
for k in range(len(in_list)):
t1, f1, tp1, _, _ = get_meta(in_list[k])
if f1 == acronym and abs(t1 - t0).total_seconds() < del_t and \
meas_type_acro == tp1:
del_t = abs(t1 - t0).total_seconds()
idx = k
if idx == -1 or del_t == inf:
raise Exception("Error in func find_closest_img: no match")
return in_list[idx]
[docs] def all_lists(self):
"""Return list containing all available image lists.
Loops over ``self._lists_intern`` and the corresponding sub directories
"""
lists = []
for meas_type, sub_dict in six.iteritems(self._lists_intern):
for filter_id, lst in six.iteritems(sub_dict):
lists.append(lst)
return lists
@property
def dark_ids(self):
"""Get all dark IDs."""
ids = []
for info in self.camera.dark_info:
ids.append(info.id)
return ids
[docs] def assign_dark_offset_lists(self, into_list=None):
"""Assign dark and offset lists to image lists ``self.lists``.
Assign dark and offset lists in filter lists for automatic dark and
offset correction. The lists are set dependent on the read_gain
mode of the detector
:param ImgList into_list (None): optional input, if specified, the dark
assignment is performed only in the input list
"""
if isinstance(into_list, ImgList):
into_list.link_dark_offset_lists(
*list(self.dark_lists_with_data.values()))
return True
# ==============================================================================
# no_dark_ids = self.check_dark_lists()
# if len(no_dark_ids) > 0:
# self.find_master_darks(no_dark_ids)
# ==============================================================================
# loop over all image lists ...
for filter_id, lst in six.iteritems(self.img_lists):
# ... that contain data
if lst.nof > 0:
lists = od()
if self.camera.meas_type_pos != self.camera.filter_id_pos:
for dark_mtype in self.camera.dark_meas_type_acros:
for dark_acro in self.camera.dark_acros:
try:
if (lst.filter.acronym in dark_acro and
lst.filter.meas_type_acro == dark_mtype):
dark_lst = self. \
_lists_intern[dark_mtype][dark_acro]
if isinstance(dark_lst, DarkImgList):
lists[dark_lst.list_id] = dark_lst
logger.info("Found dark list match for "
"image list %s, dark ID: %s"
% (lst.list_id,
dark_lst.list_id))
except BaseException:
pass
for offs_mtype in self.camera.offset_meas_type_acros:
for offset_acro in self.camera.offset_acros:
try:
if lst.filter.acronym in offset_acro:
offs_lst = self. \
_lists_intern[offs_mtype][offset_acro]
if isinstance(offs_lst, DarkImgList):
lists[offs_lst.list_id] = offs_lst
logger.info("Found offset list match for "
"image list %s: dark ID: %s"
% (lst.list_id,
offs_lst.list_id))
except BaseException:
pass
if not lists:
lists = self.dark_lists
rm = []
for key, dark_list in six.iteritems(lists):
if dark_list.nof < 1:
try:
self.find_master_dark(dark_list)
except BaseException:
rm.append(key)
for key in rm:
del lists[key]
# now lists only contains dark and offset lists with data
for l in lists.values():
if not isinstance(l.loaded_images["this"], Img):
l.load()
if not bool(lists):
logger.warning("Failed to assign dark / offset lists to image "
"list %s, no dark images could be found" % filter_id)
else:
logger.info("Assigning dark/offset lists %s to image list %s\n"
% (list(lists.keys()), filter_id))
lst.link_dark_offset_lists(*list(lists.values()))
return True
[docs] def get_all_dark_offset_lists(self):
"""Get all dark and offset image lists."""
lists = od()
for dark_id in self.dark_ids:
info = self.lists_access_info[dark_id]
lists[dark_id] = self._lists_intern[info[0]][info[1]]
return lists
@property
def dark_lists(self):
"""Call and return :func:`get_all_dark_offset_lists`."""
return self.get_all_dark_offset_lists()
@property
def dark_lists_with_data(self):
"""Return all dark/offset lists that include image data."""
lists = od()
for dark_id, lst in six.iteritems(self.dark_lists):
if lst.nof > 0:
lists[dark_id] = lst
return lists
@property
def filter_ids(self):
"""Get all dark IDs."""
return self.filters.filters.keys()
[docs] def get_all_image_lists(self):
"""Get all image lists (without dark and offset lists)."""
lists = od()
for filter_id in self.filter_ids:
info = self.lists_access_info[filter_id]
lists[filter_id] = self._lists_intern[info[0]][info[1]]
return lists
@property
def img_lists(self):
"""Wrap :func:`get_all_image_lists`."""
return self.get_all_image_lists()
@property
def img_lists_with_data(self):
"""Wrap :func:`get_all_image_lists`."""
lists = od()
for key, lst in six.iteritems(self.img_lists):
if lst.nof > 0:
lists[key] = lst
return lists
[docs] def check_dark_lists(self):
"""Check all dark lists whether they contain images or not."""
no_data_ids = []
for dark_id, lst in six.iteritems(self.dark_lists):
if not lst.nof > 0:
no_data_ids.append(lst.list_id)
return no_data_ids
[docs] def find_master_dark(self, dark_list):
"""Search master dark image for a specific dark list.
Search a master dark image for all dark image lists that do not
contain images
"""
dark_id = dark_list.list_id
logger.info("\nSearching master dark image for dark list %s" % dark_id)
flags = self.camera._fname_access_flags
if not (flags["filter_id"] and flags["meas_type"]):
#: it is not possible to separate different image types (on, off,
#: dark..) from filename, thus dark or offset images can not be
#: searched
raise ImgMetaError("Image identification via file name is not "
"possible in Dataset")
all_files = self.get_all_filepaths()
l = self.get_list(self.filters.default_key_on)
if l.data_available:
f_name = l.files[int(l.nof / 2)]
else:
f_name = all_files[int(len(all_files) / 2.)]
meas_type_acro, acronym = self.lists_access_info[dark_id]
try:
p = self.find_closest_img(f_name, all_files, acronym,
meas_type_acro)
dark_list.files.append(p)
dark_list.init_filelist()
logger.info("Found dark image for ID %s\n" % dark_id)
except BaseException:
logger.info("Failed to find dark image for ID %s\n" % dark_id)
raise Exception
return dark_list
[docs] def find_master_darks(self, dark_ids=None):
"""Search master dark image for dark image lists.
Search a master dark image for all dark image lists that do not
contain images.
"""
if dark_ids is None:
dark_ids = []
logger.info("\nCHECKING DARK IMAGE LISTS IN DATASET")
flags = self.camera._fname_access_flags
if not (flags["filter_id"] and flags["meas_type"]):
#: it is not possible to separate different image types (on, off,
#: dark..) from filename, thus dark or offset images can not be
#: searched
return []
all_files = self.get_all_filepaths()
l = self.get_list(self.filters.default_key_on)
if l.data_available:
f_name = l.files[int(l.nof / 2)]
else:
f_name = all_files[int(len(all_files) / 2.)]
failed_ids = []
if not bool(dark_ids):
dark_ids = self.dark_lists.keys()
for dark_id in dark_ids:
lst = self.dark_lists[dark_id]
if not lst.nof > 0:
meas_type_acro, acronym = self.lists_access_info[dark_id]
logger.info("\nSearching master dark image for\nID: %s\nacronym: %s"
"\nmeas_type_acro: %s" % (dark_id, acronym,
meas_type_acro))
try:
p = self.find_closest_img(f_name, all_files, acronym,
meas_type_acro)
lst.files.append(p)
lst.init_filelist()
logger.info("Found dark image for ID %s\n" % dark_id)
except BaseException:
logger.info("Failed to find dark image for ID %s\n" % dark_id)
failed_ids.append(dark_id)
return failed_ids
[docs] def check_image_access_dark_lists(self):
"""Check whether dark and offset image lists contain at least one img.
"""
for lst in self.dark_lists.values():
if not lst.data_available:
return False
return True
"""Helpers"""
[docs] def images_available(self, filter_id):
"""Check if image list has images.
:param str filter_id: string (filter) ID of image list
"""
try:
if self.get_list(filter_id).nof > 0:
return 1
return 0
except BaseException:
return 0
[docs] def current_image(self, filter_id):
"""Get current image of image list.
:param str filter_id: filter ID of image list
"""
try:
return self.get_list(filter_id).current_img()
except BaseException:
return 0
[docs] def get_list(self, list_id):
"""Get image list for one filter.
:param str filter_id: filter ID of image list (e.g. "on")
"""
if list_id not in self.lists_access_info.keys():
raise KeyError("%s ImgList could not be found..." % list_id)
info = self.lists_access_info[list_id]
lst = self._lists_intern[info[0]][info[1]]
if not lst.nof > 0:
logger.warning("Image list %s does not contain any images" % list_id)
return lst
[docs] def get_current_img_prep_dict(self, list_id=None):
"""Get the current image preparation settings from one image list.
:param str list_id: ID of image list
"""
if list_id is None:
list_id = self.filters.default_key_on
return self.get_list(list_id).img_prep
[docs] def load_images(self):
"""Load the current images in all image lists.
Note
----
Gives warning for lists containing no images
"""
for lst in self.all_lists():
if lst.nof > 0:
lst.load()
else:
logger.warning("No images available in list %s" % lst.list_id)
[docs] def update_image_prep_settings(self, **settings):
"""Update image preparation settings in all image lists."""
for list_id, lst in six.iteritems(self.img_lists):
logger.info("Checking changes in list %s: " % list_id)
val = lst.update_img_prep_settings(**settings)
logger.info("list %s updated (0 / 1): %s" % (list_id, val))
[docs] def update_times(self, start, stop):
"""Update start and stop times of this dataset and reload.
:param datetime start: new start time
:param datetime stop: new stop time
"""
if not all([isinstance(x, datetime) for x in [start, stop]]):
raise TypeError("Times could not be changed in Dataset, "
"wrong input type: %s, %s (need datetime)"
% (type(start), type(stop)))
self.setup.start = start
self.setup.stop = stop
self.setup.check_timestamps()
[docs] def duplicate(self):
"""Duplicate Dataset object."""
logger.info('Dataset successfully duplicated')
return deepcopy(self)
"""GUI stuff
"""
# ==============================================================================
# def open_in_gui(self):
# """Open this dataset in GUI application"""
# try:
# import pyplis.gui as gui
# app=QApplication(argv)
#
# #win = DispTwoImages.DispTwoImagesWidget(fileListRight=fileList)
# win = gui.MainApp.MainApp(self)
# win.show()
# app.exec_() #run main loop
# except:
# print ("Error: could not open pyplis GUI")
# raise
# ==============================================================================
"""
Plotting etc.
"""
[docs] def show_current_img(self, filter_id, add_forms=False):
"""Plot current image.
:param str filter_id: filter ID of image list (e.g. "on")
"""
ax = self.current_image(filter_id).show_img()
if add_forms:
handles = []
for k, v in six.iteritems(self.lines._forms):
l = Line2D([v[0], v[2]], [v[1], v[3]], color="#00ff00",
label=k)
handles.append(l)
ax.add_artist(l)
for k, v in six.iteritems(self.rects._forms):
w, h = v[2] - v[0], v[3] - v[1]
r = Rectangle((v[0], v[1]), w, h, ec="#00ff00", fc="none",
label=k)
ax.add_patch(r)
handles.append(r)
ax.legend(handles=handles, loc='best', fancybox=True,
framealpha=0.5, fontsize=10).draggable()
return ax
# ax.draw()
[docs] def plot_mean_value(self, filter_id, yerr=1, rect=None):
"""Plot the pixel mean value of specified filter.
Only pixel values in the time span covered by this dataset are used.
"""
self.get_list(filter_id).plot_mean_value(yerr=yerr, rect=rect)
[docs] def draw_map_2d(self, *args, **kwargs):
"""Call and return :func:`draw_map_2d` of ``self.meas_geometry``."""
return self.meas_geometry.draw_map_2d(*args, **kwargs)
[docs] def draw_map_3d(self, *args, **kwargs):
"""Call and return :func:`draw_map_3d` of ``self.meas_geometry``."""
return self.meas_geometry.draw_map_3d(*args, **kwargs)
[docs] def print_list_info(self):
"""Print overview information about image lists."""
s = ("info about image lists in dataset\n-------------------------\n\n"
"Scene image lists:\n------------------------\n")
for lst in self.img_lists.values():
s += ("ID: %s, type: %s, %s images\n"
% (lst.list_id, lst.list_type, lst.nof))
s += "\nDark image lists:\n------------------------\n"
for lst in self.dark_lists.values():
s += ("ID: %s, type: %s, read_gain: %s, %s images\n"
% (lst.list_id, lst.list_type, lst.read_gain, lst.nof))
logger.info(s)
"""
THE FOLLOWING STUFF WAS COPIED FROM OLD PLUMEDATA OBJECT
"""
[docs] def connect_meas_geometry(self):
"""Set pointer to current measurement geometry within image lists."""
if self.meas_geometry is not None:
for filter_id in self.filters.filters.keys():
self.get_list(filter_id).set_meas_geometry(self.meas_geometry)
[docs] def plot_tau_preview(self, on_id="on", off_id="off", pyrlevel=2):
"""Plot a preview of current tau_on, tau_off and AA images.
AA is plotted twice in 1st row of subplots in 2 diffent value ranges.
:param str on_id: string ID of onband filter ("on")
:param str off_id: string ID of offband filter ("off")
:param pyrlevel: provide any integer here to reduce the image
sizes using a gaussian pyramide approach (2)
"""
lists = {}
tm = {on_id: 1,
off_id: 1}
def fmt(num):
return '{:.1e}'.format(num)
for list_id in [on_id, off_id]:
try:
l = self.get_list(list_id)
lists[list_id] = l
if not l.bg_model.ready_2_go():
logger.info("Tau preview could not be plotted, bg model is not "
" ready for filter: %s" % list_id)
return 0
if not l.tau_mode:
tm[list_id] = 0
l.activate_tau_mode()
except BaseException:
logger.info(format_exc())
return 0
fig, axes = subplots(2, 2, figsize=(16, 10))
tau_on = lists[on_id].current_img().pyr_down(pyrlevel)
t_on_str = lists[on_id].current_time_str()
tau_off = lists[off_id].current_img().pyr_down(pyrlevel)
t_off_str = lists[off_id].current_time_str()
aa = tau_on - tau_off # AA image object
tau_max = max([tau_on.img.max(), tau_off.img.max(), aa.img.max()])
tau_min = max([tau_on.img.min(), tau_off.img.min(), aa.img.min()])
# make a color map for the index range
cmap = shifted_color_map(tau_min, tau_max)
im = axes[0, 0].imshow(tau_on.img, cmap=cmap,
vmin=tau_min, vmax=tau_max)
fig.colorbar(im, ax=axes[0, 0], format=FuncFormatter(fmt))
axes[0, 0].set_title("tau on: %s" % t_on_str)
im = axes[0, 1].imshow(tau_off.img, cmap=cmap,
vmin=tau_min, vmax=tau_max)
fig.colorbar(im, ax=axes[0, 1], format=FuncFormatter(fmt))
axes[0, 1].set_title("tau off: %s" % t_off_str)
im = axes[1, 0].imshow(aa.img, cmap=cmap,
vmin=tau_min, vmax=tau_max)
fig.colorbar(im, ax=axes[1, 0], format=FuncFormatter(fmt))
axes[1, 0].set_title("AA (vals scaled)")
cmap = shifted_color_map(aa.img.min(), aa.img.max())
im = axes[1, 1].imshow(aa.img, cmap=cmap)
fig.colorbar(im, ax=axes[1, 1], format=FuncFormatter(fmt))
axes[1, 1].set_title("AA")
tight_layout()
for k, v in six.iteritems(tm):
lists[k].activate_tau_mode(v)
return axes
def __getitem__(self, key):
"""Get one class item.
Searches in ``self.__dict__`` and ``self.setup`` and returns item if
match found
:param str key: name of item
"""
if key in self.setup.__dict__:
return self.setup.__dict__[key]
elif key in self.__dict__:
return self.__dict__[key]
def __setitem__(self, key, val):
"""Update an item values.
Searches in ``self.__dict__`` and ``self.setup`` and overwrites if
match found
:param str key: key of item (e.g. base_dir)
:param val: the replacement
"""
if key in self.setup.__dict__:
self.setup.__dict__[key] = val
elif key in self.__dict__:
self.__dict__[key] = val