""" Author: Carl Schaffer Date: 2018 December 19th Mail: schaffer@leibniz-kis.de Scripts to generate a dataframe representing the calibration settings used for calibrating the gris archive. The general idea is to find all calibration files and represent each call to calibration routines ocurring in these files as a line in a pandas DataFrame. each row in the dataframe represents a variable within the idl environment when calling the command. """ import datetime import pickle import re from glob import glob from os.path import basename, join, exists import numpy as np import pandas as pd from kis_tools.util.util import get_fits_header_df from kis_tools.util.util import gris_run_number, groupby_function ################################################## # Processing of Calfiles ################################################## def remove_comments(string, comment_sign=";"): """Take a string, and remove all comments, this is done by splitting the text into lines and then dropping all parts of each line that occur after the first comment character.""" lines = string.split("\n") clean_lines = [] for l in lines: clean_lines.append(l.split(comment_sign)[0]) return "\n".join(clean_lines) def process_calfile(fn): """loop over the lines of a gris calibration file and log the state of all defined variables as well as the command each time a command is called """ # open file with open(fn, "r") as infile: text = infile.read() # ignore all comments text = remove_comments(text) results = [] data = {"filename": fn} # loop over lines for line in text.split("\n"): # check if line contains an idl command, commands are assumed # to be names of routines followed by a opening parenthesis or # a comma sign if re.search(r"^([^,=\(])+[,\(]", line): data["command"] = line.strip() results.append(data.copy()) # if a line does not contain data, check if it contains any # variable assignments, if a variable is already defined it is # overwritten else: matches = re.findall(r"([a-zA-Z_]+)\s*=\s*([^\n]+)", line) for m in matches: data[m[0]] = m[1] return results def find_commented_files(calfiles): """Generate a dataframe containing a list pf all files containing commented calibration routines""" df = pd.DataFrame() for c in calfiles: text = open(c, "r") commented = False for line in text: if re.search(r"[^\S\n]*;+[^\S\n]*gris[^\n]+,", line): commented = True print(c) print(line) if commented: df = df.append({"filename": c}, ignore_index=True) return df def get_calfile_df( archive="/dat/sdc/gris/", pattern="*/cal[0-9][0-9]?????.pro" ): """ Construct a pandas DataFrame containing information on all calls to gris calibration routines. Each row in the dataframe represents a variable within the idl environment when calling the command. Args: archive: path to archive pattern: unix style pattern to select all files Returns: result: DataFrame ordered and cleaned representing the matched calfiles """ # find all calfiles calfiles = glob(join(archive, pattern)) # initialize empty dataframe and fill from calfiles df = pd.DataFrame() for c in calfiles: results = process_calfile(c) for r in results: df = df.append(r, ignore_index=True) # sometimes 'file' is used instead of map, fix this missing_map = df[df.map.isnull()] df.loc[missing_map.index, "map"] = df.loc[missing_map.index, "file"] # make index by cleaning map column and adding data and run columns df["map"] = clean_col(df["map"]) df["runid"] = df.map.str.extract(r"(\d\d\w\w\w\d\d.\d\d\d)") df["run"] = pd.to_numeric(df["runid"].str.split(".").str.get(1)) df["date"] = pd.to_datetime(df["runid"].str.split(".").str.get(0)) df = df.set_index(["date", "run"]) # separate flatfiled and calibration files into separate columns df[["ff1", "ff2"]] = split_file_list(df.fileff) df[["cal1", "cal2"]] = split_file_list(df.filecal) # clean dark column df["filedc"] = clean_col(df["filedc"].astype(str)) # extract boolean keywords from command, boolean keywords are # preceded by a frontslash e.g. /xtalk df["boolean_keywords"] = np.nan for i, r in df.iterrows(): c = r["command"] matches = re.findall(r"(/\w+)+", c) if matches: df.loc[i, "boolean_keywords"] = ",".join(matches) # generate column for called routine df["main_routine"] = extract_main_routine(df.command) # replace invalid values with nan df = df.replace("None", np.nan) df = df.replace("nan", np.nan) # extract value for keywords from command call keywords = ["lambda", "rotator_offset", "filedc", "xtau", "data", "pzero"] for k in keywords: df[k] = extract_kw_value_from_command(df, k) # select relevant columns columns = [ "map", "main_routine", "ff1", "ff2", "cal1", "filedc", "lambda", "rotator_offset", "xtau", "data", "pzero", "boolean_keywords", ] result = df[columns].copy() result.sort_index(inplace=True) return result ################################################## # Cleaning and ordering of results ################################################## def extract_main_routine(column): """extract name of main routine in a command line""" return column.str.extract(r"(^[^,\(]+)[,\(]") def clean_col(column): """make values in a column containing filenames consistent by removing all remaining list artefacts, trailing spaces, or unnecessary directory information""" column = column.str.replace("[", "") column = column.str.replace("]", "") column = column.str.replace("'", "") column = column.replace(np.nan, "") column = column.str.strip() column = column.apply(lambda x: basename(x) if x else np.nan) return column def split_file_list(column): """transform columns containing a comma separated list into multiple columns, clean the columns afterwards""" df = column.str.split(",", expand=True) for c in df.columns: df[c] = clean_col(df[c].astype(str)) return df def extract_kw_value_from_command(df, kw): pat = r"{}\s*=\s*([\s\d,\.\[\]+-]+)[,$]".format(kw) padded = df["command"] + "," extracted = padded.str.extract(pat)[0] if kw in df.columns: extracted = extracted.astype(df[kw].dtype) extracted = extracted.combine_first(df[kw]) return extracted def get_best_ffs(runname, l0kws): """Get the two best ffs for a given gris run, only flatfields Procedure: 1: Get all flats from same day 2: Calculate the time difference to the measurement for each flat 3: Split into 'before' and 'after' sets 4: Try to successively find the closest flat in each of these categories: a: parameters match and delta_t < 2 b: parameters don't match and delta_t < 2 c: parameters match and same day d: parameters don't match and same day Args: runname : run identifier formatted as "28may19.001" l0kws : pandas DataFrame containing FITS headers for level0 files Returns: results : list of flatfields either empty one or two elements """ print(runname) verbname, runnumber = re.search(r"(\d+\w+\d+)\.(\d\d\d)", runname).groups() date = pd.to_datetime(verbname).date() runnumber = int(runnumber) # get all entries matching the day day = l0kws[str(date)][ ["date", "run", "ACCUMULA", "EXPTIME", "MEASURE", "FILENAME"] ].copy() run = day[day.run == runnumber] if run.empty: return [] # select all flatfield measurements candidates = day[day.MEASURE == "flat field"] if candidates.empty: return [] # calculate time_differences in hours candidates["delta_t"] = candidates.FILENAME.apply( lambda x: get_time(x, l0kws) ) candidates.delta_t = candidates.delta_t - get_time( f"{verbname}.{runnumber:03d}", l0kws ) candidates.delta_t = candidates.delta_t.dt.total_seconds() / (60 * 60) # check if parameters match: pars_to_check = ["ACCUMULA", "EXPTIME"] try: candidates["matching_pars"] = ( run[pars_to_check].values == candidates[pars_to_check] ).sum(axis=1) except: print(runname) return [] # split into before and after: before = candidates[np.sign(candidates.delta_t) == -1.0] after = candidates[np.sign(candidates.delta_t) == 1.0] hits = [] for flat_candidates in [before, after]: if flat_candidates.empty: continue lt_2 = np.abs(flat_candidates.delta_t) <= 2 match_pars = flat_candidates.matching_pars == len(pars_to_check) combinations = [ match_pars & lt_2, lt_2, match_pars, np.full((len(flat_candidates)), True, dtype=bool), ] for c in combinations: if c.sum() == 0: continue flat_candidates.delta_t = np.abs(flat_candidates.delta_t) matches = flat_candidates[c].sort_values("delta_t") hits.append(matches.iloc[0]["run"]) break # make verbose runname as result res = [f"{verbname}.{int(h):03d}" for h in hits] return res def get_time(runname, l0kws): try: verbname, runnumber = runname.split(".") except: return np.nan date = pd.to_datetime(verbname).date() runnumber = int(runnumber.split("-")[0]) day = l0kws[str(date)][ ["date", "run", "ACCUMULA", "EXPTIME", "MEASURE", "DATE-OBS", "UTIME"] ] run = day[day.run == runnumber] if run.empty: return np.nan else: time = run.iloc[0]["UTIME"] time = pd.to_datetime(time).time() dt = datetime.datetime.combine(date, time) return dt def append_command(command, col): """append a command to a column, chacks if each field contains a value, and either appends separated by a comma or sets the value if none is set pars: command: string to be appended col: pd.Series containing the values returns: col: modified column """ col = col.copy() for i, v in col.items(): if v == v: # check for nan v = ",".join([str(v), command]) else: v = command col[i] = v return col def pars_match(run_a, run_b, l0kws): """Check if the parameters for two given runs match pars: run_a, run_b: gris run identifiers DDMMMYY.run such as 24apr14.000 l0kws: pandas DataFrame containing level0 keywords returns: result: boolean """ pars = [] for r in [run_a, run_b]: verbname, runnumber = r.split(".") date = str(pd.to_datetime(verbname).date()) runnumber = int(runnumber) day = l0kws[date] run = day[day.run == runnumber] pars.append(run[["EXPTIME"]].values.tolist()) result = pars[0] == pars[1] return result ################################################## # Main block ################################################## def get_l0_runlist( archive, force_update=False, storage_path="/dat/schaffer/data/" ): """ Get a list of all data files contained in the */level0/ subfolders, log, cal and textfiles are ignored""" fn = "gris_l0_runs_" + archive.replace("/", "_") + ".pkl" fn = join(storage_path, fn) if exists(fn) and not force_update: print(fn) with open(fn, "rb") as cache_file: l0_runlist = pickle.load(cache_file) return l0_runlist else: runpattern = r"(\d{2}\w{3}\d{2}\.\d{3})" pattern = join(archive, "*/level0/*") files = glob(pattern) # filter files: files = list(map(basename, files)) files = list( filter(lambda x: not x[-3:] in ["log", "txt", "cal"], files) ) files = list(filter(lambda x: re.search(runpattern, x), files)) l0_runlist = list( set([re.search(runpattern, r).group(0) for r in files]) ) with open(fn, "wb+") as cache_file: pickle.dump(l0_runlist, cache_file) return l0_runlist if __name__ == "__main__": archive = "/dat/sdc/gris/" # get dataframe from calibration files result = get_calfile_df(archive=archive) df2 = result.sort_index() new = result.loc["2019-09-01"] new.boolean_keywords = new.boolean_keywords.fillna("") + "/lazy/quiet" # with open("/dat/schaffer/projects/grisred/gris_calibration_settings.csv", "a+") as out: # new.to_csv(out, header=False) # check which maps are present: by_run = groupby_function( sorted(get_l0_runlist("/dat/sdc/gris/", force_update=True)), gris_run_number, ) runfiles = [a[0] for _, a in by_run.items()] # remove invalid or obsolete keywords from boolean_keywords column for invalid_option in ["/filt", "/xtalk", "/show"]: result.boolean_keywords = result.boolean_keywords.str.replace( f"{invalid_option},", "" ) result.boolean_keywords = result.boolean_keywords.str.replace( f",{invalid_option}", "" ) result.boolean_keywords = result.boolean_keywords.replace( f"{invalid_option}", np.nan ) # ensure that all maps use the correct calibration routine result["main_routine"] = result.main_routine.replace( "gris_sp_v5", "gris_sp" ) result["main_routine"] = result.main_routine.replace("gris", "gris_v6") result["main_routine"] = result.main_routine.replace("gris_v5", "gris_v6") # replace empty strings with nan result["boolean_keywords"] = result[["boolean_keywords"]].replace( "", np.nan ) # add lazy and quiet keywords result["boolean_keywords"] = append_command( "/lazy,/quiet", result["boolean_keywords"] ) l0kws = get_fits_header_df(runfiles, "/dat/schaffer/data/l0_keywords.pkl") l0kws = l0kws.reset_index() l0kws["datetime"] = pd.to_datetime( l0kws.date.astype(str).str.cat(l0kws.UT, sep=" ") ) l0kws = l0kws.set_index("datetime") flats_rec = pd.read_pickle("recommended_flatfields.pkl") ffstudy = result[["map", "ff1", "ff2"]] ffstudy[["ff1_rec", "ff2_rec"]] = flats_rec fs = ffstudy for c in ["map", "ff1", "ff2", "ff1_rec", "ff2_rec"]: print(c) ffstudy[c + "_time"] = ffstudy[c].apply(lambda x: get_time(x, l0kws)) for c in ["ff1_time", "ff2_time", "ff1_rec_time", "ff2_rec_time"]: ffstudy[c + "diff"] = ( ffstudy["map_time"] - ffstudy[c] ).dt.total_seconds() / (60 * 60) no_f2 = ffstudy[ffstudy.ff2.isnull()] matches = (no_f2.ff1 == no_f2.ff1_rec) | (no_f2.ff1 == no_f2.ff2_rec) spec = l0kws[l0kws["STATES"] == 1][["date", "run", "FILENAME"]] spec_runs = spec["FILENAME"].str.extract(r"(\d\d\w\w\w\d\d.\d\d\d)") result[result.map.isin(spec_runs.values.flatten())].command # check calibration file parameter matching for i, row in result.iterrows(): if "gris_sp" in row["main_routine"]: res = True else: res = pars_match(row["map"], row["cal1"], l0kws) result.loc[i, "cal_match"] = res