wcs.py 10.6 KB
Newer Older
1
import math
2
import re
3
from glob import glob
4
from math import isnan
5
from os.path import exists, join
6
from pathlib import Path
Carl Schaffer's avatar
Carl Schaffer committed
7

8
import joblib
Carl Schaffer's avatar
Carl Schaffer committed
9
import numpy as np
10
from astropy.io.fits import Card
11
from pandas import to_datetime
Carl Schaffer's avatar
Carl Schaffer committed
12
from scipy.io.idl import readsav
13

Carl Schaffer's avatar
Carl Schaffer committed
14
15
from kis_tools.gris.GrisFitsFile import GrisFitsFile
from kis_tools.gris.headers.template_builder import HEADER_TEMPLATE
16
from kis_tools.util.calculations import rotate_around_first_coord
17

18
19
20
_coord_model_path = Path(__file__).parent.parent / 'gris_coordinate_study' / 'multi_tree_stdx38_stdy36.gz'
_coord_model = joblib.load(_coord_model_path)

21

Carl Schaffer's avatar
Carl Schaffer committed
22
23
24
class GrisWCSGenerator(object):
    def __init__(self, fits_file, cross_correlation_file=None):
        """Initialize WCS Generator class, load all necessary data into class
25

Carl Schaffer's avatar
Carl Schaffer committed
26
27
28
29
30
31
32
33
34
35
36
37
        Args:
          fits_file: gris level1 split fits file corresponding to single slit position
          cross_correlation_file: IDL save file resulting from cross correlation with HMI

        """
        # Store information on run
        self.infile = GrisFitsFile(fits_file)

        # Assure that inputs are valid:
        if not exists(fits_file):
            message = f"FITS file {fits_file} not found."
            raise IOError(message)
38
39

        # Try to find a cross correlation file
Carl Schaffer's avatar
Carl Schaffer committed
40
        verb_name = self.infile.verbose_runname
41
42
43
44
45
46
        folder = join(self.infile.dirname, "../context_data/")
        pattern = join(folder, f"{verb_name}_coord_mu.sav")
        matches = glob(pattern)
        if matches != [] and cross_correlation_file is None:
            cross_correlation_file = matches[0]

Carl Schaffer's avatar
Carl Schaffer committed
47
48
49
50
51
        if cross_correlation_file is not None:
            if not exists(cross_correlation_file):
                message = f"Cross correlation save {cross_correlation_file} not found"
                raise IOError(message)

52
53
        self.cross_correlation_file = cross_correlation_file

Carl Schaffer's avatar
Carl Schaffer committed
54
55
56
57
58
59
        # retrieve fits data from disk
        self.header_in = self.infile.header
        self.data_in = self.infile.data

        # get coordinate info
        if cross_correlation_file is not None:
60
            self.coords = self.get_coords_from_save()
Carl Schaffer's avatar
Carl Schaffer committed
61
62
63
        else:
            self.coords = self.get_coords_from_header()

64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
    def get_coords_ml(self):
        """
        Determine the coordinates of the observation from a pre-trained statistical model.
        The model was trained trained and evaluated against a dataset of original data and
        results of a manual cross correlation with hmi. Albeit not being totaly accurate,
        the model has been shown to systematically reduce the distance between the given
        and the actual coodinates. The typical accuracy (stdev) is around 40"
        Returns: x,y tuple of the estimated observation center

        """
        features = ['SLITPOSX', 'SLITPOSY', 'AZIMUT', 'P0ANGLE']
        vals = [self.infile.header[f] for f in features]

        # Insert bogus parallax angle, it is 0 for all cases I observed
        # so far anyway plus is not used very strongly by the model
        vals.insert(2, 0)
        date = to_datetime(self.infile['DATE-OBS'] + self.infile['UT'])

        # Insert Zenit
        vals.insert(4, 90 - self.infile.header['ELEVATIO'])
        vals.append(date.day_of_year)
        vals.append(date.year)
        return _coord_model.predict(np.array(vals).reshape(-1, 1).T)

Carl Schaffer's avatar
Carl Schaffer committed
88
    def get_coords_from_header(self):
89
90
91
92
93
94
95
96
97
        """coordinates for slit, the Keywords 'SLITPOSX' and 'SLITPOSY' are assumed to be the center of the slit
         while the keyword 'SLITORIE' describes the rotation of the slit w.r.t. the helioprojective axes. The
         algorithm further assumes that each pixel represents a square area with side length of 'STEPSIZE'.
         These assumptions are used to calculate the coordinates for the center of each pixel within the data array.

        Returns:
            (X,Y):ndarray(float) x and y coordinates for each pixel of the slit
        """

98
        coords = self.infile.coords.value
99
        return coords[:, 0], coords[:, 1]
Carl Schaffer's avatar
Carl Schaffer committed
100

101
102
103
104
105
106
107
    def get_coords_from_save(self):
        """Extract coordinates from cross correlation file.  If the scan was performed with an angle of 0 degrees,
        the correlation file needs to be read in the opposite direction. This is accounted for.

        Returns:
            X,Y: ndarray(float) Slit coordinates in X and Y
        """
Carl Schaffer's avatar
Carl Schaffer committed
108
        fits_file = self.infile
109
110
111
112
113
114
115
116
117
118
119
120
121
        save_file = self.cross_correlation_file

        # nmaps = fits_file.number_of_maps
        # imaps = fits_file.map_index
        nsteps = fits_file.n_steps
        istep = fits_file.step_index
        angle = fits_file.step_angle

        save = readsav(save_file)
        full_coords = save["xy_coord"]

        # calculate index perpendicular to slit
        index = istep - 1
122
123
124
125
126
127

        # Adjust index according to derotator presence and scanning direction. The index needs to be flipped if
        # exactly one of these requirements is fulfilled. If both are fulfilled nothing needs to be done.
        reversed_scanning = angle == 0
        derotator_in = fits_file.derotator_in
        if reversed_scanning != derotator_in:
128
            index = nsteps - index - 1
129

130
131
132
133
        X = full_coords[0, index, :]
        Y = full_coords[1, index, :]
        return X, Y

134
135
    @property
    def header_template(self):
136
        df = HEADER_TEMPLATE
137
138
139
140
141
142
        return df

    def make_card(self, keyword, value):
        pattern = re.sub(r"\d+", "\*", keyword)  # noqa: F841, W605
        template = (
            self.header_template.query("KEYWORD.str.contains(@pattern)")
Carl Schaffer's avatar
Carl Schaffer committed
143
144
            .iloc[0]
            .to_dict()
145
146
147
148
149
        )

        number = re.search(r"(\d+)", keyword).group()
        comment = template["FITS COMMENT"]
        comment = re.sub(r"\*", number, comment)
150
151
152
153
154
155
156
157
158

        # fix NAN and round where necessary
        if isinstance(value, float):
            decimal_digits = template["decimal_digits"]
            if isnan(value):
                value = None
            elif not isnan(decimal_digits):
                value = round(value, int(decimal_digits))

159
160
161
        card = Card(keyword, value, comment)
        return card

Carl Schaffer's avatar
Carl Schaffer committed
162
163
164
165
166
167
168
169
    def slitpos_cards(self):
        """Generate Header cards for SLITPOSX and SLITPOSY"""
        X, Y = self.coords

        cards = []
        for key, coords in zip(["SLITPOSX", "SLITPOSY"], [X, Y]):
            template = (
                self.header_template.query("KEYWORD.str.contains(@key)")
Carl Schaffer's avatar
Carl Schaffer committed
170
171
                .iloc[0]
                .to_dict()
Carl Schaffer's avatar
Carl Schaffer committed
172
173
174
175
176
177
178
179
            )
            comment = template["FITS COMMENT"]
            value = coords.mean()
            card = Card(key, value, comment)
            cards.append(card)

        return cards

180
    def make_wcs_cards(self):
181
182
183
184
185
        """Generate WCS keywords. The spatial extent is represented by introducing a degenerate axis.
        The values specified in the header contain axes of SOLARX and SOLARY rotated to be parallel with
        the SOLARX axis. They the also contain a rotation matrix to orient the slit correctly within the
        coordinate system. This is necessary to make the specification of the values more intuitive."""

Carl Schaffer's avatar
Carl Schaffer committed
186
        shape = reorder_gris_data(self.data_in).shape
187
188
189
190
191
192
193
194
195

        def get_angle(x, y):
            v = np.array([x, y])
            u = np.array([0, 1])
            temp = np.dot(v, u) / (math.sqrt(np.dot(v, v)) * math.sqrt(np.dot(u, u)))
            angle = math.acos(temp)
            angle = -1 * angle if x < 0 else angle
            return angle

196
        X, Y = self.coords
197
        angle = get_angle(X[0] - X[-1], Y[0] - Y[-1])
198

199
        X_rot, Y_rot = rotate_around_first_coord(X, Y, angle)
200
        assert np.isclose(0, sum(X_rot - np.mean(X_rot)), 0.01)
201
        cards = []
202
203

        # COORDINATE TYPES
204
        c_types = ["HPLN-TAN", "HPLT-TAN", "WAVE", "STOKES"]
205
206

        # UNITS
207
        c_units = ["arcsec", "arcsec", "Angstrom", ""]
208

209
210
        # Reference Pixels
        c_rpix = [0, 0, 0, 0]
211
212
213
214
215

        # REFERENCE VALUES
        # the coordinates are rotated in such a way that they are parallel to the y-axis
        # which means that the y- value is the same along the entire axis. The information on the rotation is stored
        # in the PCi_j keywords
216
        c_rval = [X_rot[0], Y_rot[0], self.infile.wavelength_region[0], 1]
217
218
219
220
221
222

        # COORDINATE UNCERTAINTIES
        # set systematic error: Use estimated 2 arcsec if cross-correlation is successful, else use one solar disk
        # with 1000" because the only thing we can be sure of with current GREGOR coordinates is that we are indeed
        # pointing at the sun -.-
        has_ccorr = bool(self.cross_correlation_file)
223
224
225
226
227
        # Uncertainties estimated from difference between cross correlated coordinates and coordinates
        # from the telescope. They showed a significantly larger uncertainty for the Y coordinate. See documentation of
        # https://gitlab.leibniz-kis.de/sdc/sdc_importer/issues/199
        spatial_uncertainty_x = 5 if has_ccorr else 141
        spatial_uncertainty_y = 5 if has_ccorr else 186
228
        c_syer = [
229
230
            spatial_uncertainty_x,
            spatial_uncertainty_y,
231
232
233
234
235
            self.infile.wavelength_step_size,
            0,
        ]

        # STEP SIZES
236
        step_size = np.diff(Y_rot).mean()
237
238
239
240
241
242
243
244
245
        c_delt = [step_size, step_size, self.infile.wavelength_step_size, 1]
        for i in range(len(shape)):
            cards.append(self.make_card(f"NAXIS{i + 1}", shape[i]))
            cards.append(self.make_card(f"CTYPE{i + 1}", c_types[i]))
            cards.append(self.make_card(f"CUNIT{i + 1}", c_units[i]))
            cards.append(self.make_card(f"CRPIX{i + 1}", c_rpix[i]))
            cards.append(self.make_card(f"CRVAL{i + 1}", c_rval[i]))
            cards.append(self.make_card(f"CDELT{i + 1}", c_delt[i]))
            cards.append(self.make_card(f"CSYER{i + 1}", c_syer[i]))
246
247
248
249

        # ROTATION MATRIX
        # add rotation of spatial axes
        c, s = np.cos(-angle), np.sin(-angle)
250
251
252
253
        cards.append(self.make_card("PC1_1", c))
        cards.append(self.make_card("PC1_2", -s))
        cards.append(self.make_card("PC2_1", s))
        cards.append(self.make_card("PC2_2", c))
Carl Schaffer's avatar
Carl Schaffer committed
254

255
        return cards
256
257


258
259
260
261
262
263
def reorder_gris_data(data, reverse_axes=False):
    """
    Reorder gris data for easier access
    Args:
        data: input data array
        reverse_axes: reverse the order of axes, necessary if writing the data to a fits file afterwards
264

265
    Returns:
266

267
268
269
    """
    # add degenerate dimension to depict x and y for spatial coordinate
    # change axis order from (y,stokes,x,wave) to (x,y,wave,stokes)
270

271
272
    trans_data = np.swapaxes(data, 0, 1)
    trans_data = np.array([trans_data])
273

274
    trans_data = np.swapaxes(trans_data, -1, -2)
275

276
277
278
279
    if reverse_axes:
        for i in range(int(len(trans_data.shape) / 2)):
            trans_data = np.swapaxes(trans_data, 0 + i, -1 - i)
            pass
280

Carl Schaffer's avatar
Carl Schaffer committed
281
    return trans_data