"""Translate GRIS header to conform to solarnet naming standards gris_translate_header.py: Main script to translate headers. The header translation is done in three steps: Generate an ObservationInfo instance Generate header cards for each entry in the header template Write the data from the original file to a new file along with the new header. """ # standard library imports import re import sys from argparse import ArgumentParser from glob import glob from math import isnan from os.path import join, isdir, dirname from warnings import warn, filterwarnings # third party imports from astropy.io import fits from astropy.io.fits import Header, PrimaryHDU, Card from astropy.io.fits.verify import VerifyWarning from tqdm import tqdm from kis_tools.gris.headers import ObservationInfo from kis_tools.gris.headers.exceptions import NothingToDoError # custom imports from kis_tools.gris.headers.template_builder import ( GRIS_PROPERTIES, HEADER_TEMPLATE, _dirname, hyphenated, ) from kis_tools.gris.headers.wcs import GrisWCSGenerator, reorder_gris_data from kis_tools.util.gitinfo import get_git_info from kis_tools.util.util import ( gen_history_time, gen_history_git, groupby_gris_run, ) from kis_tools.util.util import setup_outdir, gris_run_number, date_from_fn _git_info = get_git_info() def read_metadata(file, hdu=0): """Get header from file""" fits_file = fits.open(file, memmap=False, ignore_missing_end=True) try: hdu = fits_file[hdu] hdu.verify("silentfix") header = fits_file[hdu].header except IndexError: header = None fits_file.close() return header def read_data(file, hdu=0): """Get data from file""" fits_file = fits.open(file, memmap=False, ignore_missing_end=True) try: data = fits_file[hdu].data except IndexError: data = None fits_file.close() return data def make_obs_info(fn): """Generate an ObservationInformation Instance """ # get first HDU header = read_metadata(fn, 0) obs_info = ObservationInfo(header, pedantic=True, filename=fn) return obs_info def process_comment(value): """Expand comments containing commands formatted as COMMAND:value Currently supported comands are - section: Wrap the value in filling characters to visually mark the beginning of a new section - dump: use the value to open a file in the parent directory of this file and dump the contents into the header. If no command is given or it is not recognized, the whole field added to the header. """ card_list = [] try: command, comment_value = re.match(r"^(\w+):(.*$)", value).groups() except AttributeError: return [Card(("COMMENT", value))] command = command.lower() if command == "section": card = ("COMMENT", 72 * "-") card_list.append(Card(*card)) card = ("COMMENT", comment_value.center(72, "-")) card_list.append(Card(*card)) elif command == "dump": file_to_dump = _dirname / comment_value.strip() assert file_to_dump.exists(), f'File {file_to_dump} to dump into header not found!' with open(file_to_dump) as f: for line in f: line = line.strip() card = ("COMMENT", line) card_list.append(Card(*card)) else: warn(f"Could not recognize comment command {comment_value}") return [Card(("COMMENT", value))] return card_list def make_header(obs_info, properties): """Generate header from ObservationInfo instance, cast values to correct types.""" h = {} c = {} for key, entry in properties.items(): comment, value_type = entry value = getattr(obs_info, key) try: value = eval(value_type)(value) except: warn( f"Could not cast {value} to {value_type}, using {type(value)} instead!" ) c[key] = str(comment).strip() h[key] = value if "nn" in key: del c[key] del h[key] for i, ki in enumerate(value.keys()): h[ki] = value[ki] c[ki] = comment.replace("nn", f"{i:02d}") return h, c def main(infile, outfile=None, wcs_generator=None, **kwargs): # noqa: C901 """ Main script. By default it will only print the new header. If an output file is specified the file will be created. The force option lets you overwrite existing output files, if none is specified, the source file is overwritten. Params: infile: path to file outfile: path to outfile """ overwrite = kwargs["overwrite"] if "overwrite" in kwargs else False oi = make_obs_info(infile) h, c = make_header(oi, GRIS_PROPERTIES) h["DATE_BEG"] = h["DATE_BEG"].isoformat() h["DATE_OBS"] = h["DATE_OBS"].isoformat() h["DATE"] = h["DATE"].isoformat() # make card list card_list = [] def make_card(keyword, decimal_digits): """handles None, nan values and round floats""" value = h[keyword] comment = c[keyword] if isinstance(value, float): if isnan(value): value = None elif not isnan(decimal_digits): value = round(value, int(decimal_digits)) card = (keyword, value, comment) return card for index,row in HEADER_TEMPLATE.iterrows(): keyword, default_value, decimal_digits = row['KEYWORD'],row['fixed_val'],row['decimal_digits'] if keyword == "COMMENT": cards = process_comment(default_value) card_list += cards continue if "nn" in keyword: pattern = keyword.replace("nn", r"\d\d") kws = [k for k in h.keys() if re.search(pattern, k)] for k in kws: card = make_card(k, decimal_digits) card_list.append(Card(*card)) elif keyword not in h.keys(): warn(f"Could not find value for {keyword}, skipping") continue else: card = make_card(keyword, decimal_digits) card_list.append(Card(*card)) # fix hyphenated kws back to original form for i, card in enumerate(card_list): if card.keyword in hyphenated: keyword = card.keyword.replace("_", "-") card_list[i] = Card(keyword, card.value, card.comment) # add history from original file and add entry for the header # translation processing step old_hist = list(oi._header["HISTORY"]) hist_main = [gen_history_time("gris_translate_header")] git_info = _git_info hist_git = gen_history_git("gris_translate_header", git_info) hist_entries = old_hist + hist_main + hist_git for hist_entry in hist_entries: card_list.append(Card("HISTORY", hist_entry)) # add WCS kws to card_list wcs_generator = GrisWCSGenerator(infile) wcs_cards = wcs_generator.make_wcs_cards() wcs_index = [c.keyword for c in card_list].index("WCSNAME") for c in wcs_cards: card_list.insert(wcs_index + 1, c) wcs_index += 1 # replace slitpos cards slitpos_cards = wcs_generator.slitpos_cards() for card in slitpos_cards: index = [c.keyword for c in card_list].index(card.keyword) card_list[index] = card # create header from card list header = Header(card_list) DATA = reorder_gris_data(wcs_generator.infile.data, reverse_axes=True) f = PrimaryHDU(data=DATA, header=header) if outfile is not None: f.writeto(outfile, overwrite=overwrite, output_verify="fix", checksum=True) header = read_metadata(outfile, 0) else: try: f.writeto(infile, overwrite=overwrite, output_verify="fix", checksum=True) except OSError: print("File already exists, use -f option to force overwriting") def find_cross_corr_file(f): folder = dirname(f) context_folder = join(folder, "..", "context_data") rn = gris_run_number(f) date = date_from_fn(f) pattern = f"{date.strftime('%d%b%y').lower()}.{rn:03d}_coord_mu.sav" candidates = glob(join(context_folder, pattern)) if candidates: return candidates[0] else: return None def run_translate_header(cmd_line_args): parser = ArgumentParser() parser.add_argument( "folder_in", help="folder to be processed, needs to be a GRIS level1_split folder", ) parser.add_argument( "--folder_out", help="path where output is written. Defaults to input folder", default=None, ) parser.add_argument( "-r", "--run", help="run number", default=None, ) parser.add_argument( "-v", "--verbose", help="don't silence warnings", action="store_true", default=False, ) args = parser.parse_args(cmd_line_args) folder_in = args.folder_in folder_out = args.folder_out if folder_out: setup_outdir(folder_out, overwrite=True) # Get list of input files if not isdir(folder_in): raise IOError(f"Directory {folder_in} not found!") files = sorted(glob(join(folder_in, "*.fits"))) if not files: files = sorted(glob(join(folder_in, 'level1_split', "*.fits"))) # If run is given, filter by run if args.run is not None: files = groupby_gris_run(files)[run] # Filter warnigns by default unless verbose flag is set if not args.verbose: filterwarnings("ignore", ".*", UserWarning) filterwarnings("ignore", ".*", VerifyWarning) # Main Loop progress_bar = tqdm(files) skipped = 0 for f in progress_bar: progress_bar.set_description(f"Modifying {f}") outfile = f.replace(folder_in, folder_out) if folder_out else f try: main(f, outfile=outfile, overwrite=True) except NothingToDoError: skipped += 1 continue if skipped > 0: print(f"Skipped {skipped} files which were already done.") if __name__ == "__main__": main(*sys.argv[1:])