Commit 6517dad5 authored by Carl Schaffer's avatar Carl Schaffer
Browse files

adding IFU fits file along with WCS generation for IFU

parent a3d14b96
......@@ -31,7 +31,8 @@ from kis_tools.gris.headers.template_builder import (
from kis_tools.gris.headers.wcs import GrisWCSGenerator, reorder_gris_data
from kis_tools.gris.headers.wcs_generators import get_wcs_generator
from kis_tools.gris.headers.wcs_generators.gris import reorder_gris_data
from kis_tools.util.gitinfo import get_git_info
from kis_tools.util.util import (
......@@ -218,7 +219,8 @@ def main(infile, outfile=None, wcs_generator=None, **kwargs): # noqa: C901
card_list.append(Card("HISTORY", hist_entry))
# add WCS kws to card_list
wcs_generator = GrisWCSGenerator(infile)
wcs_generator = get_wcs_generator(infile)
wcs_cards = wcs_generator.make_wcs_cards()
wcs_index = [c.keyword for c in card_list].index("WCSNAME")
for c in wcs_cards:
from os.path import basename
from kis_tools.gris.headers.wcs_generators.gris import GrisWCSGenerator, GrisIFUWCSGenerator
class NoWCSGeneratorError(object):
"""Raise if no wcs generator matches an input"""
def get_wcs_generator(infile):
filename = basename(infile)
if filename.startswith("gris_"):
return GrisWCSGenerator(infile)
elif filename.startswith("gris-ifu_"):
return GrisIFUWCSGenerator(infile)
raise NoWCSGeneratorError(f"Could not determine correct WCS generator for {infile}")
from abc import ABC, abstractmethod
class WCSGenerator(ABC):
def make_wcs_cards(self):
"""Generate WCS cards for the underlying file"""
......@@ -10,10 +10,12 @@ from import readsav
from kis_tools.gris.GrisFitsFile import GrisFitsFile
from kis_tools.gris.headers.template_builder import HEADER_TEMPLATE
from kis_tools.gris.headers.wcs_generators.generic import WCSGenerator
from kis_tools.gris.ifu_fits_file import IFUFitsFile
from kis_tools.util.calculations import rotate_around_first_coord
class GrisWCSGenerator(object):
class GrisWCSGenerator(WCSGenerator):
def __init__(self, fits_file, cross_correlation_file=None):
"""Initialize WCS Generator class, load all necessary data into class
......@@ -55,9 +57,6 @@ class GrisWCSGenerator(object):
self.coords = self.get_coords_from_header()
def get_coords_from_header(self):
"""coordinates for slit, the Keywords 'SLITPOSX' and 'SLITPOSY' are assumed to be the center of the slit
while the keyword 'SLITORIE' describes the rotation of the slit w.r.t. the helioprojective axes. The
......@@ -70,7 +69,7 @@ class GrisWCSGenerator(object):
coords = self.infile.coords.value
self.coord_uncertainties = self.infile.coord_uncertainty
return coords[:, 0], coords[:, 1]
return coords[0, :, 0], coords[1, 0, :]
def get_coords_from_save(self):
"""Extract coordinates from cross correlation file. If the scan was performed with an angle of 0 degrees,
......@@ -114,8 +113,8 @@ class GrisWCSGenerator(object):
pattern = re.sub(r"\d+", "\*", keyword) # noqa: F841, W605
template = (
number ="(\d+)", keyword).group()
......@@ -141,8 +140,8 @@ class GrisWCSGenerator(object):
for key, coords in zip(["SLITPOSX", "SLITPOSY"], [X, Y]):
template = (
comment = template["FITS COMMENT"]
value = coords.mean()
......@@ -254,3 +253,95 @@ def reorder_gris_data(data, reverse_axes=False):
return trans_data
class GrisIFUWCSGenerator(GrisWCSGenerator):
def __init__(self, fits_file):
self.infile = IFUFitsFile(fits_file)
self.cross_correlation_file = None
# retrieve fits data from disk
self.header_in = self.infile.header
self.data_in =
self.coords = self.get_coords_from_header()
def make_wcs_cards(self):
"""Generate WCS keywords. The spatial extent is represented by introducing a degenerate axis.
The values specified in the header contain axes of SOLARX and SOLARY rotated to be parallel with
the SOLARX axis. They the also contain a rotation matrix to orient the slit correctly within the
coordinate system. This is necessary to make the specification of the values more intuitive."""
shape = self.data_in.T.shape
def get_angle(x, y):
v = np.array([x, y])
u = np.array([0, 1])
temp =, u) / (math.sqrt(, v)) * math.sqrt(, u)))
angle = math.acos(temp)
angle = -1 * angle if x < 0 else angle
return angle
X, Y = self.coords
angle = get_angle(X[0] - X[-1], Y[0] - Y[-1])
X, Y_placeholder = [X, np.ones(X.shape) * Y.mean()]
X_placeholder, Y = [np.ones(Y.shape) * X.mean(), Y]
X_rot, _ = rotate_around_first_coord(X, Y_placeholder, angle)
_, Y_rot = rotate_around_first_coord(X_placeholder, Y, angle)
assert np.isclose(0, sum(X_rot - np.mean(X_rot)), 0.01)
cards = []
c_types = ["HPLN-TAN", "HPLT-TAN", "WAVE", "STOKES"]
c_units = ["arcsec", "arcsec", "Angstrom", ""]
# Reference Pixels
# CRPIX values are 1-indexed according to the fits standard
c_rpix = [1, 1, 1, 1]
# the coordinates are rotated in such a way that they are parallel to the y-axis
# which means that the y- value is the same along the entire axis. The information on the rotation is stored
# in the PCi_j keywords
c_rval = [X_rot[0], Y_rot[0], self.infile.wavelength_region[0], 1]
# set systematic error: Use estimated 2 arcsec if cross-correlation is successful, else use one solar disk
# with 1000" because the only thing we can be sure of with current GREGOR coordinates is that we are indeed
# pointing at the sun -.-
has_ccorr = bool(self.cross_correlation_file)
# Uncertainties estimated from difference between cross correlated coordinates and coordinates
# from the telescope. They showed a significantly larger uncertainty for the Y coordinate. See documentation of
spatial_uncertainty_x = 5 if has_ccorr else self.coord_uncertainties[0]
spatial_uncertainty_y = 5 if has_ccorr else self.coord_uncertainties[1]
c_syer = [
step_size = np.diff(Y_rot).mean()
c_delt = [step_size, step_size, self.infile.wavelength_step_size, 1]
for i in range(len(shape)):
cards.append(self.make_card(f"NAXIS{i + 1}", shape[i]))
cards.append(self.make_card(f"CTYPE{i + 1}", c_types[i]))
cards.append(self.make_card(f"CUNIT{i + 1}", c_units[i]))
cards.append(self.make_card(f"CRPIX{i + 1}", c_rpix[i]))
cards.append(self.make_card(f"CRVAL{i + 1}", c_rval[i]))
cards.append(self.make_card(f"CDELT{i + 1}", c_delt[i]))
cards.append(self.make_card(f"CSYER{i + 1}", c_syer[i]))
# add rotation of spatial axes
c, s = np.cos(-angle), np.sin(-angle)
cards.append(self.make_card("PC1_1", c))
cards.append(self.make_card("PC1_2", -s))
cards.append(self.make_card("PC2_1", s))
cards.append(self.make_card("PC2_2", c))
return cards
import logging
import os
from datetime import timedelta
from os.path import basename
import astropy.units as u
import numpy as np
from kis_tools.gris.GrisFitsFile import GrisFitsFile
from kis_tools.gris.coord_correction_ml import get_coords_ml
from kis_tools.gris.util import telescope_to_hpc
from kis_tools.util.constants import gris_uncertainty_no_center_x, gris_uncertainty_no_center_y
from kis_tools.util.util import date_from_fn, gris_run_number
logger = logging.getLogger(__name__)
class IFUFitsFile(GrisFitsFile):
def parse_filename(self):
"""Extract metadata from filename"""
# Extract time from Filename
name = os.path.splitext(basename(self.filename))[0].split("_") = date_from_fn(self.filename)
# Extract Run-, Map-, and Stepnumber, as well as mode string
self.runnumber = gris_run_number(self.filename)
self.mapnumber = int(name[5])
self.modestring = name[3]
def n_steps(self):
Number of steps per map
Does not account for aborted maps. Contrary to slit operation,
IFU maps are typically not too large and the error caused by the
actual difference isn't as severe.
n_steps: number of steps per map
nsteps = self["STEPSH"] * self["STEPSV"]
return nsteps
def _projected_observation_end(self):
remaining_slits = self.n_steps * self.number_of_maps - (
self.query(["STEP", "ISTEP"]) + (self.mapnumber - 1) * self.n_steps)
# remaining_time = remaining_slits * self.exposure_time
# using placeholder for time between slits, I don't know how to calculate correctly,
# self.exptime does not account for dead time between exposures.
remaining_time = remaining_slits * 3
projected_end = self.obs_time + timedelta(seconds=remaining_time)
return projected_end
def _coords_from_simple_header(self):
"""coordinates for slit, the Keywords 'SLITPOSX' and 'SLITPOSY' are assumed to be the center of the slit
while the keyword 'SLITORIE' describes the rotation of the slit w.r.t. the helioprojective axes. The
algorithm further assumes that each pixel represents a square area with side length of 'STEPSIZE'.
These assumptions are used to calculate the coordinates for the center of each pixel within the data array.
(X,Y):ndarray(float) x and y coordinates for each pixel of the slit
WARNING! The code below is not checked for correctness! We do not know what direction SLITORIE references
off and coordinates provided by GREGOR are very error-prone. If you need precision better than ~50" this
function is not sufficient.
x_elements = self["NAXIS1"]
y_elements = self["NAXIS2"]
# Slit center in telescope coordinates
x_slitcenter = self['SLITPOSX'] if self.header['SLITPOSX'] != "" else 0
y_slitcenter = self['SLITPOSY'] if self.header['SLITPOSY'] != "" else 0
# Stepsizes
x_stepsize = self['STEPSIZH']
y_stepsize = self['STEPSIZV']
coord_ref_x = x_elements // 2
coord_ref_y = y_elements // 2
if "SLITCNTR" in self.header:
if self['SLITCNTR'].strip() != 'scancntr':
raise ValueError(
f'Cannot determine reference pixel for coordinates! SLITCNTR is {self["SLITCNTR"]} instead of scancntr')
# Get telescope centering and associated uncertainties. Centering is the driving factor for uncertainties
delta_x, delta_y, std_delta_x, std_delta_y = self.telescope_centering
# If uncertainties are too large, use ML model to correct the coordinates
p0_angle = self.p0_angle
except ValueError:
p0_angle = None
if p0_angle is not None:
from kis_tools.gris.coord_correction_ml import coord_model_stdx, coord_model_stdy
if std_delta_x > coord_model_stdx or std_delta_y > coord_model_stdy:
x_hpc, y_hpc, std_delta_x, std_delta_y = get_coords_ml(self)
except ValueError as e:
logger.warning(f"Error in finding ML coords for {self.__repr__()}:{e}")
# Point to slitpos with 1 solar disk uncertainty if coord retrieval didn't work
x_hpc, y_hpc, std_delta_x, std_delta_y = x_slitcenter, y_slitcenter, \
gris_uncertainty_no_center_x, gris_uncertainty_no_center_y
x_hpc, y_hpc = telescope_to_hpc(x_slitcenter, y_slitcenter, p0_angle, delta_x, delta_y)
# if p0 is broken, only do centering correction
x_hpc, y_hpc = x_slitcenter - delta_x, y_slitcenter - delta_y
# Try determining orientation, use 0 if not possible
angle = self.slit_orientation
except ValueError as e:
logger.warning(f"Could not determine Slit orientation! \n {e}")
angle = 0
# With the center pixel value given, the coordinates need to be expanded to a full array.
# Setup grid of pixels
delta_pix_x = [i - coord_ref_x for i in range(x_elements)]
delta_pix_y = [i - coord_ref_y for i in range(y_elements)]
delta_pix_x = np.array(delta_pix_x)
delta_pix_y = np.array(delta_pix_y)
# generate coordinates for full grid of slit pixels
X = x_hpc + np.cos(np.radians(angle)) * delta_pix_x * x_stepsize
Y = y_hpc - np.sin(np.radians(angle)) * delta_pix_y * y_stepsize
X = (X * np.ones((y_elements, 1))).T
Y = Y * np.ones((x_elements, 1))
coord_array = np.array([X, Y]) * u.arcsec
# track coordinate uncertainty within class
self.coord_uncertainty = (std_delta_x, std_delta_y)
return coord_array
def ___init__(self, filename):
super(IFUFitsFile, self).___init__(filename)
......@@ -11,7 +11,7 @@ from import readsav
from kis_tools.gris.GrisDay import GrisDay
from kis_tools.gris.GrisMapExtractor import GrisMapExtractor
from kis_tools.gris.GrisRun import GrisRun
from kis_tools.gris.headers.wcs import GrisWCSGenerator
from kis_tools.gris.headers.wcs_generators.gris import GrisWCSGenerator
class TestCoords(unittest.TestCase):
......@@ -13,6 +13,8 @@ from importer_test_data.gris_structure import (
folder as structured_gris_folder, coord_save,
from import idl
from kis_tools.gris.GrisArchive import GrisArchive
from kis_tools.gris.GrisDay import GrisDay
from kis_tools.gris.GrisFitsFile import GrisFitsFile
......@@ -23,11 +25,10 @@ from kis_tools.gris.calib_settings.parsing import (
parse_file, parse_archive,
from kis_tools.gris.coord_correction_ml import get_coords_ml
from kis_tools.gris.headers.wcs import reorder_gris_data
from kis_tools.gris.headers.wcs_generators.gris import reorder_gris_data
from kis_tools.gris.util import expand_coords, calculate_slit_translation
from kis_tools.util.constants import gris_stepsize_along_slit, gris_stepsize_perpendicular_to_slit
from kis_tools.util.util import groupby_gris_run
from import idl
_testdata = "/dat/sdc/testing_data_for_importer/gris/"
......@@ -12,7 +12,7 @@ from import Map
from kis_tools.gris.headers.template_builder import HEADER_TEMPLATE
from kis_tools.gris.headers.translate_header import main
from kis_tools.gris.headers.wcs import GrisWCSGenerator
from kis_tools.gris.headers.wcs_generators.gris import GrisWCSGenerator
class TestTranslation(unittest.TestCase):
......@@ -16,7 +16,7 @@ from matplotlib.figure import Figure
from kis_tools.gris.GrisFitsFile import GrisFitsFile
from kis_tools.gris.GrisMapExtractor import run_gris_gen_maps
from kis_tools.gris.headers.translate_header import main
from kis_tools.gris.headers.wcs import GrisWCSGenerator
from kis_tools.gris.headers.wcs_generators.gris import GrisWCSGenerator
from kis_tools.util.locplot import make_loc_plot
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment