#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created by schaffer at 11/6/19 Collection of utility functions specific to GRIS """ import datetime import re from collections import namedtuple from glob import glob from itertools import chain from os.path import basename from pathlib import Path import astropy.units as u import numpy as np import pandas as pd import pytz from astroplan import Observer from astropy.coordinates import SkyCoord, GCRS, get_sun from astropy.io.fits.hdu.hdulist import fitsopen as fitsopen from sunpy.coordinates import Helioprojective from kis_tools.util.calculations import rotate_around_origin from kis_tools.util.constants import gregor_coords from kis_tools.util.util import gris_run_number def get_observers(logfile): return extract_names(get_candidates(logfile)) def get_candidates(logfile): with open(logfile, "r") as f: text = f.read().strip() text = "\n".join(text.split("\n")[:5]) observer_parts = [] patterns = [ r"[a-zA-Z\-]*[ \t\r\f\v]*(?:observer|remote)+\w*[ \t\r\f\v]*:?\s+(.+)\n", r"(?:\n|^)*\s*([a-z ,\.]+)\n", ] for p in patterns: res = re.findall(p, text, re.IGNORECASE) if res: for r in res: observer_parts.append(r) break return ",".join(observer_parts) def extract_names(obs_candidates): # check for Capitalized names obs_candidates.replace(" and ", ",") observers = [c for c in obs_candidates.split(",") if re.search(r"[A-Z][a-z]", c)] return observers def observer_data_frame(): files = glob("/dat/sdc/gris/*/*.txt") logfiles = list(filter(lambda x: re.match(r"\d{8}\.txt", basename(x)), files)) observers = [] dates = [] for f in logfiles: observers.append(",".join(get_observers(f))) date = datetime.datetime.strptime(basename(f), "%Y%m%d.txt") dates.append(date) df = pd.DataFrame(dict(date=dates, observers=observers)) df = df.set_index("date") print(df.head()) def extract_position_kws_from_manolo(raw_gris_file): """ Extract coordinate information from an observation, possibly spliced into multiple gris files. Extracts date and run information from the filename, searches for files of the same observation and determines coordinate info. Coordinate info is collected for every step of the first map found. The keywords are averaged across the map, resulting in the coordinates of the map center. Args: raw_gris_file: path or stub of gris file name like 14apr20.001[-01][cc|r] Returns: pos: dictionary with member access to position values """ # get coord info and bounding box folder = Path(raw_gris_file).parent assert folder.exists(), f"Input folder {folder} not found!" runnumber = gris_run_number(raw_gris_file) pattern = f"*.{runnumber:03d}*" files = [f for f in folder.glob(pattern) if not str(f).endswith('m')] print(files) headers = [] for f in sorted(files): with fitsopen(f) as hdu: hdu[0].verify("silentfix") headers.append(hdu[0].header) assert headers, f"Empty list of headers, are there files in {str(folder)}? Pattern was: '{pattern}'." pos_kws = [ "IFILE", "SLITORIE", "SLITPOSX", "SLITPOSY", "SLITLATI", "SLITLONG", "B0ANGLE", "P0ANGLE", "PARAANGL", "AZIMUT", "ELEVATIO", "L0ANGLE", "ROTANGLE", "ROTCODE", "FILENAME", "RA", "DEC", ] PositionInfo = namedtuple("PositionInfo", field_names=pos_kws) def info_from_env(env): # default for rotcode and angle, they were added in 2016 defaults = dict( ROTCODE=1, ROTANGLE=0, ) kwargs = {**defaults, **{k: env[k] for k in pos_kws if k in env}} return PositionInfo(**kwargs) env = {} pos_infos = None trigger_kw = "ISTEP" step = 0 for key, value in chain(*[h.items() for h in headers]): if str(value).strip() == '' and key in env.keys(): value = env[key] env[key] = value next_map = key == 'ISERIE' and value == 2 if next_map: # Stop iteration after first map break if key == trigger_kw and value != 1: if pos_infos is None: pos_infos = [None] * env["STEPS"] pos_infos[step] = info_from_env(env) step += 1 pos_infos[step] = info_from_env(env) pos_infos = np.array(pos_infos) # coords map 1 indexes = {k: i for i, k in enumerate(pos_kws)} pos = dict( slitposx=pos_infos[:, indexes["SLITPOSX"]].astype(float).mean(), slitposy=pos_infos[:, indexes["SLITPOSY"]].astype(float).mean(), slitlati=pos_infos[:, indexes["SLITLATI"]].astype(float).mean(), slitlong=pos_infos[:, indexes["SLITLONG"]].astype(float).mean(), slitorie=pos_infos[:, indexes["SLITORIE"]].astype(float).mean(), l0_angle=pos_infos[:, indexes["L0ANGLE"]].astype(float).mean(), b0_angle=pos_infos[:, indexes["B0ANGLE"]].astype(float).mean(), p0_angle=pos_infos[:, indexes["P0ANGLE"]].astype(float).mean(), parallax_angle=pos_infos[:, indexes["PARAANGL"]].astype(float).mean(), azimuth=pos_infos[:, indexes["AZIMUT"]].astype(float).mean(), elevation=pos_infos[:, indexes["ELEVATIO"]].astype(float).mean(), date_beg=headers[0]["DATE-OBS"] + " " + headers[0]["UT"] ) return pos def gregor_parallactic_angle(time, hpc_x=None, hpc_y=None): """ Calculates parallactic angle for GREGOR based on a time and helioprojective coordinates on the sun. Args: time: time of observation hpc_x: helioprojective x (defaults to sun center) hpc_y: helioprojective y (defaults to sun center) Returns: angle: parallactic angle in degrees """ # Define observer and add timezone (assume tenerife time) gregor_observer = Observer(location=gregor_coords) date_tz = pytz.utc.localize(time) time = gregor_observer.datetime_to_astropy_time(date_tz) # Build Sky Coord object, default to sun center at time if hpc_x and hpc_y: sun_coords = SkyCoord(hpc_x * u.arcsec, hpc_y * u.arcsec, frame=Helioprojective, observer='earth', obstime=time) sun_coords.representation_type = 'spherical' sun_coords = sun_coords.transform_to(GCRS) else: sun_coords = get_sun(time) angle = gregor_observer.parallactic_angle(time, sun_coords) angle = angle.to('degree').value return angle def telescope_to_hpc(slitposx=None, slitposy=None, p0_angle=None, xcenter=None, ycenter=None): """ Transform GRIS header coordinate data to HPC coordinates Args: slitposx: float SLITPOSX in arcsec slitposy: float SLITPOSY in arcsec p0_angle: float p0_angle in degrees xcenter: float X_center in arcsec (stored in GDBS upon centering the telescope) ycenter: float Y_center in arcsec (stored in GDBS upon centering the telescope) Returns: hpc_x, hpc_y tuple of helioprojective coordinates in arcseconds """ xi = slitposx - xcenter yi = slitposy - ycenter rotated = rotate_around_origin((xi, yi), np.radians(p0_angle)) return rotated