wcs.py 9.42 KB
Newer Older
1
import math
2
import re
3
from glob import glob
4
from math import isnan
5
from os.path import exists, join
Carl Schaffer's avatar
Carl Schaffer committed
6
7

import numpy as np
8
from astropy.io.fits import Card
Carl Schaffer's avatar
Carl Schaffer committed
9
from scipy.io.idl import readsav
10

Carl Schaffer's avatar
Carl Schaffer committed
11
12
from kis_tools.gris.GrisFitsFile import GrisFitsFile
from kis_tools.gris.headers.template_builder import HEADER_TEMPLATE
13
from kis_tools.util.calculations import rotate_around_first_coord
14
15


Carl Schaffer's avatar
Carl Schaffer committed
16
17
18
class GrisWCSGenerator(object):
    def __init__(self, fits_file, cross_correlation_file=None):
        """Initialize WCS Generator class, load all necessary data into class
19

Carl Schaffer's avatar
Carl Schaffer committed
20
21
22
23
24
25
26
27
28
29
30
31
        Args:
          fits_file: gris level1 split fits file corresponding to single slit position
          cross_correlation_file: IDL save file resulting from cross correlation with HMI

        """
        # Store information on run
        self.infile = GrisFitsFile(fits_file)

        # Assure that inputs are valid:
        if not exists(fits_file):
            message = f"FITS file {fits_file} not found."
            raise IOError(message)
32
33

        # Try to find a cross correlation file
Carl Schaffer's avatar
Carl Schaffer committed
34
        verb_name = self.infile.verbose_runname
35
36
37
38
39
40
        folder = join(self.infile.dirname, "../context_data/")
        pattern = join(folder, f"{verb_name}_coord_mu.sav")
        matches = glob(pattern)
        if matches != [] and cross_correlation_file is None:
            cross_correlation_file = matches[0]

Carl Schaffer's avatar
Carl Schaffer committed
41
42
43
44
45
        if cross_correlation_file is not None:
            if not exists(cross_correlation_file):
                message = f"Cross correlation save {cross_correlation_file} not found"
                raise IOError(message)

46
47
        self.cross_correlation_file = cross_correlation_file

Carl Schaffer's avatar
Carl Schaffer committed
48
49
50
51
52
53
        # retrieve fits data from disk
        self.header_in = self.infile.header
        self.data_in = self.infile.data

        # get coordinate info
        if cross_correlation_file is not None:
54
            self.coords = self.get_coords_from_save()
Carl Schaffer's avatar
Carl Schaffer committed
55
56
57
        else:
            self.coords = self.get_coords_from_header()

58
59
60



Carl Schaffer's avatar
Carl Schaffer committed
61
    def get_coords_from_header(self):
62
63
64
65
66
67
68
69
70
        """coordinates for slit, the Keywords 'SLITPOSX' and 'SLITPOSY' are assumed to be the center of the slit
         while the keyword 'SLITORIE' describes the rotation of the slit w.r.t. the helioprojective axes. The
         algorithm further assumes that each pixel represents a square area with side length of 'STEPSIZE'.
         These assumptions are used to calculate the coordinates for the center of each pixel within the data array.

        Returns:
            (X,Y):ndarray(float) x and y coordinates for each pixel of the slit
        """

71
        coords = self.infile.coords.value
72
        self.coord_uncertainties = self.infile.coord_uncertainty
73
        return coords[:, 0], coords[:, 1]
Carl Schaffer's avatar
Carl Schaffer committed
74

75
76
77
78
79
80
81
    def get_coords_from_save(self):
        """Extract coordinates from cross correlation file.  If the scan was performed with an angle of 0 degrees,
        the correlation file needs to be read in the opposite direction. This is accounted for.

        Returns:
            X,Y: ndarray(float) Slit coordinates in X and Y
        """
Carl Schaffer's avatar
Carl Schaffer committed
82
        fits_file = self.infile
83
84
85
86
87
88
89
90
91
92
93
94
95
        save_file = self.cross_correlation_file

        # nmaps = fits_file.number_of_maps
        # imaps = fits_file.map_index
        nsteps = fits_file.n_steps
        istep = fits_file.step_index
        angle = fits_file.step_angle

        save = readsav(save_file)
        full_coords = save["xy_coord"]

        # calculate index perpendicular to slit
        index = istep - 1
96
97
98
99
100
101

        # Adjust index according to derotator presence and scanning direction. The index needs to be flipped if
        # exactly one of these requirements is fulfilled. If both are fulfilled nothing needs to be done.
        reversed_scanning = angle == 0
        derotator_in = fits_file.derotator_in
        if reversed_scanning != derotator_in:
102
            index = nsteps - index - 1
103

104
105
106
107
        X = full_coords[0, index, :]
        Y = full_coords[1, index, :]
        return X, Y

108
109
    @property
    def header_template(self):
110
        df = HEADER_TEMPLATE
111
112
113
114
115
116
        return df

    def make_card(self, keyword, value):
        pattern = re.sub(r"\d+", "\*", keyword)  # noqa: F841, W605
        template = (
            self.header_template.query("KEYWORD.str.contains(@pattern)")
Carl Schaffer's avatar
Carl Schaffer committed
117
118
            .iloc[0]
            .to_dict()
119
120
121
122
123
        )

        number = re.search(r"(\d+)", keyword).group()
        comment = template["FITS COMMENT"]
        comment = re.sub(r"\*", number, comment)
124
125
126
127
128
129
130
131
132

        # fix NAN and round where necessary
        if isinstance(value, float):
            decimal_digits = template["decimal_digits"]
            if isnan(value):
                value = None
            elif not isnan(decimal_digits):
                value = round(value, int(decimal_digits))

133
134
135
        card = Card(keyword, value, comment)
        return card

Carl Schaffer's avatar
Carl Schaffer committed
136
137
138
139
140
141
142
143
    def slitpos_cards(self):
        """Generate Header cards for SLITPOSX and SLITPOSY"""
        X, Y = self.coords

        cards = []
        for key, coords in zip(["SLITPOSX", "SLITPOSY"], [X, Y]):
            template = (
                self.header_template.query("KEYWORD.str.contains(@key)")
Carl Schaffer's avatar
Carl Schaffer committed
144
145
                .iloc[0]
                .to_dict()
Carl Schaffer's avatar
Carl Schaffer committed
146
147
148
149
150
151
152
153
            )
            comment = template["FITS COMMENT"]
            value = coords.mean()
            card = Card(key, value, comment)
            cards.append(card)

        return cards

154
    def make_wcs_cards(self):
155
156
157
158
159
        """Generate WCS keywords. The spatial extent is represented by introducing a degenerate axis.
        The values specified in the header contain axes of SOLARX and SOLARY rotated to be parallel with
        the SOLARX axis. They the also contain a rotation matrix to orient the slit correctly within the
        coordinate system. This is necessary to make the specification of the values more intuitive."""

Carl Schaffer's avatar
Carl Schaffer committed
160
        shape = reorder_gris_data(self.data_in).shape
161
162
163
164
165
166
167
168
169

        def get_angle(x, y):
            v = np.array([x, y])
            u = np.array([0, 1])
            temp = np.dot(v, u) / (math.sqrt(np.dot(v, v)) * math.sqrt(np.dot(u, u)))
            angle = math.acos(temp)
            angle = -1 * angle if x < 0 else angle
            return angle

170
        X, Y = self.coords
171
        angle = get_angle(X[0] - X[-1], Y[0] - Y[-1])
172

173
        X_rot, Y_rot = rotate_around_first_coord(X, Y, angle)
174
        assert np.isclose(0, sum(X_rot - np.mean(X_rot)), 0.01)
175
        cards = []
176
177

        # COORDINATE TYPES
178
        c_types = ["HPLN-TAN", "HPLT-TAN", "WAVE", "STOKES"]
179
180

        # UNITS
181
        c_units = ["arcsec", "arcsec", "Angstrom", ""]
182

183
184
        # Reference Pixels
        c_rpix = [0, 0, 0, 0]
185
186
187
188
189

        # REFERENCE VALUES
        # the coordinates are rotated in such a way that they are parallel to the y-axis
        # which means that the y- value is the same along the entire axis. The information on the rotation is stored
        # in the PCi_j keywords
190
        c_rval = [X_rot[0], Y_rot[0], self.infile.wavelength_region[0], 1]
191
192
193
194
195
196

        # COORDINATE UNCERTAINTIES
        # set systematic error: Use estimated 2 arcsec if cross-correlation is successful, else use one solar disk
        # with 1000" because the only thing we can be sure of with current GREGOR coordinates is that we are indeed
        # pointing at the sun -.-
        has_ccorr = bool(self.cross_correlation_file)
197
198
199
        # Uncertainties estimated from difference between cross correlated coordinates and coordinates
        # from the telescope. They showed a significantly larger uncertainty for the Y coordinate. See documentation of
        # https://gitlab.leibniz-kis.de/sdc/sdc_importer/issues/199
200
201
        spatial_uncertainty_x = 5 if has_ccorr else self.coord_uncertainties[0]
        spatial_uncertainty_y = 5 if has_ccorr else self.coord_uncertainties[1]
202
        c_syer = [
203
204
            spatial_uncertainty_x,
            spatial_uncertainty_y,
205
206
207
208
209
            self.infile.wavelength_step_size,
            0,
        ]

        # STEP SIZES
210
        step_size = np.diff(Y_rot).mean()
211
212
213
214
215
216
217
218
219
        c_delt = [step_size, step_size, self.infile.wavelength_step_size, 1]
        for i in range(len(shape)):
            cards.append(self.make_card(f"NAXIS{i + 1}", shape[i]))
            cards.append(self.make_card(f"CTYPE{i + 1}", c_types[i]))
            cards.append(self.make_card(f"CUNIT{i + 1}", c_units[i]))
            cards.append(self.make_card(f"CRPIX{i + 1}", c_rpix[i]))
            cards.append(self.make_card(f"CRVAL{i + 1}", c_rval[i]))
            cards.append(self.make_card(f"CDELT{i + 1}", c_delt[i]))
            cards.append(self.make_card(f"CSYER{i + 1}", c_syer[i]))
220
221
222
223

        # ROTATION MATRIX
        # add rotation of spatial axes
        c, s = np.cos(-angle), np.sin(-angle)
224
225
226
227
        cards.append(self.make_card("PC1_1", c))
        cards.append(self.make_card("PC1_2", -s))
        cards.append(self.make_card("PC2_1", s))
        cards.append(self.make_card("PC2_2", c))
Carl Schaffer's avatar
Carl Schaffer committed
228

229
        return cards
230
231


232
233
234
235
236
237
def reorder_gris_data(data, reverse_axes=False):
    """
    Reorder gris data for easier access
    Args:
        data: input data array
        reverse_axes: reverse the order of axes, necessary if writing the data to a fits file afterwards
238

239
    Returns:
240

241
242
243
    """
    # add degenerate dimension to depict x and y for spatial coordinate
    # change axis order from (y,stokes,x,wave) to (x,y,wave,stokes)
244

245
246
    trans_data = np.swapaxes(data, 0, 1)
    trans_data = np.array([trans_data])
247

248
    trans_data = np.swapaxes(trans_data, -1, -2)
249

250
251
252
253
    if reverse_axes:
        for i in range(int(len(trans_data.shape) / 2)):
            trans_data = np.swapaxes(trans_data, 0 + i, -1 - i)
            pass
254

Carl Schaffer's avatar
Carl Schaffer committed
255
    return trans_data