translate_header.py 9.95 KB
Newer Older
Carl Schaffer's avatar
Carl Schaffer committed
1
"""Translate GRIS header to conform to solarnet naming standards
2
 gris_translate_header.py: Main script to translate headers. The header translation is done in three steps:
Carl Schaffer's avatar
Carl Schaffer committed
3
4
5
6
7
8
9

    Generate an ObservationInfo instance
    Generate header cards for each entry in the header template
    Write the data from the original file to a new file along with the new header.

"""

10
# standard library imports
Carl Schaffer's avatar
Carl Schaffer committed
11
import re
Carl Schaffer's avatar
Carl Schaffer committed
12
import sys
13
14
from argparse import ArgumentParser
from glob import glob
15
from math import isnan
16
from os.path import join, isdir, dirname
17
from warnings import warn, filterwarnings
Carl Schaffer's avatar
Carl Schaffer committed
18

19
# third party imports
20
from astropy.io import fits
21
from astropy.io.fits import Header, PrimaryHDU, Card
22
from astropy.io.fits.verify import VerifyWarning
23
24
from tqdm import tqdm

25
from kis_tools.gris.headers import ObservationInfo
Carl Schaffer's avatar
Carl Schaffer committed
26
from kis_tools.gris.headers.exceptions import NothingToDoError
27
# custom imports
28
from kis_tools.gris.headers.template_builder import (
29
30
31
32
33
    GRIS_PROPERTIES,
    HEADER_TEMPLATE,
    _dirname,
    hyphenated,
)
34
from kis_tools.gris.headers.wcs_generators import get_wcs_generator
35
from kis_tools.util.gitinfo import get_git_info
Carl Schaffer's avatar
Carl Schaffer committed
36
from kis_tools.util.util import (
Carl Schaffer's avatar
Carl Schaffer committed
37
    gen_history_time,
38
    gen_history_git, groupby_gris_run,
Carl Schaffer's avatar
Carl Schaffer committed
39
)
40
from kis_tools.util.util import setup_outdir, gris_run_number, date_from_fn
41

42
_git_info = get_git_info()
Carl Schaffer's avatar
Carl Schaffer committed
43

Carl Schaffer's avatar
Carl Schaffer committed
44

Carl Schaffer's avatar
Carl Schaffer committed
45
46
def read_metadata(file, hdu=0):
    """Get header from file"""
47
    fits_file = fits.open(file, memmap=False, ignore_missing_end=True)
48
    try:
Carl Schaffer's avatar
Carl Schaffer committed
49
50
        hdu = fits_file[hdu]
        hdu.verify("silentfix")
51
        header = fits_file[hdu].header
Carl Schaffer's avatar
Carl Schaffer committed
52

53
54
    except IndexError:
        header = None
55
    fits_file.close()
56
57
58
    return header


Carl Schaffer's avatar
Carl Schaffer committed
59
60
def read_data(file, hdu=0):
    """Get data from file"""
61
    fits_file = fits.open(file, memmap=False, ignore_missing_end=True)
Carl Schaffer's avatar
Carl Schaffer committed
62
    try:
63
        data = fits_file[hdu].data
Carl Schaffer's avatar
Carl Schaffer committed
64
    except IndexError:
65
        data = None
66
    fits_file.close()
67
    return data
Carl Schaffer's avatar
Carl Schaffer committed
68
69


Carl Schaffer's avatar
Carl Schaffer committed
70
71
def make_obs_info(fn):
    """Generate an ObservationInformation Instance """
72
    # get first HDU
73
    header = read_metadata(fn, 0)
74
75
76
77
78

    obs_info = ObservationInfo(header, pedantic=True, filename=fn)
    return obs_info


79
def process_comment(value):
Carl Schaffer's avatar
Carl Schaffer committed
80
81
82
83
84
85
86
87
88
89
    """Expand comments containing commands formatted as
    COMMAND:value

    Currently supported comands are
      - section: Wrap the value in filling characters to visually mark
        the beginning of a new section
      - dump: use the value to open a file in the parent directory of
        this file and dump the contents into the header.
    If no command is given or it is not recognized, the whole field
    added to the header.
Carl Schaffer's avatar
Carl Schaffer committed
90
    """
91
92
93
94
95
96
97
98
99
100
101
102
103
104
    card_list = []
    try:
        command, comment_value = re.match(r"^(\w+):(.*$)", value).groups()
    except AttributeError:
        return [Card(("COMMENT", value))]
    command = command.lower()
    if command == "section":
        card = ("COMMENT", 72 * "-")
        card_list.append(Card(*card))
        card = ("COMMENT", comment_value.center(72, "-"))
        card_list.append(Card(*card))

    elif command == "dump":

105
106
107
        file_to_dump = _dirname / comment_value.strip()
        assert file_to_dump.exists(), f'File {file_to_dump} to dump into header not found!'
        with open(file_to_dump) as f:
108
109
110
111
112
            for line in f:
                line = line.strip()
                card = ("COMMENT", line)
                card_list.append(Card(*card))
    else:
Carl Schaffer's avatar
Carl Schaffer committed
113
        warn(f"Could not recognize comment command {comment_value}")
114
115
        return [Card(("COMMENT", value))]
    return card_list
Carl Schaffer's avatar
Carl Schaffer committed
116

117
118

def make_header(obs_info, properties):
Carl Schaffer's avatar
Carl Schaffer committed
119
120
    """Generate header from ObservationInfo instance, cast values to
    correct types."""
121
122
123
124
125
    h = {}
    c = {}

    for key, entry in properties.items():
        comment, value_type = entry
126
        value = getattr(obs_info, key)
127
128
129
130

        try:
            value = eval(value_type)(value)
        except:
Carl Schaffer's avatar
Carl Schaffer committed
131
132
            warn(
                f"Could not cast {value} to {value_type}, using {type(value)} instead!"
133
134
135
136
            )

        c[key] = str(comment).strip()
        h[key] = value
137
138

        if "nn" in key:
139
140
141
142
143
144
            del c[key]
            del h[key]
            for i, ki in enumerate(value.keys()):
                h[ki] = value[ki]
                c[ki] = comment.replace("nn", f"{i:02d}")
    return h, c
Carl Schaffer's avatar
Carl Schaffer committed
145
146


147
def main(infile, outfile=None, wcs_generator=None, **kwargs):  # noqa: C901
Carl Schaffer's avatar
Carl Schaffer committed
148
149
150
151
152
153
154
    """
    Main script. By default it will only print the new header. If an
    output file is specified the file will be created. The force
    option lets you overwrite existing output files, if none is
    specified, the source file is overwritten.

    Params:
155
      infile: path to file
Carl Schaffer's avatar
Carl Schaffer committed
156
157
158
      outfile: path to outfile
    """

159
    overwrite = kwargs["overwrite"] if "overwrite" in kwargs else False
Carl Schaffer's avatar
Carl Schaffer committed
160

161
    oi = make_obs_info(infile)
162
163
    h, c = make_header(oi, GRIS_PROPERTIES)

Carl Schaffer's avatar
Carl Schaffer committed
164
    h["DATE_BEG"] = h["DATE_BEG"].isoformat()
165
    h["DATE_OBS"] = h["DATE_OBS"].isoformat()
Carl Schaffer's avatar
Carl Schaffer committed
166
    h["DATE"] = h["DATE"].isoformat()
Carl Schaffer's avatar
Carl Schaffer committed
167

168
    # make card list
Carl Schaffer's avatar
Carl Schaffer committed
169
    card_list = []
170

171
    def make_card(keyword, decimal_digits):
Carl Schaffer's avatar
Carl Schaffer committed
172
        """handles None, nan values and round floats"""
173
174
175
176
177
178
179
180
181
182
183
        value = h[keyword]
        comment = c[keyword]

        if isinstance(value, float):
            if isnan(value):
                value = None
            elif not isnan(decimal_digits):
                value = round(value, int(decimal_digits))
        card = (keyword, value, comment)
        return card

184
185
    for index, row in HEADER_TEMPLATE.iterrows():
        keyword, default_value, decimal_digits = row['KEYWORD'], row['fixed_val'], row['decimal_digits']
186
187
188
189
        if keyword == "COMMENT":
            cards = process_comment(default_value)
            card_list += cards
            continue
190
191
192
        if "nn" in keyword:
            pattern = keyword.replace("nn", r"\d\d")
            kws = [k for k in h.keys() if re.search(pattern, k)]
193

194
195
196
197
            for k in kws:
                card = make_card(k, decimal_digits)
                card_list.append(Card(*card))
        elif keyword not in h.keys():
Carl Schaffer's avatar
Carl Schaffer committed
198
            warn(f"Could not find value for {keyword}, skipping")
199
200
            continue
        else:
201
202
            card = make_card(keyword, decimal_digits)
            card_list.append(Card(*card))
203

204
205
206
207
208
    # fix hyphenated kws back to original form
    for i, card in enumerate(card_list):
        if card.keyword in hyphenated:
            keyword = card.keyword.replace("_", "-")
            card_list[i] = Card(keyword, card.value, card.comment)
Carl Schaffer's avatar
Carl Schaffer committed
209

Carl Schaffer's avatar
Carl Schaffer committed
210
211
212
    # add history from original file and add entry for the header
    # translation processing step
    old_hist = list(oi._header["HISTORY"])
213
    hist_main = [gen_history_time("gris_translate_header")]
214
    git_info = _git_info
215
    hist_git = gen_history_git("gris_translate_header", git_info)
Carl Schaffer's avatar
Carl Schaffer committed
216
217
218
219
    hist_entries = old_hist + hist_main + hist_git
    for hist_entry in hist_entries:
        card_list.append(Card("HISTORY", hist_entry))

220
    # add WCS kws to card_list
221
222

    wcs_generator = get_wcs_generator(infile)
223
224
225
226
227
    wcs_cards = wcs_generator.make_wcs_cards()
    wcs_index = [c.keyword for c in card_list].index("WCSNAME")
    for c in wcs_cards:
        card_list.insert(wcs_index + 1, c)
        wcs_index += 1
228

229
230
231
232
233
234
    # replace slitpos cards
    slitpos_cards = wcs_generator.slitpos_cards()
    for card in slitpos_cards:
        index = [c.keyword for c in card_list].index(card.keyword)
        card_list[index] = card

235
    # create header from card list
236
    header = Header(card_list)
237
    DATA = wcs_generator.reordered_data()
238
    f = PrimaryHDU(data=DATA, header=header)
239
240

    if outfile is not None:
Carl Schaffer's avatar
Carl Schaffer committed
241
        f.writeto(outfile, overwrite=overwrite, output_verify="fix", checksum=True)
242
243
        header = read_metadata(outfile, 0)

244
245
    else:
        try:
Carl Schaffer's avatar
Carl Schaffer committed
246
            f.writeto(infile, overwrite=overwrite, output_verify="fix", checksum=True)
247
        except OSError:
248
            print("File already exists, use -f option to force overwriting")
249
250


251
252
253
254
255
256
257
258
259
260
261
def find_cross_corr_file(f):
    folder = dirname(f)
    context_folder = join(folder, "..", "context_data")
    rn = gris_run_number(f)
    date = date_from_fn(f)
    pattern = f"{date.strftime('%d%b%y').lower()}.{rn:03d}_coord_mu.sav"
    candidates = glob(join(context_folder, pattern))
    if candidates:
        return candidates[0]
    else:
        return None
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276


def run_translate_header(cmd_line_args):
    parser = ArgumentParser()

    parser.add_argument(
        "folder_in",
        help="folder to be processed, needs to be a GRIS level1_split folder",
    )
    parser.add_argument(
        "--folder_out",
        help="path where output is written. Defaults to input folder",
        default=None,
    )

277
278
279
280
281
    parser.add_argument(
        "-r",
        "--run",
        help="run number",
        default=None,
Carl Schaffer's avatar
Carl Schaffer committed
282
        type=int
283
284
    )

285
286
287
    parser.add_argument(
        "-v",
        "--verbose",
Carl Schaffer's avatar
Carl Schaffer committed
288
        help="don't silence warnings",
289
290
291
292
293
294
295
296
297
298
299
        action="store_true",
        default=False,
    )

    args = parser.parse_args(cmd_line_args)
    folder_in = args.folder_in

    folder_out = args.folder_out
    if folder_out:
        setup_outdir(folder_out, overwrite=True)

300
    # Get list of input files
301
302
    if not isdir(folder_in):
        raise IOError(f"Directory {folder_in} not found!")
303
    files = sorted(glob(join(folder_in, "*.fits")))
304
305
    if not files:
        files = sorted(glob(join(folder_in, 'level1_split', "*.fits")))
306
307
    # If run is given, filter by run
    if args.run is not None:
Carl Schaffer's avatar
typo    
Carl Schaffer committed
308
        files = groupby_gris_run(files)[args.run]
309

310
    # Filter warnigns by default unless verbose flag is set
311
312
313
314
    if not args.verbose:
        filterwarnings("ignore", ".*", UserWarning)
        filterwarnings("ignore", ".*", VerifyWarning)

315
    # Main Loop
316
317
318
319
    progress_bar = tqdm(files)
    skipped = 0
    for f in progress_bar:
        progress_bar.set_description(f"Modifying {f}")
320

Carl Schaffer's avatar
Carl Schaffer committed
321
        outfile = f.replace(folder_in, folder_out) if folder_out else f
322
        try:
Carl Schaffer's avatar
Carl Schaffer committed
323
            main(f, outfile=outfile, overwrite=True)
324
325
326
327
        except NothingToDoError:
            skipped += 1
            continue
    if skipped > 0:
328
        print(f"Skipped {skipped} files which were already done.")
Carl Schaffer's avatar
Carl Schaffer committed
329
330


Carl Schaffer's avatar
Carl Schaffer committed
331
if __name__ == "__main__":
Carl Schaffer's avatar
Carl Schaffer committed
332
    main(*sys.argv[1:])