import math import re from glob import glob from math import isnan from os.path import exists, join from pathlib import Path import joblib import numpy as np from astropy.io.fits import Card from pandas import to_datetime from scipy.io.idl import readsav from kis_tools.gris.GrisFitsFile import GrisFitsFile from kis_tools.gris.headers.template_builder import HEADER_TEMPLATE from kis_tools.util.calculations import rotate_around_first_coord _coord_model_path = Path(__file__).parent.parent / 'gris_coordinate_study' / 'multi_tree_stdx38_stdy36.gz' _coord_model = joblib.load(_coord_model_path) class GrisWCSGenerator(object): def __init__(self, fits_file, cross_correlation_file=None): """Initialize WCS Generator class, load all necessary data into class Args: fits_file: gris level1 split fits file corresponding to single slit position cross_correlation_file: IDL save file resulting from cross correlation with HMI """ # Store information on run self.infile = GrisFitsFile(fits_file) # Assure that inputs are valid: if not exists(fits_file): message = f"FITS file {fits_file} not found." raise IOError(message) # Try to find a cross correlation file verb_name = self.infile.verbose_runname folder = join(self.infile.dirname, "../context_data/") pattern = join(folder, f"{verb_name}_coord_mu.sav") matches = glob(pattern) if matches != [] and cross_correlation_file is None: cross_correlation_file = matches[0] if cross_correlation_file is not None: if not exists(cross_correlation_file): message = f"Cross correlation save {cross_correlation_file} not found" raise IOError(message) self.cross_correlation_file = cross_correlation_file # retrieve fits data from disk self.header_in = self.infile.header self.data_in = self.infile.data # get coordinate info if cross_correlation_file is not None: self.coords = self.get_coords_from_save() else: self.coords = self.get_coords_from_header() def get_coords_ml(self): """ Determine the coordinates of the observation from a pre-trained statistical model. The model was trained trained and evaluated against a dataset of original data and results of a manual cross correlation with hmi. Albeit not being totaly accurate, the model has been shown to systematically reduce the distance between the given and the actual coodinates. The typical accuracy (stdev) is around 40" Returns: x,y tuple of the estimated observation center """ features = ['SLITPOSX', 'SLITPOSY', 'AZIMUT', 'P0ANGLE'] vals = [self.infile.header[f] for f in features] # Insert bogus parallax angle, it is 0 for all cases I observed # so far anyway plus is not used very strongly by the model vals.insert(2, 0) date = to_datetime(self.infile['DATE-OBS'] + self.infile['UT']) # Insert Zenit vals.insert(4, 90 - self.infile.header['ELEVATIO']) vals.append(date.day_of_year) vals.append(date.year) return _coord_model.predict(np.array(vals).reshape(-1, 1).T) def get_coords_from_header(self): """coordinates for slit, the Keywords 'SLITPOSX' and 'SLITPOSY' are assumed to be the center of the slit while the keyword 'SLITORIE' describes the rotation of the slit w.r.t. the helioprojective axes. The algorithm further assumes that each pixel represents a square area with side length of 'STEPSIZE'. These assumptions are used to calculate the coordinates for the center of each pixel within the data array. Returns: (X,Y):ndarray(float) x and y coordinates for each pixel of the slit """ coords = self.infile.coords.value return coords[:, 0], coords[:, 1] def get_coords_from_save(self): """Extract coordinates from cross correlation file. If the scan was performed with an angle of 0 degrees, the correlation file needs to be read in the opposite direction. This is accounted for. Returns: X,Y: ndarray(float) Slit coordinates in X and Y """ fits_file = self.infile save_file = self.cross_correlation_file # nmaps = fits_file.number_of_maps # imaps = fits_file.map_index nsteps = fits_file.n_steps istep = fits_file.step_index angle = fits_file.step_angle save = readsav(save_file) full_coords = save["xy_coord"] # calculate index perpendicular to slit index = istep - 1 # Adjust index according to derotator presence and scanning direction. The index needs to be flipped if # exactly one of these requirements is fulfilled. If both are fulfilled nothing needs to be done. reversed_scanning = angle == 0 derotator_in = fits_file.derotator_in if reversed_scanning != derotator_in: index = nsteps - index - 1 X = full_coords[0, index, :] Y = full_coords[1, index, :] return X, Y @property def header_template(self): df = HEADER_TEMPLATE return df def make_card(self, keyword, value): pattern = re.sub(r"\d+", "\*", keyword) # noqa: F841, W605 template = ( self.header_template.query("KEYWORD.str.contains(@pattern)") .iloc[0] .to_dict() ) number = re.search(r"(\d+)", keyword).group() comment = template["FITS COMMENT"] comment = re.sub(r"\*", number, comment) # fix NAN and round where necessary if isinstance(value, float): decimal_digits = template["decimal_digits"] if isnan(value): value = None elif not isnan(decimal_digits): value = round(value, int(decimal_digits)) card = Card(keyword, value, comment) return card def slitpos_cards(self): """Generate Header cards for SLITPOSX and SLITPOSY""" X, Y = self.coords cards = [] for key, coords in zip(["SLITPOSX", "SLITPOSY"], [X, Y]): template = ( self.header_template.query("KEYWORD.str.contains(@key)") .iloc[0] .to_dict() ) comment = template["FITS COMMENT"] value = coords.mean() card = Card(key, value, comment) cards.append(card) return cards def make_wcs_cards(self): """Generate WCS keywords. The spatial extent is represented by introducing a degenerate axis. The values specified in the header contain axes of SOLARX and SOLARY rotated to be parallel with the SOLARX axis. They the also contain a rotation matrix to orient the slit correctly within the coordinate system. This is necessary to make the specification of the values more intuitive.""" shape = reorder_gris_data(self.data_in).shape def get_angle(x, y): v = np.array([x, y]) u = np.array([0, 1]) temp = np.dot(v, u) / (math.sqrt(np.dot(v, v)) * math.sqrt(np.dot(u, u))) angle = math.acos(temp) angle = -1 * angle if x < 0 else angle return angle X, Y = self.coords angle = get_angle(X[0] - X[-1], Y[0] - Y[-1]) X_rot, Y_rot = rotate_around_first_coord(X, Y, angle) assert np.isclose(0, sum(X_rot - np.mean(X_rot)), 0.01) cards = [] # COORDINATE TYPES c_types = ["HPLN-TAN", "HPLT-TAN", "WAVE", "STOKES"] # UNITS c_units = ["arcsec", "arcsec", "Angstrom", ""] # Reference Pixels c_rpix = [0, 0, 0, 0] # REFERENCE VALUES # the coordinates are rotated in such a way that they are parallel to the y-axis # which means that the y- value is the same along the entire axis. The information on the rotation is stored # in the PCi_j keywords c_rval = [X_rot[0], Y_rot[0], self.infile.wavelength_region[0], 1] # COORDINATE UNCERTAINTIES # set systematic error: Use estimated 2 arcsec if cross-correlation is successful, else use one solar disk # with 1000" because the only thing we can be sure of with current GREGOR coordinates is that we are indeed # pointing at the sun -.- has_ccorr = bool(self.cross_correlation_file) # Uncertainties estimated from difference between cross correlated coordinates and coordinates # from the telescope. They showed a significantly larger uncertainty for the Y coordinate. See documentation of # https://gitlab.leibniz-kis.de/sdc/sdc_importer/issues/199 spatial_uncertainty_x = 5 if has_ccorr else 141 spatial_uncertainty_y = 5 if has_ccorr else 186 c_syer = [ spatial_uncertainty_x, spatial_uncertainty_y, self.infile.wavelength_step_size, 0, ] # STEP SIZES step_size = np.diff(Y_rot).mean() c_delt = [step_size, step_size, self.infile.wavelength_step_size, 1] for i in range(len(shape)): cards.append(self.make_card(f"NAXIS{i + 1}", shape[i])) cards.append(self.make_card(f"CTYPE{i + 1}", c_types[i])) cards.append(self.make_card(f"CUNIT{i + 1}", c_units[i])) cards.append(self.make_card(f"CRPIX{i + 1}", c_rpix[i])) cards.append(self.make_card(f"CRVAL{i + 1}", c_rval[i])) cards.append(self.make_card(f"CDELT{i + 1}", c_delt[i])) cards.append(self.make_card(f"CSYER{i + 1}", c_syer[i])) # ROTATION MATRIX # add rotation of spatial axes c, s = np.cos(-angle), np.sin(-angle) cards.append(self.make_card("PC1_1", c)) cards.append(self.make_card("PC1_2", -s)) cards.append(self.make_card("PC2_1", s)) cards.append(self.make_card("PC2_2", c)) return cards def reorder_gris_data(data, reverse_axes=False): """ Reorder gris data for easier access Args: data: input data array reverse_axes: reverse the order of axes, necessary if writing the data to a fits file afterwards Returns: """ # add degenerate dimension to depict x and y for spatial coordinate # change axis order from (y,stokes,x,wave) to (x,y,wave,stokes) trans_data = np.swapaxes(data, 0, 1) trans_data = np.array([trans_data]) trans_data = np.swapaxes(trans_data, -1, -2) if reverse_axes: for i in range(int(len(trans_data.shape) / 2)): trans_data = np.swapaxes(trans_data, 0 + i, -1 - i) pass return trans_data