translate_header.py 9.49 KB
Newer Older
Carl Schaffer's avatar
Carl Schaffer committed
1
"""Translate GRIS header to conform to solarnet naming standards
2
 gris_translate_header.py: Main script to translate headers. The header translation is done in three steps:
Carl Schaffer's avatar
Carl Schaffer committed
3
4
5
6
7
8
9

    Generate an ObservationInfo instance
    Generate header cards for each entry in the header template
    Write the data from the original file to a new file along with the new header.

"""

10
# standard library imports
Carl Schaffer's avatar
Carl Schaffer committed
11
import re
Carl Schaffer's avatar
Carl Schaffer committed
12
import sys
13
14
from argparse import ArgumentParser
from glob import glob
15
from math import isnan
16
from os.path import join, isdir, dirname
17
from warnings import warn, filterwarnings
Carl Schaffer's avatar
Carl Schaffer committed
18

19
# third party imports
20
from astropy.io import fits
21
from astropy.io.fits import Header, PrimaryHDU, Card
22
from astropy.io.fits.verify import VerifyWarning
23
24
from tqdm import tqdm

25
from kis_tools.gris.headers import ObservationInfo
26
# custom imports
27
from kis_tools.gris.headers.template_builder import (
28
29
30
31
32
    GRIS_PROPERTIES,
    HEADER_TEMPLATE,
    _dirname,
    hyphenated,
)
Carl Schaffer's avatar
Carl Schaffer committed
33
from kis_tools.gris.headers.wcs import GrisWCSGenerator, reorder_gris_data
34
from kis_tools.util.gitinfo import get_git_info
Carl Schaffer's avatar
Carl Schaffer committed
35
from kis_tools.util.util import (
Carl Schaffer's avatar
Carl Schaffer committed
36
37
38
    gen_history_time,
    gen_history_git,
)
39
40
from kis_tools.util.util import setup_outdir, gris_run_number, date_from_fn
from .exceptions import NothingToDoError
41

42
_git_info = get_git_info()
Carl Schaffer's avatar
Carl Schaffer committed
43

Carl Schaffer's avatar
Carl Schaffer committed
44

Carl Schaffer's avatar
Carl Schaffer committed
45
46
def read_metadata(file, hdu=0):
    """Get header from file"""
Carl Schaffer's avatar
Carl Schaffer committed
47
    fits_file = fits.open(file, ignore_missing_end=True)
48
    try:
Carl Schaffer's avatar
Carl Schaffer committed
49
50
        hdu = fits_file[hdu]
        hdu.verify("silentfix")
51
        header = fits_file[hdu].header
Carl Schaffer's avatar
Carl Schaffer committed
52

53
54
55
56
57
    except IndexError:
        header = None
    return header


Carl Schaffer's avatar
Carl Schaffer committed
58
59
def read_data(file, hdu=0):
    """Get data from file"""
Carl Schaffer's avatar
Carl Schaffer committed
60
    fits_file = fits.open(file, ignore_missing_end=True)
Carl Schaffer's avatar
Carl Schaffer committed
61
    try:
62
        data = fits_file[hdu].data
Carl Schaffer's avatar
Carl Schaffer committed
63
    except IndexError:
64
65
        data = None
    return data
Carl Schaffer's avatar
Carl Schaffer committed
66
67


Carl Schaffer's avatar
Carl Schaffer committed
68
69
def make_obs_info(fn):
    """Generate an ObservationInformation Instance """
70
    # get first HDU
71
    header = read_metadata(fn, 0)
72
73
74
75
76

    obs_info = ObservationInfo(header, pedantic=True, filename=fn)
    return obs_info


77
def process_comment(value):
Carl Schaffer's avatar
Carl Schaffer committed
78
79
80
81
82
83
84
85
86
87
    """Expand comments containing commands formatted as
    COMMAND:value

    Currently supported comands are
      - section: Wrap the value in filling characters to visually mark
        the beginning of a new section
      - dump: use the value to open a file in the parent directory of
        this file and dump the contents into the header.
    If no command is given or it is not recognized, the whole field
    added to the header.
Carl Schaffer's avatar
Carl Schaffer committed
88
    """
89
90
91
92
93
94
95
96
97
98
99
100
101
102
    card_list = []
    try:
        command, comment_value = re.match(r"^(\w+):(.*$)", value).groups()
    except AttributeError:
        return [Card(("COMMENT", value))]
    command = command.lower()
    if command == "section":
        card = ("COMMENT", 72 * "-")
        card_list.append(Card(*card))
        card = ("COMMENT", comment_value.center(72, "-"))
        card_list.append(Card(*card))

    elif command == "dump":

103
104
105
        file_to_dump = _dirname / comment_value.strip()
        assert file_to_dump.exists(), f'File {file_to_dump} to dump into header not found!'
        with open(file_to_dump) as f:
106
107
108
109
110
            for line in f:
                line = line.strip()
                card = ("COMMENT", line)
                card_list.append(Card(*card))
    else:
Carl Schaffer's avatar
Carl Schaffer committed
111
        warn(f"Could not recognize comment command {comment_value}")
112
113
        return [Card(("COMMENT", value))]
    return card_list
Carl Schaffer's avatar
Carl Schaffer committed
114

115
116

def make_header(obs_info, properties):
Carl Schaffer's avatar
Carl Schaffer committed
117
118
    """Generate header from ObservationInfo instance, cast values to
    correct types."""
119
120
121
122
123
    h = {}
    c = {}

    for key, entry in properties.items():
        comment, value_type = entry
124
        value = getattr(obs_info, key)
125
126
127
128

        try:
            value = eval(value_type)(value)
        except:
Carl Schaffer's avatar
Carl Schaffer committed
129
130
            warn(
                f"Could not cast {value} to {value_type}, using {type(value)} instead!"
131
132
133
134
            )

        c[key] = str(comment).strip()
        h[key] = value
135
136

        if "nn" in key:
137
138
139
140
141
142
            del c[key]
            del h[key]
            for i, ki in enumerate(value.keys()):
                h[ki] = value[ki]
                c[ki] = comment.replace("nn", f"{i:02d}")
    return h, c
Carl Schaffer's avatar
Carl Schaffer committed
143
144


145
def main(infile, outfile=None, wcs_generator=None, **kwargs):  # noqa: C901
Carl Schaffer's avatar
Carl Schaffer committed
146
147
148
149
150
151
152
    """
    Main script. By default it will only print the new header. If an
    output file is specified the file will be created. The force
    option lets you overwrite existing output files, if none is
    specified, the source file is overwritten.

    Params:
153
      infile: path to file
Carl Schaffer's avatar
Carl Schaffer committed
154
155
156
      outfile: path to outfile
    """

157
    overwrite = kwargs["overwrite"] if "overwrite" in kwargs else False
Carl Schaffer's avatar
Carl Schaffer committed
158

159
    oi = make_obs_info(infile)
160
161
    h, c = make_header(oi, GRIS_PROPERTIES)

Carl Schaffer's avatar
Carl Schaffer committed
162
    h["DATE_BEG"] = h["DATE_BEG"].isoformat()
163
    h["DATE_OBS"] = h["DATE_OBS"].isoformat()
Carl Schaffer's avatar
Carl Schaffer committed
164
    h["DATE"] = h["DATE"].isoformat()
Carl Schaffer's avatar
Carl Schaffer committed
165

166
    # make card list
Carl Schaffer's avatar
Carl Schaffer committed
167
    card_list = []
168

169
    def make_card(keyword, decimal_digits):
Carl Schaffer's avatar
Carl Schaffer committed
170
        """handles None, nan values and round floats"""
171
172
173
174
175
176
177
178
179
180
181
        value = h[keyword]
        comment = c[keyword]

        if isinstance(value, float):
            if isnan(value):
                value = None
            elif not isnan(decimal_digits):
                value = round(value, int(decimal_digits))
        card = (keyword, value, comment)
        return card

182
183
    for index,row in HEADER_TEMPLATE.iterrows():
        keyword, default_value, decimal_digits = row['KEYWORD'],row['fixed_val'],row['decimal_digits']
184
185
186
187
        if keyword == "COMMENT":
            cards = process_comment(default_value)
            card_list += cards
            continue
188
189
190
        if "nn" in keyword:
            pattern = keyword.replace("nn", r"\d\d")
            kws = [k for k in h.keys() if re.search(pattern, k)]
191

192
193
194
195
            for k in kws:
                card = make_card(k, decimal_digits)
                card_list.append(Card(*card))
        elif keyword not in h.keys():
Carl Schaffer's avatar
Carl Schaffer committed
196
            warn(f"Could not find value for {keyword}, skipping")
197
198
            continue
        else:
199
200
            card = make_card(keyword, decimal_digits)
            card_list.append(Card(*card))
201

202
203
204
205
206
    # fix hyphenated kws back to original form
    for i, card in enumerate(card_list):
        if card.keyword in hyphenated:
            keyword = card.keyword.replace("_", "-")
            card_list[i] = Card(keyword, card.value, card.comment)
Carl Schaffer's avatar
Carl Schaffer committed
207

Carl Schaffer's avatar
Carl Schaffer committed
208
209
210
    # add history from original file and add entry for the header
    # translation processing step
    old_hist = list(oi._header["HISTORY"])
211
    hist_main = [gen_history_time("gris_translate_header")]
212
    git_info = _git_info
213
    hist_git = gen_history_git("gris_translate_header", git_info)
Carl Schaffer's avatar
Carl Schaffer committed
214
215
216
217
    hist_entries = old_hist + hist_main + hist_git
    for hist_entry in hist_entries:
        card_list.append(Card("HISTORY", hist_entry))

218
    # add WCS kws to card_list
219
220
221
222
223
224
    wcs_generator = GrisWCSGenerator(infile)
    wcs_cards = wcs_generator.make_wcs_cards()
    wcs_index = [c.keyword for c in card_list].index("WCSNAME")
    for c in wcs_cards:
        card_list.insert(wcs_index + 1, c)
        wcs_index += 1
225

226
227
228
229
230
231
    # replace slitpos cards
    slitpos_cards = wcs_generator.slitpos_cards()
    for card in slitpos_cards:
        index = [c.keyword for c in card_list].index(card.keyword)
        card_list[index] = card

232
    # create header from card list
233
    header = Header(card_list)
234
    DATA = reorder_gris_data(wcs_generator.infile.data, reverse_axes=True)
235
    f = PrimaryHDU(data=DATA, header=header)
236
237

    if outfile is not None:
Carl Schaffer's avatar
Carl Schaffer committed
238
        f.writeto(outfile, overwrite=overwrite, output_verify="fix", checksum=True)
239
240
        header = read_metadata(outfile, 0)

241
242
    else:
        try:
Carl Schaffer's avatar
Carl Schaffer committed
243
            f.writeto(infile, overwrite=overwrite, output_verify="fix", checksum=True)
244
        except OSError:
245
            print("File already exists, use -f option to force overwriting")
246
247


248
249
250
251
252
253
254
255
256
257
258
def find_cross_corr_file(f):
    folder = dirname(f)
    context_folder = join(folder, "..", "context_data")
    rn = gris_run_number(f)
    date = date_from_fn(f)
    pattern = f"{date.strftime('%d%b%y').lower()}.{rn:03d}_coord_mu.sav"
    candidates = glob(join(context_folder, pattern))
    if candidates:
        return candidates[0]
    else:
        return None
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276


def run_translate_header(cmd_line_args):
    parser = ArgumentParser()

    parser.add_argument(
        "folder_in",
        help="folder to be processed, needs to be a GRIS level1_split folder",
    )
    parser.add_argument(
        "--folder_out",
        help="path where output is written. Defaults to input folder",
        default=None,
    )

    parser.add_argument(
        "-v",
        "--verbose",
Carl Schaffer's avatar
Carl Schaffer committed
277
        help="don't silence warnings",
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
        action="store_true",
        default=False,
    )

    args = parser.parse_args(cmd_line_args)
    folder_in = args.folder_in

    folder_out = args.folder_out
    if folder_out:
        setup_outdir(folder_out, overwrite=True)

    if not isdir(folder_in):
        raise IOError(f"Directory {folder_in} not found!")

    files = sorted(glob(join(folder_in, "*.fits")))

    if not args.verbose:
        filterwarnings("ignore", ".*", UserWarning)
        filterwarnings("ignore", ".*", VerifyWarning)

    progress_bar = tqdm(files)

    skipped = 0
    for f in progress_bar:
        progress_bar.set_description(f"Modifying {f}")
303
304
        cross_corr_file = find_cross_corr_file(f)

Carl Schaffer's avatar
Carl Schaffer committed
305
        outfile = f.replace(folder_in, folder_out) if folder_out else f
306
        try:
Carl Schaffer's avatar
Carl Schaffer committed
307
            main(f, outfile=outfile, overwrite=True)
308
309
310
311
        except NothingToDoError:
            skipped += 1
            continue
    if skipped > 0:
312
        print(f"Skipped {skipped} files which were already done.")
Carl Schaffer's avatar
Carl Schaffer committed
313
314


Carl Schaffer's avatar
Carl Schaffer committed
315
if __name__ == "__main__":
Carl Schaffer's avatar
Carl Schaffer committed
316
    main(*sys.argv[1:])