""" Created on 2018-04-09 @author Carl Schaffer @mail carl.schaffer@leibniz-kis.de """ import datetime # date handling import os # path and file manipulations import sys # error handling from glob import glob from os.path import basename, exists, join, dirname from warnings import warn import astropy.io.fits as fitsio # fits reading and writing import astropy.units as u import matplotlib.pyplot as plt import numpy as np from astropy.coordinates import SkyCoord from astropy.wcs import WCS from ..generic.fits import FitsFile from ..util import calc_mu, calc_theta from ..util.util import date_from_fn, gris_obs_mode class GrisFitsFile(FitsFile): """Class for handling fits files for GRIS db When the constructor is called, the filename is stored and basic metadata parameters such as date and runnumber are extracted from the filename. Once the class function GrisFitsFile.parse() is called, metadata from the fits file is available as a dictionary stored at GrisFitsFile.properties. Any errors ocurring during opening and parsing of the file are stored as strings in GrisFitsFile.errors Args: filename: filename of target fits file Returns: """ def __init__(self, filename): """Constructor""" super().__init__(filename) assert exists(filename) self.properties = {} self.errors = [] self.filename = filename self.name = basename(filename) self.path = self.filename self.dirname = dirname(self.path) self.parse_filename() self.is_parsed = False def parse_filename(self): """Extract metadata from filename""" # Extract time from Filename name = os.path.splitext(basename(self.filename))[0].split("_") year = int(name[1][:4]) month = int(name[1][4:6]) day = int(name[1][6:]) # Get time # hh = int(name[2][:2]) # mm = int(name[2][2:4]) # outstring = int(name[2][4:6]) # self.date = datetime.datetime(year,month,day,hh,mm,outstring) self.date = datetime.datetime(year, month, day, 0, 0, 0) # Extract Run-, Map-, and Stepnumber, as well as mode string self.runnumber = int(name[4]) self.mapnumber = int(name[5]) self.slitposnumber = name[6] self.modestring = name[3] @property def verbose_runname(self): """ Get verbose run name such as 24apr14.002 Returns: outstring: verbose run name """ outstring = self.date.strftime("%d%b%y").lower() outstring += f".{int(self.runnumber):03d}" return outstring @property def _coords_from_simple_header(self): """coordinates for slit, the Keywords 'SLITPOSX' and 'SLITPOSY' are assumed to be the center of the slit while the keyword 'SLITORIE' describes the rotation of the slit w.r.t. the helioprojective axes. The algorithm further assumes that each pixel represents a square area with side length of 'STEPSIZE'. These assumptions are used to calculate the coordinates for the center of each pixel within the data array. Returns: (X,Y):ndarray(float) x and y coordinates for each pixel of the slit WARNING! The code below is not checked for correctness! We do not know what direction SLITORIE references off and coordinates provided by GREGOR are very error-prone. """ h = self.header slit_length_pixels = h["NAXIS2"] center_pixel = slit_length_pixels // 2 x_slitcenter = h["SLITPOSX"] if h["SLITPOSX"] != "" else 0 y_slitcenter = h["SLITPOSY"] if h["SLITPOSY"] != "" else 0 angle = h["SLITORIE"] if h["SLITORIE"] != "" else 0 delta_pix = [i - center_pixel for i in range(slit_length_pixels)] delta_pix = np.array(delta_pix) arcsec_per_pixel_x = h["STEPSIZE"] # assuming square pixels arcsec_per_pixel_y = h["STEPSIZE"] if h["STEPSIZE"] else 0.135 # assuming square pixels X = x_slitcenter - np.sin(angle) * delta_pix * arcsec_per_pixel_x Y = y_slitcenter - np.cos(angle) * delta_pix * arcsec_per_pixel_y message = "Using FITS file coordinates, they are probably wrong by at least 100 arcsec!" \ " Also the slit angle is definitely off!" warn(message) coord_array = np.array([X, Y]) * u.arcsec # reorder axes for consistency with other coordinate retrieval methods coord_array = np.swapaxes(coord_array, 0, 1) return coord_array @property def coords(self): try: coords = self._coords_from_wcs except AssertionError: coords = self._coords_from_simple_header return coords @property def old_header(self): """Check if the header of this file has already been processed""" old_header = True if any([k in self.header for k in ['SOLARNET']]): old_header = False return old_header @property def mu(self): """ Calculate mu from coordinates Returns: mu """ coords = self.coords.value X, Y = coords[:, 0], coords[:, 1] x = np.mean(X) y = np.mean(Y) mu = calc_mu(x, y) return mu @property def theta(self): """ Calculate theta from coordinates Returns: theta """ coords = self.coords.value X, Y = coords[:, 0], coords[:, 1] x = np.mean(X) y = np.mean(Y) theta = calc_theta(x, y) return theta @property def slit_orientation(self): """Get slit_orientation, defaults to 0. Not sure what this angle means... Returns: slit_orientation: float """ slit_orientation = self._get_value(["SLITORIE"]) slit_orientation = slit_orientation if slit_orientation != "" else 0 return slit_orientation @property def number_of_maps(self): """ Returns: nmaps: int number of maps in associated observation """ return self._get_value(["NMAPS", "SERIES"]) @property def map_index(self): """ Returns: i_map: int map index within observation """ return self._get_value(["IMAP", "ISERIE"]) @property def n_steps(self): """ Determine the number of slit scanning steps in the associated map. Returns: nsteps: number of steps in the associated map """ # get initial guess from header nsteps = self._get_value(["STEPS", "NSTEPS"]) # check if other files matching the same run are present. If so, check the running step number # for the same map to determine the actual number of steps. It is assumed, that all files belonging to the # same map are stored in the same directory pattern = ( f"gris_*_*_*_{self.runnumber:03d}_{self.mapnumber:03d}_*.fits" ) matching_files = glob(join(self.dirname, pattern)) max_steps = int( max( [ basename(m).split(".")[0].split("_")[-1] for m in matching_files ] ) ) if max_steps < nsteps: nsteps = max_steps return nsteps @property def obs_time(self): """ Returns: obs_time: observation time given in filename """ return date_from_fn(self.filename) @property def step_index(self): """ Returns: step_index: index of scanning step within current map """ return self._get_value(["ISTEP"]) @property def step_angle(self): """ Returns: step_angle: scanning angle """ return self._get_value(["STEPANGL"]) @property def wavelength_step_size(self): """Get step length along wavelength axis. We use the values from the continuum correction fitting""" return self._kw_mean(["FF1WLDSP", "FF2WLDSP"]) @property def wavelength_region(self): """Get wavelength region covered by file. Returns tuple of (upper, lower)""" step_size = self.wavelength_step_size n_steps_wl = self.data.shape[-1] # average for lower border: lower = self._kw_mean(["FF1WLOFF", "FF2WLOFF"]) # add wl steps to calculate upper border upper = lower + step_size * n_steps_wl return lower, upper @property def derotator_in(self): """ Determine whether the derotator is inserted Returns: bool: derotator in """ try: rotval = self._get_value(["ROTCODE"]) except KeyError: # if no derotator keywords are scontained, assume it doesn't exist yet return False if rotval == 0: return True else: return False @property def states(self): """ Determine number of polarimetric states Returns: n_states int: number of states """ # Number of polarimetric states is stored in the last axis. Determine # number of axes from NAXIS keyword and query value for last axis. naxes = self.query("NAXIS") keyword = f"NAXIS{naxes}" states = self.query(keyword) return states @property def obs_mode(self): """Get observation mode""" self.parse() # extract obs mode keywords states = self.states n_maps = self.number_of_maps n_steps = self.n_steps mode = gris_obs_mode(states, n_maps, n_steps) return mode def parse(self): """extract data and metadata from file""" if self.is_parsed: return def is_valid(value): valid = True if isinstance(value, fitsio.card.Undefined): valid = False elif value == "-NAN": valid = False return valid # Read file try: header = self.header # Add header to self.properties, cast strange data types where # necessary, fill invalid values with empty strings for key in header.keys(): value = header[key] # check validity: value = value if is_valid(value) else "" if key in ["COMMENT", "HISTORY"]: self.properties[key] = str(value) else: self.properties[key] = value self.is_parsed = True except: exception = sys.exc_info()[0] self.errors.append("File %s" % self.filename + str(exception)) def make_bson(self): """construct BSON document for mongodb storage""" return self.properties @property def _coords_from_wcs(self): """ Retrieve spatial coordinates for each pixel in for slices projected into the spatial dimensions. Returns a data cube shaped s the spatial dimensions Returns: arcsec, cube of (NAXIS1, NAXIS2, 2) Values are Helioprojective X and Y in arcseconds """ assert not self.old_header, f"Can't get coordinates from {self.path} ! Run header conversion first!" wcs = WCS(self.header) pixel_shape = wcs.pixel_shape n_y = pixel_shape[1] slit_list = np.zeros((n_y, self["NAXIS"])) slit_list[:, 1] = np.arange(n_y) slit_list[:, 0] = 0 converted = np.array(wcs.pixel_to_world_values(slit_list)) degrees = u.degree * converted[:, :2] arcsec = degrees.to(u.arcsec) return arcsec def plot_slit(self): m = self.full_disk_map # retrieve different data sources coordinates = self.coords X, Y = coordinates[:, 0], coordinates[:, 1] # determine fov limits for plotting plotting_padding = 300 * u.arcsec x_min = min(X) - plotting_padding x_max = max(X) + plotting_padding y_min = min(Y) - plotting_padding y_max = max(Y) + plotting_padding # Create submap of FOV and plot submap = m.submap(SkyCoord(x_min, y_min, frame=m.coordinate_frame), SkyCoord(x_max, y_max, frame=m.coordinate_frame)) fig = plt.figure() # noqa F841 ax = plt.subplot(projection=submap) im = submap.plot() # noqa:F841 # create SkyCoord objects and overplot slit_wcs = SkyCoord(X, Y, frame=submap.coordinate_frame) ax.plot_coord(slit_wcs, 'g-', label="WCS Coordinates") # Create Legend and annotations title = "Gris Slit {0}_{1:03d}_{2:03d}_{3:03d}" title = title.format(self.obs_time.strftime('%Y%m%d'), self.runnumber, self.mapnumber, self.step_index) plt.title(title) annotation = f"Spectrum taken at \n{self.obs_time.strftime('%Y-%m-%d %H:%M:%S')}" annotation += f"\nHMI Reference taken at \n{submap.date.strftime('%Y-%m-%d %H:%M:%S')}" plt.text(0.05, 0.8, annotation, horizontalalignment='left', transform=ax.transAxes) plt.tight_layout(pad=3.5) _ = plt.legend() return fig, ax def __str__(self): outstring = "GrisFitsFile %s from %s:\n\tRun: %s\n\tMap: %s\n\tSlitposition: %s\n" outstring = outstring % ( os.path.basename(self.filename), self.date, self.runnumber, self.map_index, self.slitposnumber, ) return outstring if __name__ == '__main__': gf = GrisFitsFile(sys.argv[1]) gf.parse() print(gf)