translator_classes.py 16.9 KB
Newer Older
1
import datetime
2
import re
Carl Schaffer's avatar
Carl Schaffer committed
3
from glob import glob
4
from os.path import join, dirname, basename
5
from warnings import warn
6

7
import pandas as pd
8
9
10
from astropy import units as u
from astropy.coordinates import EarthLocation
from astropy.time import Time
11
from numpy import nan
12
13

from kis_tools.gris.headers.exceptions import NothingToDoError
14
from kis_tools.gris.headers.template_builder import gris_const_trans, gris_triv_trans, TRANS_SETTINGS, fitpar_kws
15
from kis_tools.gris.headers.translator import cache_translation, MetadataTranslator
16
from kis_tools.gris.util import get_observers
Carl Schaffer's avatar
Carl Schaffer committed
17
from kis_tools.util.calculations import get_distance_gregor_sun
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
from kis_tools.util.constants import gregor_coords
from kis_tools.util.util import gris_obs_mode, date_from_fn, gris_run_number


class FitsTranslator(MetadataTranslator):
    """Metadata translator for FITS standard headers.

    Understands:

    - DATE-OBS
    - INSTRUME
    - TELESCOP
    - OBSGEO-[X,Y,Z]

    """

    _wcs_kw_patterns = [
        r"CTYPE\d+",
        r"CUNIT\d+",
        r"CRPIX\d+",
        r"CRVAL\d+",
        r"CSYER\d+",
        r"CDELT\d+",
        r"PC\d+_\d+",
    ]

    @classmethod
    def can_translate(cls, header, filename=None):
        """Indicate whether this translation class can translate the
        supplied header.

        Checks the instrument value and compares with the supported
        instruments in the class

        Parameters
        ----------
        header : `dict`-like
            Header to convert to standardized form.
        filename : `str`, optional
            Name of file being translated.

        Returns
        -------
        can : `bool`
            `True` if the header is recognized by this class. `False`
            otherwise.
        """
        if cls.supported_instrument is None:
            return False

        # Protect against being able to always find a standard
        # header for instrument
        try:
            translator = cls(header, filename=filename)
            instrument = translator.to_instrument()
        except KeyError:
            return False

        return instrument == cls.supported_instrument

    @classmethod
    def _from_fits_date_string(cls, date_str, scale="utc", time_str=None):
        """Parse standard FITS ISO-style date string and return time object

        Parameters
        ----------
        date_str : `str`
            FITS format date string to convert to standard form. Bypasses
            lookup in the header.
        scale : `str`, optional
            Override the time scale from the TIMESYS header. Defaults to
            UTC.
        time_str : `str`, optional
            If provided, overrides any time component in the ``dateStr``,
            retaining the YYYY-MM-DD component and appending this time
            string, assumed to be of format HH:MM::SS.ss.

        Returns
        -------
        date : `astropy.time.Time`
            `~astropy.time.Time` representation of the date.
        """
        if time_str is not None:
            date_str = "{}T{}".format(date_str[:10], time_str)

        return Time(date_str, format="isot", scale=scale)

    def _from_fits_date(self, date_key):
        """Calculate a date object from the named FITS header

        Uses the TIMESYS header if present to determine the time scale,
        defaulting to UTC.

        Parameters
        ----------
        dateKey : `str`
            The key in the header representing a standard FITS
            ISO-style date.

        Returns
        -------
        date : `astropy.time.Time`
            `~astropy.time.Time` representation of the date.
        """
        used = [date_key]
        if "TIMESYS" in self._header:
            scale = self._header["TIMESYS"].lower()
            used.append("TIMESYS")
        else:
            scale = "utc"
        if date_key in self._header:
            date_str = self._header[date_key]
            value = self._from_fits_date_string(date_str, scale=scale)
            self._used_these_cards(*used)
        else:
            value = None
        return value

    @cache_translation
    def to_datetime_begin(self):
        """Calculate start time of observation.

        Uses FITS standard ``DATE-OBS`` and ``TIMESYS`` headers.

        Returns
        -------
        start_time : `astropy.time.Time`
            Time corresponding to the start of the observation.
        """
        return self._from_fits_date("DATE-OBS")

    @cache_translation
    def to_datetime_end(self):
        """Calculate end time of observation.

        Uses FITS standard ``DATE-END`` and ``TIMESYS`` headers.

        Returns
        -------
        start_time : `astropy.time.Time`
            Time corresponding to the end of the observation.
        """
        return self._from_fits_date("DATE-END")

    @cache_translation
    def to_location(self):
        """Calculate the observatory location.

        Uses FITS standard ``OBSGEO-`` headers.

        Returns
        -------
        location : `astropy.coordinates.EarthLocation`
            An object representing the location of the telescope.
        """
        cards = [f"OBSGEO-{c}" for c in ("X", "Y", "Z")]
        coords = [self._header[c] for c in cards]
        value = EarthLocation.from_geocentric(*coords, unit=u.m)
        self._used_these_cards(*cards)
        return value

    def from_default(self, kw, default):
        """Generate a translation method which keeps the value of a given Keyword and fills a default if no
        value is given"""

        def check_or_default():
            if kw in self._header.keys():
                return self._header[kw]
            else:
                return default

        setattr(self, f"to_{kw}", check_or_default)

    def from_float_or_nan(self, kw):
        """Generate a translation method which tries to convert a given Keyword to float and fills a NaN if no
        value is given"""

        def try_float_else_nan():
            val = self._header[kw]
            try:
                val = float(val)
                return val
            except ValueError:
                return nan
            except TypeError:
                return nan

        setattr(self, f"to_{kw}", try_float_else_nan)

    def add_wcs_methods(self):
        h = self._header
        patterns = self._wcs_kw_patterns

        for p in patterns:
            pattern = re.compile(p)
            kws = filter(pattern.match, h.keys())
            for k in kws:
                def constant_translator():
                    return h[k]

                self.from_default(k, h[k])

        pass

    @property
    def wcs_cards(self):
        """Retrieve WCS-related cards from header. Cards are Identified by matching with the wcs_patterns property
        Returns:
            cards: list of header cards
        """
        h = self._header
        patterns = self._wcs_kw_patterns
        cards = [
            c
            for c in h._cards
            if any([re.search(pattern, c.keyword) for pattern in patterns])
        ]

        return cards


class KISTranslator(FitsTranslator):
    """Translator class for building KIS headers. It is intended to be used as a base class for all
    telescope-specific translators."""

    def _to_ORIGIN(self):
        return self.default_if_empty(
            "ORIGIN", "Leibniz Insitute for Solar Physics (KIS)"
        )


class GREGORTranslator(KISTranslator):
    """Translator class for building GREGOR headers. It is intended to be used as a base class for all
    instrument-specific translators."""

    def to_TELESCOP(self):
        return self.default_if_empty("TELESCOP", "GREGOR")

    def to_OBSRVTRY(self):
        return self.default_if_empty("OBSRVTRY", "Teide Obseratory")
Carl Schaffer's avatar
Carl Schaffer committed
258

259
260
261
    def __init__(self, *args, **kwargs):
        self._coords = gregor_coords.to_geocentric()
        super().__init__(*args, **kwargs)
262

263
264
    def to_OBSGEO_X(self):
        return self._coords[0].value
265

266
267
    def to_OBSGEO_Y(self):
        return self._coords[1].value
268

269
270
    def to_OBSGEO_Z(self):
        return self._coords[2].value
271
272


273
class GrisTranslator(GREGORTranslator):
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
    """Translate Gris Headers"""

    name = "Gris"
    supported_instrument = "Gris"

    @classmethod
    def can_translate(cls, header, filename=None):
        """Indicate whether this translation class can translate the
        supplied header.

        Parameters
        ----------
        header : `dict`-like
            Header to convert to standardized form.
        filename : `str`, optional
            Name of file being translated.

        Returns
        -------
        can : `bool`
            `True` if the header is recognized by this class. `False`
            otherwise.
        """
297
        if not basename(filename).startswith("gris_"):
298
299
            return False

300
        if "SOLARNET" in header:
301
            raise NothingToDoError("This header is already up to date!")
302

303
        if "TELESCOP" in header:
304
            is_gregor = header["TELESCOP"] == "GREGOR"
Carl Schaffer's avatar
Carl Schaffer committed
305
            if is_gregor and all([key in header.keys() for key in ["FF1WLOFF"]]):
306
307
                return True
            else:
Carl Schaffer's avatar
Carl Schaffer committed
308
                warn(
309
310
                    "GrisTranslator: got a file which does not contain FTS-fitting results or has not been split."
                    "This translator needs a split file created with recent routines as input."
311
312
                )

313
314
        return False

315
316
317
    _const_map = gris_const_trans

    _trivial_map = gris_triv_trans
318
319
320

    @cache_translation
    def to_observation_id(self):
Carl Schaffer's avatar
Carl Schaffer committed
321
        """ observation id is given by DAY_RUN"""
Carl Schaffer's avatar
Carl Schaffer committed
322
        date = self._from_fits_date_string(self._header["DATE-OBS"]).strftime("%Y%m%d")
323
        iobs = gris_run_number(self.filename)
324
325
326
        value = f"{date}_{iobs:03d}"
        return value

327
    def to_EXTNAME(self):
Carl Schaffer's avatar
Carl Schaffer committed
328
329
        """EXTNAME as hdu for a given combination of day, run,
        map,slitposition"""
330
331
332
        iserie = int(self._header["ISERIE"])
        istep = int(self._header["ISTEP"])
        value = self.to_observation_id()
333
        value += f"_{iserie:03d}_{istep:04d}"
334
335
        return value

336
    def to_POINT_ID(self):
Carl Schaffer's avatar
Carl Schaffer committed
337
        """Assume that telescope is re-pointed for each run"""
338
        return self.to_observation_id()
339

Carl Schaffer's avatar
Carl Schaffer committed
340
341
    @staticmethod
    def to_DATE():
Carl Schaffer's avatar
Carl Schaffer committed
342
        """Date of FITS creation"""
343
344
        return datetime.date.today()

Carl Schaffer's avatar
Carl Schaffer committed
345
346
    @cache_translation
    def to_DSUN_OBS(self):
Carl Schaffer's avatar
Carl Schaffer committed
347
348
349
        """Distance between instrument and sun center"""
        return get_distance_gregor_sun(self.to_DATE_BEG())

350
351
352
353
354
355
356
357
    @cache_translation
    def to_DATE_BEG(self):
        """Calculate start time of observation.

        Uses FITS standard ``DATE-OBS``

        Returns
        -------
Carl Schaffer's avatar
Carl Schaffer committed
358
        time : datetime.datetime
359
360
361
362
363
364
365
366
            Time corresponding to the start of the observation.
        """
        ut = self._header["UT"]
        do = self._header["DATE-OBS"]
        string = do + " " + ut.strip()
        time = datetime.datetime.strptime(string, "%Y-%m-%d %H:%M:%S.%f")
        return time

367
    def to_DATE_OBS(self):
Carl Schaffer's avatar
Carl Schaffer committed
368
        """Added for compatibility, set to beginnning of observation
369
370
371
372
373
374
375
376
377

        Returns
        -------
        time : datetime.datetime
            Time corresponding to the start of the observation.
        """
        time = self.to_DATE_BEG()
        return time

Carl Schaffer's avatar
Carl Schaffer committed
378
379
    @cache_translation
    def to_AWAVLNTH(self):
Carl Schaffer's avatar
Carl Schaffer committed
380
        """Get approximate wavelength in air"""
Carl Schaffer's avatar
Carl Schaffer committed
381
        wl = int(self._header["WAVELENG"])
382
383
        if wl == 1564:
            wl = 1565
Carl Schaffer's avatar
Carl Schaffer committed
384
385
        return wl * 10

386
387
    @cache_translation
    def to_AWAVMIN(self):
Carl Schaffer's avatar
Carl Schaffer committed
388
389
        """Minimum wavelength is calculated from the wavelength offset
        determined by the FTS fit, average if two flats are given"""
390
391
392
393
394
395
396
397
398
399
        ff1 = self._header["FF1WLOFF"]
        ff2 = self._header["FF2WLOFF"]
        try:
            value = (ff1 + ff2) / 2
        except TypeError:
            value = ff1
        return value

    @cache_translation
    def to_AWAVMAX(self):
Carl Schaffer's avatar
Carl Schaffer committed
400
401
        """Maximum wavelength is calculated from the wavelength offset
        determined by the FTS fit plus the number of pixels along the
Carl Schaffer's avatar
Carl Schaffer committed
402
        wavelength axis times the dispersion. average if two flats are
Carl Schaffer's avatar
Carl Schaffer committed
403
404
        given"""

405
406
407
408
409
410
411
        ff1 = self._header["FF1WLDSP"]
        ff2 = self._header["FF2WLDSP"]
        nsteps = self._header["NAXIS1"] - 1
        try:
            dispersion = (ff1 + ff2) / 2
        except TypeError:
            dispersion = ff1
Carl Schaffer's avatar
Carl Schaffer committed
412
413
        value = dispersion * nsteps + self.to_AWAVMIN()
        return value
414
415
416

    @cache_translation
    def to_OBS_MODE(self):
Carl Schaffer's avatar
Carl Schaffer committed
417
418
419
        """Observation mode is determined by the number of observed
        states,the stepping settings and the number of observed
        maps."""
420
421
422
423
424
425
426
427
428
        n_maps = self.to_NMAPS()
        n_steps = self.to_NSTEPS()
        step_size = self._header["STEPSIZE"]
        naxis = self._header["NAXIS"]
        if naxis == 2:
            states = 1
        else:
            states = self._header[f"NAXIS{naxis}"]
        mode_string = gris_obs_mode(states, n_maps, n_steps, step_size)
429
430
431

        return mode_string

Carl Schaffer's avatar
Carl Schaffer committed
432
433
434
435
    def to_TEXPOSUR(self):
        """Implemented for Solarsoft compatibility in seconds"""
        return self._header["EXPTIME"] / 1000

436
    @cache_translation
437
    def to_XPOSURE(self):
Carl Schaffer's avatar
Carl Schaffer committed
438
        """Exposure time for the entire file in seconds"""
439
        n_accumulations = self.to_NSUMEXP()
440
441
        exptime = self.to_TEXPOSUR()
        return exptime * n_accumulations
442

443
444
445
    def to_IOBS(self):
        return gris_run_number(self.filename)

446
    def to_EXPTIME(self):
Carl Schaffer's avatar
Carl Schaffer committed
447
        """Implemented for Solarsoft compatibility in seconds"""
448
449
        return self.to_XPOSURE()

Carl Schaffer's avatar
Carl Schaffer committed
450
451
452
    @cache_translation
    def to_OBSERVER(self):
        """Observer"""
Carl Schaffer's avatar
Carl Schaffer committed
453
        logfiles = glob(join(dirname(dirname(self.filename)), "????????.txt"))
Carl Schaffer's avatar
Carl Schaffer committed
454
455
456
457
458
        if not logfiles:
            return "Unknown"
        logfile = logfiles[0]
        observers = get_observers(logfile)
        to_header = []
Carl Schaffer's avatar
Carl Schaffer committed
459
        sep = ", "
Carl Schaffer's avatar
Carl Schaffer committed
460
        for o in observers:
Carl Schaffer's avatar
Carl Schaffer committed
461
462
463
            length = sum([len(th) for th in to_header]) + (len(to_header) - 1) * len(
                sep
            )
Carl Schaffer's avatar
Carl Schaffer committed
464
465
466
467
468
469
470
471
            if length + len(o) + len(sep) < 70:
                to_header.append(o)
            else:
                break
        res = sep.join(to_header)
        assert len(res) < 70
        return res

472
473
    @cache_translation
    def to_AO_LOCK(self):
Carl Schaffer's avatar
Carl Schaffer committed
474
        """Adaptive optics locking"""
475
476
477
478
479
480
481
482
        state = self._header["AOSTATE"].strip()
        if state == "on":
            return 1.0
        elif state == "off":
            return 0.0
        else:
            return nan

Carl Schaffer's avatar
Carl Schaffer committed
483
    def to_OBS_TRGT(self):
Carl Schaffer's avatar
Carl Schaffer committed
484
485
        """Parsed from tagging file, if not in tagging file, check
        header for tag, else default to not_tagged"""
Carl Schaffer's avatar
Carl Schaffer committed
486
        fn = self._header["FILENAME"]
Carl Schaffer's avatar
Carl Schaffer committed
487
        date = date_from_fn(fn).strftime("%Y-%m-%d")
Carl Schaffer's avatar
Carl Schaffer committed
488
489
490
491
        run = gris_run_number(fn)
        try:
            return self.tags.at[(date, run), "main_tag"]
        except KeyError:
Carl Schaffer's avatar
Carl Schaffer committed
492
493
494
495
            if "TARGET" in self._header:
                return self._header["TARGET"]
            else:
                return "not tagged"
Carl Schaffer's avatar
Carl Schaffer committed
496
497

    def get_fitpars(self, fitpar_kw):
Carl Schaffer's avatar
Carl Schaffer committed
498
499
500
501
        """Generate a dictionary of keywords for the parameters of the
        Polynomial determined by the FTS fit. The dictionary is later
        expanded to single keywords."""

Carl Schaffer's avatar
Carl Schaffer committed
502
503
504
505
506
        def get_all_parameters():
            npoly_key = fitpar_kw[:3] + "NPOLY"
            try:
                npoly = int(self._header[npoly_key])
            except ValueError:
507
                return {fitpar_kw: None}
Carl Schaffer's avatar
Carl Schaffer committed
508
509
            except TypeError:
                return {fitpar_kw: None}
Carl Schaffer's avatar
Carl Schaffer committed
510
            vals = {}
Carl Schaffer's avatar
Carl Schaffer committed
511
            for i in range(npoly + 1):
Carl Schaffer's avatar
Carl Schaffer committed
512
                par_key = fitpar_kw.replace("nn", f"{i:02d}")
Carl Schaffer's avatar
Carl Schaffer committed
513
514
515
                try:
                    vals[par_key] = self._header[par_key]
                except KeyError:
Carl Schaffer's avatar
Carl Schaffer committed
516
517
                    message = f"Warning: could not find '{par_key}' in header."
                    message += (
Carl Schaffer's avatar
Carl Schaffer committed
518
                        " Check that the correct number of parameters has entered "
Carl Schaffer's avatar
Carl Schaffer committed
519
                    )
Carl Schaffer's avatar
Carl Schaffer committed
520
                    message += "the header and re-run calibration if necessary!"
Carl Schaffer's avatar
Carl Schaffer committed
521
522
                    warn(message)

Carl Schaffer's avatar
Carl Schaffer committed
523
524
525
526
            return vals

        setattr(self, f"to_{fitpar_kw}", get_all_parameters)

527
    def __init__(self, *args, **kwargs):
Carl Schaffer's avatar
Carl Schaffer committed
528
        """Parse tag_file, set up translations for keywords with
529
        default value, setup fit result keywords"""
530
        super(GrisTranslator, self).__init__(*args, **kwargs)
Carl Schaffer's avatar
Carl Schaffer committed
531
532

        self.tags = pd.read_csv(
Carl Schaffer's avatar
Carl Schaffer committed
533
            TRANS_SETTINGS["tag_file"],
Carl Schaffer's avatar
Carl Schaffer committed
534
535
536
537
            parse_dates=["date"],
            index_col=["date", "run"],
        )

538
        self.from_default("ROTCODE", -1)
539
        self.from_default("ROTTRACK", None)
540
541
        self.from_default("IMGSYS", "SLIT")
        self.from_default("SLITCNTR", "scancntr")
Carl Schaffer's avatar
Carl Schaffer committed
542
        self.from_default("ROTANGLE", 0)
543
544
        self.from_default("BSCALE", 1)
        self.from_default("BZERO", 0)
Carl Schaffer's avatar
Carl Schaffer committed
545
546
547
548
549
550
551
552
553
554

        fit_results = ["WLOFF", "WLDSP", "FWHMA", "FWHMP", "STRAY", "NPOLY"]
        for fit_res in fit_results:
            for i_flat in [1, 2]:
                kw = f"FF{i_flat}{fit_res}"
                self.from_float_or_nan(kw)

        # add fit res parameters
        for fitpar_kw in fitpar_kws:
            self.get_fitpars(fitpar_kw)