gris.py 11.9 KB
Newer Older
1
import math
2
import re
3
from glob import glob
4
from math import isnan
5
from os.path import exists, join
Carl Schaffer's avatar
Carl Schaffer committed
6
7

import numpy as np
8
from astropy.io.fits import Card
Carl Schaffer's avatar
Carl Schaffer committed
9
from scipy.io.idl import readsav
10

Carl Schaffer's avatar
Carl Schaffer committed
11
12
from kis_tools.gris.GrisFitsFile import GrisFitsFile
from kis_tools.gris.headers.template_builder import HEADER_TEMPLATE
13
14
from kis_tools.gris.headers.wcs_generators.generic import WCSGenerator
from kis_tools.gris.ifu_fits_file import IFUFitsFile
15
from kis_tools.util.calculations import rotate_around_first_coord
16
17


18
class GrisWCSGenerator(WCSGenerator):
Carl Schaffer's avatar
Carl Schaffer committed
19
20
    def __init__(self, fits_file, cross_correlation_file=None):
        """Initialize WCS Generator class, load all necessary data into class
21

Carl Schaffer's avatar
Carl Schaffer committed
22
23
24
25
26
27
28
29
30
31
32
33
        Args:
          fits_file: gris level1 split fits file corresponding to single slit position
          cross_correlation_file: IDL save file resulting from cross correlation with HMI

        """
        # Store information on run
        self.infile = GrisFitsFile(fits_file)

        # Assure that inputs are valid:
        if not exists(fits_file):
            message = f"FITS file {fits_file} not found."
            raise IOError(message)
34
35

        # Try to find a cross correlation file
Carl Schaffer's avatar
Carl Schaffer committed
36
        verb_name = self.infile.verbose_runname
37
38
39
40
41
42
        folder = join(self.infile.dirname, "../context_data/")
        pattern = join(folder, f"{verb_name}_coord_mu.sav")
        matches = glob(pattern)
        if matches != [] and cross_correlation_file is None:
            cross_correlation_file = matches[0]

Carl Schaffer's avatar
Carl Schaffer committed
43
44
45
46
47
        if cross_correlation_file is not None:
            if not exists(cross_correlation_file):
                message = f"Cross correlation save {cross_correlation_file} not found"
                raise IOError(message)

48
49
        self.cross_correlation_file = cross_correlation_file

Carl Schaffer's avatar
Carl Schaffer committed
50
51
52
53
54
55
        # retrieve fits data from disk
        self.header_in = self.infile.header
        self.data_in = self.infile.data

        # get coordinate info
        if cross_correlation_file is not None:
56
            self.coords = self.get_coords_from_save()
Carl Schaffer's avatar
Carl Schaffer committed
57
58
59
60
        else:
            self.coords = self.get_coords_from_header()

    def get_coords_from_header(self):
61
62
63
64
65
66
67
68
69
        """coordinates for slit, the Keywords 'SLITPOSX' and 'SLITPOSY' are assumed to be the center of the slit
         while the keyword 'SLITORIE' describes the rotation of the slit w.r.t. the helioprojective axes. The
         algorithm further assumes that each pixel represents a square area with side length of 'STEPSIZE'.
         These assumptions are used to calculate the coordinates for the center of each pixel within the data array.

        Returns:
            (X,Y):ndarray(float) x and y coordinates for each pixel of the slit
        """

70
        coords = self.infile.coords.value
71
        self.coord_uncertainties = self.infile.coord_uncertainty
72
        return coords[:, 0], coords[:, 1]
Carl Schaffer's avatar
Carl Schaffer committed
73

74
75
76
77
78
79
80
    def get_coords_from_save(self):
        """Extract coordinates from cross correlation file.  If the scan was performed with an angle of 0 degrees,
        the correlation file needs to be read in the opposite direction. This is accounted for.

        Returns:
            X,Y: ndarray(float) Slit coordinates in X and Y
        """
Carl Schaffer's avatar
Carl Schaffer committed
81
        fits_file = self.infile
82
83
84
85
86
87
88
89
90
91
92
93
94
        save_file = self.cross_correlation_file

        # nmaps = fits_file.number_of_maps
        # imaps = fits_file.map_index
        nsteps = fits_file.n_steps
        istep = fits_file.step_index
        angle = fits_file.step_angle

        save = readsav(save_file)
        full_coords = save["xy_coord"]

        # calculate index perpendicular to slit
        index = istep - 1
95
96
97
98
99
100

        # Adjust index according to derotator presence and scanning direction. The index needs to be flipped if
        # exactly one of these requirements is fulfilled. If both are fulfilled nothing needs to be done.
        reversed_scanning = angle == 0
        derotator_in = fits_file.derotator_in
        if reversed_scanning != derotator_in:
101
            index = nsteps - index - 1
102

103
104
105
106
        X = full_coords[0, index, :]
        Y = full_coords[1, index, :]
        return X, Y

107
108
    @property
    def header_template(self):
109
        df = HEADER_TEMPLATE
110
111
112
113
114
115
        return df

    def make_card(self, keyword, value):
        pattern = re.sub(r"\d+", "\*", keyword)  # noqa: F841, W605
        template = (
            self.header_template.query("KEYWORD.str.contains(@pattern)")
116
117
                .iloc[0]
                .to_dict()
118
119
120
121
122
        )

        number = re.search(r"(\d+)", keyword).group()
        comment = template["FITS COMMENT"]
        comment = re.sub(r"\*", number, comment)
123
124
125
126
127
128
129
130
131

        # fix NAN and round where necessary
        if isinstance(value, float):
            decimal_digits = template["decimal_digits"]
            if isnan(value):
                value = None
            elif not isnan(decimal_digits):
                value = round(value, int(decimal_digits))

132
133
134
        card = Card(keyword, value, comment)
        return card

Carl Schaffer's avatar
Carl Schaffer committed
135
136
137
138
139
140
141
142
    def slitpos_cards(self):
        """Generate Header cards for SLITPOSX and SLITPOSY"""
        X, Y = self.coords

        cards = []
        for key, coords in zip(["SLITPOSX", "SLITPOSY"], [X, Y]):
            template = (
                self.header_template.query("KEYWORD.str.contains(@key)")
143
144
                    .iloc[0]
                    .to_dict()
Carl Schaffer's avatar
Carl Schaffer committed
145
146
147
148
149
150
151
152
            )
            comment = template["FITS COMMENT"]
            value = coords.mean()
            card = Card(key, value, comment)
            cards.append(card)

        return cards

153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
    def reordered_data(self):
        """
        Reorder gris data for easier access
        Args:
            data: input data array
            reverse_axes: reverse the order of axes, necessary if writing the data to a fits file afterwards

        Returns:

        """
        # add degenerate dimension to depict x and y for spatial coordinate
        # change axis order from (y,stokes,x,wave) to (x,y,wave,stokes)

        trans_data = np.swapaxes(self.infile.data, 0, 1)
        trans_data = np.array([trans_data])

        trans_data = np.swapaxes(trans_data, -1, -2)

        for i in range(int(len(trans_data.shape) / 2)):
            trans_data = np.swapaxes(trans_data, 0 + i, -1 - i)
            pass

        return trans_data

177
    def make_wcs_cards(self):
178
179
180
181
182
        """Generate WCS keywords. The spatial extent is represented by introducing a degenerate axis.
        The values specified in the header contain axes of SOLARX and SOLARY rotated to be parallel with
        the SOLARX axis. They the also contain a rotation matrix to orient the slit correctly within the
        coordinate system. This is necessary to make the specification of the values more intuitive."""

183
        shape = self.data_shape()
184
185
186
187
188
189
190
191
192

        def get_angle(x, y):
            v = np.array([x, y])
            u = np.array([0, 1])
            temp = np.dot(v, u) / (math.sqrt(np.dot(v, v)) * math.sqrt(np.dot(u, u)))
            angle = math.acos(temp)
            angle = -1 * angle if x < 0 else angle
            return angle

193
        X, Y = self.coords
194
        angle = get_angle(X[0] - X[-1], Y[0] - Y[-1])
195

196
        X_rot, Y_rot = self.rotate_reference_coords(X, Y, angle)
197
        assert np.isclose(0, sum(X_rot - np.mean(X_rot)), 0.01)
198
        cards = []
199
200

        # COORDINATE TYPES
201
        c_types = ["HPLN-TAN", "HPLT-TAN", "WAVE", "STOKES"]
202
203

        # UNITS
204
        c_units = ["arcsec", "arcsec", "Angstrom", ""]
205

206
        # Reference Pixels
207
208
        # CRPIX values are 1-indexed according to the fits standard
        c_rpix = [1, 1, 1, 1]
209
210
211
212
213

        # REFERENCE VALUES
        # the coordinates are rotated in such a way that they are parallel to the y-axis
        # which means that the y- value is the same along the entire axis. The information on the rotation is stored
        # in the PCi_j keywords
214
        c_rval = [X_rot[0], Y_rot[0], self.infile.wavelength_region[0], 1]
215
216
217
218
219
220

        # COORDINATE UNCERTAINTIES
        # set systematic error: Use estimated 2 arcsec if cross-correlation is successful, else use one solar disk
        # with 1000" because the only thing we can be sure of with current GREGOR coordinates is that we are indeed
        # pointing at the sun -.-
        has_ccorr = bool(self.cross_correlation_file)
221
222
223
        # Uncertainties estimated from difference between cross correlated coordinates and coordinates
        # from the telescope. They showed a significantly larger uncertainty for the Y coordinate. See documentation of
        # https://gitlab.leibniz-kis.de/sdc/sdc_importer/issues/199
224
225
        spatial_uncertainty_x = 5 if has_ccorr else self.coord_uncertainties[0]
        spatial_uncertainty_y = 5 if has_ccorr else self.coord_uncertainties[1]
226
        c_syer = [
227
228
            spatial_uncertainty_x,
            spatial_uncertainty_y,
229
230
231
232
233
            self.infile.wavelength_step_size,
            0,
        ]

        # STEP SIZES
234
        step_size = np.diff(Y_rot).mean()
235
236
237
238
239
240
241
242
243
        c_delt = [step_size, step_size, self.infile.wavelength_step_size, 1]
        for i in range(len(shape)):
            cards.append(self.make_card(f"NAXIS{i + 1}", shape[i]))
            cards.append(self.make_card(f"CTYPE{i + 1}", c_types[i]))
            cards.append(self.make_card(f"CUNIT{i + 1}", c_units[i]))
            cards.append(self.make_card(f"CRPIX{i + 1}", c_rpix[i]))
            cards.append(self.make_card(f"CRVAL{i + 1}", c_rval[i]))
            cards.append(self.make_card(f"CDELT{i + 1}", c_delt[i]))
            cards.append(self.make_card(f"CSYER{i + 1}", c_syer[i]))
244
245
246
247

        # ROTATION MATRIX
        # add rotation of spatial axes
        c, s = np.cos(-angle), np.sin(-angle)
248
249
250
251
        cards.append(self.make_card("PC1_1", c))
        cards.append(self.make_card("PC1_2", -s))
        cards.append(self.make_card("PC2_1", s))
        cards.append(self.make_card("PC2_2", c))
Carl Schaffer's avatar
Carl Schaffer committed
252

253
        return cards
254

255
256
257
258
259
260
261
    def rotate_reference_coords(self, X, Y, angle):
        """
        Rotate the reference coords by a given angle in radians
        for slit data we can directly rotate the x and y coords of the slit.
        """
        X_rot, Y_rot = rotate_around_first_coord(X, Y, angle)
        return X_rot, Y_rot
262

263
264
265
    def data_shape(self):
        shape = self.reordered_data().T.shape
        return shape
266
267
268
269
270
271
272
273
274
275
276


class GrisIFUWCSGenerator(GrisWCSGenerator):
    def __init__(self, fits_file):
        self.infile = IFUFitsFile(fits_file)
        self.cross_correlation_file = None
        # retrieve fits data from disk
        self.header_in = self.infile.header
        self.data_in = self.infile.data
        self.coords = self.get_coords_from_header()

277
    def get_coords_from_header(self):
278
279
        """
        Extract coordinates from the underlying data.
280
281

        Returns:
282
            (X,Y):ndarray(float) x and y coordinates in HPC/Arcsec for the leftmost column and topmost row respectively
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
        """

        coords = self.infile.coords.value
        self.coord_uncertainties = self.infile.coord_uncertainty
        return coords[0, :, 0], coords[1, 0, :]

    def reordered_data(self):
        """
        Reorder gris data for easier access
        Args:
            data: input data array
            reverse_axes: reverse the order of axes, necessary if writing the data to a fits file afterwards

        Returns:

        """

        data = self.infile.data
        return data

303
304
305
306
    def rotate_reference_coords(self, X, Y, angle):
        """
        Rotate the reference coords by a given angle in radians
        for IFU data we need to account for different sitzes in x and y direction.
307

308
309
310
311
        Essentially the code below is meant to take a horizontal slice with varying X values and a vertical slice with
        varying Y values, pad each with a constant value of the other coordinate and rotate them around the first
        coordinate.
        """
312
313
314
315
        X, Y_placeholder = [X, np.ones(X.shape) * Y.mean()]
        X_placeholder, Y = [np.ones(Y.shape) * X.mean(), Y]
        X_rot, _ = rotate_around_first_coord(X, Y_placeholder, angle)
        _, Y_rot = rotate_around_first_coord(X_placeholder, Y, angle)
316
        return X_rot, Y_rot
317

318
319
320
    def data_shape(self):
        shape = self.reordered_data().T.shape
        return shape