# -*- coding: utf-8 -*- # # Authors: Derek Homeier # Andrew Leonard # import os import re import copy import warnings import astropy.units as u from astropy.time import Time import sunpy.net.attrs as a from sunpy.net.attr import AttrAnd, AttrOr, AttrMeta, AttrWalker, DataAttr, and_, or_ from sunpy.net.base_client import (BaseClient, QueryResponseTable, QueryResponseRow, convert_row_to_table) from sunpy.util.exceptions import SunpyUserWarning # from sunpy.net.attrs import Instrument, Level, Physobs, Provider, Time, Wavelength import sdc.attrs as sattrs import urllib.parse import urllib.request from urllib.error import HTTPError, URLError import json import parfive walker = AttrWalker() @walker.add_creator(AttrOr) def create_from_or(wlk, tree): """ For each subtree under the OR, create a new set of query params. """ params = [] for sub in tree.attrs: sub_params = wlk.create(sub) # Strip out one layer of nesting of lists # This means that create always returns a list of dicts. if isinstance(sub_params, list) and len(sub_params) == 1: sub_params = sub_params[0] params.append(sub_params) return params @walker.add_creator(AttrAnd, DataAttr) def create_new_param(wlk, tree): params = dict() # Use the apply dispatcher to convert the attrs to their query parameters wlk.apply(tree, params) return [params] @walker.add_applier(AttrAnd) def iterate_over_and(wlk, tree, params): for sub in tree.attrs: wlk.apply(sub, params) # Map type_name from SunPy Attrs to observation description fields: # CALIB_LEVEL - Degree of data processing # BTYPE - IVOA Unified Content Desciptor for data ('phot.flux') # T_RESOLUTIION - (Minimal) sampling interval along the time axis [s] # # The default field/key name is given by Attr.type_name.upper(); # keys ending in '_' have the special significance of setting both # `KEY_MIN` and `KEY_MAX` description fields. _obs_fields = {'level': 'CALIB_LEVEL', 'physobs': 'BTYPE', 'sample': 'T_RESOLUTION', 'time': 'DATE', 'wave': 'WAVELENGTH_'} def _str_val(value, regex=False): """ Format `value` for query string. """ if isinstance(value, (int, float, complex)): return f"{value:g}" elif isinstance(value, Time): # Dates are stored as f"{{'$date':{int(value.unix * 1000)}}}" but queried as return f"{{'$date':'{value.isot}'}}" elif regex: return f"{{'$regex':'{str(value)}'}}" else: # Run through urllib.parse.quote() to be safe? return f"'{str(value)}'" def _update_val(dictionary, key, value, block='description', regex=False): """ Update dictionary with field_name:value string in format parseable by BSON filter. """ strval = _str_val(value, regex) # Some fields are always lower- some uppercase... if key in ('BTYPE', 'INSTRUMENT', 'OBS_COLLECTION'): strval = strval.lower() if key in ('POL_STATES', 'TELESCOPE'): strval = strval.upper() # Special case for fields having a _MIN and _MAX form - require MIN <= value <= MAX: if key.endswith('_'): query = (f"{{'{block}.{key}MIN':{{'$lte':{strval}}}}}," f"{{'{block}.{key}MAX':{{'$gte':{strval}}}}}") else: query = f"{{'{block}.{key}':{strval}}}" return dictionary.update({key: query}) def _update_range(dictionary, key, values, block='description', fullrange=False): """ Update dictionary with field_name:{value range} string in format parseable by BSON filter. """ strvals = [_str_val(values[0]), _str_val(values[1])] # Special case for fields having a _MIN and _MAX form: if key.endswith('_'): if fullrange: # Cover full search range: MIN >= values[:] >= MAX query = (f"{{'{block}.{key}MIN':{{'$lte':{strvals[0]}}}}}," f"{{'{block}.{key}MAX':{{'$gte':{strvals[1]}}}}}") else: # Default: MIN <= values[1], MAX >= values[0] query = (f"{{'{block}.{key}MIN':{{'$lte':{strvals[1]}}}}}," f"{{'{block}.{key}MAX':{{'$gte':{strvals[0]}}}}}") else: query = f"{{'{block}.{key}':{{'$gte':{strvals[0]},'$lte':{strvals[1]}}}}}" return dictionary.update({key: query}) # Converters from SunPy Attrs to Observation Record 'description' fields # Generic applier for single-valued Attrs @walker.add_applier(a.Level, a.Physobs, a.Sample, sattrs.Filter, sattrs.PolStates, sattrs.DataProduct, sattrs.Telescope, sattrs.Target, sattrs.ObsName) def _(wlk, attr, params): """Set `description.TYPE_NAME`""" key = _obs_fields.get(attr.type_name, attr.type_name.upper()) return _update_val(params, key, attr.value, regex=getattr(attr, 'regex', False)) # OBS_COLLECTION: `instrument_observations` also defines the MongoDB collection (base path). @walker.add_applier(a.Instrument) def _(wlk, attr, params): """Set 'description.INSTRUMENT' and 'description.OBS_COLLECTION'""" params.update({'OBS_COLLECTION': f'{attr.value.lower()}_observations'}) key = attr.type_name.upper() return _update_val(params, key, attr.value, regex=getattr(attr, 'regex', False)) # Time range covering the observation - require at least part of it within 'DATE_BEG', 'DATE_END' @walker.add_applier(a.Time) def _(wlk, attr, params): """Set 'description.DATE_BEG|END' [ISO Time str]""" if attr.end == attr.start: _update_val(params, 'DATE_', attr.start, regex=getattr(attr, 'regex', False)) else: _update_range(params, 'DATE_', [attr.start, attr.end], fullrange=getattr(attr, 'fullrange', False)) params['DATE_'] = params['DATE_'].replace('_MIN', '_BEG').replace('_MAX', '_END') return # sattrs.Date is a point in time to fall within the observation @walker.add_applier(sattrs.Date) def _(wlk, attr, params): """Set 'description.DATE_BEG|END' [ISO Time str]""" _update_val(params, attr.type_name.upper(), attr.time, regex=getattr(attr, 'regex', False)) params['DATE_'] = params['DATE_'].replace('_MIN', '_BEG').replace('_MAX', '_END') return # Generic applier for Range Attrs @walker.add_applier(a.Wavelength, sattrs.AtmosR0, sattrs.ExposureTime, sattrs.Theta, sattrs.Mu, sattrs.HelioProjLon, sattrs.HelioProjLat, sattrs.SpatialResolution, sattrs.SpectralResolution, sattrs.TemporalResolution, sattrs.NDimensions, sattrs.PolXel, sattrs.SpatialXel1, sattrs.SpatialXel2, sattrs.SpectralXel, sattrs.TimeXel) def _(wlk, attr, params): """Set 'description.TYPE_NAME[_MIN|_MAX]'""" key = _obs_fields.get(attr.type_name, attr.type_name.upper()) # Wavelength returns quantities with default unit Angstrom; convert to nm. if isinstance(attr.min, u.Quantity): attrange = [attr.min.to_value(u.nm), attr.max.to_value(u.nm)] else: attrange = [attr.min, attr.max] # Are we querying for only a single parameter, or need to sort min/max yet? if attrange[1] == attrange[0]: return _update_val(params, key, attrange[0]) else: attrange = [min(attrange), max(attrange)] return _update_range(params, key, attrange, fullrange=getattr(attr, 'fullrange', False)) @walker.add_applier(a.Provider, a.ExtentType, a.Source) def _(wlk, attr, params): """ Provider, ExtentType, Source are used by client `_can_handle_query` and not the API. """ class KISClient(BaseClient): """ Search KIS-SDC observation datasets from GridFS and retrieve [metadata] files. """ _BASE_URL = "http://dockertest:8083/sdc/" status = "" def __init__(self): """""" @classmethod def _status(cls): return cls.status @staticmethod def _make_filename(row: QueryResponseRow, path: os.PathLike = None): """ Generate a filename for a file based on the observation record; to be worked out. """ # The fallback name is just the dataset _id. name = f"{row['_id']}.fits" return path.append(name) def _make_search(self, query_dicts): """ Combine (AND) query parameters from list of params dictionaries to search paths. """ searches = [] # param dicts for same OBS_COLLECTION (Instrument) could in principle be # combined to a single query using '$or':[] ... for query_params in query_dicts: collection = query_params.pop('OBS_COLLECTION') instrument = query_params.get('INSTRUMENT') if not collection.split('_')[0] in instrument.lower(): raise ValueError(f"Mismatch of '{instrument}' in '{collection}.") # Search for existing dict for same collection (instrument) to update: # index_obs = [i for i, d in enumerate(params) if collection in d.values()] # if len(index_obs) > 0: # ~~~query_dicts[index_obs[0]].update(sub_params)~~~ need to collect them for OR searches.append(f"{collection}?filter={{'$and':[{','.join(query_params.values())}]}}") return searches def search(self, *query) -> QueryResponseTable: """ Query the SDC RESTHeart client for a list of results in the observation collection. Parameters ---------- query : iterable or `~sunpy.net.attr.AttrAnd, `~sunpy.net.attr.AttrOr`` Logical combination of `sunpy.net.attrs` objects representing the query, or any number of such objects to be combined using the AND (``&``) operator. Returns ------- results : `~sunpy.net.base_client.QueryResponseTable` A `QueryResponseTable` instance containing the query result. """ # Unit converters for 'description' fields converters = {'ATMOS': lambda x: x * u.cm, 'DATE_': lambda x: Time(x['$date'] * 1e-3, format='unix'), 'EXPTI': lambda x: x * u.s, 'HPLN_': lambda x: x * u.arcsec, 'HPLT_': lambda x: x * u.arcsec, 'S_RES': lambda x: x * u.arcsec, 'THETA': lambda x: x * u.deg, 'T_RES': lambda x: x * u.deg, 'WAVEL': lambda x: x * u.nm} query = and_(*query) # Not very extensively tested on hierarchical queries if not self._can_handle_query(query): raise AttributeError(f'Query not possible: {self._status()}') queries = self._make_search(walker.create(query)) results = [] for query_str in queries: full_url = f"{self._BASE_URL}{query_str}" try: response = urllib.request.urlopen(full_url) obs_dict = json.loads(response.read()).get('_embedded', []) if len(obs_dict) > 0 and 'links' in obs_dict[0]: # Check if a.Level was set in this query (defaults to 1) cl = re.compile(r"{'description.CALIB_LEVEL':(?P\d+)}").search(query_str) if cl: data_type = f"l{int(cl['level'])}_data" else: data_type = "l1_data" data_ln = obs_dict[0]['links'].get('data_ln', []) obs_dict[0].update(obs_dict[0].pop('description', dict())) for k in obs_dict[0].keys(): if k[:5] in converters: obs_dict[0][k] = converters[k[:5]](obs_dict[0][k]) obs_rec = [] for exp in data_ln: obs_rec.append(copy.deepcopy(obs_dict[0])) obs_rec[-1]['links'] = exp obs_rec[-1]['data_type'] = data_type results += obs_rec except(HTTPError, URLError) as exc: raise URLError(f'Unable to execute search "{full_url}": {exc}. Confirm that ' f'RESTHeart is running on {self._BASE_URL} and connected.') return QueryResponseTable(results, client=self) @convert_row_to_table def fetch(self, query_results: QueryResponseTable, *, downloader: parfive.Downloader, path: os.PathLike = None, binary: bool = False, **kwargs): """ Fetch GridFS dataset metadata or binary files. Parameters ---------- query_results: Results to download. path : `str` or `pathlib.Path`, optional Path to the download directory downloader : `parfive.Downloader` The download manager to use. binary : bool, optional Fetch the binary data for the dataset (default: metadata). """ if not len(query_results): return if binary: binfile = '/binary' ext = 'fits' else: binfile = '' ext = 'json' for row in query_results: inst = row['INSTRUMENT'] oid = row['links']['$oid'] # Content-Disposition header default is "{row['_id']['$oid']}/{oid}.{ext}" (no '.json'). # rowpath = row['_id']['$oid'] filepath = os.path.join(row['OBS_NAME'], f"{oid}.{ext}") url = f"{self._BASE_URL}{inst}_{row['data_type']}.files/{oid}{binfile}" downloader.enqueue_file(url, filename=str(path).format(file=filepath, **row.response_block_map)) @classmethod def _can_handle_query(cls, *query, hasinstr=False): """ Check if query attributes are valid and supported by this client. Tests that all attributes in tree are in `~sunpy.net.attrs` or `~sunpy.net.attrs.sdc` and, if in registered attributes, have a registered value. Parameters ---------- query : iterable of `~sunpy.net.attr.Attr`, `~sunpy.net.attr.AttrAnd, `~sunpy.net.attr.AttrOr`` Collection of query attributes. hasinstr : bool, optional Assume that `~sunpy.net.attrs.Instrument` attribute is already present. Notes ----- An instrument is always required to form the 'sdc/{instrument}_observations' base query string, but its presence may not be checked in complex query trees. """ query_attrs = set(type(x) for x in query) sunpy_attrs = [getattr(a, x) for x in a.__all__ if isinstance(getattr(a, x), AttrMeta)] sdc_attrs = [getattr(a.sdc, x) for x in a.sdc.__all__ if isinstance(getattr(a.sdc, x), AttrMeta)] supported_attrs = set(sunpy_attrs + sdc_attrs + [AttrAnd, AttrOr]) # Remove a.Provider, a.Source, a.ExtentType here since they are not used as filters? if not supported_attrs.issuperset(query_attrs): cls.status = f"Unknown Attributes: {[y.__name__ for y in query_attrs-supported_attrs]}" return False if len(query) == 1 and isinstance(*query, AttrOr): for x in query[0].attrs: if not cls._can_handle_query(x): return False return True registered = cls.register_values() for x in query: if isinstance(x, AttrAnd): hasinstr = cls._can_handle_query(*x.attrs, hasinstr=False) if not hasinstr: return False elif isinstance(x, a.Instrument): known = [y[0].lower() for y in registered.get(type(x))] if x.value.lower() in known: hasinstr = True else: cls.status = f"Instrument {x.value} not in registered list\n {known}" return False # Treat non-str descriptors separately here. elif isinstance(x, a.Level): if round(x.value) not in range(3): cls.status = f"Level {x.value} not in supported range {range(2)}" return False else: known = [y[0].lower() for y in registered.get(type(x), [])] if known and x.value.lower() not in known: cls.status = f"{type(x).__name__} {x.value} not in registered list\n {known}" return False if not hasinstr: cls.status = f"No 'Instrument' found in Attributes {[y.__name__ for y in query_attrs]}" return hasinstr @classmethod def _attrs_module(cls): return 'sdc', 'sdc.attrs' @classmethod def register_values(cls): """ Known and supported search values for SDC data, builtins for now. """ adict = { a.Provider: [("KIS-SDC", "Science Data Centre at the Leibniz-Institut für Sonnenphysik")], # description.INSTRUMENT a.Instrument: [("ChroTel", "Chromospheric Telescope CCD Imager"), ("BBI", "Broadband Context Imager @ Gregor"), ("GRIS", "Gregor Infrared Spectrograph"), ("M-lite", "M-lite 2M Imagers @ Gregor"), ("LARS", "Lars is an Absolute Reference Spectrograph @ VTT")], # ("ECHELLE", "Echelle grating Spectrograph @ VTT"), # 404 error # ("HELLRIDE", "HELioseismic Large Region Interferometric Device @ VTT"), # ("TESOS", "TESOS/VIP 2D Fabry-Perot interferometric spectrometer @ VTT"), # ("GFPI", "Gregor Fast Processes Imaging Spectrograph"), # ("HiFi", "High-resolution Fast Imager @ Gregor"), # 404 error # ("TIP-II", "Tenerife Infrared Polarimeter @ VTT"), # name? # ("ZIMPOL", "Zeeman Imaging Polarimeter @ Gregor"), # 404 error # description.TELESCOPE - Name of the telescope a.sdc.Telescope: [("ChroTel", "10 cm Chromospheric Telescope, Observatorio del Teide"), ("Gregor", "150 cm Gregor Solar Telescope, Observatorio del Teide"), ("VTT", "70 cm Vacuum Tower Telescope, Observatorio del Teide")], # description.CALIB_LEVEL - Degree of data processing - note IVOA also specifies 3 & 4. a.Level: [("0", "L0 - raw"), ("1", "L1 - calibrated science ready data"), ("2", "L2 - enhanced data products")], # description.BTYPE` - IVOA Unified Content Desciptor for data - see also # https://gitlab.leibniz-kis.de/sdc/sdc_wiki/-/wikis/standards_and_references/btypes a.Physobs: [("phot.flux", "Photon flux"), ("phot.count", "Flux expressed in counts")] } return adict