Commit d43921e5 authored by Carl Schaffer's avatar Carl Schaffer
Browse files

Merge branch 'IFU_translation_tests' into 'master'

Refining IFU Header generation

See merge request !219
parents 49355fac f4476833
# Manual for header Translation
# IF you ever need to touch the template builder... just re-write it. May the force be with you!
Collection of Software to translate headers from the existing style to SOLARNET style. The tools are built on the [lsst metadata translator](https://github.com/lsst/astro_metadata_translator).
A coarse overview on the most important files and their function follows.
- **sdc_header_template.xlsx**: contains a template for the header includingthe order of the keywords and some additional info such as the values for constant keywords, or command strings to generate comment strings structuring the header. Also includes the types and the display precision for floating point values.
......
from os.path import basename
from warnings import warn
from kis_tools.gris.headers.template_builder import GRIS_IFU_PROPERTIES
from kis_tools.gris.headers.translator_classes import GrisTranslator, NothingToDoError
class GrisIFUTranslator(GrisTranslator):
name = "gris_ifu"
supported_instrument = "gris_ifu"
instrument = "gris-ifu"
PROPERTIES = GRIS_IFU_PROPERTIES
def __init__(self, *args, **kwargs):
super(GrisIFUTranslator, self).__init__(*args, **kwargs)
self.from_default("INSTRUME", "GRIS-IFU")
self.add_fts_fit_results()
@classmethod
def can_translate(cls, header, filename=None):
"""Indicate whether this translation class can translate the
......@@ -48,6 +52,15 @@ class GrisIFUTranslator(GrisTranslator):
return False
def to_STEPSV(self):
return self._header['STEPSV']
def to_STEPSH(self):
return self._header['STEPSH']
def to_DBLESAMP(self):
return self._header['DBLESAMP']
def to_EXTNAME(self):
"""Generate extension name (date_run_map)"""
iserie = int(self._header["ISERIE"])
......
......@@ -19,7 +19,6 @@ import logging
import astropy.time
from .template_builder import GRIS_PROPERTIES as PROPERTIES
from .translator import MetadataTranslator
log = logging.getLogger(__name__)
......@@ -55,7 +54,6 @@ class ObservationInfo:
The supplied translator class was not a MetadataTranslator.
"""
_PROPERTIES = PROPERTIES
"""All the properties supported by this class with associated
documentation."""
......@@ -83,11 +81,13 @@ class ObservationInfo:
# Create an instance for this header
translator = translator_class(header, filename=filename)
self._PROPERTIES = translator.PROPERTIES
# Store the translator
self._translator = translator
self.translator_class_name = translator_class.__name__
self.__class__.add_class_properties(self._PROPERTIES)
# Loop over each property and request the translated form
for t in self._PROPERTIES:
# prototype code
......@@ -201,6 +201,15 @@ class ObservationInfo:
property = f"_{p}"
setattr(self, property, state[p])
@classmethod
def add_class_properties(cls, PROPERTIES):
# Initialize the internal properties (underscored) and add the associated
# getter methods.
for name, description in PROPERTIES.items():
setattr(cls, f"_{name}", None)
setattr(cls, name, property(_make_property(name, *description)))
pass
# Method to add the standard properties
def _make_property(property, doc, return_type):
......@@ -233,9 +242,3 @@ def _make_property(property, doc, return_type):
"""
return getter
# Initialize the internal properties (underscored) and add the associated
# getter methods.
for name, description in ObservationInfo._PROPERTIES.items():
setattr(ObservationInfo, f"_{name}", None)
setattr(ObservationInfo, name, property(_make_property(name, *description)))
......@@ -29,6 +29,7 @@ _dirname = Path(dirname(realpath(__file__)))
# translation settings
TRANS_SETTINGS = dict(
header_template=_dirname.parent / "resources" / "sdc_header_templates.csv",
ifu_kws=_dirname.parent / "resources" / "ifu_additional_kws.csv",
tag_file=_dirname.parent / "resources" / "gris_tagged_runs.csv",
)
......@@ -45,7 +46,27 @@ hyphenated = df.KEYWORD.str.contains("-")
df.KEYWORD = df.KEYWORD.str.replace("-", "_")
hyphenated = df[hyphenated].KEYWORD.values.tolist()
HEADER_TEMPLATE = df[["KEYWORD", "fixed_val", "decimal_digits", 'FITS COMMENT']]
template_df = df.copy()
def get_header_template(instrument):
columns = ["KEYWORD", "fixed_val", "decimal_digits", 'FITS COMMENT']
df_out = pd.DataFrame()
if instrument == "gris":
df_out = template_df[columns].copy()
elif instrument == "gris-ifu":
df_ifu = template_df.copy().reset_index(drop=True)
ifu_kws = pd.read_csv(TRANS_SETTINGS['ifu_kws'])
for i, row in ifu_kws.iterrows():
idx_kw = df_ifu[df_ifu.KEYWORD == 'IMAP'].index.to_list()[0]
df_ifu = pd.concat([df_ifu.iloc[:idx_kw + 1], pd.DataFrame([row]), df_ifu.iloc[idx_kw + 1:]]).reset_index(
drop=True)
df_out = df_ifu[columns].copy()
df_out.dropna(subset=["KEYWORD"], inplace=True)
return df_out
HEADER_TEMPLATE = get_header_template('gris-ifu')
# Filter out comments, they are processed separately
df = df.query('KEYWORD != "COMMENT"')
......@@ -62,7 +83,15 @@ for i, r in data.iterrows():
ret_type = r["type"]
comment = r["FITS COMMENT"]
props[kw] = (comment, ret_type)
GRIS_PROPERTIES = props
GRIS_PROPERTIES = props.copy()
ifu_df = pd.read_csv(TRANS_SETTINGS["ifu_kws"])
for i, r in ifu_df.iterrows():
props[r["KEYWORD"]] = (r["FITS COMMENT"], r["type"])
del_kws = ["STEPSIZE"]
for kw in del_kws:
if kw in props: del props["STEPSIZE"]
GRIS_IFU_PROPERTIES = props.copy()
# generate constant translators
data = df[["KEYWORD", "fixed_val"]]
......
......@@ -26,10 +26,8 @@ from kis_tools.gris.headers import ObservationInfo
from kis_tools.gris.headers.exceptions import NothingToDoError
# custom imports
from kis_tools.gris.headers.template_builder import (
GRIS_PROPERTIES,
HEADER_TEMPLATE,
_dirname,
hyphenated,
hyphenated, get_header_template,
)
from kis_tools.gris.headers.wcs_generators import get_wcs_generator
from kis_tools.util.gitinfo import get_git_info
......@@ -115,13 +113,14 @@ def process_comment(value):
return card_list
def make_header(obs_info, properties):
def make_header(obs_info):
"""Generate header from ObservationInfo instance, cast values to
correct types."""
h = {}
c = {}
for key, entry in properties.items():
# noinspection PyProtectedMember
for key, entry in obs_info._PROPERTIES.items():
comment, value_type = entry
value = getattr(obs_info, key)
......@@ -159,7 +158,7 @@ def main(infile, outfile=None, wcs_generator=None, **kwargs): # noqa: C901
overwrite = kwargs["overwrite"] if "overwrite" in kwargs else False
oi = make_obs_info(infile)
h, c = make_header(oi, GRIS_PROPERTIES)
h, c = make_header(oi)
h["DATE_BEG"] = h["DATE_BEG"].isoformat()
h["DATE_OBS"] = h["DATE_OBS"].isoformat()
......@@ -181,7 +180,8 @@ def main(infile, outfile=None, wcs_generator=None, **kwargs): # noqa: C901
card = (keyword, value, comment)
return card
for index, row in HEADER_TEMPLATE.iterrows():
template = get_header_template(oi._translator.instrument)
for index, row in template.iterrows():
keyword, default_value, decimal_digits = row['KEYWORD'], row['fixed_val'], row['decimal_digits']
if keyword == "COMMENT":
cards = process_comment(default_value)
......@@ -218,7 +218,6 @@ def main(infile, outfile=None, wcs_generator=None, **kwargs): # noqa: C901
card_list.append(Card("HISTORY", hist_entry))
# add WCS kws to card_list
wcs_generator = get_wcs_generator(infile)
wcs_cards = wcs_generator.make_wcs_cards()
wcs_index = [c.keyword for c in card_list].index("WCSNAME")
......
......@@ -15,14 +15,11 @@ __all__ = ("MetadataTranslator", "StubTranslator", "cache_translation")
import logging
import math
import warnings
from abc import abstractmethod
import astropy.units as u
from astropy.coordinates import Angle
from .template_builder import GRIS_PROPERTIES as PROPERTIES
log = logging.getLogger(__name__)
......@@ -87,8 +84,8 @@ class MetadataTranslator:
supported_instrument = None
"""Name of instrument understood by this translation class."""
@staticmethod
def _make_const_mapping(property_key, constant):
@classmethod
def _make_const_mapping(cls, property_key, constant):
"""Make a translator method that returns a constant value.
Parameters
......@@ -107,8 +104,8 @@ class MetadataTranslator:
def constant_translator(self):
return constant
if property_key in PROPERTIES:
property_doc, return_type = PROPERTIES[property_key]
if property_key in cls.PROPERTIES:
property_doc, return_type = cls.PROPERTIES[property_key]
else:
return_type = type(constant).__name__
property_doc = f"Returns constant value for '{property_key}' property"
......@@ -122,15 +119,16 @@ class MetadataTranslator:
"""
return constant_translator
@staticmethod
@classmethod
def _make_trivial_mapping(
property_key,
header_key,
default=None,
minimum=None,
maximum=None,
unit=None,
checker=None,
cls,
property_key,
header_key,
default=None,
minimum=None,
maximum=None,
unit=None,
checker=None,
):
"""Make a translator method returning a header value.
......@@ -173,8 +171,8 @@ class MetadataTranslator:
Function implementing a translator with the specified
parameters.
"""
if property_key in PROPERTIES:
property_doc, return_type = PROPERTIES[property_key]
if property_key in cls.PROPERTIES:
property_doc, return_type = cls.PROPERTIES[property_key]
else:
return_type = "str` or `numbers.Number"
property_doc = (
......@@ -288,7 +286,7 @@ class MetadataTranslator:
method = f"to_{property_key}"
translator.__name__ = f"{method}_trivial"
setattr(cls, method, cache_translation(translator, method=method))
if property_key not in PROPERTIES:
if property_key not in cls.PROPERTIES:
log.warning(
f"Unexpected trivial translator for '{property_key}' defined in {cls}"
)
......@@ -300,7 +298,7 @@ class MetadataTranslator:
method = f"to_{property_key}"
translator.__name__ = f"{method}_constant"
setattr(cls, method, translator)
if property_key not in PROPERTIES:
if property_key not in cls.PROPERTIES:
log.warning(
f"Unexpected constant translator for '{property_key}' defined in {cls}"
)
......@@ -455,13 +453,13 @@ class MetadataTranslator:
return value
def quantity_from_card(
self,
keywords,
unit,
default=None,
minimum=None,
maximum=None,
checker=None,
self,
keywords,
unit,
default=None,
minimum=None,
maximum=None,
checker=None,
):
"""Calculate a Astropy Quantity from a header card and a unit.
......@@ -571,80 +569,3 @@ def _make_abstract_translator_method(property, doc, return_type):
The translated property.
"""
return to_property
# Make abstract methods for all the translators methods.
# Unfortunately registering them as abstractmethods does not work
# as these assignments come after the class has been created.
# Assigning to __abstractmethods__ directly does work but interacts
# poorly with the metaclass automatically generating methods from
# _trivialMap and _constMap.
for name, description in PROPERTIES.items():
setattr(
MetadataTranslator,
f"to_{name}",
abstractmethod(_make_abstract_translator_method(name, *description)),
)
class StubTranslator(MetadataTranslator):
"""Translator where all the translations are stubbed out and issue
warnings.
This translator can be used as a base class whilst developing a new
translator. It allows testing to proceed without being required to fully
define all translation methods. Once complete the class should be
removed from the inheritance tree.
"""
pass
def _make_stub_translator_method(property, doc, return_type):
"""Create a an stub translation method for this property.
Parameters
----------
property : `str`
Name of the translator for property to be created.
doc : `str`
Description of the property.
return_type : `str`
Type of this property (used in the doc string).
Returns
-------
m : `function`
Stub translator method for this property.
"""
def to_stub(self):
warnings.warn(
f"Please implement translator for property '{property}' for translator {self}",
stacklevel=3,
)
return None
to_stub.__doc__ = f"""Unimplemented translator for {property}.
{doc}
Issues a warning reminding the implementer to override this method.
Returns
-------
{property} : `None`
Always returns `None`.
"""
return to_stub
# Create stub translation methods
for name, description in PROPERTIES.items():
setattr(
StubTranslator,
f"to_{name}",
_make_stub_translator_method(name, *description),
)
......@@ -11,7 +11,8 @@ from astropy.time import Time
from numpy import nan
from kis_tools.gris.headers.exceptions import NothingToDoError
from kis_tools.gris.headers.template_builder import gris_const_trans, gris_triv_trans, TRANS_SETTINGS, fitpar_kws
from kis_tools.gris.headers.template_builder import gris_const_trans, gris_triv_trans, TRANS_SETTINGS, fitpar_kws, \
GRIS_PROPERTIES
from kis_tools.gris.headers.translator import cache_translation, MetadataTranslator
from kis_tools.gris.util import get_observers
from kis_tools.util.calculations import get_distance_gregor_sun
......@@ -272,8 +273,9 @@ class GREGORTranslator(KISTranslator):
class GrisTranslator(GREGORTranslator):
"""Translate Gris Headers"""
PROPERTIES = GRIS_PROPERTIES
name = "Gris"
instrument = "gris"
supported_instrument = "Gris"
@classmethod
......@@ -506,7 +508,10 @@ class GrisTranslator(GREGORTranslator):
except ValueError:
return {fitpar_kw: None}
except TypeError:
return {fitpar_kw: None}
if "2" in npoly_key:
npoly = self._header[npoly_key.replace("2", "1")] # use FF1NPOLY if there is no second flatfield
else:
return {fitpar_kw: None}
vals = {}
for i in range(npoly + 1):
par_key = fitpar_kw.replace("nn", f"{i:02d}")
......@@ -543,12 +548,14 @@ class GrisTranslator(GREGORTranslator):
self.from_default("BSCALE", 1)
self.from_default("BZERO", 0)
self.add_fts_fit_results()
def add_fts_fit_results(self):
fit_results = ["WLOFF", "WLDSP", "FWHMA", "FWHMP", "STRAY", "NPOLY"]
for fit_res in fit_results:
for i_flat in [1, 2]:
kw = f"FF{i_flat}{fit_res}"
self.from_float_or_nan(kw)
# add fit res parameters
for fitpar_kw in fitpar_kws:
self.get_fitpars(fitpar_kw)
......@@ -9,13 +9,14 @@ from astropy.io.fits import Card
from scipy.io.idl import readsav
from kis_tools.gris.GrisFitsFile import GrisFitsFile
from kis_tools.gris.headers.template_builder import HEADER_TEMPLATE
from kis_tools.gris.headers.template_builder import get_header_template
from kis_tools.gris.headers.wcs_generators.generic import WCSGenerator
from kis_tools.gris.ifu_fits_file import IFUFitsFile
from kis_tools.util.calculations import rotate_around_first_coord
class GrisWCSGenerator(WCSGenerator):
instrument = "gris"
def __init__(self, fits_file, cross_correlation_file=None):
"""Initialize WCS Generator class, load all necessary data into class
......@@ -106,16 +107,22 @@ class GrisWCSGenerator(WCSGenerator):
@property
def header_template(self):
df = HEADER_TEMPLATE
return df
try:
return self._header_template
except AttributeError:
self._header_template = get_header_template(self.instrument)
return self._header_template
def make_card(self, keyword, value):
pattern = re.sub(r"\d+", "\*", keyword) # noqa: F841, W605
template = (
self.header_template.query("KEYWORD.str.contains(@pattern)")
.iloc[0]
.to_dict()
)
try:
template = (
self.header_template.query("KEYWORD.str.contains(@pattern)")
.iloc[0]
.to_dict()
)
except IndexError:
raise IndexError(f"Could not retrieve template for {pattern}")
number = re.search(r"(\d+)", keyword).group()
comment = template["FITS COMMENT"]
......@@ -266,6 +273,7 @@ class GrisWCSGenerator(WCSGenerator):
class GrisIFUWCSGenerator(GrisWCSGenerator):
instrument = "gris"
def __init__(self, fits_file):
self.infile = IFUFitsFile(fits_file)
self.cross_correlation_file = None
......
......@@ -69,6 +69,7 @@ class IFUFitsFile(GrisFitsFile):
off and coordinates provided by GREGOR are very error-prone. If you need precision better than ~50" this
function is not sufficient.
"""
logger.warning("Coordinate calculation is not tested! Use at own risk!")
x_elements = self["NAXIS1"]
y_elements = self["NAXIS2"]
......@@ -140,6 +141,9 @@ class IFUFitsFile(GrisFitsFile):
# track coordinate uncertainty within class
self.coord_uncertainty = (std_delta_x, std_delta_y)
# Setting High coordinate uncertainty, don't have time to implement correct calculations
self.coord_uncertainty = (1000, 1000)
return coord_array
def ___init__(self, filename):
......
KEYWORD,OLD_KW,FITS COMMENT,fixed_val,type,decimal_digits,action,Comment
STEPSV,,Steps in the axis perpendicular to the slit,,int,,keep,
STEPSH,,Steps in the axis parallel to the slit,,int,,keep,
DBLESAMP,,Double sampling active,,str,,keep,
import re
import unittest
from os.path import basename
from pathlib import Path
from tempfile import TemporaryDirectory
......@@ -11,9 +12,9 @@ from importer_test_data import translation_files
from kis_headers import validate_header
from sunpy.map import Map
from kis_tools.gris.headers.template_builder import HEADER_TEMPLATE
from kis_tools.gris.headers.template_builder import GRIS_PROPERTIES, GRIS_IFU_PROPERTIES
from kis_tools.gris.headers.translate_header import main
from kis_tools.gris.headers.wcs_generators.gris import GrisWCSGenerator
from kis_tools.gris.headers.wcs_generators import get_wcs_generator
class TestTranslation(unittest.TestCase):
......@@ -25,60 +26,70 @@ class TestTranslation(unittest.TestCase):
"""
with TemporaryDirectory() as td:
# Perform translation of sample file
resultfile = Path(td) / "outfile.fits"
main(
translation_files[0],
outfile=resultfile,
wcs_generator=GrisWCSGenerator(translation_files[0]),
)
# Validate Observer coordinates using Sunpy Map
m = Map(str(resultfile))
loc_from_file = EarthLocation.from_geocentric(
m.fits_header["OBSGEO-X"] * meter,
m.fits_header["OBSGEO-Y"] * meter,
m.fits_header["OBSGEO-Z"] * meter,
).to_geodetic()
ref_coordinates = (-16.5107, 28.3018, 2390)
for ref_coord, header_coord in zip(ref_coordinates, loc_from_file):
message = f"Observer Coordinates for GREGOR don't match:\nExpected: {ref_coordinates}\nFrom header:{loc_from_file}"
self.assertAlmostEqual(
ref_coord, header_coord.value, places=4, msg=message
for i, sample_file in enumerate(translation_files):
resultfile = Path(td) / f"outfile_{i + 1:02d}.fits"
# if 'gris-ifu' in sample_file: continue
wcs_gen = get_wcs_generator(sample_file)
main(
sample_file,
outfile=resultfile,
wcs_generator=wcs_gen,
)
del m
# Check header contents against original template
header = Header.fromfile(resultfile)
header_keys = [*header.keys()]
tmpl_keys = [record[0] for _,record in HEADER_TEMPLATE.iterrows()]
# Check keys
for k in tmpl_keys:
if k == "END":
continue
# Make regexps to check keywords
k_regexp = k
# Wildcards
k_regexp = k_regexp.replace("*", ".*")
# Two digit numbers
k_regexp = k_regexp.replace("nn", r"\d{2}")
# Different separators
k_regexp = re.sub(r"[\-_]", r"[\-_]", k_regexp)
# Check if key is present in header
in_header = any((re.match(k_regexp, hk) for hk in header_keys))
self.assertTrue(in_header, f"{k} not found in translated header!")
# Check SDC archive compliance with validate header tool
self.assertIsInstance(validate_header(header, instrument='gris'), Header,
"Header archive compatibility failed")
# Check DSUN_REF
self.assertIn("DSUN_OBS", header)
# Check that the header can be used to retrieve a WCS object
wcs = WCS(header)