Commit 0701b03f authored by Carl Schaffer's avatar Carl Schaffer
Browse files

Merge branch 'ifu_headers' into 'master'

Ifu headers

See merge request !217
parents 32241ace f3f4aa32
......@@ -16,7 +16,7 @@ pip install git+https://gitlab.leibniz-kis.de/sdc/kis_tools.git
```
The following commandline tools are provided by this package:
* BBI: bbi_gen_gifs, bbi_header_conversion, bbi_preview
* GRIS: gris_gen_maps, gris_location_plot, gris_penumbral_thumbs, gris_plot_cont_corr, gris_translate_header, grisplot, gristagger
* GRIS: gris_gen_maps, gris_location_plot, gris_parse_calib, gris_penumbral_thumbs, gris_plot_cont_corr, gris_translate_header, grisplot, gristagger
* LARS: lars_gen_preview, lars_logbook
All of them can be called from the commandline after installing the package. More information on the individual tools can be obtained using the `--help` flag
......
......@@ -71,6 +71,29 @@ optional arguments:
--disable-hmi-cache Disable caching of HMI data in /dat. Context data is copied to the user's home directory instead.
```
## gris_parse_calib
Parse calibration settings from `calDDmmmYY.pro` files. Recursively searches target folder for calibration scripts
and parses the environment at each call to one of the main calibration routine (gris_v8 etc.) calls.
Config data is filled into a pandas DataFrame. Run with pythons `-i` option to drop into an interactive frame for
further batch modification or use the output file option to store the data in CSV format for use with other tools.
The resulting data should be appended to the repository of GRIS calibration settings for use with `grisred`. Currently,
this information is stored in a csv file in the `grisred` repository (Nov 2021) this may be subject to change.
```shell
usage: gris_parse_calib [-h] [-v] [-o OUTPUT_FILE] folder_in
positional arguments:
folder_in folder to be processed
optional arguments:
-h, --help show this help message and exit
-v, --verbose don't silence warnings
-o OUTPUT_FILE, --output-file OUTPUT_FILE
output file
```
## gris_penumbral_thumbs
Generate preview images for GRIS penumbral masks, targets '*pen_mask.sav' files in the folder and produces .png versions.
```shell
......
import logging
from kis_tools.interface import BasicArgParser
from kis_tools.interface.commandline_interface import add_option_outfile
from kis_tools.util.swag import greeter
logger = logging.getLogger(__name__)
print(greeter)
parser = BasicArgParser()
add_option_outfile(parser)
args = parser.parse_args()
folder_in = args.folder_in
# Setup verbose option
verbose = args.verbose
logging_level = logging.INFO if verbose else logging.WARNING
logging.basicConfig(level=logging_level)
# Perform parsing
logger.info(f"Parsing {folder_in}")
from kis_tools.gris.calib_settings.parsing import parse_path
df = parse_path(folder_in)
# Output:
if args.output_file:
df.to_csv(args.output_file)
else:
print(df)
......@@ -5,11 +5,10 @@ from os.path import join
from pathlib import Path
import numpy as np
from kis_tools.util.util import gris_run_number, date_from_fn
from pandas import DataFrame
from tqdm import tqdm
from kis_tools.util.util import gris_run_number, date_from_fn
def parse_file(path):
"""
......@@ -38,7 +37,7 @@ def parse_file(path):
varname, value = res.groups()
env[varname] = value.strip()
main_call_match = re.search(r"^(gris(?:_v6|_v7|_sp){0,1})\s*,", line)
main_call_match = re.search(r"^(gris(?:_v[0-9]+|_sp){0,1})\s*,", line)
if main_call_match:
routine = main_call_match.group(1)
......
from kis_tools.gris.headers.ifu_translator import GrisIFUTranslator
from kis_tools.gris.headers.observationInfo import ObservationInfo
from kis_tools.gris.headers.translator_classes import *
from .translate_header import run_translate_header
from os.path import basename
from warnings import warn
from kis_tools.gris.headers.translator_classes import GrisTranslator, NothingToDoError
class GrisIFUTranslator(GrisTranslator):
name = "gris_ifu"
supported_instrument = "gris_ifu"
def __init__(self, *args, **kwargs):
super(GrisIFUTranslator, self).__init__(*args, **kwargs)
self.from_default("INSTRUME", "GRIS-IFU")
@classmethod
def can_translate(cls, header, filename=None):
"""Indicate whether this translation class can translate the
supplied header.
Parameters
----------
header : `dict`-like
Header to convert to standardized form.
filename : `str`, optional
Name of file being translated.
Returns
-------
can : `bool`
`True` if the header is recognized by this class. `False`
otherwise.
"""
if not basename(filename).startswith("gris-ifu_"):
return False
if "SOLARNET" in header:
raise NothingToDoError("This header is already up to date!")
if "TELESCOP" in header:
is_gregor = header["TELESCOP"] == "GREGOR"
if is_gregor and all([key in header.keys() for key in ["FF1WLOFF"]]):
return True
else:
warn(
f"{cls.__name__}: got a file which does not contain FTS-fitting results or has not been split."
"This translator needs a split file created with recent routines as input."
)
return False
def to_EXTNAME(self):
"""Generate extension name (date_run_map)"""
iserie = int(self._header["ISERIE"])
value = self.to_observation_id()
value += f"_{iserie:03d}"
return value
def to_FILENAME(self):
return basename(self.filename)
......@@ -31,7 +31,7 @@ from kis_tools.gris.headers.template_builder import (
_dirname,
hyphenated,
)
from kis_tools.gris.headers.wcs import GrisWCSGenerator, reorder_gris_data
from kis_tools.gris.headers.wcs_generators import get_wcs_generator
from kis_tools.util.gitinfo import get_git_info
from kis_tools.util.util import (
gen_history_time,
......@@ -181,8 +181,8 @@ def main(infile, outfile=None, wcs_generator=None, **kwargs): # noqa: C901
card = (keyword, value, comment)
return card
for index,row in HEADER_TEMPLATE.iterrows():
keyword, default_value, decimal_digits = row['KEYWORD'],row['fixed_val'],row['decimal_digits']
for index, row in HEADER_TEMPLATE.iterrows():
keyword, default_value, decimal_digits = row['KEYWORD'], row['fixed_val'], row['decimal_digits']
if keyword == "COMMENT":
cards = process_comment(default_value)
card_list += cards
......@@ -218,7 +218,8 @@ def main(infile, outfile=None, wcs_generator=None, **kwargs): # noqa: C901
card_list.append(Card("HISTORY", hist_entry))
# add WCS kws to card_list
wcs_generator = GrisWCSGenerator(infile)
wcs_generator = get_wcs_generator(infile)
wcs_cards = wcs_generator.make_wcs_cards()
wcs_index = [c.keyword for c in card_list].index("WCSNAME")
for c in wcs_cards:
......@@ -233,7 +234,7 @@ def main(infile, outfile=None, wcs_generator=None, **kwargs): # noqa: C901
# create header from card list
header = Header(card_list)
DATA = reorder_gris_data(wcs_generator.infile.data, reverse_axes=True)
DATA = wcs_generator.reordered_data()
f = PrimaryHDU(data=DATA, header=header)
if outfile is not None:
......
import datetime
import re
from glob import glob
from os.path import join, dirname
from os.path import join, dirname, basename
from warnings import warn
import pandas as pd
......@@ -294,6 +294,9 @@ class GrisTranslator(GREGORTranslator):
`True` if the header is recognized by this class. `False`
otherwise.
"""
if not basename(filename).startswith("gris_"):
return False
if "SOLARNET" in header:
raise NothingToDoError("This header is already up to date!")
......@@ -317,7 +320,7 @@ class GrisTranslator(GREGORTranslator):
def to_observation_id(self):
""" observation id is given by DAY_RUN"""
date = self._from_fits_date_string(self._header["DATE-OBS"]).strftime("%Y%m%d")
iobs = int(self._header["IOBS"])
iobs = gris_run_number(self.filename)
value = f"{date}_{iobs:03d}"
return value
......@@ -437,6 +440,9 @@ class GrisTranslator(GREGORTranslator):
exptime = self.to_TEXPOSUR()
return exptime * n_accumulations
def to_IOBS(self):
return gris_run_number(self.filename)
def to_EXPTIME(self):
"""Implemented for Solarsoft compatibility in seconds"""
return self.to_XPOSURE()
......
from os.path import basename
from kis_tools.gris.headers.wcs_generators.gris import GrisWCSGenerator, GrisIFUWCSGenerator
class NoWCSGeneratorError(object):
"""Raise if no wcs generator matches an input"""
def get_wcs_generator(infile):
filename = basename(infile)
if filename.startswith("gris_"):
return GrisWCSGenerator(infile)
elif filename.startswith("gris-ifu_"):
return GrisIFUWCSGenerator(infile)
raise NoWCSGeneratorError(f"Could not determine correct WCS generator for {infile}")
pass
from abc import ABC, abstractmethod
class WCSGenerator(ABC):
@abstractmethod
def make_wcs_cards(self):
"""Generate WCS cards for the underlying file"""
@abstractmethod
def reordered_data(self):
"""Generate WCS cards for the underlying file"""
@abstractmethod
def data_shape(self):
"""Return the shape of the data array after translation"""
@abstractmethod
def rotate_reference_coords(self, X, Y, angle):
"""Rotate a set of reference coords by a given angle"""
......@@ -10,10 +10,12 @@ from scipy.io.idl import readsav
from kis_tools.gris.GrisFitsFile import GrisFitsFile
from kis_tools.gris.headers.template_builder import HEADER_TEMPLATE
from kis_tools.gris.headers.wcs_generators.generic import WCSGenerator
from kis_tools.gris.ifu_fits_file import IFUFitsFile
from kis_tools.util.calculations import rotate_around_first_coord
class GrisWCSGenerator(object):
class GrisWCSGenerator(WCSGenerator):
def __init__(self, fits_file, cross_correlation_file=None):
"""Initialize WCS Generator class, load all necessary data into class
......@@ -55,9 +57,6 @@ class GrisWCSGenerator(object):
else:
self.coords = self.get_coords_from_header()
def get_coords_from_header(self):
"""coordinates for slit, the Keywords 'SLITPOSX' and 'SLITPOSY' are assumed to be the center of the slit
while the keyword 'SLITORIE' describes the rotation of the slit w.r.t. the helioprojective axes. The
......@@ -151,13 +150,37 @@ class GrisWCSGenerator(object):
return cards
def reordered_data(self):
"""
Reorder gris data for easier access
Args:
data: input data array
reverse_axes: reverse the order of axes, necessary if writing the data to a fits file afterwards
Returns:
"""
# add degenerate dimension to depict x and y for spatial coordinate
# change axis order from (y,stokes,x,wave) to (x,y,wave,stokes)
trans_data = np.swapaxes(self.infile.data, 0, 1)
trans_data = np.array([trans_data])
trans_data = np.swapaxes(trans_data, -1, -2)
for i in range(int(len(trans_data.shape) / 2)):
trans_data = np.swapaxes(trans_data, 0 + i, -1 - i)
pass
return trans_data
def make_wcs_cards(self):
"""Generate WCS keywords. The spatial extent is represented by introducing a degenerate axis.
The values specified in the header contain axes of SOLARX and SOLARY rotated to be parallel with
the SOLARX axis. They the also contain a rotation matrix to orient the slit correctly within the
coordinate system. This is necessary to make the specification of the values more intuitive."""
shape = reorder_gris_data(self.data_in).shape
shape = self.data_shape()
def get_angle(x, y):
v = np.array([x, y])
......@@ -170,7 +193,7 @@ class GrisWCSGenerator(object):
X, Y = self.coords
angle = get_angle(X[0] - X[-1], Y[0] - Y[-1])
X_rot, Y_rot = rotate_around_first_coord(X, Y, angle)
X_rot, Y_rot = self.rotate_reference_coords(X, Y, angle)
assert np.isclose(0, sum(X_rot - np.mean(X_rot)), 0.01)
cards = []
......@@ -229,8 +252,41 @@ class GrisWCSGenerator(object):
return cards
def rotate_reference_coords(self, X, Y, angle):
"""
Rotate the reference coords by a given angle in radians
for slit data we can directly rotate the x and y coords of the slit.
"""
X_rot, Y_rot = rotate_around_first_coord(X, Y, angle)
return X_rot, Y_rot
def data_shape(self):
shape = self.reordered_data().T.shape
return shape
class GrisIFUWCSGenerator(GrisWCSGenerator):
def __init__(self, fits_file):
self.infile = IFUFitsFile(fits_file)
self.cross_correlation_file = None
# retrieve fits data from disk
self.header_in = self.infile.header
self.data_in = self.infile.data
self.coords = self.get_coords_from_header()
def get_coords_from_header(self):
"""
Extract coordinates from the underlying data.
Returns:
(X,Y):ndarray(float) x and y coordinates in HPC/Arcsec for the leftmost column and topmost row respectively
"""
coords = self.infile.coords.value
self.coord_uncertainties = self.infile.coord_uncertainty
return coords[0, :, 0], coords[1, 0, :]
def reorder_gris_data(data, reverse_axes=False):
def reordered_data(self):
"""
Reorder gris data for easier access
Args:
......@@ -240,17 +296,25 @@ def reorder_gris_data(data, reverse_axes=False):
Returns:
"""
# add degenerate dimension to depict x and y for spatial coordinate
# change axis order from (y,stokes,x,wave) to (x,y,wave,stokes)
trans_data = np.swapaxes(data, 0, 1)
trans_data = np.array([trans_data])
trans_data = np.swapaxes(trans_data, -1, -2)
data = self.infile.data
return data
if reverse_axes:
for i in range(int(len(trans_data.shape) / 2)):
trans_data = np.swapaxes(trans_data, 0 + i, -1 - i)
pass
def rotate_reference_coords(self, X, Y, angle):
"""
Rotate the reference coords by a given angle in radians
for IFU data we need to account for different sitzes in x and y direction.
return trans_data
Essentially the code below is meant to take a horizontal slice with varying X values and a vertical slice with
varying Y values, pad each with a constant value of the other coordinate and rotate them around the first
coordinate.
"""
X, Y_placeholder = [X, np.ones(X.shape) * Y.mean()]
X_placeholder, Y = [np.ones(Y.shape) * X.mean(), Y]
X_rot, _ = rotate_around_first_coord(X, Y_placeholder, angle)
_, Y_rot = rotate_around_first_coord(X_placeholder, Y, angle)
return X_rot, Y_rot
def data_shape(self):
shape = self.reordered_data().T.shape
return shape
import logging
import os
from datetime import timedelta
from os.path import basename
import astropy.units as u
import numpy as np
from kis_tools.gris.GrisFitsFile import GrisFitsFile
from kis_tools.gris.coord_correction_ml import get_coords_ml
from kis_tools.gris.util import telescope_to_hpc
from kis_tools.util.constants import gris_uncertainty_no_center_x, gris_uncertainty_no_center_y
from kis_tools.util.util import date_from_fn, gris_run_number
logger = logging.getLogger(__name__)
class IFUFitsFile(GrisFitsFile):
def parse_filename(self):
"""Extract metadata from filename"""
# Extract time from Filename
name = os.path.splitext(basename(self.filename))[0].split("_")
self.date = date_from_fn(self.filename)
# Extract Run-, Map-, and Stepnumber, as well as mode string
self.runnumber = gris_run_number(self.filename)
self.mapnumber = int(name[5])
self.modestring = name[3]
@property
def n_steps(self):
"""
Number of steps per map
Does not account for aborted maps. Contrary to slit operation,
IFU maps are typically not too large and the error caused by the
actual difference isn't as severe.
Returns:
n_steps: number of steps per map
"""
nsteps = self["STEPSH"] * self["STEPSV"]
return nsteps
@property
def _projected_observation_end(self):
remaining_slits = self.n_steps * self.number_of_maps - (
self.query(["STEP", "ISTEP"]) + (self.mapnumber - 1) * self.n_steps)
# remaining_time = remaining_slits * self.exposure_time
# using placeholder for time between slits, I don't know how to calculate correctly,
# self.exptime does not account for dead time between exposures.
remaining_time = remaining_slits * 3
projected_end = self.obs_time + timedelta(seconds=remaining_time)
return projected_end
@property
def _coords_from_simple_header(self):
"""coordinates for slit, the Keywords 'SLITPOSX' and 'SLITPOSY' are assumed to be the center of the slit
while the keyword 'SLITORIE' describes the rotation of the slit w.r.t. the helioprojective axes. The
algorithm further assumes that each pixel represents a square area with side length of 'STEPSIZE'.
These assumptions are used to calculate the coordinates for the center of each pixel within the data array.
Returns:
(X,Y):ndarray(float) x and y coordinates for each pixel of the slit
WARNING! The code below is not checked for correctness! We do not know what direction SLITORIE references
off and coordinates provided by GREGOR are very error-prone. If you need precision better than ~50" this
function is not sufficient.
"""
x_elements = self["NAXIS1"]
y_elements = self["NAXIS2"]
# Slit center in telescope coordinates
x_slitcenter = self['SLITPOSX'] if self.header['SLITPOSX'] != "" else 0
y_slitcenter = self['SLITPOSY'] if self.header['SLITPOSY'] != "" else 0
# Stepsizes
x_stepsize = self['STEPSIZH']
y_stepsize = self['STEPSIZV']
coord_ref_x = x_elements // 2
coord_ref_y = y_elements // 2
if "SLITCNTR" in self.header:
if self['SLITCNTR'].strip() != 'scancntr':
raise ValueError(
f'Cannot determine reference pixel for coordinates! SLITCNTR is {self["SLITCNTR"]} instead of scancntr')
# Get telescope centering and associated uncertainties. Centering is the driving factor for uncertainties
delta_x, delta_y, std_delta_x, std_delta_y = self.telescope_centering
# If uncertainties are too large, use ML model to correct the coordinates
try:
p0_angle = self.p0_angle
except ValueError:
logger.warning(ValueError)
p0_angle = None
if p0_angle is not None:
from kis_tools.gris.coord_correction_ml import coord_model_stdx, coord_model_stdy
if std_delta_x > coord_model_stdx or std_delta_y > coord_model_stdy:
try:
x_hpc, y_hpc, std_delta_x, std_delta_y = get_coords_ml(self)
except ValueError as e:
logger.warning(f"Error in finding ML coords for {self.__repr__()}:{e}")
# Point to slitpos with 1 solar disk uncertainty if coord retrieval didn't work
x_hpc, y_hpc, std_delta_x, std_delta_y = x_slitcenter, y_slitcenter, \
gris_uncertainty_no_center_x, gris_uncertainty_no_center_y
else:
x_hpc, y_hpc = telescope_to_hpc(x_slitcenter, y_slitcenter, p0_angle, delta_x, delta_y)
else:
# if p0 is broken, only do centering correction
x_hpc, y_hpc = x_slitcenter - delta_x, y_slitcenter - delta_y
# Try determining orientation, use 0 if not possible
try:
angle = self.slit_orientation
except ValueError as e:
logger.warning(f"Could not determine Slit orientation! \n {e}")
angle = 0
# With the center pixel value given, the coordinates need to be expanded to a full array.
# Setup grid of pixels
delta_pix_x = [i - coord_ref_x for i in range(x_elements)]
delta_pix_y = [i - coord_ref_y for i in range(y_elements)]
delta_pix_x = np.array(delta_pix_x)
delta_pix_y = np.array(delta_pix_y)
# generate coordinates for full grid of slit pixels
X = x_hpc + np.cos(np.radians(angle)) * delta_pix_x * x_stepsize
Y = y_hpc - np.sin(np.radians(angle)) * delta_pix_y * y_stepsize
X = (X * np.ones((y_elements, 1))).T
Y = Y * np.ones((x_elements, 1))
coord_array = np.array([X, Y]) * u.arcsec
# track coordinate uncertainty within class
self.coord_uncertainty = (std_delta_x, std_delta_y)
return coord_array
def ___init__(self, filename):
super(IFUFitsFile, self).___init__(filename)
......@@ -15,7 +15,7 @@ EXTNAME,,Unique HDU name,,str,,new,"unique name for each header data unit, since
POINT_ID,,Unique (re-)pointing ID,,str,,new,A number that should be automatically incremented each time the telescope is significantly repointed. Can we get something like this for GREGOR?
OBSERVER,,Name of observer,,str,,new,
OBS_TRGT,,Observation Target,,str,,new,
IOBS,,Observation ID (unique within day),,int,,keep,"run number, only in split files available"
IOBS,,Observation ID (unique within day),,int,,new,"run number, only in split files available"
NMAPS,SERIES,Number of maps within observation,,int,,rename,Number of maps
IMAP,ISERIE,Map ID (unique within observation),,int,,rename,number of the map the fits file belongs to
NSTEPS,STEPS,Number of steps per map,,int,,rename,number of slit positions
......
......@@ -115,7 +115,7 @@ def add_arg_infile(parser):
def add_option_outfile(parser):
"""Add option for output to file"""
parser.add_argument("-f", "--output-file", default=None, help="output file")
parser.add_argument("-o", "--output-file", default=None, help="output file")
def add_option_run(parser):
......
......@@ -11,7 +11,7 @@ from scipy.io import readsav
from kis_tools.gris.GrisDay import GrisDay
from kis_tools.gris.GrisMapExtractor import GrisMapExtractor
from kis_tools.gris.GrisRun import GrisRun
from kis_tools.gris.headers.wcs import GrisWCSGenerator
from kis_tools.gris.headers.wcs_generators.gris import GrisWCSGenerator
class TestCoords(unittest.TestCase):
......
......@@ -13,6 +13,8 @@ from importer_test_data.gris_structure import (
raw_files,