client.py 17.4 KB
Newer Older
1
2
# -*- coding: utf-8 -*-

3
import os
4
5
import copy
import warnings
6

7
8
import astropy.units as u
from astropy.time import Time
9
import sunpy.net.attrs as a
10
from sunpy.net.attr import AttrAnd, AttrOr, AttrMeta, AttrWalker, DataAttr, and_, or_
11
12
from sunpy.net.base_client import (BaseClient, QueryResponseTable, QueryResponseRow,
                                   convert_row_to_table)
13
14
from sunpy.util.exceptions import SunpyUserWarning
# from sunpy.net.attrs import Instrument, Level, Physobs, Provider, Time, Wavelength
15
import sdc.attrs as sattrs
16

17
18
19
import urllib.parse
import urllib.request
from urllib.error import HTTPError, URLError
20

21
import json
22
import parfive
23
24
25
26
27

walker = AttrWalker()


@walker.add_creator(AttrOr)
28
29
30
31
32
def create_from_or(wlk, tree):
    """
    For each subtree under the OR, create a new set of query params.
    """
    params = []
33
    for sub in tree.attrs:
34
35
36
37
38
39
40
41
        sub_params = wlk.create(sub)

        # Strip out one layer of nesting of lists
        # This means that create always returns a list of dicts.
        if isinstance(sub_params, list) and len(sub_params) == 1:
            sub_params = sub_params[0]

        params.append(sub_params)
42

43
    return params
44
45
46


@walker.add_creator(AttrAnd, DataAttr)
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
def create_new_param(wlk, tree):
    params = dict()

    # Use the apply dispatcher to convert the attrs to their query parameters
    wlk.apply(tree, params)

    return [params]


@walker.add_applier(AttrAnd)
def iterate_over_and(wlk, tree, params):
    for sub in tree.attrs:
        wlk.apply(sub, params)


# Map type_name from SunPy Attrs to observation description fields:
# CALIB_LEVEL - Degree of data processing
# BTYPE - IVOA Unified Content Desciptor for data ('phot.flux')
# T_RESOLUTIION - (Minimal) sampling interval along the time axis [s]
#
# The default field/key name is given by Attr.type_name.upper();
# keys ending in '_' have the special significance of setting both
# `KEY_MIN` and `KEY_MAX` description fields.

_obs_fields = {'level': 'CALIB_LEVEL', 'physobs': 'BTYPE', 'sample': 'T_RESOLUTION',
               'time': 'DATE', 'wave': 'WAVELENGTH_'}


def _str_val(value, regex=False):
    """
    Format `value` for query string.
    """
    if isinstance(value, (int, float, complex)):
        return f"{value:g}"
    elif isinstance(value, Time):
82
        # Dates are stored as f"{{'$date':{int(value.unix * 1000)}}}" but queried as
83
        return f"{{'$date':'{value.isot}'}}"
84
    elif regex:
85
        return f"{{'$regex':'{str(value)}'}}"
86
    else:
87
        # Run through urllib.parse.quote() to be safe?
88
        return f"'{str(value)}'"
89
90


91
92
93
94
95
def _update_val(dictionary, key, value, block='description', regex=False):
    """
    Update dictionary with field_name:value string in format parseable by BSON filter.
    """
    strval = _str_val(value, regex)
96
97
98
99
    # Some fields are always lower- some uppercase...
    if key in ('BTYPE', 'INSTRUMENT', 'OBS_COLLECTION'):
        strval = strval.lower()
    if key in ('POL_STATES', 'TELESCOPE'):
100
101
102
        strval = strval.upper()
    # Special case for fields having a _MIN and _MAX form - require MIN <= value <= MAX:
    if key.endswith('_'):
103
104
        query = (f"{{'{block}.{key}MIN':{{'$lte':{strval}}}}},"
                 f"{{'{block}.{key}MAX':{{'$gte':{strval}}}}}")
105
106
107
108
109
    else:
        query = f"{{'{block}.{key}':{strval}}}"
    return dictionary.update({key: query})


110
def _update_range(dictionary, key, values, block='description', fullrange=False):
111
112
113
    """
    Update dictionary with field_name:{value range} string in format parseable by BSON filter.
    """
114
115
    strvals = [_str_val(values[0]), _str_val(values[1])]
    # Special case for fields having a _MIN and _MAX form:
116
    if key.endswith('_'):
117
118
        if fullrange:
            # Cover full search range: MIN >= values[:] >= MAX
119
120
            query = (f"{{'{block}.{key}MIN':{{'$lte':{strvals[0]}}}}},"
                     f"{{'{block}.{key}MAX':{{'$gte':{strvals[1]}}}}}")
121
122
        else:
            # Default: MIN <= values[1], MAX >= values[0]
123
124
            query = (f"{{'{block}.{key}MIN':{{'$lte':{strvals[1]}}}}},"
                     f"{{'{block}.{key}MAX':{{'$gte':{strvals[0]}}}}}")
125
    else:
126
        query = f"{{'{block}.{key}':{{'$gte':{strvals[0]},'$lte':{strvals[1]}}}}}"
127
128
129
130
131
132
133
134
135
136
137
    return dictionary.update({key: query})


# Converters from SunPy Attrs to Observation Record 'description' fields

# Generic applier for single-valued Attrs
@walker.add_applier(a.Level, a.Physobs, a.Sample, sattrs.Filter, sattrs.PolStates,
                    sattrs.DataProduct, sattrs.Telescope, sattrs.Target, sattrs.ObsName)
def _(wlk, attr, params):
    """Set `description.TYPE_NAME`"""
    key = _obs_fields.get(attr.type_name, attr.type_name.upper())
138
    return _update_val(params, key, attr.value, regex=getattr(attr, 'regex', False))
139
140
141
142
143
144
145
146


# OBS_COLLECTION: `instrument_observations` also defines the MongoDB collection (base path).
@walker.add_applier(a.Instrument)
def _(wlk, attr, params):
    """Set 'description.INSTRUMENT' and 'description.OBS_COLLECTION'"""
    params.update({'OBS_COLLECTION': f'{attr.value.lower()}_observations'})
    key = attr.type_name.upper()
147
    return _update_val(params, key, attr.value, regex=getattr(attr, 'regex', False))
148
149
150


# Time range covering the observation - require at least part of it within 'DATE_BEG', 'DATE_END'
151
152
@walker.add_applier(a.Time)
def _(wlk, attr, params):
153
154
    """Set 'description.DATE_BEG|END' [ISO Time str]"""
    if attr.end == attr.start:
155
        _update_val(params, 'DATE_', attr.start, regex=getattr(attr, 'regex', False))
156
    else:
157
158
        _update_range(params, 'DATE_', [attr.start, attr.end],
                      fullrange=getattr(attr, 'fullrange', False))
159
160
161
162
163
164
165
166
    params['DATE_'] = params['DATE_'].replace('_MIN', '_BEG').replace('_MAX', '_END')
    return


# sattrs.Date is a point in time to fall within the observation
@walker.add_applier(sattrs.Date)
def _(wlk, attr, params):
    """Set 'description.DATE_BEG|END' [ISO Time str]"""
167
    _update_val(params, attr.type_name.upper(), attr.time, regex=getattr(attr, 'regex', False))
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
    params['DATE_'] = params['DATE_'].replace('_MIN', '_BEG').replace('_MAX', '_END')
    return


# Generic applier for Range Attrs
@walker.add_applier(a.Wavelength, sattrs.AtmosR0, sattrs.ExposureTime, sattrs.Theta, sattrs.Mu,
                    sattrs.HelioProjLon, sattrs.HelioProjLat, sattrs.SpatialResolution,
                    sattrs.SpectralResolution, sattrs.TemporalResolution, sattrs.NDimensions,
                    sattrs.PolXel, sattrs.SpatialXel1, sattrs.SpatialXel2,
                    sattrs.SpectralXel, sattrs.TimeXel)
def _(wlk, attr, params):
    """Set 'description.TYPE_NAME[_MIN|_MAX]'"""
    key = _obs_fields.get(attr.type_name, attr.type_name.upper())
    # Wavelength returns quantities with default unit Angstrom; convert to nm.
    if isinstance(attr.min, u.Quantity):
        attrange = [attr.min.to_value(u.nm), attr.max.to_value(u.nm)]
    else:
        attrange = [attr.min, attr.max]
    # Are we querying for only a single parameter, or need to sort min/max yet?
    if attrange[1] == attrange[0]:
        return _update_val(params, key, attrange[0])
    else:
190
191
        attrange = [min(attrange), max(attrange)]
        return _update_range(params, key, attrange, fullrange=getattr(attr, 'fullrange', False))
192
193


194
@walker.add_applier(a.Provider, a.ExtentType, a.Source)
195
def _(wlk, attr, params):
196
197
198
    """
    Provider, ExtentType, Source are used by client `_can_handle_query` and not the API.
    """
199
200
201


class KISClient(BaseClient):
202
203
204
205
206
207
208
    """
    Search KIS-SDC observation datasets from GridFS and retrieve [metadata] files.

    """
    _BASE_URL = "http://dockertest:8083/sdc/"
    status = ""

209
210
211
    def __init__(self):
        """"""

212
213
214
215
    @classmethod
    def _status(cls):
        return cls.status

216
217
218
219
220
221
222
223
224
225
    @staticmethod
    def _make_filename(row: QueryResponseRow, path: os.PathLike = None):
        """
        Generate a filename for a file based on the observation record; to be worked out.
        """
        # The fallback name is just the dataset _id.
        name = f"{row['_id']}.fits"

        return path.append(name)

226
227
228
229
230
231
232
233
234
235
236
    def _make_search(self, query_dicts):
        """
        Combine (AND) query parameters from list of params dictionaries to search paths.
        """
        searches = []

        # param dicts for same OBS_COLLECTION (Instrument) could in principle be
        # combined to a single query using '$or':[] ...

        for query_params in query_dicts:
            collection = query_params.pop('OBS_COLLECTION')
237
238
239
            instrument = query_params.get('INSTRUMENT')
            if not collection.split('_')[0] in instrument.lower():
                raise ValueError(f"Mismatch of '{instrument}' in '{collection}.")
240
241
242
243
244
245
246
247
            # Search for existing dict for same collection (instrument) to update:
            # index_obs = [i for i, d in enumerate(params) if collection in d.values()]
            # if len(index_obs) > 0:
            #     ~~~query_dicts[index_obs[0]].update(sub_params)~~~ need to collect them for OR

            searches.append(f"{collection}?filter={{'$and':[{','.join(query_params.values())}]}}")

        return searches
248

249
    def search(self, *query) -> QueryResponseTable:
250
251
252
253
254
        """
        Query the SDC RESTHeart client for a list of results in the observation collection.

        Parameters
        ----------
255
256
257
        query : iterable or `~sunpy.net.attr.AttrAnd, `~sunpy.net.attr.AttrOr``
            Logical combination of `sunpy.net.attrs` objects representing the query,
            or any number of such objects to be combined using the AND (``&``) operator.
258

259
260
        Returns
        -------
261
262
        results : `~sunpy.net.base_client.QueryResponseTable`
            A `QueryResponseTable` instance containing the query result.
263
        """
264
265
        query = and_(*query)

266
267
268
269
270
        # Not very extensively tested on hierarchical queries
        if not self._can_handle_query(query):
            raise AttributeError(f'Query not possible: {self._status()}')

        queries = self._make_search(walker.create(query))
271
272
        results = []

273
274
275
        for query_string in queries:
            full_url = f"{self._BASE_URL}{query_string}"
            try:
276
                response = urllib.request.urlopen(full_url)
277
                obs_dict = json.loads(response.read()).get('_embedded', [])
278
279
280
281
282
283
284
                if len(obs_dict) > 0 and 'links' in obs_dict[0]:
                    l1_data = obs_dict[0]['links'].get('l1_data', [])
                    obs_rec = []
                    for exp in l1_data:
                        obs_rec.append(copy.deepcopy(obs_dict[0]))
                        obs_rec[-1]['links']['l1_data'] = exp
                    results += obs_rec
285
            except(HTTPError, URLError) as exc:
Derek Homeier's avatar
Derek Homeier committed
286
287
                raise URLError(f'Unable to execute search "{full_url}": {exc}. Confirm that '
                               f'RESTHeart is running on {self._BASE_URL} and connected.')
288

289
        return QueryResponseTable(results, client=self)
290

291
    @convert_row_to_table
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
    def fetch(self, query_results: QueryResponseTable, *, downloader: parfive.Downloader,
              path: os.PathLike = None, binary: bool = False, **kwargs):
        """
        Fetch GridFS dataset metadata or binary files.

        Parameters
        ----------
        query_results:
            Results to download.
        path : `str` or `pathlib.Path`, optional
            Path to the download directory
        downloader : `parfive.Downloader`
            The download manager to use.
        binary : bool, optional
            Fetch the binary data for the dataset (default: metadata).
        """
308
309
310
311
312
313
314
315
316
317
318
319
        if not len(query_results):
            return

        if binary:
            binfile = '/binary'
            ext = 'fits'
        else:
            binfile = ''
            ext = 'json'

        for row in query_results:
            inst = row['description']['INSTRUMENT']
320
321
322
323
324
325
326
            oid = row['links']['l1_data']['$oid']
            # Content-Disposition header default is "{row['_id']['$oid']}/{oid}.{ext}" (no '.json').
            # rowpath = row['_id']['$oid']
            filepath = os.path.join(row['description']['OBS_NAME'], f"{oid}.{ext}")
            url = f"{self._BASE_URL}{inst}_l1_data.files/{oid}{binfile}"
            downloader.enqueue_file(url, filename=str(path).format(file=filepath,
                                                                   **row.response_block_map))
327
328

    @classmethod
329
330
331
332
333
334
335
336
337
    def _can_handle_query(cls, *query, hasinstr=False):
        """
        Check if query attributes are valid and supported by this client.

        Tests that all attributes in tree are in `~sunpy.net.attrs` or `~sunpy.net.attrs.sdc`
        and, if in registered attributes, have a registered value.

        Parameters
        ----------
338
        query : iterable of `~sunpy.net.attr.Attr`, `~sunpy.net.attr.AttrAnd, `~sunpy.net.attr.AttrOr``
339
340
341
342
343
344
345
346
347
            Collection of query attributes.
        hasinstr : bool, optional
            Assume that `~sunpy.net.attrs.Instrument` attribute is already present.

        Notes
        -----
            An instrument is always required to form the 'sdc/{instrument}_observations'
            base query string, but its presence may not be checked in complex query trees.
        """
348
        query_attrs = set(type(x) for x in query)
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
        sunpy_attrs = [getattr(a, x) for x in a.__all__ if isinstance(getattr(a, x), AttrMeta)]
        sdc_attrs = [getattr(a.sdc, x) for x in a.sdc.__all__ if
                     isinstance(getattr(a.sdc, x), AttrMeta)]
        supported_attrs = set(sunpy_attrs + sdc_attrs + [AttrAnd, AttrOr])

        # Remove a.Provider, a.Source, a.ExtentType here since they are not used as filters?
        if not supported_attrs.issuperset(query_attrs):
            cls.status = f"Unknown Attributes: {[y.__name__ for y in query_attrs-supported_attrs]}"
            return False

        if len(query) == 1 and isinstance(*query, AttrOr):
            for x in query[0].attrs:
                if not cls._can_handle_query(x):
                    return False
            return True

        registered = cls.register_values()
        for x in query:
            if isinstance(x, AttrAnd):
                hasinstr = cls._can_handle_query(*x.attrs, hasinstr=False)
                if not hasinstr:
                    return False
            elif isinstance(x, a.Instrument):
                known = [y[0].lower() for y in registered.get(type(x))]
                if x.value.lower() in known:
                    hasinstr = True
                else:
                    cls.status = f"Instrument {x.value} not in registered list\n  {known}"
                    return False
            # Treat non-str descriptors separately here.
            elif isinstance(x, a.Level):
                if round(x.value) not in range(3):
                    cls.status = f"Level {x.value} not in supported range {range(2)}"
                    return False
            else:
                known = [y[0].lower() for y in registered.get(type(x), [])]
                if known and x.value.lower() not in known:
                    cls.status = f"{type(x).__name__} {x.value} not in registered list\n  {known}"
                    return False

        if not hasinstr:
            cls.status = f"No 'Instrument' found in Attributes {[y.__name__ for y in query_attrs]}"
        return hasinstr

    @classmethod
    def _attrs_module(cls):
395
        return 'sdc', 'sdc.attrs'
396
397
398

    @classmethod
    def register_values(cls):
399
400
401
        """
        Known and supported search values for SDC data, builtins for now.
        """
402
        adict = {
403
404
405
406
407
408
409
410
            a.Provider: [("KIS-SDC",
                          "Science Data Centre at the Leibniz-Institut für Sonnenphysik")],
            # description.INSTRUMENT
            a.Instrument: [("ChroTel", "Chromospheric Telescope CCD Imager"),
                           ("BBI", "Broadband Context Imager @ Gregor"),
                           ("GRIS", "Gregor Infrared Spectrograph"),
                           ("M-lite", "M-lite 2M Imagers @ Gregor"),
                           ("LARS", "Lars is an Absolute Reference Spectrograph @ VTT")],
411
412
413
            #              ("ECHELLE", "Echelle grating Spectrograph @ VTT"),  # 404 error
            #              ("HELLRIDE", "HELioseismic Large Region Interferometric Device @ VTT"),
            #              ("TESOS", "TESOS/VIP 2D Fabry-Perot interferometric spectrometer @ VTT"),
414
            #              ("GFPI", "Gregor Fast Processes Imaging Spectrograph"),
415
            #              ("HiFi", "High-resolution Fast Imager @ Gregor"),   # 404 error
416
            #              ("TIP-II", "Tenerife Infrared Polarimeter @ VTT"),  # name?
417
            #              ("ZIMPOL", "Zeeman Imaging Polarimeter @ Gregor"),  # 404 error
418
419
420
421
422
423
424
425
426
427
428
429
            # description.TELESCOPE - Name of the telescope
            a.sdc.Telescope: [("ChroTel", "10 cm Chromospheric Telescope, Observatorio del Teide"),
                              ("Gregor", "150 cm Gregor Solar Telescope, Observatorio del Teide"),
                              ("VTT", "70 cm Vacuum Tower Telescope, Observatorio del Teide")],
            # description.CALIB_LEVEL - Degree of data processing - note IVOA also specifies 3 & 4.
            a.Level: [("0", "L0 - raw"),
                      ("1", "L1 - calibrated science ready data"),
                      ("2", "L2 - enhanced data products")],
            # description.BTYPE` - IVOA Unified Content Desciptor for data - see also
            # https://gitlab.leibniz-kis.de/sdc/sdc_wiki/-/wikis/standards_and_references/btypes
            a.Physobs: [("phot.flux", "Photon flux"),
                        ("phot.count", "Flux expressed in counts")]
430
431
432
        }

        return adict