#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created by schaffer at 11/6/19 Collection of utility functions specific to GRIS """ import datetime import re from collections import namedtuple from glob import glob from itertools import chain from os.path import basename from pathlib import Path import numpy as np import pandas as pd from astropy.io.fits.hdu.hdulist import fitsopen as fitsopen from kis_tools.util.util import gris_run_number def get_observers(logfile): return extract_names(get_candidates(logfile)) def get_candidates(logfile): with open(logfile, "r") as f: text = f.read().strip() text = "\n".join(text.split("\n")[:5]) observer_parts = [] patterns = [ r"[a-zA-Z\-]*[ \t\r\f\v]*(?:observer|remote)+\w*[ \t\r\f\v]*:?\s+(.+)\n", r"(?:\n|^)*\s*([a-z ,\.]+)\n", ] for p in patterns: res = re.findall(p, text, re.IGNORECASE) if res: for r in res: observer_parts.append(r) break return ",".join(observer_parts) def extract_names(obs_candidates): # check for Capitalized names obs_candidates.replace(" and ", ",") observers = [c for c in obs_candidates.split(",") if re.search(r"[A-Z][a-z]", c)] return observers def observer_data_frame(): files = glob("/dat/sdc/gris/*/*.txt") logfiles = list(filter(lambda x: re.match(r"\d{8}\.txt", basename(x)), files)) observers = [] dates = [] for f in logfiles: observers.append(",".join(get_observers(f))) date = datetime.datetime.strptime(basename(f), "%Y%m%d.txt") dates.append(date) df = pd.DataFrame(dict(date=dates, observers=observers)) df = df.set_index("date") print(df.head()) def extract_position_kws_from_manolo(raw_gris_file): """ Extract coordinate information from an observation, possibly spliced into multiple gris files. Extracts date and run information from the filename, searches for files of the same observation and determines coordinate info. Coordinate info is collected for every step of the first map found. The keywords are averaged across the map, resulting in the coordinates of the map center. Args: raw_gris_file: path or stub of gris file name like 14apr20.001[-01][cc|r] Returns: pos: dictionary with member access to position values """ # get coord info and bounding box folder = Path(raw_gris_file).parent assert folder.exists(), f"Input folder {folder} not found!" runnumber = gris_run_number(raw_gris_file) pattern = f"*.{runnumber:03d}*" files = [f for f in folder.glob(pattern) if not str(f).endswith('m')] print(files) headers = [] for f in sorted(files): with fitsopen(f) as hdu: hdu[0].verify("silentfix") headers.append(hdu[0].header) assert headers, f"Empty list of headers, are there files in {str(folder)}? Pattern was: '{pattern}'." pos_kws = [ "IFILE", "SLITORIE", "SLITPOSX", "SLITPOSY", "SLITLATI", "SLITLONG", "B0ANGLE", "P0ANGLE", "PARAANGL", "AZIMUT", "ELEVATIO", "L0ANGLE", "ROTANGLE", "ROTCODE", "FILENAME", "RA", "DEC", ] PositionInfo = namedtuple("PositionInfo", field_names=pos_kws) def info_from_env(env): # default for rotcode and angle, they were added in 2016 defaults = dict( ROTCODE=1, ROTANGLE=0, ) kwargs = {**defaults, **{k: env[k] for k in pos_kws if k in env}} return PositionInfo(**kwargs) env = {} pos_infos = None trigger_kw = "ISTEP" for key, value in chain(*[h.items() for h in headers]): if str(value).strip() == '' and key in env.keys(): value = env[key] env[key] = value next_map = key == 'ISERIE' and value == 2 if next_map: # Stop iteration after first map break if key == trigger_kw and value != 1: if pos_infos is None: pos_infos = [None] * env["STEPS"] pos_infos[env["ISTEP"] - 1] = info_from_env(env) pos_infos[env["ISTEP"] - 1] = info_from_env(env) pos_infos = np.array(pos_infos) # coords map 1 indexes = {k: i for i, k in enumerate(pos_kws)} pos = dict( slitposx=pos_infos[:, indexes["SLITPOSX"]].astype(float).mean(), slitposy=pos_infos[:, indexes["SLITPOSY"]].astype(float).mean(), slitlati=pos_infos[:, indexes["SLITLATI"]].astype(float).mean(), slitlong=pos_infos[:, indexes["SLITLONG"]].astype(float).mean(), slitorie=pos_infos[:, indexes["SLITORIE"]].astype(float).mean(), l0_angle=pos_infos[:, indexes["L0ANGLE"]].astype(float).mean(), b0_angle=pos_infos[:, indexes["B0ANGLE"]].astype(float).mean(), p0_angle=pos_infos[:, indexes["P0ANGLE"]].astype(float).mean(), parallax_angle=pos_infos[:, indexes["PARAANGL"]].astype(float).mean(), azimuth=pos_infos[:, indexes["AZIMUT"]].astype(float).mean(), elevation=pos_infos[:, indexes["ELEVATIO"]].astype(float).mean(), date_beg=headers[0]["DATE-OBS"] + " " + headers[0]["UT"] ) return pos