import datetime import re from glob import glob from os.path import join, dirname from warnings import warn import pandas as pd from astropy import units as u from astropy.coordinates import EarthLocation from astropy.time import Time from numpy import nan from kis_tools.gris.headers.exceptions import NothingToDoError from kis_tools.gris.headers.template_builder import gris_const_trans, gris_triv_trans, TRANS_SETTINGS, fitpar_kws from kis_tools.gris.headers.translator import cache_translation, MetadataTranslator from kis_tools.gris.util import get_observers from kis_tools.util.calculations import get_distance_gregor_sun from kis_tools.util.constants import gregor_coords from kis_tools.util.util import gris_obs_mode, date_from_fn, gris_run_number class FitsTranslator(MetadataTranslator): """Metadata translator for FITS standard headers. Understands: - DATE-OBS - INSTRUME - TELESCOP - OBSGEO-[X,Y,Z] """ _wcs_kw_patterns = [ r"CTYPE\d+", r"CUNIT\d+", r"CRPIX\d+", r"CRVAL\d+", r"CSYER\d+", r"CDELT\d+", r"PC\d+_\d+", ] @classmethod def can_translate(cls, header, filename=None): """Indicate whether this translation class can translate the supplied header. Checks the instrument value and compares with the supported instruments in the class Parameters ---------- header : `dict`-like Header to convert to standardized form. filename : `str`, optional Name of file being translated. Returns ------- can : `bool` `True` if the header is recognized by this class. `False` otherwise. """ if cls.supported_instrument is None: return False # Protect against being able to always find a standard # header for instrument try: translator = cls(header, filename=filename) instrument = translator.to_instrument() except KeyError: return False return instrument == cls.supported_instrument @classmethod def _from_fits_date_string(cls, date_str, scale="utc", time_str=None): """Parse standard FITS ISO-style date string and return time object Parameters ---------- date_str : `str` FITS format date string to convert to standard form. Bypasses lookup in the header. scale : `str`, optional Override the time scale from the TIMESYS header. Defaults to UTC. time_str : `str`, optional If provided, overrides any time component in the ``dateStr``, retaining the YYYY-MM-DD component and appending this time string, assumed to be of format HH:MM::SS.ss. Returns ------- date : `astropy.time.Time` `~astropy.time.Time` representation of the date. """ if time_str is not None: date_str = "{}T{}".format(date_str[:10], time_str) return Time(date_str, format="isot", scale=scale) def _from_fits_date(self, date_key): """Calculate a date object from the named FITS header Uses the TIMESYS header if present to determine the time scale, defaulting to UTC. Parameters ---------- dateKey : `str` The key in the header representing a standard FITS ISO-style date. Returns ------- date : `astropy.time.Time` `~astropy.time.Time` representation of the date. """ used = [date_key] if "TIMESYS" in self._header: scale = self._header["TIMESYS"].lower() used.append("TIMESYS") else: scale = "utc" if date_key in self._header: date_str = self._header[date_key] value = self._from_fits_date_string(date_str, scale=scale) self._used_these_cards(*used) else: value = None return value @cache_translation def to_datetime_begin(self): """Calculate start time of observation. Uses FITS standard ``DATE-OBS`` and ``TIMESYS`` headers. Returns ------- start_time : `astropy.time.Time` Time corresponding to the start of the observation. """ return self._from_fits_date("DATE-OBS") @cache_translation def to_datetime_end(self): """Calculate end time of observation. Uses FITS standard ``DATE-END`` and ``TIMESYS`` headers. Returns ------- start_time : `astropy.time.Time` Time corresponding to the end of the observation. """ return self._from_fits_date("DATE-END") @cache_translation def to_location(self): """Calculate the observatory location. Uses FITS standard ``OBSGEO-`` headers. Returns ------- location : `astropy.coordinates.EarthLocation` An object representing the location of the telescope. """ cards = [f"OBSGEO-{c}" for c in ("X", "Y", "Z")] coords = [self._header[c] for c in cards] value = EarthLocation.from_geocentric(*coords, unit=u.m) self._used_these_cards(*cards) return value def from_default(self, kw, default): """Generate a translation method which keeps the value of a given Keyword and fills a default if no value is given""" def check_or_default(): if kw in self._header.keys(): return self._header[kw] else: return default setattr(self, f"to_{kw}", check_or_default) def from_float_or_nan(self, kw): """Generate a translation method which tries to convert a given Keyword to float and fills a NaN if no value is given""" def try_float_else_nan(): val = self._header[kw] try: val = float(val) return val except ValueError: return nan except TypeError: return nan setattr(self, f"to_{kw}", try_float_else_nan) def add_wcs_methods(self): h = self._header patterns = self._wcs_kw_patterns for p in patterns: pattern = re.compile(p) kws = filter(pattern.match, h.keys()) for k in kws: def constant_translator(): return h[k] self.from_default(k, h[k]) pass @property def wcs_cards(self): """Retrieve WCS-related cards from header. Cards are Identified by matching with the wcs_patterns property Returns: cards: list of header cards """ h = self._header patterns = self._wcs_kw_patterns cards = [ c for c in h._cards if any([re.search(pattern, c.keyword) for pattern in patterns]) ] return cards class KISTranslator(FitsTranslator): """Translator class for building KIS headers. It is intended to be used as a base class for all telescope-specific translators.""" def _to_ORIGIN(self): return self.default_if_empty( "ORIGIN", "Leibniz Insitute for Solar Physics (KIS)" ) class GREGORTranslator(KISTranslator): """Translator class for building GREGOR headers. It is intended to be used as a base class for all instrument-specific translators.""" def to_TELESCOP(self): return self.default_if_empty("TELESCOP", "GREGOR") def to_OBSRVTRY(self): return self.default_if_empty("OBSRVTRY", "Teide Obseratory") def __init__(self, *args, **kwargs): self._coords = gregor_coords.to_geocentric() super().__init__(*args, **kwargs) def to_OBSGEO_X(self): return self._coords[0].value def to_OBSGEO_Y(self): return self._coords[1].value def to_OBSGEO_Z(self): return self._coords[2].value class GrisTranslator(GREGORTranslator): """Translate Gris Headers""" name = "Gris" supported_instrument = "Gris" @classmethod def can_translate(cls, header, filename=None): """Indicate whether this translation class can translate the supplied header. Parameters ---------- header : `dict`-like Header to convert to standardized form. filename : `str`, optional Name of file being translated. Returns ------- can : `bool` `True` if the header is recognized by this class. `False` otherwise. """ if "SOLARNET" in header: raise NothingToDoError("This header is already up to date!") if "TELESCOP" in header: is_gregor = header["TELESCOP"] == "GREGOR" if is_gregor and all([key in header.keys() for key in ["FF1WLOFF"]]): return True else: warn( "GrisTranslator: got a file which does not contain FTS-fitting results or has not been split." "This translator needs a split file created with recent routines as input." ) return False _const_map = gris_const_trans _trivial_map = gris_triv_trans @cache_translation def to_observation_id(self): """ observation id is given by DAY_RUN""" date = self._from_fits_date_string(self._header["DATE-OBS"]).strftime("%Y%m%d") iobs = int(self._header["IOBS"]) value = f"{date}_{iobs:03d}" return value def to_EXTNAME(self): """EXTNAME as hdu for a given combination of day, run, map,slitposition""" iserie = int(self._header["ISERIE"]) istep = int(self._header["ISTEP"]) value = self.to_observation_id() value += f"_{iserie:03d}_{istep:04d}" return value def to_POINT_ID(self): """Assume that telescope is re-pointed for each run""" return self.to_observation_id() @staticmethod def to_DATE(): """Date of FITS creation""" return datetime.date.today() @cache_translation def to_DSUN_OBS(self): """Distance between instrument and sun center""" return get_distance_gregor_sun(self.to_DATE_BEG()) @cache_translation def to_DATE_BEG(self): """Calculate start time of observation. Uses FITS standard ``DATE-OBS`` Returns ------- time : datetime.datetime Time corresponding to the start of the observation. """ ut = self._header["UT"] do = self._header["DATE-OBS"] string = do + " " + ut.strip() time = datetime.datetime.strptime(string, "%Y-%m-%d %H:%M:%S.%f") return time def to_DATE_OBS(self): """Added for compatibility, set to beginnning of observation Returns ------- time : datetime.datetime Time corresponding to the start of the observation. """ time = self.to_DATE_BEG() return time @cache_translation def to_AWAVLNTH(self): """Get approximate wavelength in air""" wl = int(self._header["WAVELENG"]) if wl == 1564: wl = 1565 return wl * 10 @cache_translation def to_AWAVMIN(self): """Minimum wavelength is calculated from the wavelength offset determined by the FTS fit, average if two flats are given""" ff1 = self._header["FF1WLOFF"] ff2 = self._header["FF2WLOFF"] try: value = (ff1 + ff2) / 2 except TypeError: value = ff1 return value @cache_translation def to_AWAVMAX(self): """Maximum wavelength is calculated from the wavelength offset determined by the FTS fit plus the number of pixels along the wavelength axis times the dispersion. average if two flats are given""" ff1 = self._header["FF1WLDSP"] ff2 = self._header["FF2WLDSP"] nsteps = self._header["NAXIS1"] - 1 try: dispersion = (ff1 + ff2) / 2 except TypeError: dispersion = ff1 value = dispersion * nsteps + self.to_AWAVMIN() return value @cache_translation def to_OBS_MODE(self): """Observation mode is determined by the number of observed states,the stepping settings and the number of observed maps.""" n_maps = self.to_NMAPS() n_steps = self.to_NSTEPS() step_size = self._header["STEPSIZE"] naxis = self._header["NAXIS"] if naxis == 2: states = 1 else: states = self._header[f"NAXIS{naxis}"] mode_string = gris_obs_mode(states, n_maps, n_steps, step_size) return mode_string def to_TEXPOSUR(self): """Implemented for Solarsoft compatibility in seconds""" return self._header["EXPTIME"] / 1000 @cache_translation def to_XPOSURE(self): """Exposure time for the entire file in seconds""" n_accumulations = self.to_NSUMEXP() exptime = self.to_TEXPOSUR() return exptime * n_accumulations def to_EXPTIME(self): """Implemented for Solarsoft compatibility in seconds""" return self.to_XPOSURE() @cache_translation def to_OBSERVER(self): """Observer""" logfiles = glob(join(dirname(dirname(self.filename)), "????????.txt")) if not logfiles: return "Unknown" logfile = logfiles[0] observers = get_observers(logfile) to_header = [] sep = ", " for o in observers: length = sum([len(th) for th in to_header]) + (len(to_header) - 1) * len( sep ) if length + len(o) + len(sep) < 70: to_header.append(o) else: break res = sep.join(to_header) assert len(res) < 70 return res @cache_translation def to_AO_LOCK(self): """Adaptive optics locking""" state = self._header["AOSTATE"].strip() if state == "on": return 1.0 elif state == "off": return 0.0 else: return nan def to_OBS_TRGT(self): """Parsed from tagging file, if not in tagging file, check header for tag, else default to not_tagged""" fn = self._header["FILENAME"] date = date_from_fn(fn).strftime("%Y-%m-%d") run = gris_run_number(fn) try: return self.tags.at[(date, run), "main_tag"] except KeyError: if "TARGET" in self._header: return self._header["TARGET"] else: return "not tagged" def get_fitpars(self, fitpar_kw): """Generate a dictionary of keywords for the parameters of the Polynomial determined by the FTS fit. The dictionary is later expanded to single keywords.""" def get_all_parameters(): npoly_key = fitpar_kw[:3] + "NPOLY" try: npoly = int(self._header[npoly_key]) except ValueError: return {fitpar_kw: None} except TypeError: return {fitpar_kw: None} vals = {} for i in range(npoly + 1): par_key = fitpar_kw.replace("nn", f"{i:02d}") try: vals[par_key] = self._header[par_key] except KeyError: message = f"Warning: could not find '{par_key}' in header." message += ( " Check that the correct number of parameters has entered " ) message += "the header and re-run calibration if necessary!" warn(message) return vals setattr(self, f"to_{fitpar_kw}", get_all_parameters) def __init__(self, *args, **kwargs): """Parse tag_file, set up translations for keywords with default value, setup fit result keywords""" super(GrisTranslator, self).__init__(*args, **kwargs) self.tags = pd.read_csv( TRANS_SETTINGS["tag_file"], parse_dates=["date"], index_col=["date", "run"], ) self.from_default("ROTCODE", -1) self.from_default("ROTTRACK", None) self.from_default("IMGSYS", "SLIT") self.from_default("SLITCNTR", "scancntr") self.from_default("ROTANGLE", 0) self.from_default("BSCALE", 1) self.from_default("BZERO", 0) fit_results = ["WLOFF", "WLDSP", "FWHMA", "FWHMP", "STRAY", "NPOLY"] for fit_res in fit_results: for i_flat in [1, 2]: kw = f"FF{i_flat}{fit_res}" self.from_float_or_nan(kw) # add fit res parameters for fitpar_kw in fitpar_kws: self.get_fitpars(fitpar_kw)