translator_classes.py 16.3 KB
Newer Older
1
import datetime
2
import re
Carl Schaffer's avatar
Carl Schaffer committed
3
from glob import glob
4
from os.path import join, dirname
5
from warnings import warn
6

7
import pandas as pd
8
9
10
from astropy import units as u
from astropy.coordinates import EarthLocation
from astropy.time import Time
11
from numpy import nan
12
13
14
15

from kis_tools.gris.headers.exceptions import NothingToDoError
from kis_tools.gris.headers.properties import gris_const_trans, gris_triv_trans, TRANS_SETTINGS, fitpar_kws
from kis_tools.gris.headers.translator import cache_translation, MetadataTranslator
16
from kis_tools.gris.util import get_observers
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
from kis_tools.util.constants import gregor_coords
from kis_tools.util.util import gris_obs_mode, date_from_fn, gris_run_number


class FitsTranslator(MetadataTranslator):
    """Metadata translator for FITS standard headers.

    Understands:

    - DATE-OBS
    - INSTRUME
    - TELESCOP
    - OBSGEO-[X,Y,Z]

    """

    _wcs_kw_patterns = [
        r"CTYPE\d+",
        r"CUNIT\d+",
        r"CRPIX\d+",
        r"CRVAL\d+",
        r"CSYER\d+",
        r"CDELT\d+",
        r"PC\d+_\d+",
    ]

    @classmethod
    def can_translate(cls, header, filename=None):
        """Indicate whether this translation class can translate the
        supplied header.

        Checks the instrument value and compares with the supported
        instruments in the class

        Parameters
        ----------
        header : `dict`-like
            Header to convert to standardized form.
        filename : `str`, optional
            Name of file being translated.

        Returns
        -------
        can : `bool`
            `True` if the header is recognized by this class. `False`
            otherwise.
        """
        if cls.supported_instrument is None:
            return False

        # Protect against being able to always find a standard
        # header for instrument
        try:
            translator = cls(header, filename=filename)
            instrument = translator.to_instrument()
        except KeyError:
            return False

        return instrument == cls.supported_instrument

    @classmethod
    def _from_fits_date_string(cls, date_str, scale="utc", time_str=None):
        """Parse standard FITS ISO-style date string and return time object

        Parameters
        ----------
        date_str : `str`
            FITS format date string to convert to standard form. Bypasses
            lookup in the header.
        scale : `str`, optional
            Override the time scale from the TIMESYS header. Defaults to
            UTC.
        time_str : `str`, optional
            If provided, overrides any time component in the ``dateStr``,
            retaining the YYYY-MM-DD component and appending this time
            string, assumed to be of format HH:MM::SS.ss.

        Returns
        -------
        date : `astropy.time.Time`
            `~astropy.time.Time` representation of the date.
        """
        if time_str is not None:
            date_str = "{}T{}".format(date_str[:10], time_str)

        return Time(date_str, format="isot", scale=scale)

    def _from_fits_date(self, date_key):
        """Calculate a date object from the named FITS header

        Uses the TIMESYS header if present to determine the time scale,
        defaulting to UTC.

        Parameters
        ----------
        dateKey : `str`
            The key in the header representing a standard FITS
            ISO-style date.

        Returns
        -------
        date : `astropy.time.Time`
            `~astropy.time.Time` representation of the date.
        """
        used = [date_key]
        if "TIMESYS" in self._header:
            scale = self._header["TIMESYS"].lower()
            used.append("TIMESYS")
        else:
            scale = "utc"
        if date_key in self._header:
            date_str = self._header[date_key]
            value = self._from_fits_date_string(date_str, scale=scale)
            self._used_these_cards(*used)
        else:
            value = None
        return value

    @cache_translation
    def to_datetime_begin(self):
        """Calculate start time of observation.

        Uses FITS standard ``DATE-OBS`` and ``TIMESYS`` headers.

        Returns
        -------
        start_time : `astropy.time.Time`
            Time corresponding to the start of the observation.
        """
        return self._from_fits_date("DATE-OBS")

    @cache_translation
    def to_datetime_end(self):
        """Calculate end time of observation.

        Uses FITS standard ``DATE-END`` and ``TIMESYS`` headers.

        Returns
        -------
        start_time : `astropy.time.Time`
            Time corresponding to the end of the observation.
        """
        return self._from_fits_date("DATE-END")

    @cache_translation
    def to_location(self):
        """Calculate the observatory location.

        Uses FITS standard ``OBSGEO-`` headers.

        Returns
        -------
        location : `astropy.coordinates.EarthLocation`
            An object representing the location of the telescope.
        """
        cards = [f"OBSGEO-{c}" for c in ("X", "Y", "Z")]
        coords = [self._header[c] for c in cards]
        value = EarthLocation.from_geocentric(*coords, unit=u.m)
        self._used_these_cards(*cards)
        return value

    def from_default(self, kw, default):
        """Generate a translation method which keeps the value of a given Keyword and fills a default if no
        value is given"""

        def check_or_default():
            if kw in self._header.keys():
                return self._header[kw]
            else:
                return default

        setattr(self, f"to_{kw}", check_or_default)

    def from_float_or_nan(self, kw):
        """Generate a translation method which tries to convert a given Keyword to float and fills a NaN if no
        value is given"""

        def try_float_else_nan():
            val = self._header[kw]
            try:
                val = float(val)
                return val
            except ValueError:
                return nan
            except TypeError:
                return nan

        setattr(self, f"to_{kw}", try_float_else_nan)

    def add_wcs_methods(self):
        h = self._header
        patterns = self._wcs_kw_patterns

        for p in patterns:
            pattern = re.compile(p)
            kws = filter(pattern.match, h.keys())
            for k in kws:
                def constant_translator():
                    return h[k]

                self.from_default(k, h[k])

        pass

    @property
    def wcs_cards(self):
        """Retrieve WCS-related cards from header. Cards are Identified by matching with the wcs_patterns property
        Returns:
            cards: list of header cards
        """
        h = self._header
        patterns = self._wcs_kw_patterns
        cards = [
            c
            for c in h._cards
            if any([re.search(pattern, c.keyword) for pattern in patterns])
        ]

        return cards


class KISTranslator(FitsTranslator):
    """Translator class for building KIS headers. It is intended to be used as a base class for all
    telescope-specific translators."""

    def _to_ORIGIN(self):
        return self.default_if_empty(
            "ORIGIN", "Leibniz Insitute for Solar Physics (KIS)"
        )


class GREGORTranslator(KISTranslator):
    """Translator class for building GREGOR headers. It is intended to be used as a base class for all
    instrument-specific translators."""

    def to_TELESCOP(self):
        return self.default_if_empty("TELESCOP", "GREGOR")

    def to_OBSRVTRY(self):
        return self.default_if_empty("OBSRVTRY", "Teide Obseratory")
Carl Schaffer's avatar
Carl Schaffer committed
257

258
259
260
    def __init__(self, *args, **kwargs):
        self._coords = gregor_coords.to_geocentric()
        super().__init__(*args, **kwargs)
261

262
263
    def to_OBSGEO_X(self):
        return self._coords[0].value
264

265
266
    def to_OBSGEO_Y(self):
        return self._coords[1].value
267

268
269
    def to_OBSGEO_Z(self):
        return self._coords[2].value
270
271


272
class GrisTranslator(GREGORTranslator):
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
    """Translate Gris Headers"""

    name = "Gris"
    supported_instrument = "Gris"

    @classmethod
    def can_translate(cls, header, filename=None):
        """Indicate whether this translation class can translate the
        supplied header.

        Parameters
        ----------
        header : `dict`-like
            Header to convert to standardized form.
        filename : `str`, optional
            Name of file being translated.

        Returns
        -------
        can : `bool`
            `True` if the header is recognized by this class. `False`
            otherwise.
        """
296
        if "SOLARNET" in header:
297
            raise NothingToDoError("This header is already up to date!")
298

299
        if "TELESCOP" in header:
300
            is_gregor = header["TELESCOP"] == "GREGOR"
Carl Schaffer's avatar
Carl Schaffer committed
301
            if is_gregor and all([key in header.keys() for key in ["FF1WLOFF"]]):
302
303
                return True
            else:
Carl Schaffer's avatar
Carl Schaffer committed
304
                warn(
305
306
                    "GrisTranslator: got a file which does not contain FTS-fitting results or has not been split."
                    "This translator needs a split file created with recent routines as input."
307
308
                )

309
310
        return False

311
312
313
    _const_map = gris_const_trans

    _trivial_map = gris_triv_trans
314
315
316

    @cache_translation
    def to_observation_id(self):
Carl Schaffer's avatar
Carl Schaffer committed
317
        """ observation id is given by DAY_RUN"""
Carl Schaffer's avatar
Carl Schaffer committed
318
        date = self._from_fits_date_string(self._header["DATE-OBS"]).strftime("%Y%m%d")
319
320
321
322
        iobs = int(self._header["IOBS"])
        value = f"{date}_{iobs:03d}"
        return value

323
    def to_EXTNAME(self):
Carl Schaffer's avatar
Carl Schaffer committed
324
325
        """EXTNAME as hdu for a given combination of day, run,
        map,slitposition"""
326
327
328
        iserie = int(self._header["ISERIE"])
        istep = int(self._header["ISTEP"])
        value = self.to_observation_id()
329
        value += f"_{iserie:03d}_{istep:04d}"
330
331
        return value

332
    def to_POINT_ID(self):
Carl Schaffer's avatar
Carl Schaffer committed
333
        """Assume that telescope is re-pointed for each run"""
334
        return self.to_observation_id()
335

Carl Schaffer's avatar
Carl Schaffer committed
336
337
    @staticmethod
    def to_DATE():
Carl Schaffer's avatar
Carl Schaffer committed
338
        """Date of FITS creation"""
339
340
341
342
343
344
345
346
347
348
        return datetime.date.today()

    @cache_translation
    def to_DATE_BEG(self):
        """Calculate start time of observation.

        Uses FITS standard ``DATE-OBS``

        Returns
        -------
Carl Schaffer's avatar
Carl Schaffer committed
349
        time : datetime.datetime
350
351
352
353
354
355
356
357
            Time corresponding to the start of the observation.
        """
        ut = self._header["UT"]
        do = self._header["DATE-OBS"]
        string = do + " " + ut.strip()
        time = datetime.datetime.strptime(string, "%Y-%m-%d %H:%M:%S.%f")
        return time

358
    def to_DATE_OBS(self):
Carl Schaffer's avatar
Carl Schaffer committed
359
        """Added for compatibility, set to beginnning of observation
360
361
362
363
364
365
366
367
368

        Returns
        -------
        time : datetime.datetime
            Time corresponding to the start of the observation.
        """
        time = self.to_DATE_BEG()
        return time

Carl Schaffer's avatar
Carl Schaffer committed
369
370
    @cache_translation
    def to_AWAVLNTH(self):
Carl Schaffer's avatar
Carl Schaffer committed
371
        """Get approximate wavelength in air"""
Carl Schaffer's avatar
Carl Schaffer committed
372
        wl = int(self._header["WAVELENG"])
373
374
        if wl == 1564:
            wl = 1565
Carl Schaffer's avatar
Carl Schaffer committed
375
376
        return wl * 10

377
378
    @cache_translation
    def to_AWAVMIN(self):
Carl Schaffer's avatar
Carl Schaffer committed
379
380
        """Minimum wavelength is calculated from the wavelength offset
        determined by the FTS fit, average if two flats are given"""
381
382
383
384
385
386
387
388
389
390
        ff1 = self._header["FF1WLOFF"]
        ff2 = self._header["FF2WLOFF"]
        try:
            value = (ff1 + ff2) / 2
        except TypeError:
            value = ff1
        return value

    @cache_translation
    def to_AWAVMAX(self):
Carl Schaffer's avatar
Carl Schaffer committed
391
392
        """Maximum wavelength is calculated from the wavelength offset
        determined by the FTS fit plus the number of pixels along the
Carl Schaffer's avatar
Carl Schaffer committed
393
        wavelength axis times the dispersion. average if two flats are
Carl Schaffer's avatar
Carl Schaffer committed
394
395
        given"""

396
397
398
399
400
401
402
        ff1 = self._header["FF1WLDSP"]
        ff2 = self._header["FF2WLDSP"]
        nsteps = self._header["NAXIS1"] - 1
        try:
            dispersion = (ff1 + ff2) / 2
        except TypeError:
            dispersion = ff1
Carl Schaffer's avatar
Carl Schaffer committed
403
404
        value = dispersion * nsteps + self.to_AWAVMIN()
        return value
405
406
407

    @cache_translation
    def to_OBS_MODE(self):
Carl Schaffer's avatar
Carl Schaffer committed
408
409
410
        """Observation mode is determined by the number of observed
        states,the stepping settings and the number of observed
        maps."""
411
412
413
414
415
416
417
418
419
        n_maps = self.to_NMAPS()
        n_steps = self.to_NSTEPS()
        step_size = self._header["STEPSIZE"]
        naxis = self._header["NAXIS"]
        if naxis == 2:
            states = 1
        else:
            states = self._header[f"NAXIS{naxis}"]
        mode_string = gris_obs_mode(states, n_maps, n_steps, step_size)
420
421
422
423

        return mode_string

    @cache_translation
424
    def to_XPOSURE(self):
Carl Schaffer's avatar
Carl Schaffer committed
425
        """Exposure time for the entire file"""
426
        n_accumulations = self.to_NSUMEXP()
427
428
        exptime = self.to_TEXPOSUR()
        return exptime * n_accumulations
429

430
431
432
433
    def to_EXPTIME(self):
        """Implemented for Solarsoft compatibility"""
        return self.to_XPOSURE()

Carl Schaffer's avatar
Carl Schaffer committed
434
435
436
    @cache_translation
    def to_OBSERVER(self):
        """Observer"""
Carl Schaffer's avatar
Carl Schaffer committed
437
        logfiles = glob(join(dirname(dirname(self.filename)), "????????.txt"))
Carl Schaffer's avatar
Carl Schaffer committed
438
439
440
441
442
        if not logfiles:
            return "Unknown"
        logfile = logfiles[0]
        observers = get_observers(logfile)
        to_header = []
Carl Schaffer's avatar
Carl Schaffer committed
443
        sep = ", "
Carl Schaffer's avatar
Carl Schaffer committed
444
        for o in observers:
Carl Schaffer's avatar
Carl Schaffer committed
445
446
447
            length = sum([len(th) for th in to_header]) + (len(to_header) - 1) * len(
                sep
            )
Carl Schaffer's avatar
Carl Schaffer committed
448
449
450
451
452
453
454
455
            if length + len(o) + len(sep) < 70:
                to_header.append(o)
            else:
                break
        res = sep.join(to_header)
        assert len(res) < 70
        return res

456
457
    @cache_translation
    def to_AO_LOCK(self):
Carl Schaffer's avatar
Carl Schaffer committed
458
        """Adaptive optics locking"""
459
460
461
462
463
464
465
466
        state = self._header["AOSTATE"].strip()
        if state == "on":
            return 1.0
        elif state == "off":
            return 0.0
        else:
            return nan

Carl Schaffer's avatar
Carl Schaffer committed
467
    def to_OBS_TRGT(self):
Carl Schaffer's avatar
Carl Schaffer committed
468
469
        """Parsed from tagging file, if not in tagging file, check
        header for tag, else default to not_tagged"""
Carl Schaffer's avatar
Carl Schaffer committed
470
        fn = self._header["FILENAME"]
Carl Schaffer's avatar
Carl Schaffer committed
471
        date = date_from_fn(fn).strftime("%Y-%m-%d")
Carl Schaffer's avatar
Carl Schaffer committed
472
473
474
475
        run = gris_run_number(fn)
        try:
            return self.tags.at[(date, run), "main_tag"]
        except KeyError:
Carl Schaffer's avatar
Carl Schaffer committed
476
477
478
479
            if "TARGET" in self._header:
                return self._header["TARGET"]
            else:
                return "not tagged"
Carl Schaffer's avatar
Carl Schaffer committed
480
481

    def get_fitpars(self, fitpar_kw):
Carl Schaffer's avatar
Carl Schaffer committed
482
483
484
485
        """Generate a dictionary of keywords for the parameters of the
        Polynomial determined by the FTS fit. The dictionary is later
        expanded to single keywords."""

Carl Schaffer's avatar
Carl Schaffer committed
486
487
488
489
490
        def get_all_parameters():
            npoly_key = fitpar_kw[:3] + "NPOLY"
            try:
                npoly = int(self._header[npoly_key])
            except ValueError:
491
                return {fitpar_kw: None}
Carl Schaffer's avatar
Carl Schaffer committed
492
493
            except TypeError:
                return {fitpar_kw: None}
Carl Schaffer's avatar
Carl Schaffer committed
494
            vals = {}
Carl Schaffer's avatar
Carl Schaffer committed
495
            for i in range(npoly + 1):
Carl Schaffer's avatar
Carl Schaffer committed
496
                par_key = fitpar_kw.replace("nn", f"{i:02d}")
Carl Schaffer's avatar
Carl Schaffer committed
497
498
499
                try:
                    vals[par_key] = self._header[par_key]
                except KeyError:
Carl Schaffer's avatar
Carl Schaffer committed
500
501
                    message = f"Warning: could not find '{par_key}' in header."
                    message += (
Carl Schaffer's avatar
Carl Schaffer committed
502
                        " Check that the correct number of parameters has entered "
Carl Schaffer's avatar
Carl Schaffer committed
503
                    )
Carl Schaffer's avatar
Carl Schaffer committed
504
                    message += "the header and re-run calibration if necessary!"
Carl Schaffer's avatar
Carl Schaffer committed
505
506
                    warn(message)

Carl Schaffer's avatar
Carl Schaffer committed
507
508
509
510
            return vals

        setattr(self, f"to_{fitpar_kw}", get_all_parameters)

511
    def __init__(self, *args, **kwargs):
Carl Schaffer's avatar
Carl Schaffer committed
512
        """Parse tag_file, set up translations for keywords with
513
        default value, setup fit result keywords"""
514
        super(GrisTranslator, self).__init__(*args, **kwargs)
Carl Schaffer's avatar
Carl Schaffer committed
515
516

        self.tags = pd.read_csv(
Carl Schaffer's avatar
Carl Schaffer committed
517
            TRANS_SETTINGS["tag_file"],
Carl Schaffer's avatar
Carl Schaffer committed
518
519
520
521
            parse_dates=["date"],
            index_col=["date", "run"],
        )

522
        self.from_default("ROTCODE", -1)
523
        self.from_default("ROTTRACK", None)
524
525
        self.from_default("IMGSYS", "SLIT")
        self.from_default("SLITCNTR", "scancntr")
Carl Schaffer's avatar
Carl Schaffer committed
526
        self.from_default("ROTANGLE", 0)
527
528
        self.from_default("BSCALE", 1)
        self.from_default("BZERO", 0)
Carl Schaffer's avatar
Carl Schaffer committed
529
530
531
532
533
534
535
536
537
538

        fit_results = ["WLOFF", "WLDSP", "FWHMA", "FWHMP", "STRAY", "NPOLY"]
        for fit_res in fit_results:
            for i_flat in [1, 2]:
                kw = f"FF{i_flat}{fit_res}"
                self.from_float_or_nan(kw)

        # add fit res parameters
        for fitpar_kw in fitpar_kws:
            self.get_fitpars(fitpar_kw)