""" Created on 2018-04-09 @author Carl Schaffer @mail carl.schaffer@leibniz-kis.de """ import datetime # date handling import logging import os # path and file manipulations import sys # error handling from glob import glob from os.path import basename, exists, join, dirname from pathlib import Path import astropy.io.fits as fitsio # fits reading and writing import astropy.units as u import matplotlib.pyplot as plt import numpy as np import pytz from astroplan import Observer from astropy.coordinates import SkyCoord, Angle, get_sun from astropy.wcs import WCS from kis_tools.generic.fits import FitsFile from kis_tools.gris.coord_correction_ml import get_coords_ml from kis_tools.gris.util import telescope_to_hpc, expand_coords, calculate_slit_translation from kis_tools.util.calculations import estimate_telescope_drift from kis_tools.util.constants import gregor_coords, gris_stepsize_along_slit from kis_tools.util.util import date_from_fn, gris_obs_mode logger = logging.getLogger(__name__) class GrisFitsFile(FitsFile): """Class for handling fits files for GRIS db When the constructor is called, the filename is stored and basic metadata parameters such as date and runnumber are extracted from the filename. Once the class function GrisFitsFile.parse() is called, metadata from the fits file is available as a dictionary stored at GrisFitsFile.properties. Any errors occurring during opening and parsing of the file are stored as strings in GrisFitsFile.errors Args: filename: filename of target fits file Returns: """ def __init__(self, filename): """Constructor""" if isinstance(filename, Path): filename = str(filename.resolve()) super().__init__(filename) assert exists(filename) self.properties = {} self.errors = [] self.filename = filename self.name = basename(filename) self.path = self.filename self.dirname = dirname(self.path) self.parse_filename() self.is_parsed = False def parse_filename(self): """Extract metadata from filename""" # Extract time from Filename name = os.path.splitext(basename(self.filename))[0].split("_") year = int(name[1][:4]) month = int(name[1][4:6]) day = int(name[1][6:]) # Get time # hh = int(name[2][:2]) # mm = int(name[2][2:4]) # outstring = int(name[2][4:6]) # self.date = datetime.datetime(year,month,day,hh,mm,outstring) self.date = datetime.datetime(year, month, day, 0, 0, 0) # Extract Run-, Map-, and Stepnumber, as well as mode string self.runnumber = int(name[4]) self.mapnumber = int(name[5]) self.slitposnumber = int(name[6]) self.modestring = name[3] @property def verbose_runname(self): """ Get verbose run name such as 24apr14.002 Returns: outstring: verbose run name """ outstring = self.date.strftime("%d%b%y").lower() outstring += f".{int(self.runnumber):03d}" return outstring @property def telescope_centering(self): """ Determine state of telescope centering at the time of observation, queries GDBS for centering data Returns: outstring: x_offset, y_offset, x_uncertainty, y_uncertainty all in arcseconds """ # Query centering from GDBS try: from kis_tools.gdbs.query_gdbs import query_gdbs gdbs_usable = True x_telcenter, x_valid, x_age = query_gdbs('xsuncenter', self.obs_time) y_telcenter, y_valid, y_age = query_gdbs('ysuncenter', self.obs_time) # Determine uncertainties for centering values centering_age = max(x_age, y_age) drift = estimate_telescope_drift(centering_age.seconds / 3600) x_std, y_std = drift, drift # if the centering is older than 24 hours, discard if centering_age > datetime.timedelta(hours=24): gdbs_usable = False except IOError: gdbs_usable = False except ValueError: gdbs_usable = False # Catch instances where no numbers but error strings are written to GDBS if gdbs_usable: try: x_telcenter, y_telcenter = float(x_telcenter), float(y_telcenter) except ValueError: gdbs_usable = False if not gdbs_usable: # Determined from offset between uncentered and cross_correlated data # STD-DEV of distances in X and Y again see gris coord study for details uncertainty_no_center_x = 117.1 uncertainty_no_center_y = 182.4 x_telcenter, y_telcenter, x_std, y_std = 0, 0, uncertainty_no_center_x, uncertainty_no_center_y return x_telcenter, y_telcenter, x_std, y_std @property def _coords_from_simple_header(self): """coordinates for slit, the Keywords 'SLITPOSX' and 'SLITPOSY' are assumed to be the center of the slit while the keyword 'SLITORIE' describes the rotation of the slit w.r.t. the helioprojective axes. The algorithm further assumes that each pixel represents a square area with side length of 'STEPSIZE'. These assumptions are used to calculate the coordinates for the center of each pixel within the data array. Returns: (X,Y):ndarray(float) x and y coordinates for each pixel of the slit WARNING! The code below is not checked for correctness! We do not know what direction SLITORIE references off and coordinates provided by GREGOR are very error-prone. If you need precision better than ~50" this function is not sufficient. """ h = self.header slit_length_pixels = self.NAXIS2 if self.old_header else self.NAXIS1 # Slit center in telescope coordinates x_slitcenter = self.SLITPOSX if self.header['SLITPOSX'] != "" else 0 y_slitcenter = self.SLITPOSY if self.header['SLITPOSY'] != "" else 0 coord_ref = slit_length_pixels // 2 if "SLITCNTR" in self.header: if self.SLITCNTR.strip() != 'scancntr': raise ValueError( f'Cannot determine reference pixel for coordinates! SLITCNTR is {self.SLITCNTR} instead of scancntr') # Transform to HPC (shift by telescope centering, rotate by p0 angle) delta_x, delta_y, std_delta_x, std_delta_y = self.telescope_centering # If uncertainties are too large, use ML model to correct the coordinates from kis_tools.gris.coord_correction_ml import coord_model_stdx, coord_model_stdy if std_delta_x > coord_model_stdx or std_delta_y > coord_model_stdy: try: x_hpc, y_hpc, std_delta_x, std_delta_y = get_coords_ml(self) except ValueError as e: logger.warning(f"Error in finding ML coords for {self.__repr__()}:{e}") # Point to slitpos with 1 solar disk uncertainty if coord retrieval didn't work x_hpc, y_hpc, std_delta_x, std_delta_y = x_slitcenter, y_slitcenter, 1000, 1000 else: x_hpc, y_hpc = telescope_to_hpc(x_slitcenter, y_slitcenter, self.p0_angle, delta_x, delta_y) try: angle = self.slit_orientation except ValueError as e: logger.warning(f"Could not determine Slit orientation! \n {e}") angle = 0 X, Y = expand_coords(x_hpc, y_hpc, slit_orientation=angle, reference_pixel=coord_ref, stepsize_slit=gris_stepsize_along_slit, slit_length_pixels=slit_length_pixels) coord_shift = calculate_slit_translation(slit_number=self.slitposnumber, stepsize_perp=self.STEPSIZE, slit_orientation=angle) X = X + coord_shift[0] Y = Y + coord_shift[1] coord_array = np.array([X, Y]) * u.arcsec # reorder axes for consistency with other coordinate retrieval methods coord_array = np.swapaxes(coord_array, 0, 1) # track coordinate uncertainty within class self.coord_uncertainty = (std_delta_x, std_delta_y) return coord_array @property def coords(self): """ Returns:array(float) shape (2,len(slit)) helioprojective coordinates in arcsecnd along slit """ try: coords = self._coords_from_wcs except AssertionError: coords = self._coords_from_simple_header return coords @property def old_header(self): """Check if the header of this file has already been processed""" old_header = True if any([k in self.header for k in ["SOLARNET"]]): old_header = False return old_header @property def zenit(self): """ Get Zenit angle (90° - elevation) Returns: zenit float in degrees Raises Value error if elevation is not contained in the header """ elev = self._get_value(['ELEV_ANG', 'ELEVATIO']) if elev == '': raise ValueError(f'No elevation tracked for {self.__repr__()}') zenit = 90 - elev return zenit @property def slit_orientation(self): """Get slitorintation: clockwise angle from the solar equator, scan direction is 90° further in clockwise direction. Attempts to correct for derotator and image rotation, usually correct within a couple 10s of degrees. Returns: slitorie: float (degrees) """ # Pull values from header # p0 angle, rotates over the course of the year p0_angle = self.p0_angle # Azimut and zenit at time of observation start azimut = self._get_value(['AZIMUT']) zenit = self.zenit # Slitorientation from header slitorie = self.SLITORIE # Calculate declination of disk center from time gregor_observer = Observer(location=gregor_coords) date_tz = pytz.utc.localize(self.obs_time) time = gregor_observer.datetime_to_astropy_time(date_tz) sun = get_sun(time) dec = sun.dec # Calculate parallactic angle as described in the GREGOR derotator manual # http://www.leibniz-kis.de/fileadmin/user_upload/observatorien/gre_docs/man/GRE-KIS-MAN-0008_v001_derotator.pdf parallactic_angle = np.degrees( np.arcsin(np.sin(np.radians(azimut)) * np.cos(gregor_coords.lat.to(u.rad)) / np.cos(dec.to(u.rad)))) self.parallactic_angle = parallactic_angle const_term = -130 # determined by trial and error to match results from cross correlation with HMI alpha = -1 * parallactic_angle.to(u.degree).value + azimut - zenit - p0_angle + const_term if self.derotator_in: rotangle = self.ROTANGLE alpha = -2 * rotangle + (const_term / 2) angle = slitorie - alpha return angle @property def number_of_maps(self): """ Returns: nmaps: int number of maps in associated observation """ return self._get_value(["NMAPS", "SERIES"]) @property def map_index(self): """ Returns: i_map: int map index within observation """ return self._get_value(["IMAP", "ISERIE"]) @property def n_steps(self): """ Determine the number of slit scanning steps in the associated map. Returns: nsteps: number of steps in the associated map """ # get initial guess from header nsteps = self._get_value(["STEPS", "NSTEPS"]) # check if other files matching the same run are present. If so, check the running step number # for the same map to determine the actual number of steps. It is assumed, that all files belonging to the # same map are stored in the same directory pattern = f"gris_*_*_*_{self.runnumber:03d}_{self.mapnumber:03d}_*.fits" matching_files = glob(join(self.dirname, pattern)) max_steps = int( max([basename(m).split(".")[0].split("_")[-1] for m in matching_files]) ) if max_steps < nsteps: nsteps = max_steps return nsteps @property def obs_time(self): """ Returns: obs_time: observation time given in filename """ return date_from_fn(self.filename) @property def step_index(self): """ Returns: step_index: index of scanning step within current map """ return self._get_value(["ISTEP"]) @property def p0_angle(self): """ Returns: p0_angle: angle between telescope and solar north """ angle = self._get_value(["P0ANGLE", 'SOLAR_P0']) if angle == '': raise ValueError(f'Header of {self} contains illegal p0_angle:"{angle}"') return angle @property def step_angle(self): """ Returns: step_angle: scanning angle """ return self._get_value(["STEPANGL"]) @property def wavelength_step_size(self): """Get step length along wavelength axis. We use the values from the continuum correction fitting""" return self._kw_mean(["FF1WLDSP", "FF2WLDSP"]) @property def wavelength_region(self): """Get wavelength region covered by file. Returns tuple of (upper, lower)""" step_size = self.wavelength_step_size n_steps_wl = self.data.shape[-1] if self.old_header else self.NAXIS3 # average for lower border: lower = self._kw_mean(["FF1WLOFF", "FF2WLOFF"]) # add wl steps to calculate upper border upper = lower + step_size * n_steps_wl return lower, upper @property def derotator_in(self): """ Determine whether the derotator is inserted Returns: bool: derotator in """ try: rotval = self._get_value(["ROTCODE"]) except KeyError: # if no derotator keywords are scontained, assume it doesn't exist yet return False if rotval == 0: return True else: return False @property def states(self): """ Determine number of polarimetric states Returns: n_states int: number of states """ # Number of polarimetric states is stored in the last axis. Determine # number of axes from NAXIS keyword and query value for last axis. naxes = self.query("NAXIS") keyword = f"NAXIS{naxes}" states = self.query(keyword) return states @property def obs_mode(self): """Get observation mode""" self.parse() # extract obs mode keywords states = self.states n_maps = self.number_of_maps n_steps = self.n_steps mode = gris_obs_mode(states, n_maps, n_steps) return mode def parse(self): """extract data and metadata from file""" if self.is_parsed: return def is_valid(value): valid = True if isinstance(value, fitsio.card.Undefined): valid = False elif value == "-NAN": valid = False return valid # Read file try: header = self.header # Add header to self.properties, cast strange data types where # necessary, fill invalid values with empty strings for key in header.keys(): value = header[key] # check validity: value = value if is_valid(value) else "" if key in ["COMMENT", "HISTORY"]: self.properties[key] = str(value) else: self.properties[key] = value self.is_parsed = True except: exception = sys.exc_info()[0] self.errors.append("File %s" % self.filename + str(exception)) def make_bson(self): """construct BSON document for mongodb storage""" return self.properties @property def _coords_from_wcs(self): """ Retrieve spatial coordinates for each pixel in for slices projected into the spatial dimensions. Returns a data cube shaped s the spatial dimensions Returns: arcsec, cube of (NAXIS1, NAXIS2, 2) Values are Helioprojective X and Y in arcseconds Side-effect: Sets self.coord_uncertainty """ assert ( not self.old_header ), f"Can't get coordinates from {self.path} ! Run header conversion first!" wcs = WCS(self.header) pixel_shape = wcs.pixel_shape n_y = pixel_shape[1] slit_list = np.zeros((n_y, self["NAXIS"])) slit_list[:, 1] = np.arange(n_y) slit_list[:, 0] = 0 converted = np.array(wcs.pixel_to_world_values(slit_list)) degrees = Angle(u.degree * converted[:, :2]).wrap_at(180 * u.degree) arcsec = degrees.to(u.arcsec) # Track coord uncertainty within instance self.coord_uncertainty = (self.CSYER1, self.CSYER2) return arcsec def plot_slit(self): m = self.full_disk_map # retrieve different data sources coordinates = self.coords X, Y = coordinates[:, 0], coordinates[:, 1] # determine fov limits for plotting plotting_padding = 300 * u.arcsec x_min = min(X) - plotting_padding x_max = max(X) + plotting_padding y_min = min(Y) - plotting_padding y_max = max(Y) + plotting_padding # Create submap of FOV and plot submap = m.submap( SkyCoord(x_min, y_min, frame=m.coordinate_frame), SkyCoord(x_max, y_max, frame=m.coordinate_frame), ) fig = plt.figure() # noqa F841 ax = plt.subplot(projection=submap) im = submap.plot() # noqa:F841 # create SkyCoord objects and overplot slit_wcs = SkyCoord(X, Y, frame=submap.coordinate_frame) ax.plot_coord(slit_wcs, "g-", label="WCS Coordinates") # Create Legend and annotations title = "Gris Slit {0}_{1:03d}_{2:03d}_{3:03d}" title = title.format( self.obs_time.strftime("%Y%m%d"), self.runnumber, self.mapnumber, self.step_index, ) plt.title(title) annotation = ( f"Spectrum taken at \n{self.obs_time.strftime('%Y-%m-%d %H:%M:%S')}" ) annotation += ( f"\nHMI Reference taken at \n{submap.date.strftime('%Y-%m-%d %H:%M:%S')}" ) plt.text( 0.05, 0.8, annotation, horizontalalignment="left", transform=ax.transAxes ) plt.tight_layout(pad=3.5) _ = plt.legend() return fig, ax def __str__(self): outstring = ( "GrisFitsFile %s from %s:\n\tRun: %s\n\tMap: %s\n\tSlitposition: %s\n" ) outstring = outstring % ( os.path.basename(self.filename), self.date, self.runnumber, self.map_index, self.slitposnumber, ) return outstring if __name__ == "__main__": gf = GrisFitsFile(sys.argv[1]) gf.parse() print(gf)