Commit 7e1e7139 authored by Carl Schaffer's avatar Carl Schaffer
Browse files

Merge branch 'get_file_wrapper' into 'master'

Get file wrapper

See merge request !106
parents 1a623212 34fbd200
__version__='2.0.0'
__version__='2.1.0'
from .generic import get_file_wrapper
\ No newline at end of file
......@@ -40,6 +40,10 @@ class ChrotelFitsFile(FitsFile):
"""
# List header cleaning functions here
# Every function should accept and return a header dictionary
header_cleaning_functions = []
# List of keywords to extract from header, values are passed
# through self.query
bson_keys = {
......
from sda_kharon.file_sorter import sort_file
from .SDCMongoClient import SDCMongoClient
from .field_list import chrotel_fields
from .field_list import gris_fields
from .field_list import lars_fields
from .field_list import sdc_fields
from .fits import FitsFile
from ..chrotel.ChrotelFitsFile import ChrotelFitsFile
from ..gris.GrisFitsFile import GrisFitsFile
from ..util import auth
sdc = SDCMongoClient(**auth)
......@@ -17,3 +22,27 @@ def cleaner(instrument):
for c in db.list_collection_names():
if instrument in c:
db[c].drop()
def get_file_wrapper(filename, instrument=None):
"""
Get file wrapper (FitsFile or child class) for a fits file
Args:
filename: path to the file
instrument: name of the instrument, if not provided automatic detection is tried
Returns:
Instance of wrapped file
"""
wrappers = dict(
gris=GrisFitsFile,
chrotel=ChrotelFitsFile,
bbi=FitsFile,
lars=FitsFile
)
if instrument is None:
res = sort_file(filename)
instrument = res.instrument
wrapper = wrappers[instrument]
return wrapper(filename)
......@@ -6,6 +6,7 @@ from os.path import exists, basename
import astropy.io.fits as fitsio
import numpy as np
from ..util import calc_mu, calc_theta
from ..util.locplot import get_hmi_continuum
from sunpy.map import Map
......@@ -22,6 +23,9 @@ class FitsFile:
"""
# Implement cleaning functions here, the functions need to modify and return a header object
header_cleaning_functions = []
def __init__(self, filename):
"""Constructor"""
assert exists(filename), f"File {filename} does not exist!"
......@@ -178,7 +182,7 @@ class FitsFile:
@property
def bson_header(self):
"""
Clean fits header to make compatible with BSON. Enuwerates duplicate Keys.
Clean fits header to make compatible with BSON. Enumerates duplicate Keys.
Returns:
......@@ -193,6 +197,15 @@ class FitsFile:
del header_dict[key]
return header_dict
def get_cleaned_header(self):
"""Clean the header, applies all header cleaning functions defined in this class to the header
contained in the file. All header cleaning functions need to take a header object and return a new header"""
header_in = self.bson_header
for cleaning_func in self.header_cleaning_functions:
header_in = cleaning_func(header_in)
self.cleaned_header = header_in
return header_in
def __str__(self):
outstring = "{} {} from {}\n"
outstring = outstring.format(
......
......@@ -7,8 +7,13 @@ from unittest.mock import patch
import bson.objectid
import gridfs
import kis_tools.util.util as util
from kis_tools import get_file_wrapper
from kis_tools.generic.SDCMongoClient import SDCMongoClient
from importer_test_data import test_data
from kis_tools.gris.GrisFitsFile import GrisFitsFile
class TestGeneric(unittest.TestCase):
""" """
......@@ -37,8 +42,12 @@ class TestGeneric(unittest.TestCase):
cls.testdocument = {"foo": "bar", "answer": 42, "date-obs": "0", "DATE-BEG": "0", "DATE_BEG": "0"}
# check if testdocument exists and delete
result = sdc.testtarget_collection.find(cls.testdocument)
if result.count() > 0:
n_matching_docs=0
try:
n_matching_docs = sdc.testtarget_collection.count_documents(cls.testdocument)
except:
return
if n_matching_docs:
sdc.testtarget_collection.delete_many(cls.testdocument)
# check if testfiles exists and delete
......@@ -54,16 +63,19 @@ class TestGeneric(unittest.TestCase):
@classmethod
def tearDownClass(cls):
""" clean up after tests"""
del cls.sdc
os.remove(cls.testfile)
os.remove(cls.testfile_md5)
del cls.testfile
try:
del cls.sdc
except:
pass
print("\nRemoving testfile and db connection")
def setUp(self):
"""prepare for every single test """
self.sdc.gitinfo = self.sdc.get_current_git_version()
target = self.sdc.testtarget_gridfs
try:
self.sdc.gitinfo = self.sdc.get_current_git_version()
target = self.sdc.testtarget_gridfs
except:
return
files = target.find()
for f in files:
target.delete(f._id)
......@@ -198,5 +210,20 @@ class TestGeneric(unittest.TestCase):
)
def test_get_file_wrapper(self):
instruments = ["gris", "chrotel", "bbi", "lars"]
test_files = {instr: getattr(test_data, instr).l1_data[0] for instr in instruments}
# Try the files for the different instruments and assert that all returned objects implement a get_cleaned
# header function
for inst, test_file in test_files.items():
instance = get_file_wrapper(test_file, inst)
self.assertTrue(hasattr(instance,"get_cleaned_header"))
# Try with a file without specifying the instrument
instance = get_file_wrapper(test_file)
self.assertTrue(hasattr(instance, "get_cleaned_header"))
if __name__ == "__main__":
unittest.main()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment