translator_classes.py 16.7 KB
Newer Older
1
import datetime
2
import re
Carl Schaffer's avatar
Carl Schaffer committed
3
from glob import glob
4
from os.path import join, dirname
5
from warnings import warn
6

7
import pandas as pd
8
9
10
from astropy import units as u
from astropy.coordinates import EarthLocation
from astropy.time import Time
11
from numpy import nan
12
13

from kis_tools.gris.headers.exceptions import NothingToDoError
14
from kis_tools.gris.headers.template_builder import gris_const_trans, gris_triv_trans, TRANS_SETTINGS, fitpar_kws
15
from kis_tools.gris.headers.translator import cache_translation, MetadataTranslator
16
from kis_tools.gris.util import get_observers
Carl Schaffer's avatar
Carl Schaffer committed
17
from kis_tools.util.calculations import get_distance_gregor_sun
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
from kis_tools.util.constants import gregor_coords
from kis_tools.util.util import gris_obs_mode, date_from_fn, gris_run_number


class FitsTranslator(MetadataTranslator):
    """Metadata translator for FITS standard headers.

    Understands:

    - DATE-OBS
    - INSTRUME
    - TELESCOP
    - OBSGEO-[X,Y,Z]

    """

    _wcs_kw_patterns = [
        r"CTYPE\d+",
        r"CUNIT\d+",
        r"CRPIX\d+",
        r"CRVAL\d+",
        r"CSYER\d+",
        r"CDELT\d+",
        r"PC\d+_\d+",
    ]

    @classmethod
    def can_translate(cls, header, filename=None):
        """Indicate whether this translation class can translate the
        supplied header.

        Checks the instrument value and compares with the supported
        instruments in the class

        Parameters
        ----------
        header : `dict`-like
            Header to convert to standardized form.
        filename : `str`, optional
            Name of file being translated.

        Returns
        -------
        can : `bool`
            `True` if the header is recognized by this class. `False`
            otherwise.
        """
        if cls.supported_instrument is None:
            return False

        # Protect against being able to always find a standard
        # header for instrument
        try:
            translator = cls(header, filename=filename)
            instrument = translator.to_instrument()
        except KeyError:
            return False

        return instrument == cls.supported_instrument

    @classmethod
    def _from_fits_date_string(cls, date_str, scale="utc", time_str=None):
        """Parse standard FITS ISO-style date string and return time object

        Parameters
        ----------
        date_str : `str`
            FITS format date string to convert to standard form. Bypasses
            lookup in the header.
        scale : `str`, optional
            Override the time scale from the TIMESYS header. Defaults to
            UTC.
        time_str : `str`, optional
            If provided, overrides any time component in the ``dateStr``,
            retaining the YYYY-MM-DD component and appending this time
            string, assumed to be of format HH:MM::SS.ss.

        Returns
        -------
        date : `astropy.time.Time`
            `~astropy.time.Time` representation of the date.
        """
        if time_str is not None:
            date_str = "{}T{}".format(date_str[:10], time_str)

        return Time(date_str, format="isot", scale=scale)

    def _from_fits_date(self, date_key):
        """Calculate a date object from the named FITS header

        Uses the TIMESYS header if present to determine the time scale,
        defaulting to UTC.

        Parameters
        ----------
        dateKey : `str`
            The key in the header representing a standard FITS
            ISO-style date.

        Returns
        -------
        date : `astropy.time.Time`
            `~astropy.time.Time` representation of the date.
        """
        used = [date_key]
        if "TIMESYS" in self._header:
            scale = self._header["TIMESYS"].lower()
            used.append("TIMESYS")
        else:
            scale = "utc"
        if date_key in self._header:
            date_str = self._header[date_key]
            value = self._from_fits_date_string(date_str, scale=scale)
            self._used_these_cards(*used)
        else:
            value = None
        return value

    @cache_translation
    def to_datetime_begin(self):
        """Calculate start time of observation.

        Uses FITS standard ``DATE-OBS`` and ``TIMESYS`` headers.

        Returns
        -------
        start_time : `astropy.time.Time`
            Time corresponding to the start of the observation.
        """
        return self._from_fits_date("DATE-OBS")

    @cache_translation
    def to_datetime_end(self):
        """Calculate end time of observation.

        Uses FITS standard ``DATE-END`` and ``TIMESYS`` headers.

        Returns
        -------
        start_time : `astropy.time.Time`
            Time corresponding to the end of the observation.
        """
        return self._from_fits_date("DATE-END")

    @cache_translation
    def to_location(self):
        """Calculate the observatory location.

        Uses FITS standard ``OBSGEO-`` headers.

        Returns
        -------
        location : `astropy.coordinates.EarthLocation`
            An object representing the location of the telescope.
        """
        cards = [f"OBSGEO-{c}" for c in ("X", "Y", "Z")]
        coords = [self._header[c] for c in cards]
        value = EarthLocation.from_geocentric(*coords, unit=u.m)
        self._used_these_cards(*cards)
        return value

    def from_default(self, kw, default):
        """Generate a translation method which keeps the value of a given Keyword and fills a default if no
        value is given"""

        def check_or_default():
            if kw in self._header.keys():
                return self._header[kw]
            else:
                return default

        setattr(self, f"to_{kw}", check_or_default)

    def from_float_or_nan(self, kw):
        """Generate a translation method which tries to convert a given Keyword to float and fills a NaN if no
        value is given"""

        def try_float_else_nan():
            val = self._header[kw]
            try:
                val = float(val)
                return val
            except ValueError:
                return nan
            except TypeError:
                return nan

        setattr(self, f"to_{kw}", try_float_else_nan)

    def add_wcs_methods(self):
        h = self._header
        patterns = self._wcs_kw_patterns

        for p in patterns:
            pattern = re.compile(p)
            kws = filter(pattern.match, h.keys())
            for k in kws:
                def constant_translator():
                    return h[k]

                self.from_default(k, h[k])

        pass

    @property
    def wcs_cards(self):
        """Retrieve WCS-related cards from header. Cards are Identified by matching with the wcs_patterns property
        Returns:
            cards: list of header cards
        """
        h = self._header
        patterns = self._wcs_kw_patterns
        cards = [
            c
            for c in h._cards
            if any([re.search(pattern, c.keyword) for pattern in patterns])
        ]

        return cards


class KISTranslator(FitsTranslator):
    """Translator class for building KIS headers. It is intended to be used as a base class for all
    telescope-specific translators."""

    def _to_ORIGIN(self):
        return self.default_if_empty(
            "ORIGIN", "Leibniz Insitute for Solar Physics (KIS)"
        )


class GREGORTranslator(KISTranslator):
    """Translator class for building GREGOR headers. It is intended to be used as a base class for all
    instrument-specific translators."""

    def to_TELESCOP(self):
        return self.default_if_empty("TELESCOP", "GREGOR")

    def to_OBSRVTRY(self):
        return self.default_if_empty("OBSRVTRY", "Teide Obseratory")
Carl Schaffer's avatar
Carl Schaffer committed
258

259
260
261
    def __init__(self, *args, **kwargs):
        self._coords = gregor_coords.to_geocentric()
        super().__init__(*args, **kwargs)
262

263
264
    def to_OBSGEO_X(self):
        return self._coords[0].value
265

266
267
    def to_OBSGEO_Y(self):
        return self._coords[1].value
268

269
270
    def to_OBSGEO_Z(self):
        return self._coords[2].value
271
272


273
class GrisTranslator(GREGORTranslator):
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
    """Translate Gris Headers"""

    name = "Gris"
    supported_instrument = "Gris"

    @classmethod
    def can_translate(cls, header, filename=None):
        """Indicate whether this translation class can translate the
        supplied header.

        Parameters
        ----------
        header : `dict`-like
            Header to convert to standardized form.
        filename : `str`, optional
            Name of file being translated.

        Returns
        -------
        can : `bool`
            `True` if the header is recognized by this class. `False`
            otherwise.
        """
297
        if "SOLARNET" in header:
298
            raise NothingToDoError("This header is already up to date!")
299

300
        if "TELESCOP" in header:
301
            is_gregor = header["TELESCOP"] == "GREGOR"
Carl Schaffer's avatar
Carl Schaffer committed
302
            if is_gregor and all([key in header.keys() for key in ["FF1WLOFF"]]):
303
304
                return True
            else:
Carl Schaffer's avatar
Carl Schaffer committed
305
                warn(
306
307
                    "GrisTranslator: got a file which does not contain FTS-fitting results or has not been split."
                    "This translator needs a split file created with recent routines as input."
308
309
                )

310
311
        return False

312
313
314
    _const_map = gris_const_trans

    _trivial_map = gris_triv_trans
315
316
317

    @cache_translation
    def to_observation_id(self):
Carl Schaffer's avatar
Carl Schaffer committed
318
        """ observation id is given by DAY_RUN"""
Carl Schaffer's avatar
Carl Schaffer committed
319
        date = self._from_fits_date_string(self._header["DATE-OBS"]).strftime("%Y%m%d")
320
321
322
323
        iobs = int(self._header["IOBS"])
        value = f"{date}_{iobs:03d}"
        return value

324
    def to_EXTNAME(self):
Carl Schaffer's avatar
Carl Schaffer committed
325
326
        """EXTNAME as hdu for a given combination of day, run,
        map,slitposition"""
327
328
329
        iserie = int(self._header["ISERIE"])
        istep = int(self._header["ISTEP"])
        value = self.to_observation_id()
330
        value += f"_{iserie:03d}_{istep:04d}"
331
332
        return value

333
    def to_POINT_ID(self):
Carl Schaffer's avatar
Carl Schaffer committed
334
        """Assume that telescope is re-pointed for each run"""
335
        return self.to_observation_id()
336

Carl Schaffer's avatar
Carl Schaffer committed
337
338
    @staticmethod
    def to_DATE():
Carl Schaffer's avatar
Carl Schaffer committed
339
        """Date of FITS creation"""
340
341
        return datetime.date.today()

Carl Schaffer's avatar
Carl Schaffer committed
342
343
    @cache_translation
    def to_DSUN_OBS(self):
Carl Schaffer's avatar
Carl Schaffer committed
344
345
346
        """Distance between instrument and sun center"""
        return get_distance_gregor_sun(self.to_DATE_BEG())

347
348
349
350
351
352
353
354
    @cache_translation
    def to_DATE_BEG(self):
        """Calculate start time of observation.

        Uses FITS standard ``DATE-OBS``

        Returns
        -------
Carl Schaffer's avatar
Carl Schaffer committed
355
        time : datetime.datetime
356
357
358
359
360
361
362
363
            Time corresponding to the start of the observation.
        """
        ut = self._header["UT"]
        do = self._header["DATE-OBS"]
        string = do + " " + ut.strip()
        time = datetime.datetime.strptime(string, "%Y-%m-%d %H:%M:%S.%f")
        return time

364
    def to_DATE_OBS(self):
Carl Schaffer's avatar
Carl Schaffer committed
365
        """Added for compatibility, set to beginnning of observation
366
367
368
369
370
371
372
373
374

        Returns
        -------
        time : datetime.datetime
            Time corresponding to the start of the observation.
        """
        time = self.to_DATE_BEG()
        return time

Carl Schaffer's avatar
Carl Schaffer committed
375
376
    @cache_translation
    def to_AWAVLNTH(self):
Carl Schaffer's avatar
Carl Schaffer committed
377
        """Get approximate wavelength in air"""
Carl Schaffer's avatar
Carl Schaffer committed
378
        wl = int(self._header["WAVELENG"])
379
380
        if wl == 1564:
            wl = 1565
Carl Schaffer's avatar
Carl Schaffer committed
381
382
        return wl * 10

383
384
    @cache_translation
    def to_AWAVMIN(self):
Carl Schaffer's avatar
Carl Schaffer committed
385
386
        """Minimum wavelength is calculated from the wavelength offset
        determined by the FTS fit, average if two flats are given"""
387
388
389
390
391
392
393
394
395
396
        ff1 = self._header["FF1WLOFF"]
        ff2 = self._header["FF2WLOFF"]
        try:
            value = (ff1 + ff2) / 2
        except TypeError:
            value = ff1
        return value

    @cache_translation
    def to_AWAVMAX(self):
Carl Schaffer's avatar
Carl Schaffer committed
397
398
        """Maximum wavelength is calculated from the wavelength offset
        determined by the FTS fit plus the number of pixels along the
Carl Schaffer's avatar
Carl Schaffer committed
399
        wavelength axis times the dispersion. average if two flats are
Carl Schaffer's avatar
Carl Schaffer committed
400
401
        given"""

402
403
404
405
406
407
408
        ff1 = self._header["FF1WLDSP"]
        ff2 = self._header["FF2WLDSP"]
        nsteps = self._header["NAXIS1"] - 1
        try:
            dispersion = (ff1 + ff2) / 2
        except TypeError:
            dispersion = ff1
Carl Schaffer's avatar
Carl Schaffer committed
409
410
        value = dispersion * nsteps + self.to_AWAVMIN()
        return value
411
412
413

    @cache_translation
    def to_OBS_MODE(self):
Carl Schaffer's avatar
Carl Schaffer committed
414
415
416
        """Observation mode is determined by the number of observed
        states,the stepping settings and the number of observed
        maps."""
417
418
419
420
421
422
423
424
425
        n_maps = self.to_NMAPS()
        n_steps = self.to_NSTEPS()
        step_size = self._header["STEPSIZE"]
        naxis = self._header["NAXIS"]
        if naxis == 2:
            states = 1
        else:
            states = self._header[f"NAXIS{naxis}"]
        mode_string = gris_obs_mode(states, n_maps, n_steps, step_size)
426
427
428

        return mode_string

Carl Schaffer's avatar
Carl Schaffer committed
429
430
431
432
    def to_TEXPOSUR(self):
        """Implemented for Solarsoft compatibility in seconds"""
        return self._header["EXPTIME"] / 1000

433
    @cache_translation
434
    def to_XPOSURE(self):
Carl Schaffer's avatar
Carl Schaffer committed
435
        """Exposure time for the entire file in seconds"""
436
        n_accumulations = self.to_NSUMEXP()
437
438
        exptime = self.to_TEXPOSUR()
        return exptime * n_accumulations
439

440
    def to_EXPTIME(self):
Carl Schaffer's avatar
Carl Schaffer committed
441
        """Implemented for Solarsoft compatibility in seconds"""
442
443
        return self.to_XPOSURE()

Carl Schaffer's avatar
Carl Schaffer committed
444
445
446
    @cache_translation
    def to_OBSERVER(self):
        """Observer"""
Carl Schaffer's avatar
Carl Schaffer committed
447
        logfiles = glob(join(dirname(dirname(self.filename)), "????????.txt"))
Carl Schaffer's avatar
Carl Schaffer committed
448
449
450
451
452
        if not logfiles:
            return "Unknown"
        logfile = logfiles[0]
        observers = get_observers(logfile)
        to_header = []
Carl Schaffer's avatar
Carl Schaffer committed
453
        sep = ", "
Carl Schaffer's avatar
Carl Schaffer committed
454
        for o in observers:
Carl Schaffer's avatar
Carl Schaffer committed
455
456
457
            length = sum([len(th) for th in to_header]) + (len(to_header) - 1) * len(
                sep
            )
Carl Schaffer's avatar
Carl Schaffer committed
458
459
460
461
462
463
464
465
            if length + len(o) + len(sep) < 70:
                to_header.append(o)
            else:
                break
        res = sep.join(to_header)
        assert len(res) < 70
        return res

466
467
    @cache_translation
    def to_AO_LOCK(self):
Carl Schaffer's avatar
Carl Schaffer committed
468
        """Adaptive optics locking"""
469
470
471
472
473
474
475
476
        state = self._header["AOSTATE"].strip()
        if state == "on":
            return 1.0
        elif state == "off":
            return 0.0
        else:
            return nan

Carl Schaffer's avatar
Carl Schaffer committed
477
    def to_OBS_TRGT(self):
Carl Schaffer's avatar
Carl Schaffer committed
478
479
        """Parsed from tagging file, if not in tagging file, check
        header for tag, else default to not_tagged"""
Carl Schaffer's avatar
Carl Schaffer committed
480
        fn = self._header["FILENAME"]
Carl Schaffer's avatar
Carl Schaffer committed
481
        date = date_from_fn(fn).strftime("%Y-%m-%d")
Carl Schaffer's avatar
Carl Schaffer committed
482
483
484
485
        run = gris_run_number(fn)
        try:
            return self.tags.at[(date, run), "main_tag"]
        except KeyError:
Carl Schaffer's avatar
Carl Schaffer committed
486
487
488
489
            if "TARGET" in self._header:
                return self._header["TARGET"]
            else:
                return "not tagged"
Carl Schaffer's avatar
Carl Schaffer committed
490
491

    def get_fitpars(self, fitpar_kw):
Carl Schaffer's avatar
Carl Schaffer committed
492
493
494
495
        """Generate a dictionary of keywords for the parameters of the
        Polynomial determined by the FTS fit. The dictionary is later
        expanded to single keywords."""

Carl Schaffer's avatar
Carl Schaffer committed
496
497
498
499
500
        def get_all_parameters():
            npoly_key = fitpar_kw[:3] + "NPOLY"
            try:
                npoly = int(self._header[npoly_key])
            except ValueError:
501
                return {fitpar_kw: None}
Carl Schaffer's avatar
Carl Schaffer committed
502
503
            except TypeError:
                return {fitpar_kw: None}
Carl Schaffer's avatar
Carl Schaffer committed
504
            vals = {}
Carl Schaffer's avatar
Carl Schaffer committed
505
            for i in range(npoly + 1):
Carl Schaffer's avatar
Carl Schaffer committed
506
                par_key = fitpar_kw.replace("nn", f"{i:02d}")
Carl Schaffer's avatar
Carl Schaffer committed
507
508
509
                try:
                    vals[par_key] = self._header[par_key]
                except KeyError:
Carl Schaffer's avatar
Carl Schaffer committed
510
511
                    message = f"Warning: could not find '{par_key}' in header."
                    message += (
Carl Schaffer's avatar
Carl Schaffer committed
512
                        " Check that the correct number of parameters has entered "
Carl Schaffer's avatar
Carl Schaffer committed
513
                    )
Carl Schaffer's avatar
Carl Schaffer committed
514
                    message += "the header and re-run calibration if necessary!"
Carl Schaffer's avatar
Carl Schaffer committed
515
516
                    warn(message)

Carl Schaffer's avatar
Carl Schaffer committed
517
518
519
520
            return vals

        setattr(self, f"to_{fitpar_kw}", get_all_parameters)

521
    def __init__(self, *args, **kwargs):
Carl Schaffer's avatar
Carl Schaffer committed
522
        """Parse tag_file, set up translations for keywords with
523
        default value, setup fit result keywords"""
524
        super(GrisTranslator, self).__init__(*args, **kwargs)
Carl Schaffer's avatar
Carl Schaffer committed
525
526

        self.tags = pd.read_csv(
Carl Schaffer's avatar
Carl Schaffer committed
527
            TRANS_SETTINGS["tag_file"],
Carl Schaffer's avatar
Carl Schaffer committed
528
529
530
531
            parse_dates=["date"],
            index_col=["date", "run"],
        )

532
        self.from_default("ROTCODE", -1)
533
        self.from_default("ROTTRACK", None)
534
535
        self.from_default("IMGSYS", "SLIT")
        self.from_default("SLITCNTR", "scancntr")
Carl Schaffer's avatar
Carl Schaffer committed
536
        self.from_default("ROTANGLE", 0)
537
538
        self.from_default("BSCALE", 1)
        self.from_default("BZERO", 0)
Carl Schaffer's avatar
Carl Schaffer committed
539
540
541
542
543
544
545
546
547
548

        fit_results = ["WLOFF", "WLDSP", "FWHMA", "FWHMP", "STRAY", "NPOLY"]
        for fit_res in fit_results:
            for i_flat in [1, 2]:
                kw = f"FF{i_flat}{fit_res}"
                self.from_float_or_nan(kw)

        # add fit res parameters
        for fitpar_kw in fitpar_kws:
            self.get_fitpars(fitpar_kw)