""" Created on 2018-04-09 @author Carl Schaffer @mail carl.schaffer@leibniz-kis.de class for retrieving run information for a set of GrisFitsFiles If this file is called directly, it will attempt to create a GrisRun instance for target passed as the first commandline argument. If no argument is passed, it will attempt to process a dummy folder. """ import datetime import re from glob import glob # get filenames in unix style from os.path import join, dirname, abspath, isdir, basename import astropy.io.fits as fitsio import matplotlib.pyplot as plt import numpy as np from tqdm import tqdm from kis_tools.util.locplot import make_loc_plot from . import GrisFitsFile from .util import get_observers from ..util.util import reglob, gris_run_number, get_filter class GrisRun(object): """Class to handle run-wise gris data Args: gris_fits_files: list of gris files to form one run Returns: ris_run: GrisRun instance """ # Patterns for files: _l0pattern = r"(-\d\d){0,1}" _l1pattern = r"(?:-\d\d){0,1}c[cm]" # List of keywords to extract from header, values are passed # through self.query pars_native = { "BITPIX": "BITPIX", "NAXIS": "NAXIS", "BTYPE": "BTYPE", "DATE-OBS": "date", "TELESCOP": "TELESCOP", "CAMERA": "CAMERA", "WAVELENG": ["WAVELENG", "AWAVLNTH"], "STATES": "states", "AOSYSTEM": "AOSYSTEM", "AO_LOCK": "AO_LOCK", "NSUMEXP": "NSUMEXP", "SERIES": ["NSERIES", "NMAPS"], "IOBS": "IOBS", "RSUN_ARC": "RSUN_ARC", "STEPS": "NSTEPS", "STEPSIZE": "STEPSIZE", "EXPTIME": "XPOSURE", "TARGET": "OBS_TRGT", "OBS_MODE": "obs_mode", } # list of keywords where a range is deduced from the input files, values are passed through GrisFitsFile.query pars_minmax = { "UT": ["DATE-BEG"], "FRIEDR0": "ATMOS_R0", "AZIMUT": "AZIMUT", "ELEVATIO": "ELEV_ANG", "SLITPOSX": "SLITPOSX", "SLITPOSY": "SLITPOSY", } # Format string for storing revision dates in DB date_formatstring = "%Y-%m-%d %H:%M:%S" def __init__(self, gris_fits_files): # check inputs if not gris_fits_files: raise ValueError("Empty list of files passed to GrisRun") # Setup data containers self.properties = {} self.files_broken = [] self.errors = [] self.is_parsed = False # check inputs: if not isinstance(gris_fits_files, list): print("Invalid input to GrisRun!") raise TypeError # check if inputs are already instances of GrisFitsFile, if # not, generate instances elif isinstance(gris_fits_files[0], str): gris_fits_files = [ GrisFitsFile.GrisFitsFile(gff) for gff in gris_fits_files ] # store files sorted by filename self.files = sorted(gris_fits_files, key=lambda x: x.filename) # Extract run-wise parameters from first file of the run self.runnumber = self.files[0].runnumber self.properties["RUNNUMBER"] = self.runnumber self.date = self.files[0].date self.path = abspath(join(dirname(self.files[0].path), "..")) self.properties["DATE-OBS"] = self.date self.maps = list( set([gris_fitsfile.mapnumber for gris_fitsfile in self.files]) ) self.map_lengths = {} self.get_map_lengths() # Check data levels: self.check_data_levels() # Look for preview images and log files self.get_previews() def parse(self): """Performs all information gathering that requires opening individual fits files instead of only using info from their filenames Args: Returns: """ if self.is_parsed: return self.is_parsed progress_bar = tqdm( sorted(self.files, key=lambda x: x.filename, reverse=True) ) for fits_file in progress_bar: progress_bar.set_description( f"Parsing {basename(fits_file.filename)}" ) fits_file.parse() if fits_file.errors: # Get errors from file,move file to broken list for error in fits_file.errors: self.errors.append(error) self.files_broken.append( self.files.pop(self.files.index(fits_file)) ) if not self.files: self.errors.append( "ERROR in %s: No fits files for run!" % (str(self)) ) return False # Extract observation properites from files pass # Set is parsed flag self.is_parsed = True def query(self, attribute): """ Get query attribute either from self or search constituents for that attribute Args: attribute: name of attribute to query Returns: res: value of given attribute if the current instance has the attribute, list of attributes for all constituents if there is no attribute in "self" """ if hasattr(self, attribute): res = getattr(self, attribute) else: res = [f.query(attribute) for f in self.files] return res @property def verbose_name(self, date=None): """Make verbose name for GRIS runs, turns the date of the run, or a date passed as an argument into a string like 26aug14.001 where the sectionafter the point is the runnumber of the corresponding run. If a specific date is passed, only the first section of the string is generated. Args: date (datetime): specific date defaults to None Returns: outstring (string): verbose date """ if date is None: outstring = self.date.strftime("%d%b%y").lower() outstring += f".{int(self.runnumber):03d}" else: outstring = date.strftime("%d%b%y").lower() return outstring def matching_files(self, subfolder, pattern=None): query = self.verbose_name if not pattern: files = glob(join(self.path, subfolder, "*")) else: query += pattern files = reglob(join(self.path, subfolder), query) return files @property def obs_name(self): """Observation name unique within all data for the instrument""" return f"{self.date.strftime('%Y%m%d')}_{self.runnumber:03d}" @property def files_l0(self): return self.matching_files("level0", self._l0pattern) @property def files_l1(self): return self.matching_files("level1", self._l1pattern) @property def files_l1_split(self): files = glob(join(self.path, "level1_split", "*.fits")) res = list( filter(lambda x: gris_run_number(x) == self.runnumber, files) ) return res @property def files_l1_split_mod(self): files = glob(join(self.path, "level1_split_modified", "*.fits")) res = list( filter(lambda x: gris_run_number(x) == self.runnumber, files) ) return res @property def files_l2(self): files = glob(join(self.path, "level2", "*")) res = list(filter(lambda x: self.verbose_name in basename(x), files)) return res @property def files_l3(self): files = glob(join(self.path, "level3", "*")) res = list(filter(lambda x: self.verbose_name in basename(x), files)) return res @property def context_files(self): return self.matching_files("context_data") @property def context_folder(self): return join(self.path, 'context_data') @property def map_saves(self): res = list(filter(lambda x: x[-1] == "m", self.files_l1)) return res @property def coord_saves(self): res = list(filter(lambda x: "coord_mu" in x, self.context_files)) return res @property def mask_files(self): res = list(filter(lambda x: "pen" in x and ".sav" in x, self.files_l2)) return res @property def map_files(self): res = list(filter(lambda x: "maps" in x, self.files_l2)) return res @property def spec_files(self): res = list(filter(lambda x: "cor_spec" in x, self.files_l2)) return res @property def stokes_files(self): res = list(filter(lambda x: "stok" in x, self.files_l2)) return res @property def loc_previews(self): def filterfunc(x): return re.search(f"HMI_{self.verbose_name}\\.png|{self.runnumber}_location.png", x) res = list(filter(filterfunc, self.context_files)) return res @property def spatial_uncertainties(self): """Average Uncertainties as described by WCS""" return np.mean(self.query("CSYER1")), np.mean(self.query("CSYER2")) @property def map_previews(self): def filterfunc(x): return re.match(f"{self.verbose_name}\\.png", basename(x)) res = list(filter(filterfunc, self.context_files)) return res @property def logfiles(self): files = glob(join(self.path, basename(self.path) + ".txt")) return files @property def observers(self): if self.logfiles: observers = get_observers(self.logfiles[0]) return observers @property def was_aborted(self): # determine number of maps found map_lengths = self.map_lengths n_maps = len(map_lengths) if n_maps == 0: return None # check header for planned number of maps planned_maps = 0 planned_steps = 0 header = fitsio.getheader(self.files[0].path, ignore_missing_end=True) planned_maps = int(header["NMAPS"]) planned_steps = int(header["NSTEPS"]) if n_maps != planned_maps: return True len_first_map = map_lengths[sorted(map_lengths.keys())[0]] if len_first_map != planned_steps: return True was_aborted = any([v != len_first_map for v in map_lengths.values()]) return was_aborted def get_map_lengths(self): """Check number of slit positions and check if this number is as specified by the header Args: Returns: """ for mapnumber in self.maps: slit_numbers = [ f.slitposnumber for f in self.files if f.mapnumber == mapnumber ] self.map_lengths[mapnumber] = max(slit_numbers) def get_previews(self): """look for preview files and store their paths in self.previews as 'key':path Args: Returns: """ run = self.runnumber # retrieve splits_level1_folder from first fits file folder = dirname(self.files[0].filename) # navigate to root folder: folder = abspath(join(folder, "../")) mapfolder = folder.replace("/gris/", "/gris_maps/") preview_map = [] preview_loc = self.loc_previews preview_log = self.logfiles preview_map = glob( join(self.context_folder, f"???????.{run:03}.png") ) if not preview_map: preview_map = glob( join(mapfolder, "context_data", f"*{run:03}_*.gif") ) if not preview_map: preview_map = glob( join(mapfolder, "context_data", f"*{run:03}_???.png") ) self.previews = {} if preview_loc: self.previews["PREVIEWLOC"] = preview_loc[0] if preview_log: self.previews["PREVIEWLOG"] = preview_log[0] if preview_map: self.previews["PREVIEWMAP"] = preview_map[0] def check_data_levels(self): """Check which data levels are present in parent directory todo : so far there is no checking whether a specific run is contained in all folders, we assume that if the folder is present in the day, the corresponding run is contained in that day. this could be improved! Args: Returns: """ # Keys under which to store the result keys = ["DATALVL0", "DATALVL1", "DATALVL2", "DATALV1S"] # folders to check for, order MUST match 'keys' above! folders = ["level0", "level1", "level2", "level1_split"] path = dirname(self.files[0].filename) path = abspath(path) directory = join(path, "..") # check if given folder is present and set run flags for k, folder in zip(keys, folders): path = join(directory, folder) folder_present = isdir(path) self.properties[k] = folder_present def __str__(self): outstring = f"GrisRun#{self.runnumber:3d} Date: {self.date.strftime('%Y-%m-%d')}," outstring += f" {len(self.maps):3d} maps, {len(self.files):3d} split files" return outstring def __repr__(self): return str(self) def get_cube(self): """Retrieve data cube for run from slit files""" # Determine data shape and initialize numpy cube gris_fitsfile = fitsio.open( self.files[0].filename, ignore_missing_end=True ) shape = gris_fitsfile[0].data.shape gris_fitsfile.close() n_x = len(self.files) n_y = shape[1] nlambda = shape[-1] npol = shape[0] cube = np.ndarray([npol, n_x, n_y, nlambda]) # get data into cube: for i, fitsfile in enumerate(self.files): data = fitsio.open(fitsfile.filename, ignore_missing_end=True)[ 0 ].data cube[:, i, :, :] = data data.close() self.cube = cube def get_mean_specs(self): """Calculate mean spectra for Stokes I,Q,U and V""" # check if cube has been made if not hasattr(self, "cube"): print("Missing cube, can't plot!") return # average over spatial dimensions and slit positions specs = np.ndarray([self.cube.shape[0], self.cube.shape[-1]]) for i in range(specs.shape[0]): specs[i, :] = self.cube[i, :, :, :].mean(axis=0).mean(axis=0) self.specs = specs def plot_toti(self): """ plot self.cube totI channel """ if not hasattr(self, "cube"): print("Missing cube, not plotting!") return toplot = self.cube[0, :, :, :].mean(axis=2) plt.imshow( np.rot90(toplot), vmax=np.percentile(toplot, 95), vmin=np.percentile(toplot, 10), cmap="gray", ) plt.ion() plt.show() def plot_location(self): coords = self.calculate_bounding_box() date = min(self.query("obs_time")) fig, ax = make_loc_plot(coords[:2], coords[2:], date, uncertainties=self.spatial_uncertainties) return fig, ax def plot_specs(self): """plot average spectra for given run. requres average spectra and cube """ # check if mean spectra have been calculated if not hasattr(self, "specs"): print("Missing specs, not plotting!") return for i in range(4): plt.plot(self.specs[i, :] / np.linalg.norm(self.specs[i, :])) plt.ion() plt.show() def calculate_bounding_box(self): """ Determine the bounding box for the run. The box is calculated as the area covered by the first map in the observation. If there are subsequent maps, they are ignored. Returns: xmin,xmax,ymin,ymax : Helioprojective coordinates in arcsec """ first_map_id = self.maps[0] map_files = sorted(list(filter(lambda x: x.mapnumber == first_map_id, self.files)), key=lambda x: x.filename) first_coords = map_files[0]._coords_from_wcs last_coords = map_files[-1]._coords_from_wcs x_vals = np.concatenate([first_coords[:, 0], last_coords[:, 0]]).value y_vals = np.concatenate([first_coords[:, 1], last_coords[:, 1]]).value xmin, xmax = min(x_vals), max(x_vals) ymin, ymax = min(y_vals), max(y_vals) return xmin, xmax, ymin, ymax # noinspection PyMethodOverriding class EmptyGrisRun(GrisRun): def __init__(self, path): """run constructed from a path to a single l1 or l0 filename""" runnumber = gris_run_number(path) fn = basename(path) self.date = datetime.datetime.strptime(fn.split(".")[0], "%d%b%y") self.runnumber = runnumber self.map_lengths = {} self.files = [] self.path = abspath(join(dirname(path), "..")) self.maps = [] # noinspection PyMethodOverriding @property def verbose_name(self): outstring = self.date.strftime("%d%b%y").lower() outstring += f".{int(self.runnumber):03d}" return outstring if __name__ == "__main__": files = glob("/dat/sdc/gris/20150426/level1_split/*_001_???_*.fits") gr = GrisRun(files) props = gr.parse()