""" Created on 2018-04-09 @author Carl Schaffer @mail carl.schaffer@leibniz-kis.de class for retrieving run information for a set of GrisFitsFiles If this file is called directly, it will attempt to create a GrisRun instance for target passed as the first commandline argument. If no argument is passed, it will attempt to process a dummy folder. """ import datetime import re from glob import glob # get filenames in unix style from os.path import join, dirname, abspath, isdir, basename import astropy.io.fits as fitsio import matplotlib.pyplot as plt import numpy as np from tqdm import tqdm from kis_tools.util.locplot import make_loc_plot from . import GrisFitsFile from .util import get_observers from ..generic import gris_fields from ..util.util import reglob, gris_run_number, get_filter class GrisRun(object): """Class to handle run-wise gris data Args: gris_fits_files: list of gris files to form one run Returns: ris_run: GrisRun instance """ # Patterns for files: _l0pattern = r"(-\d\d){0,1}" _l1pattern = r"(?:-\d\d){0,1}c[cm]" # List of keywords to extract from header, values are passed # through self.query pars_native = { "BITPIX": "BITPIX", "NAXIS": "NAXIS", "BTYPE": "BTYPE", "DATE-OBS": "date", "TELESCOP": "TELESCOP", "CAMERA": "CAMERA", "WAVELENG": ["WAVELENG", "AWAVLNTH"], "STATES": "states", "AOSYSTEM": "AOSYSTEM", "AO_LOCK": "AO_LOCK", "NSUMEXP": "NSUMEXP", "SERIES": ["NSERIES", "NMAPS"], "IOBS": "IOBS", "RSUN_ARC": "RSUN_ARC", "STEPS": "NSTEPS", "STEPSIZE": "STEPSIZE", "EXPTIME": "XPOSURE", "TARGET": "OBS_TRGT", "OBS_MODE": "obs_mode", } # list of keywords where a range is deduced from the input files, values are passed through GrisFitsFile.query pars_minmax = { "UT": ["DATE-BEG"], "FRIEDR0": "ATMOS_R0", "AZIMUT": "AZIMUT", "ELEVATIO": "ELEV_ANG", "SLITPOSX": "SLITPOSX", "SLITPOSY": "SLITPOSY", } # Format string for storing revision dates in DB date_formatstring = "%Y-%m-%d %H:%M:%S" def __init__(self, gris_fits_files): # check inputs if not gris_fits_files: raise ValueError("Empty list of files passed to GrisRun") # Setup data containers self.properties = {} self.files_broken = [] self.errors = [] self.is_parsed = False # check inputs: if not isinstance(gris_fits_files, list): print("Invalid input to GrisRun!") raise TypeError # check if inputs are already instances of GrisFitsFile, if # not, generate instances elif isinstance(gris_fits_files[0], str): gris_fits_files = [ GrisFitsFile.GrisFitsFile(gff) for gff in gris_fits_files ] # store files sorted by filename self.files = sorted(gris_fits_files, key=lambda x: x.filename) # Extract run-wise parameters from first file of the run self.runnumber = self.files[0].runnumber self.properties["RUNNUMBER"] = self.runnumber self.date = self.files[0].date self.path = abspath(join(dirname(self.files[0].path), "..")) self.properties["DATE-OBS"] = self.date self.maps = list( set([gris_fitsfile.mapnumber for gris_fitsfile in self.files]) ) self.map_lengths = {} self.get_map_lengths() # Check data levels: self.check_data_levels() # Look for preview images and log files self.get_previews() def parse(self): """Performs all information gathering that requires opening individual fits files instead of only using info from their filenames Args: Returns: """ if self.is_parsed: return self.is_parsed progress_bar = tqdm( sorted(self.files, key=lambda x: x.filename, reverse=True) ) for fits_file in progress_bar: progress_bar.set_description( f"Parsing {basename(fits_file.filename)}" ) fits_file.parse() if fits_file.errors: # Get errors from file,move file to broken list for error in fits_file.errors: self.errors.append(error) self.files_broken.append( self.files.pop(self.files.index(fits_file)) ) if not self.files: self.errors.append( "ERROR in %s: No fits files for run!" % (str(self)) ) return False # Extract observation properites from files pass # Extract run-wise parameters from files files = self.files file = files[0] xmin, xmax, ymin, ymax = self.calculate_bounding_box() observation_description = dict( OBS_NAME=self.obs_name, DATASET_NAME=f"gris_{self.date.strftime('%Y%m%d')}_{self.runnumber:03d}", DATAPRODUCT_TYPE="cube", CALIB_LEVEL=1, OBS_COLLECTION="gris_observations", DATE_BEG=min(self.query("obs_time")), DATE_END=max(self.query("obs_time")) + datetime.timedelta(seconds=file.query("XPOSURE") / 1000.), DATE_OBS=min(self.query("obs_time")), TIME_RESOLUTION=(max(self.query("obs_time")) - min(self.query("obs_time"))).seconds / file.query("NMAPS"), T_XEL=len(self.maps), EXPTIME=file.query("XPOSURE") / 1000., ATMOS_R0_MIN=min(self.query("ATMOS_R0")), ATMOS_R0_MAX=max(self.query("ATMOS_R0")), HELIOPROJECTIVE_X_MIN=xmin, HELIOPROJECTIVE_X_MAX=xmax, HELIOPROJECTIVE_Y_MIN=ymin, HELIOPROJECTIVE_Y_MAX=ymax, MU=np.mean(self.query("mu")), THETA=np.mean(self.query("theta")), S_XEL1=file.query("NAXIS1"), S_XEL2=max(self.query("ISTEP")), S_RESOLUTION=file.query("CDELT1"), WAVELENGTH_MAX=max(self.query("AWAVMAX")) / 10., WAVELENGTH_MIN=min(self.query("AWAVMIN")) / 10., EM_XEL=file.query("NAXIS3"), EM_RESOLUTION=file.query("CDELT3"), POL_STATES="IQUV" if "NAXIS4" in file.header else "", POL_XEL=file.query("NAXIS4") if "NAXIS4" in file.header else 0, DIMENSIONS=file.query("NAXIS"), BTYPE=file.query("BTYPE"), TELESCOPE="GREGOR", INSTRUMENT="GRIS", TARGET=file.query("OBS_TRGT"), FILTER=get_filter(file.query("AWAVLNTH") / 10.), IOBS=file.query("IOBS"), STEPSIZE=file.query("STEPSIZE"), OBS_MODE=file.query("OBS_MODE") ) # Checkthat all fields are contained in the observation description fields = gris_fields assert all([k in observation_description for k in fields.keys()]) self.properties = observation_description # Set is parsed flag self.is_parsed = True def query(self, attribute): """ Get query attribute either from self or search constituents for that attribute Args: attribute: name of attribute to query Returns: res: value of given attribute if the current instance has the attribute, list of attributes for all constituents if there is no attribute in "self" """ if hasattr(self, attribute): res = getattr(self, attribute) else: res = [f.query(attribute) for f in self.files] return res @property def verbose_name(self, date=None): """Make verbose name for GRIS runs, turns the date of the run, or a date passed as an argument into a string like 26aug14.001 where the sectionafter the point is the runnumber of the corresponding run. If a specific date is passed, only the first section of the string is generated. Args: date (datetime): specific date defaults to None Returns: outstring (string): verbose date """ if date is None: outstring = self.date.strftime("%d%b%y").lower() outstring += f".{int(self.runnumber):03d}" else: outstring = date.strftime("%d%b%y").lower() return outstring def matching_files(self, subfolder, pattern=None): query = self.verbose_name if not pattern: files = glob(join(self.path, subfolder, "*")) else: query += pattern files = reglob(join(self.path, subfolder), query) return files @property def obs_name(self): """Observation name unique within all data for the instrument""" return f"{self.date.strftime('%Y%m%d')}_{self.runnumber:03d}" @property def files_l0(self): return self.matching_files("level0", self._l0pattern) @property def files_l1(self): return self.matching_files("level1", self._l1pattern) @property def files_l1_split(self): files = glob(join(self.path, "level1_split", "*.fits")) res = list( filter(lambda x: gris_run_number(x) == self.runnumber, files) ) return res @property def files_l1_split_mod(self): files = glob(join(self.path, "level1_split_modified", "*.fits")) res = list( filter(lambda x: gris_run_number(x) == self.runnumber, files) ) return res @property def files_l2(self): files = glob(join(self.path, "level2", "*")) res = list(filter(lambda x: self.verbose_name in basename(x), files)) return res @property def files_l3(self): files = glob(join(self.path, "level3", "*")) res = list(filter(lambda x: self.verbose_name in basename(x), files)) return res @property def context_files(self): return self.matching_files("context_data") @property def context_folder(self): return join(self.path, 'context_data') @property def map_saves(self): res = list(filter(lambda x: x[-1] == "m", self.files_l1)) return res @property def coord_saves(self): res = list(filter(lambda x: "coord_mu" in x, self.context_files)) return res @property def mask_files(self): res = list(filter(lambda x: "pen" in x and ".sav" in x, self.files_l2)) return res @property def map_files(self): res = list(filter(lambda x: "maps" in x, self.files_l2)) return res @property def spec_files(self): res = list(filter(lambda x: "cor_spec" in x, self.files_l2)) return res @property def stokes_files(self): res = list(filter(lambda x: "stok" in x, self.files_l2)) return res @property def loc_previews(self): def filterfunc(x): return re.search(f"HMI_{self.verbose_name}\\.png|{self.runnumber}_location.png", x) res = list(filter(filterfunc, self.context_files)) return res @property def spatial_uncertainties(self): """Average Uncertainties as described by WCS""" return np.mean(self.query("CSYER1")), np.mean(self.query("CSYER2")) @property def map_previews(self): def filterfunc(x): return re.match(f"{self.verbose_name}\\.png", basename(x)) res = list(filter(filterfunc, self.context_files)) return res @property def logfiles(self): files = glob(join(self.path, basename(self.path) + ".txt")) return files @property def observers(self): if self.logfiles: observers = get_observers(self.logfiles[0]) return observers @property def was_aborted(self): # determine number of maps found map_lengths = self.map_lengths n_maps = len(map_lengths) if n_maps == 0: return None # check header for planned number of maps planned_maps = 0 planned_steps = 0 header = fitsio.getheader(self.files[0].path, ignore_missing_end=True) planned_maps = int(header["NMAPS"]) planned_steps = int(header["NSTEPS"]) if n_maps != planned_maps: return True len_first_map = map_lengths[sorted(map_lengths.keys())[0]] if len_first_map != planned_steps: return True was_aborted = any([v != len_first_map for v in map_lengths.values()]) return was_aborted def get_map_lengths(self): """Check number of slit positions and check if this number is as specified by the header Args: Returns: """ for mapnumber in self.maps: slit_numbers = [ f.slitposnumber for f in self.files if f.mapnumber == mapnumber ] self.map_lengths[mapnumber] = max(slit_numbers) def fix_times(self): """Fix all time values for consistent format in database before generating a database entry """ # parse datetime objects from strings keys = ["UT-MIN", "UT-MAX"] patterns = ["%Y-%m-%dT%H:%M:%S.%f", "%Y-%m-%dT%H:%M:%S"] for key in keys: value = self.properties[key] for p in patterns: try: time = datetime.datetime.strptime(value, p) break except ValueError: continue self.properties[key] = time def get_native(self): """Extract runwise parameter from filelist""" # Check for a file that has been successfully parsed for gris_fitsfile in self.files: if gris_fitsfile.is_parsed is True: break else: self.errors.append( "ERROR in %s: No file successfully parsed!" % (str(self)) ) return -1 # Try retrieving native keys from file, log error is unsuccessful first_file = self.files[0] for key, parameter in self.pars_native.items(): try: self.properties[key] = first_file.query(parameter) except KeyError: message = "ERROR in %s: Key %s not available in fits file!" message = message % (str(self), parameter) self.errors.append(message) # get naxisi values for i in range(1, self.properties["NAXIS"] + 1): key = f"NAXIS{i}" self.properties[key] = first_file.query(key) # fix type and value for wavelength self.properties["WAVELENG"] = int(self.properties["WAVELENG"] / 10) def get_minmax(self, key, parameter): """get parameter range from fits files Args: key: parameter: key to process, valid values are any values that are included in GrisFitsFile.properties.keys() newkey: key under which to store the values in self default is identical to original key for each original key a set of new keys appended with '-MIN' and '-MAX' are added Returns: """ try: data = [ gris_fitsfile.query(parameter) for gris_fitsfile in self.files ] # drop empty strings which are inserted instead of invalid values data = [d for d in data if d != ""] defaults = { "AZIMUT": -1.0, "ELEVATIO": -1, "SLITPOSX": 0, "SLITPOSY": 0, "FRIEDR0": 0, } if not data: print(f"No valid entries found in header for {key}") if key in defaults: print(f"Defaulting to {defaults[key]}") data.append(defaults[key]) # get min and max self.properties[key + "-MAX"] = max(data) self.properties[key + "-MIN"] = min(data) except KeyError: self.errors.append( "Missing Parameter: %s not found for %s\n" % (parameter, str(self)) ) def get_previews(self): """look for preview files and store their paths in self.previews as 'key':path Args: Returns: """ run = self.runnumber # retrieve splits_level1_folder from first fits file folder = dirname(self.files[0].filename) # navigate to root folder: folder = abspath(join(folder, "../")) mapfolder = folder.replace("/gris/", "/gris_maps/") preview_map = [] preview_loc = self.loc_previews preview_log = self.logfiles preview_map = glob( join(self.context_folder, f"???????.{run:03}.png") ) if not preview_map: preview_map = glob( join(mapfolder, "context_data", f"*{run:03}_*.gif") ) if not preview_map: preview_map = glob( join(mapfolder, "context_data", f"*{run:03}_???.png") ) self.previews = {} if preview_loc: self.previews["PREVIEWLOC"] = preview_loc[0] if preview_log: self.previews["PREVIEWLOG"] = preview_log[0] if preview_map: self.previews["PREVIEWMAP"] = preview_map[0] def make_bson(self): """construct BSON entry for run. parameters RUNNUMBER and NFILES are always inserted. Apart from that, all parameters from the argument are appended Returns: """ return self.properties.copy() def check_data_levels(self): """Check which data levels are present in parent directory todo : so far there is no checking wheter a specific run is contained in all folders, we assume that if the folder is present in the day, the corresponding run is contained in that day. this could be improved! Args: Returns: """ # Keys under which to store the result keys = ["DATALVL0", "DATALVL1", "DATALVL2", "DATALV1S"] # folders to check for, order MUST match 'keys' above! folders = ["level0", "level1", "level2", "level1_split"] path = dirname(self.files[0].filename) path = abspath(path) directory = join(path, "..") # check if given folder is present and set run flags for k, folder in zip(keys, folders): path = join(directory, folder) folder_present = isdir(path) self.properties[k] = folder_present def __str__(self): outstring = f"GrisRun#{self.runnumber:3d} Date: {self.date.strftime('%Y-%m-%d')}," outstring += f" {len(self.maps):3d} maps, {len(self.files):3d} split files" return outstring def __repr__(self): return str(self) def get_cube(self): """Retrieve data cube for run from slit files""" # Determine data shape and initialize numpy cube gris_fitsfile = fitsio.open( self.files[0].filename, ignore_missing_end=True ) shape = gris_fitsfile[0].data.shape gris_fitsfile.close() n_x = len(self.files) n_y = shape[1] nlambda = shape[-1] npol = shape[0] cube = np.ndarray([npol, n_x, n_y, nlambda]) # get data into cube: for i, fitsfile in enumerate(self.files): data = fitsio.open(fitsfile.filename, ignore_missing_end=True)[ 0 ].data cube[:, i, :, :] = data data.close() self.cube = cube def get_mean_specs(self): """Calculate mean spectra for Stokes I,Q,U and V""" # check if cube has been made if not hasattr(self, "cube"): print("Missing cube, can't plot!") return # average over spatial dimensions and slit positions specs = np.ndarray([self.cube.shape[0], self.cube.shape[-1]]) for i in range(specs.shape[0]): specs[i, :] = self.cube[i, :, :, :].mean(axis=0).mean(axis=0) self.specs = specs def plot_toti(self): """ plot self.cube totI channel """ if not hasattr(self, "cube"): print("Missing cube, not plotting!") return toplot = self.cube[0, :, :, :].mean(axis=2) plt.imshow( np.rot90(toplot), vmax=np.percentile(toplot, 95), vmin=np.percentile(toplot, 10), cmap="gray", ) plt.ion() plt.show() def plot_location(self): coords = self.calculate_bounding_box() date = min(self.query("obs_time")) fig, ax = make_loc_plot(coords[:2], coords[2:], date, uncertainties=self.spatial_uncertainties) plt.ion() plt.show() return fig,ax def plot_specs(self): """plot average spectra for given run. requres average spectra and cube """ # check if mean spectra have been calculated if not hasattr(self, "specs"): print("Missing specs, not plotting!") return for i in range(4): plt.plot(self.specs[i, :] / np.linalg.norm(self.specs[i, :])) plt.ion() plt.show() def calculate_bounding_box(self): """ Determine the bounding box for the run. The box is calculated as the area covered by the first map in the observation. If there are subsequent maps, they are ignored. Returns: xmin,xmax,ymin,ymax : Helioprojective coordinates in arcsec """ first_map_id = self.maps[0] map_files = sorted(list(filter(lambda x: x.mapnumber == first_map_id, self.files)), key=lambda x: x.filename) first_coords = map_files[0]._coords_from_wcs last_coords = map_files[-1]._coords_from_wcs x_vals = np.concatenate([first_coords[:, 0], last_coords[:, 0]]).value y_vals = np.concatenate([first_coords[:, 1], last_coords[:, 1]]).value xmin, xmax = min(x_vals), max(x_vals) ymin, ymax = min(y_vals), max(y_vals) return xmin, xmax, ymin, ymax # noinspection PyMethodOverriding class EmptyGrisRun(GrisRun): def __init__(self, path): """run constructed from a path to a single l1 or l0 filename""" runnumber = gris_run_number(path) fn = basename(path) self.date = datetime.datetime.strptime(fn.split(".")[0], "%d%b%y") self.runnumber = runnumber self.map_lengths = {} self.files = [] self.path = abspath(join(dirname(path), "..")) self.maps = [] # noinspection PyMethodOverriding @property def verbose_name(self): outstring = self.date.strftime("%d%b%y").lower() outstring += f".{int(self.runnumber):03d}" return outstring if __name__ == "__main__": files = glob("/dat/sdc/gris/20150426/level1_split/*_001_???_*.fits") gr = GrisRun(files) props = gr.parse()