Commit 3b201551 authored by Carl Schaffer's avatar Carl Schaffer
Browse files

Merge branch 'run_io' into 'master'

orientation and coordinate validation

See merge request !180
parents ec12625e 14e4a41b
......@@ -10,6 +10,7 @@ import os # path and file manipulations
import sys # error handling
from glob import glob
from os.path import basename, exists, join, dirname
from pathlib import Path
import astropy.io.fits as fitsio # fits reading and writing
import astropy.units as u
......@@ -20,8 +21,9 @@ from astropy.wcs import WCS
from kis_tools.generic.fits import FitsFile
from kis_tools.gris.coord_correction_ml import get_coords_ml
from kis_tools.gris.util import gregor_parallactic_angle, telescope_to_hpc
from kis_tools.gris.util import telescope_to_hpc, expand_coords, calculate_slit_translation, calculate_slit_angle
from kis_tools.util.calculations import estimate_telescope_drift
from kis_tools.util.constants import gris_stepsize_along_slit
from kis_tools.util.util import date_from_fn, gris_obs_mode
logger = logging.getLogger(__name__)
......@@ -46,6 +48,8 @@ class GrisFitsFile(FitsFile):
def __init__(self, filename):
"""Constructor"""
if isinstance(filename, Path):
filename = str(filename.resolve())
super().__init__(filename)
assert exists(filename)
self.properties = {}
......@@ -78,7 +82,7 @@ class GrisFitsFile(FitsFile):
# Extract Run-, Map-, and Stepnumber, as well as mode string
self.runnumber = int(name[4])
self.mapnumber = int(name[5])
self.slitposnumber = name[6]
self.slitposnumber = int(name[6])
self.modestring = name[3]
@property
......@@ -149,16 +153,22 @@ class GrisFitsFile(FitsFile):
WARNING! The code below is not checked for correctness! We do not know what direction SLITORIE references
off and coordinates provided by GREGOR are very error-prone.
off and coordinates provided by GREGOR are very error-prone. If you need precision better than ~50" this
function is not sufficient.
"""
h = self.header
slit_length_pixels = h["NAXIS2"] if self.old_header else h['NAXIS1']
center_pixel = slit_length_pixels // 2
slit_length_pixels = self.NAXIS2 if self.old_header else self.NAXIS1
# Slit center in telescope coordinates
x_slitcenter = h["SLITPOSX"] if h["SLITPOSX"] != "" else 0
y_slitcenter = h["SLITPOSY"] if h["SLITPOSY"] != "" else 0
x_slitcenter = self.SLITPOSX if self.header['SLITPOSX'] != "" else 0
y_slitcenter = self.SLITPOSY if self.header['SLITPOSY'] != "" else 0
coord_ref = slit_length_pixels // 2
if "SLITCNTR" in self.header:
if self.SLITCNTR.strip() != 'scancntr':
raise ValueError(
f'Cannot determine reference pixel for coordinates! SLITCNTR is {self.SLITCNTR} instead of scancntr')
# Transform to HPC (shift by telescope centering, rotate by p0 angle)
delta_x, delta_y, std_delta_x, std_delta_y = self.telescope_centering
......@@ -176,23 +186,35 @@ class GrisFitsFile(FitsFile):
else:
x_hpc, y_hpc = telescope_to_hpc(x_slitcenter, y_slitcenter, self.p0_angle, delta_x, delta_y)
# Setup grid of pixels
delta_pix = [i - center_pixel for i in range(slit_length_pixels)]
delta_pix = np.array(delta_pix)
arcsec_per_pixel_x = h["STEPSIZE"] # assuming square pixels
arcsec_per_pixel_y = (
h["STEPSIZE"] if h["STEPSIZE"] else 0.135
) # assuming square pixels
try:
angle = self.slit_orientation
except ValueError as e:
logger.warning(f"Could not determine Slit orientation! \n {e}")
angle = 0
# generate coordinates for full grid of slit pixels
X = x_hpc - np.sin(angle) * delta_pix * arcsec_per_pixel_x
Y = y_hpc - np.cos(angle) * delta_pix * arcsec_per_pixel_y
# With the center pixel value given, the coordinates need to be expanded to a full array.
X, Y = expand_coords(x_hpc, y_hpc,
slit_orientation=angle,
reference_pixel=coord_ref,
stepsize_slit=gris_stepsize_along_slit,
slit_length_pixels=slit_length_pixels)
# Coordinates need to be shifted by the step number of the slit. If the derotator is in
# or the stepangle is set to 0, each of these factors introduces a flip in the map, canceling
# each other out if both are true. If a flip is present, we need to start at the maximum slit
# positions and move slitpos steps towards slitposition 0.
reversed_scanning = self.step_angle == 0
derotator_in = self.derotator_in
slit_number = self.slitposnumber
if reversed_scanning != derotator_in:
slit_number = self.number_of_maps - slit_number - 1
coord_shift = calculate_slit_translation(slit_number=slit_number, stepsize_perp=self.STEPSIZE,
slit_orientation=angle)
X = X + coord_shift[0]
Y = Y + coord_shift[1]
coord_array = np.array([X, Y]) * u.arcsec
# reorder axes for consistency with other coordinate retrieval methods
......@@ -204,6 +226,10 @@ class GrisFitsFile(FitsFile):
@property
def coords(self):
"""
Returns:array(float) shape (2,len(slit)) helioprojective coordinates in arcsecnd along slit
"""
try:
coords = self._coords_from_wcs
except AssertionError:
......@@ -225,33 +251,54 @@ class GrisFitsFile(FitsFile):
Returns: zenit float in degrees
Raises Value error if elevation is not contained in the header
"""
zenit = 90 - self.elevation
return zenit
@property
def elevation(self):
"""
Get elevation angle
Returns: in degrees
Raises Value error if elevation is not contained in the header
"""
elev = self._get_value(['ELEV_ANG', 'ELEVATIO'])
if elev == '':
raise ValueError(f'No elevation tracked for {self.__repr__()}')
zenit = 90 - elev
return zenit
return elev
@property
def slit_orientation(self):
"""Get slitorie:
clockwise angle from the western section of the solar equator
"""Get slit orientation:
clockwise angle from the solar equator, scan direction is 90° further in clockwise direction.
Attempts to correct for derotator and image rotation, usually correct within a couple 10s of degrees.
Returns:
slitorie: float (degrees)
"""
# Pull Values from header
p0_angle = self.p0_angle
rotangle = self.ROTANGLE if 'ROTANGLE' in self.header else None
azimut = self._get_value(['AZIMUT'])
zenit = self.zenit
slitorie = self.SLITORIE
const_term = -130 # determined by trial and error to match results from cross correlation with HMI
alpha = -1 * gregor_parallactic_angle(self.obs_time) + azimut - zenit - p0_angle + const_term
if self.derotator_in:
rotangle = self.ROTANGLE
alpha = -2 * rotangle + (const_term / 2)
angle = slitorie - alpha
elev = self.elevation
# Calculate Angel
angle = calculate_slit_angle(
rotangle=rotangle,
azimuth=azimut,
elevation=elev,
p0_angle=p0_angle,
date=self.obs_time
)
# *******************************************************************************************
# **************WARNING!*********************************************************************
# *******************************************************************************************
#
# We currently do not have a validated way to calculate the orientation of the slit. If there is a
# cross correlation file, we use that, otherwise we return '0' with a warning
return angle
......
......@@ -12,18 +12,21 @@ argument is passed, it will attempt to process a dummy folder.
import datetime
import re
import sys
from glob import glob # get filenames in unix style
from os.path import join, dirname, abspath, isdir, basename
from pathlib import Path
import astropy.io.fits as fitsio
import matplotlib.pyplot as plt
import numpy as np
from pandas import to_datetime
from tqdm import tqdm
from kis_tools.gris.GrisFitsFile import GrisFitsFile
from kis_tools.gris.util import get_observers
from kis_tools.util.locplot import make_loc_plot
from kis_tools.util.util import reglob, gris_run_number
from kis_tools.util.util import reglob, gris_run_number, groupby_function
class GrisRun(object):
......@@ -82,6 +85,7 @@ class GrisRun(object):
if not gris_fits_files:
raise ValueError("Empty list of files passed to GrisRun")
# Setup data containers
self.properties = {}
self.files_broken = []
......@@ -95,7 +99,7 @@ class GrisRun(object):
# check if inputs are already instances of GrisFitsFile, if
# not, generate instances
elif isinstance(gris_fits_files[0], str):
elif not isinstance(gris_fits_files[0], GrisFitsFile):
gris_fits_files = [
GrisFitsFile(gff) for gff in gris_fits_files
]
......@@ -118,6 +122,25 @@ class GrisRun(object):
# Look for preview images and log files
self.get_previews()
@classmethod
def from_date_run(cls, date, run_number):
# Check date for validity, try to cast to date
if not isinstance(date, datetime.datetime):
date = to_datetime(date)
# Check operating system and set path accordingly
# Assumes mars mounted at Y: on windows
if sys.platform == "win32":
archive_root = Path('Y:\dat')
else:
archive_root = Path('/dat')
archive = archive_root / 'sdc' / 'gris' / date.strftime("%Y%m%d") / 'level1_split'
# Glob for files matching the run
pattern = f'*l1?_{run_number:03d}*'
return GrisRun(list(archive.glob(pattern)))
def parse(self):
"""Performs all information gathering that requires opening
individual fits files instead of only using info from their
......@@ -285,7 +308,7 @@ class GrisRun(object):
@property
def spatial_uncertainties(self):
"""Average Uncertainties as described by WCS"""
return np.mean(self.query("CSYER1")), np.mean(self.query("CSYER2"))
return self.files[0].coord_uncertainty
@property
def map_previews(self):
......@@ -525,14 +548,19 @@ class GrisRun(object):
list(filter(lambda x: x.mapnumber == first_map_id, self.files)),
key=lambda x: x.filename,
)
first_coords = map_files[0]._coords_from_wcs
last_coords = map_files[-1]._coords_from_wcs
first_coords = map_files[0].coords
last_coords = map_files[-1].coords
x1 = first_coords[0, :].value
x2 = first_coords[-1, :].value
x3 = last_coords[-1, :].value
x4 = last_coords[0, :].value
return x1, x2, x3, x4
@property
def maps_dict(self):
grouped = groupby_function(self.files,lambda x: x.mapnumber)
return grouped
# noinspection PyMethodOverriding
class EmptyGrisRun(GrisRun):
def __init__(self, path):
......
# GRIS Curation Scripts
This is a collection of curation scripts used within the GRIS archive. They were written to solve very specific tasks and are very likely to break if anything changes within the package. Use with care, they might be broken or do something unexpected!
\ No newline at end of file
"""
Author: Carl Schaffer Date: 2018 December 19th
Mail: schaffer@leibniz-kis.de
Scripts to generate a dataframe representing the calibration settings
used for calibrating the gris archive. The general idea is to find all
calibration files and represent each call to calibration routines
ocurring in these files as a line in a pandas DataFrame. each row in
the dataframe represents a variable within the idl environment when
calling the command.
"""
import datetime
import pickle
import re
from glob import glob
from os.path import basename, join, exists
import numpy as np
import pandas as pd
from kis_tools.util.util import get_fits_header_df
from kis_tools.util.util import gris_run_number, groupby_function
##################################################
# Processing of Calfiles
##################################################
def remove_comments(string, comment_sign=";"):
"""Take a string, and remove all comments, this is done by
splitting the text into lines and then dropping all parts of each
line that occur after the first comment character."""
lines = string.split("\n")
clean_lines = []
for l in lines:
clean_lines.append(l.split(comment_sign)[0])
return "\n".join(clean_lines)
def process_calfile(fn):
"""loop over the lines of a gris calibration file and log the
state of all defined variables as well as the command each time a
command is called"""
# open file
with open(fn, "r") as infile:
text = infile.read()
# ignore all comments
text = remove_comments(text)
results = []
data = {"filename": fn}
# loop over lines
for line in text.split("\n"):
# check if line contains an idl command, commands are assumed
# to be names of routines followed by a opening parenthesis or
# a comma sign
if re.search(r"^([^,=\(])+[,\(]", line):
data["command"] = line.strip()
results.append(data.copy())
# if a line does not contain data, check if it contains any
# variable assignments, if a variable is already defined it is
# overwritten
else:
matches = re.findall(r"([a-zA-Z_]+)\s*=\s*([^\n]+)", line)
for m in matches:
data[m[0]] = m[1]
return results
def find_commented_files(calfiles):
"""Generate a dataframe containing a list pf all files containing
commented calibration routines"""
df = pd.DataFrame()
for c in calfiles:
text = open(c, "r")
commented = False
for line in text:
if re.search(r"[^\S\n]*;+[^\S\n]*gris[^\n]+,", line):
commented = True
print(c)
print(line)
if commented:
df = df.append({"filename": c}, ignore_index=True)
return df
def get_calfile_df(archive="/dat/sdc/gris/", pattern="*/cal[0-9][0-9]?????.pro"):
"""Construct a pandas DataFrame containing information on all
calls to gris calibration routines. Each row in the dataframe
represents a variable within the idl environment when calling the
command.
Args:
archive: path to archive
pattern: unix style pattern to select all files
Returns:
result: DataFrame ordered and cleaned representing the matched calfiles
"""
# find all calfiles
calfiles = glob(join(archive, pattern))
# initialize empty dataframe and fill from calfiles
df = pd.DataFrame()
for c in calfiles:
results = process_calfile(c)
for r in results:
df = df.append(r, ignore_index=True)
# sometimes 'file' is used instead of map, fix this
missing_map = df[df.map.isnull()]
df.loc[missing_map.index, "map"] = df.loc[missing_map.index, "file"]
# make index by cleaning map column and adding data and run columns
df["map"] = clean_col(df["map"])
df["runid"] = df.map.str.extract(r"(\d\d\w\w\w\d\d.\d\d\d)")
df["run"] = pd.to_numeric(df["runid"].str.split(".").str.get(1))
df["date"] = pd.to_datetime(df["runid"].str.split(".").str.get(0))
df = df.set_index(["date", "run"])
# separate flatfiled and calibration files into separate columns
df[["ff1", "ff2"]] = split_file_list(df.fileff)
df[["cal1", "cal2"]] = split_file_list(df.filecal)
# clean dark column
df["filedc"] = clean_col(df["filedc"].astype(str))
# extract boolean keywords from command, boolean keywords are
# preceded by a frontslash e.g. /xtalk
df["boolean_keywords"] = np.nan
for i, r in df.iterrows():
c = r["command"]
matches = re.findall(r"(/\w+)+", c)
if matches:
df.loc[i, "boolean_keywords"] = ",".join(matches)
# generate column for called routine
df["main_routine"] = extract_main_routine(df.command)
# replace invalid values with nan
df = df.replace("None", np.nan)
df = df.replace("nan", np.nan)
# extract value for keywords from command call
keywords = ["lambda", "rotator_offset", "filedc", "xtau", "data", "pzero"]
for k in keywords:
df[k] = extract_kw_value_from_command(df, k)
# select relevant columns
columns = [
"map",
"main_routine",
"ff1",
"ff2",
"cal1",
"filedc",
"lambda",
"rotator_offset",
"xtau",
"data",
"pzero",
"boolean_keywords",
]
result = df[columns].copy()
result.sort_index(inplace=True)
return result
##################################################
# Cleaning and ordering of results
##################################################
def extract_main_routine(column):
"""extract name of main routine in a command line"""
return column.str.extract(r"(^[^,\(]+)[,\(]")
def clean_col(column):
"""make values in a column containing filenames consistent by
removing all remaining list artefacts, trailing spaces, or
unnecessary directory information"""
column = column.str.replace("[", "")
column = column.str.replace("]", "")
column = column.str.replace("'", "")
column = column.replace(np.nan, "")
column = column.str.strip()
column = column.apply(lambda x: basename(x) if x else np.nan)
return column
def split_file_list(column):
"""transform columns containing a comma separated list into
multiple columns, clean the columns afterwards"""
df = column.str.split(",", expand=True)
for c in df.columns:
df[c] = clean_col(df[c].astype(str))
return df
def extract_kw_value_from_command(df, kw):
pat = r"{}\s*=\s*([\s\d,\.\[\]+-]+)[,$]".format(kw)
padded = df["command"] + ","
extracted = padded.str.extract(pat)[0]
if kw in df.columns:
extracted = extracted.astype(df[kw].dtype)
extracted = extracted.combine_first(df[kw])
return extracted
def get_best_ffs(runname, l0kws):
"""Get the two best ffs for a given gris run, only flatfields
Procedure:
1: Get all flats from same day
2: Calculate the time difference to the measurement for each flat
3: Split into 'before' and 'after' sets
4: Try to successively find the closest flat in each of these categories:
a: parameters match and delta_t < 2
b: parameters don't match and delta_t < 2
c: parameters match and same day
d: parameters don't match and same day
Args:
runname : run identifier formatted as "28may19.001"
l0kws : pandas DataFrame containing FITS headers for level0 files
Returns:
results : list of flatfields either empty one or two elements
"""
print(runname)
verbname, runnumber = re.search(r"(\d+\w+\d+)\.(\d\d\d)", runname).groups()
date = pd.to_datetime(verbname).date()
runnumber = int(runnumber)
# get all entries matching the day
day = l0kws[str(date)][
["date", "run", "ACCUMULA", "EXPTIME", "MEASURE", "FILENAME"]
].copy()
run = day[day.run == runnumber]
if run.empty:
return []
# select all flatfield measurements
candidates = day[day.MEASURE == "flat field"]
if candidates.empty:
return []
# calculate time_differences in hours
candidates["delta_t"] = candidates.FILENAME.apply(lambda x: get_time(x, l0kws))
candidates.delta_t = candidates.delta_t - get_time(
f"{verbname}.{runnumber:03d}", l0kws
)
candidates.delta_t = candidates.delta_t.dt.total_seconds() / (60 * 60)
# check if parameters match:
pars_to_check = ["ACCUMULA", "EXPTIME"]
try:
candidates["matching_pars"] = (
run[pars_to_check].values == candidates[pars_to_check]
).sum(axis=1)
except:
print(runname)
return []
# split into before and after:
before = candidates[np.sign(candidates.delta_t) == -1.0]
after = candidates[np.sign(candidates.delta_t) == 1.0]
hits = []
for flat_candidates in [before, after]:
if flat_candidates.empty:
continue
lt_2 = np.abs(flat_candidates.delta_t) <= 2
match_pars = flat_candidates.matching_pars == len(pars_to_check)
combinations = [
match_pars & lt_2,
lt_2,
match_pars,
np.full((len(flat_candidates)), True, dtype=bool),
]
for c in combinations:
if c.sum() == 0:
continue
flat_candidates.delta_t = np.abs(flat_candidates.delta_t)
matches = flat_candidates[c].sort_values("delta_t")
hits.append(matches.iloc[0]["run"])
break
# make verbose runname as result
res = [f"{verbname}.{int(h):03d}" for h in hits]
return res
def get_time(runname, l0kws):
try:
verbname, runnumber = runname.split(".")
except:
return np.nan
date = pd.to_datetime(verbname).date()
runnumber = int(runnumber.split("-")[0])
day = l0kws[str(