Source code for pyplis.inout

# -*- coding: utf-8 -*-
#
# Pyplis is a Python library for the analysis of UV SO2 camera data
# Copyright (C) 2017 Jonas Gliss (jonasgliss@gmail.com)
#
# This program is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License a
# published by the Free Software Foundation, either version 3 of
# the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Module containing all sorts of I/O-routines (e.g. test data access)."""
from __future__ import (absolute_import, division)
from os.path import join, basename, exists, isfile, abspath, expanduser
from os import listdir, mkdir, remove, walk
from re import split

from collections import OrderedDict as od
try:
    from progressbar import (ProgressBar, Percentage, Bar,
                             RotatingMarker, ETA, FileTransferSpeed)
    PGBAR_AVAILABLE = True
except BaseException:
    PGBAR_AVAILABLE = False
from zipfile import ZipFile, ZIP_DEFLATED

try:
    from urllib.request import urlopen, urlretrieve
except ImportError:
    from urllib2 import urlopen
    from urllib import urlretrieve

from pyplis import logger, print_log
from tempfile import mktemp, gettempdir
from shutil import copy2
import six


[docs]def data_search_dirs(): """Get basic search directories for package data files. Data files are searched for in `~/my_pyplis`, `./data` and, if set, in the `PYPLIS_DATADIR` environment variable. """ from pyplis import __dir__ import os usr_dir = expanduser(join('~', 'my_pyplis')) if not exists(usr_dir): mkdir(usr_dir) try: env = os.environ["PYPLIS_DATADIR"] return (usr_dir, join(__dir__, "data"), env) except KeyError: return (usr_dir, join(__dir__, "data"))
[docs]def zip_example_scripts(repo_base): from pyplis import __version__ as v vstr = ".".join(v.split(".")[:3]) logger.info("Adding zipped version of pyplis example scripts for version %s" % vstr) scripts_dir = join(repo_base, "scripts") if not exists(scripts_dir): raise IOError("Cannot created zipped version of scripts, folder %s " "does not exist" % scripts_dir) save_dir = join(scripts_dir, "old_versions") if not exists(save_dir): raise IOError("Cannot create zipped version of scripts, folder %s " "does not exist" % save_dir) name = "scripts-%s.zip" % vstr zipf = ZipFile(join(save_dir, name), 'w', ZIP_DEFLATED) for fname in listdir(scripts_dir): if fname.endswith("py"): zipf.write(join(scripts_dir, fname)) zipf.close()
[docs]def get_all_files_in_dir(directory, file_type=None, include_sub_dirs=False): """Find all files in a certain directory. Parameters ---------- directory : str path to directory file_type : :obj:`str`, optional specify file type (e.g. "png", "fts"). If unspecified, then all files are considered include_sub_dirs : bool if True, also all files from all sub-directories are extracted Returns ------- list sorted list containing paths of all files detected """ p = directory if p is None or not exists(p): message = ('Error: path %s does not exist' % p) logger.warning(message) return [] use_all_types = False if not isinstance(file_type, str): use_all_types = True if include_sub_dirs: logger.info("Include files from subdirectories") all_paths = [] if use_all_types: logger.info("Using all file types") for path, subdirs, files in walk(p): for filename in files: all_paths.append(join(path, filename)) else: logger.info("Using only %s files" % file_type) for path, subdirs, files in walk(p): for filename in files: if filename.endswith(file_type): all_paths.append(join(path, filename)) else: logger.info("Exclude files from subdirectories") if use_all_types: logger.info("Using all file types") all_paths = [join(p, f) for f in listdir(p) if isfile(join(p, f))] else: logger.info("Using only %s files" % file_type) all_paths = [join(p, f) for f in listdir(p) if isfile(join(p, f)) and f.endswith(file_type)] all_paths.sort() return all_paths
[docs]def create_temporary_copy(path): temp_dir = gettempdir() temp_path = join(temp_dir, basename(path)) copy2(path, temp_path) return temp_path
[docs]def download_test_data(save_path=None): """Download pyplis test data. :param save_path: location where path is supposed to be stored Code for progress bar was "stolen" `here <http://stackoverflow.com/ questions/11143767/how-to-make-a-download-with>`_ (last access date: 11/01/2017) -progress-bar-in-python """ from pyplis import URL_TESTDATA url = URL_TESTDATA dirs = data_search_dirs() where = dirs[0] fp = join(where, "_paths.txt") if not exists(fp): where = dirs[1] fp = join(where, "_paths.txt") if save_path is None or not exists(save_path): save_path = where logger.info("Save path unspecified") else: with open(fp, "a") as f: f.write("\n" + save_path + "\n") logger.info("Adding new path for test data location in " "file _paths.txt: %s" % save_path) f.close() print_log.info("installing test data at %s" % save_path) filename = mktemp('.zip') if PGBAR_AVAILABLE: widgets = ['Downloading pyplis test data: ', Percentage(), ' ', Bar(marker=RotatingMarker()), ' ', ETA(), ' ', FileTransferSpeed()] pbar = ProgressBar(widgets=widgets) def dl_progress(count, block_size, total_size): if pbar.maxval is None: pbar.maxval = total_size pbar.start() pbar.update(min(count * block_size, total_size)) urlretrieve(url, filename, reporthook=dl_progress) pbar.finish() else: print_log.info("Downloading Pyplis testdata (this can take a while, install" "Progressbar package if you want to receive download info") urlretrieve(url, filename) thefile = ZipFile(filename) print_log.info("Extracting data at: %s (this may take a while)" % save_path) thefile.extractall(save_path) thefile.close() remove(filename) print_log.info("Download successfully finished, deleted temporary data file" "at: %s" % filename)
[docs]def find_test_data(): """Search location of test data folder.""" dirs = data_search_dirs() folder_name = "pyplis_etna_testdata" for data_path in dirs: if folder_name in listdir(data_path): print_log.info("Found test data at location: %s" % data_path) return join(data_path, folder_name) try: with open(join(data_path, "_paths.txt"), "r") as f: lines = f.readlines() for line in lines: p = line.split("\n")[0] if exists(p) and folder_name in listdir(p): print_log.info("Found test data at default location: %s" % p) f.close() return join(p, folder_name) except: pass raise IOError("pyplis test data could not be found, please download" "testdata first, using method " "pyplis.inout.download_test_data or" "specify the local path where the test data is stored using" "pyplis.inout.set_test_data_path")
[docs]def all_test_data_paths(): """Return list of all search paths for test data.""" dirs = data_search_dirs() paths = [] [paths.append(x) for x in dirs] for data_path in dirs: fp = join(data_path, "_paths.txt") if exists(fp): with open(join(data_path, "_paths.txt"), "r") as f: lines = f.readlines() for line in lines: p = line.split("\n")[0].lower() if exists(p): paths.append(p) return paths
[docs]def set_test_data_path(save_path): """Set local path where test data is stored.""" if save_path.lower() in all_test_data_paths(): logger.info("Path is already in search tree") return dirs = data_search_dirs() fp = join(dirs[0], "_paths.txt") if not exists(fp): fp = join(dirs[1], "_paths.txt") save_path = abspath(save_path) try: if not exists(save_path): raise IOError("Could not set test data path: specified location " "does not exist: %s" % save_path) with open(fp, "a") as f: f.write("\n" + save_path + "\n") print_log.info("Adding new path for test data location in " "file _paths.txt: %s" % save_path) f.close() if "pyplis_etna_testdata" not in listdir(save_path): logger.warning("WARNING: test data folder (name: pyplis_etna_testdata) " "could not be found at specified location, please download " "test data, unzip and save at: %s" % save_path) except: raise
def _load_cam_info(cam_id, filepath): """Load camera info from a specific cam_info file.""" dat = od() if cam_id is None: return dat with open(filepath, 'rb') as f: filters = [] darkinfo = [] io_opts = {} found = 0 for ll in f: line = ll.decode('utf-8').rstrip() if not line: continue if "END" in line and found: dat["default_filters"] = filters dat["dark_info"] = darkinfo dat["io_opts"] = io_opts return dat spl = line.split(":") if len(spl) == 1: continue if found: if line[0] == "#": continue k = spl[0].strip() if k == "dark_info": l = [x.strip() for x in spl[1].split("#")[0].split(',')] darkinfo.append(l) elif k == "filter": l = [x.strip() for x in spl[1].split("#")[0].split(',')] filters.append(l) elif k == "io_opts": l = [x.strip() for x in split("=|,", spl[1].split("#")[0])] keys, vals = l[::2], l[1::2] if len(keys) == len(vals): for i in range(len(keys)): io_opts[keys[i]] = bool(int(vals[i])) elif k == "reg_shift_off": try: l = [float(x.strip()) for x in spl[1].split("#")[0].split(',')] dat["reg_shift_off"] = l except: pass else: data_str = spl[1].split("#")[0].strip() if any([data_str == x for x in ["''", '""']]): data_str = "" dat[k] = data_str if spl[0] == "cam_ids": l = [x.strip() for x in spl[1].split("#")[0].split(',')] if cam_id in l: found = 1 dat["cam_ids"] = l raise IOError("Camera info for cam_id %s could not be found" % cam_id)
[docs]def get_camera_info(cam_id): """Try access camera information from file "cam_info.txt" (package data). :param str cam_id: string ID of camera (e.g. "ecII") """ dirs = data_search_dirs() try: return _load_cam_info(cam_id, join(dirs[0], "cam_info.txt")) except: return _load_cam_info(cam_id, join(dirs[1], "cam_info.txt"))
[docs]def save_new_default_camera(info_dict): """Save new default camera to data file *cam_info.txt*. :param dict info_dict: dictionary containing camera default information Only valid keys will be added to the """ dirs = data_search_dirs() cam_file = join(dirs[0], "cam_info.txt") if not exists(cam_file): cam_file = join(dirs[1], "cam_info.txt") keys = get_camera_info("ecII").keys() for key in keys: logger.info("%s (in input: %s)" % (key, key in info_dict)) if "cam_id" not in info_dict: raise KeyError("Missing specification of cam_id") try: cam_ids = info_dict["cam_ids"] except: info_dict["cam_ids"] = [info_dict["cam_id"]] cam_ids = [info_dict["cam_id"]] if not all([x in info_dict.keys() for x in keys]): raise KeyError("Input dictionary does not include all required keys " "for creating a new default camera type, required " "keys are %s" % keys) ids = get_all_valid_cam_ids() if any([x in ids for x in info_dict["cam_ids"]]): raise KeyError("Cam ID conflict: one of the provided IDs already " "exists in database...") cam_file_temp = create_temporary_copy(cam_file) with open(cam_file_temp, "a") as info_file: info_file.write("\n\nNEWCAM\ncam_ids:") cam_ids = [str(x) for x in cam_ids] info_file.write(",".join(cam_ids)) info_file.write("\n") for k, v in six.iteritems(info_dict): if k in keys: if k == "default_filters": for finfo in v: info_file.write("filter:") finfo = [str(x) for x in finfo] info_file.write(",".join(finfo)) info_file.write("\n") elif k == "dark_info": for finfo in v: info_file.write("dark_info:") finfo = [str(x) for x in finfo] info_file.write(",".join(finfo)) info_file.write("\n") elif k == "io_opts": s = "io_opts:" for opt, val in six.iteritems(v): s += "%s=%d," % (opt, val) s = s[:-1] + "\n" info_file.write(s) elif k == "reg_shift_off": info_file.write("%s:%.2f,%.2f\n" % (k, v[0], v[1])) elif k == "cam_ids": pass else: info_file.write("%s:%s\n" % (k, v)) info_file.write("ENDCAM") info_file.close() # Writing ended without errors: replace data base file "cam_info.txt" with # the temporary file and delete the temporary file copy2(cam_file_temp, cam_file) remove(cam_file_temp) print_log.info("Successfully added new default camera %s to database at %s" % (info_dict["cam_id"], cam_file))
[docs]def save_default_source(info_dict): """Add a new default source to file source_info.txt.""" if not all(k in info_dict for k in ("name", "lon", "lat", "altitude")): raise ValueError("Cannot save source information, require at least " "name, lon, lat and altitude") dirs = data_search_dirs() path = join(dirs[0], "my_sources.txt") if not exists(path): path = join(dirs[1], "my_sources.txt") if info_dict["name"] in get_source_ids(): raise NameError("A source with name %s already exists in database" % info_dict["name"]) source_file_temp = create_temporary_copy(path) with open(source_file_temp, "a") as info_file: info_file.write("\n\nsource_ids:%s\n" % info_dict["name"]) for k, v in six.iteritems(info_dict): info_file.write("%s:%s\n" % (k, v)) info_file.write("END") info_file.close() # Writing ended without errors: replace data base file "cam_info.txt" with # the temporary file and delete the temporary file copy2(source_file_temp, path) remove(source_file_temp) print_log.info("Successfully added new default source %s to database file at %s" % (info_dict["name"], path))
[docs]def get_all_valid_cam_ids(): """Load all valid camera string ids. Reads info from file cam_info.txt which is part of package data """ from pyplis import _LIBDIR ids = [] with open(join(_LIBDIR, "data", "cam_info.txt"), "rb") as f: for line in f: spl = line.decode("ISO-8859-1").split(":") if spl[0].strip().lower() == "cam_ids": ids.extend([x.strip() for x in spl[1].split("#")[0].split(',')]) return ids
[docs]def get_cam_ids(): """Load all default camera string ids. Reads info from file cam_info.txt which is part of package data """ dirs = data_search_dirs() ids = [] for path in dirs: try: with open(join(path, "cam_info.txt")) as f: for line in f: spl = line.split(":") if spl[0].strip().lower() == "cam_id": sid = spl[1].split("#")[0].strip() if sid not in ids: ids.append(sid) except IOError: pass return ids
[docs]def get_source_ids(): """Get all existing source IDs. Reads info from file my_sources.txt which is part of package data """ dirs = data_search_dirs() ids = [] for path in dirs: try: with open(join(path, "my_sources.txt")) as f: for line in f: spl = line.split(":") if spl[0].strip().lower() == "name": sid = spl[1].split("#")[0].strip() if sid not in ids: ids.append(sid) except IOError: pass return ids
[docs]def get_source_info(source_id, try_online=True): """Try access source information from file "my_sources.txt". File is part of package data :param str source_id: string ID of source (e.g. Etna) :param bool try_online: if True and local access fails, try to find source ID in online database """ from pyplis import _LIBDIR dat = od() if source_id == "": return dat found = 0 with open(join(_LIBDIR, "data", "my_sources.txt")) as f: for line in f: if "END" in line and found: return od([(source_id, dat)]) spl = line.split(":") if found: if not any([line[0] == x for x in["#", "\n"]]): spl = line.split(":") k = spl[0].strip() data_str = spl[1].split("#")[0].strip() dat[k] = data_str if spl[0] == "source_ids": if source_id in [x.strip() for x in spl[1].split("#")[0].split(',')]: found = 1 print_log.warning("Source info for source %s could not be found" % source_id) if try_online: try: return get_source_info_online(source_id) except BaseException: pass return od()
[docs]def get_source_info_online(source_id): """Try to load source info from online database (@ www.ngdc.noaa.gov). :param str source_id: ID of source """ name = source_id name = name.lower() url = ("http://www.ngdc.noaa.gov/nndc/struts/results?type_0=Like&query_0=" "&op_8=eq&v_8=&type_10=EXACT&query_10=None+Selected&le_2=&ge_3=" "&le_3=&ge_2=&op_5=eq&v_5=&op_6=eq&v_6=&op_7=eq&v_7=&t=102557&s=5" "&d=5") logger.info("Trying to access volcano data from URL:") logger.info(url) try: # it's a file like object and works just like a file data = urlopen(url) except BaseException: raise res = od() in_row = 0 in_data = 0 lc = 0 col_num = 10 first_volcano_name = "Abu" # this needs to be identical ids = ["name", "country", "region", "lat", "lon", "altitude", "type", "status", "last_eruption"] types = [str, str, str, float, float, float, str, str, str] for line in data: lc += 1 if first_volcano_name in line and line.split(">")[1].\ split("</td")[0].strip() == first_volcano_name: in_data, c = 1, 0 if in_data: if c % col_num == 0 and name in line.lower(): logger.info("FOUND candidate, line: ", lc) spl = line.split(">")[1].split("</td")[0].strip().lower() if name in spl: logger.info("FOUND MATCH: ", spl) in_row, cc = 1, 0 cid = spl res[cid] = od() if in_row: spl = line.split(">")[1].split("</td")[0].strip() res[cid][ids[cc]] = types[cc](spl) cc += 1 if in_row and cc == 9: logger.info("End of data row reached for %s" % cid) cc, in_row = 0, 0 c += 1 return res
[docs]def get_icon(name, color=None): """Try to find icon in lib icon folder. :param str name: name of icon (i.e. filename is <name>.png) :param color (None): color of the icon ("r", "k", "g") Returns icon image filepath if valid """ try: from pyplis import _LIBDIR except BaseException: raise subfolders = ["axialis", "myIcons"] for subf in subfolders: base_path = join(_LIBDIR, "data", "icons", subf) if color is not None: base_path = join(base_path, color) for file in listdir(base_path): fname = basename(file).split(".")[0] if fname == name: return base_path + file logger.warning("Failed to load icon at: " + _LIBDIR) return False
if __name__ == '__main__': i1 = get_camera_info('ecII') i2 = get_camera_info('usgs')