client.py 18.7 KB
Newer Older
1
# -*- coding: utf-8 -*-
2
3
4
5
#
# Authors: Derek Homeier  <derek.homeier@aperiosoftware.com>
#          Andrew Leonard <drew@aperiosoftware.com>
#
6

7
import os
8
import re
9
10
import copy
import warnings
11

12
13
import astropy.units as u
from astropy.time import Time
14
import sunpy.net.attrs as a
15
from sunpy.net.attr import AttrAnd, AttrOr, AttrMeta, AttrWalker, DataAttr, and_, or_
16
17
from sunpy.net.base_client import (BaseClient, QueryResponseTable, QueryResponseRow,
                                   convert_row_to_table)
18
19
from sunpy.util.exceptions import SunpyUserWarning
# from sunpy.net.attrs import Instrument, Level, Physobs, Provider, Time, Wavelength
20
import sdc.attrs as sattrs
21

22
23
24
import urllib.parse
import urllib.request
from urllib.error import HTTPError, URLError
25

26
import json
27
import parfive
28
29
30
31
32

walker = AttrWalker()


@walker.add_creator(AttrOr)
33
34
35
36
37
def create_from_or(wlk, tree):
    """
    For each subtree under the OR, create a new set of query params.
    """
    params = []
38
    for sub in tree.attrs:
39
40
41
42
43
44
45
46
        sub_params = wlk.create(sub)

        # Strip out one layer of nesting of lists
        # This means that create always returns a list of dicts.
        if isinstance(sub_params, list) and len(sub_params) == 1:
            sub_params = sub_params[0]

        params.append(sub_params)
47

48
    return params
49
50
51


@walker.add_creator(AttrAnd, DataAttr)
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def create_new_param(wlk, tree):
    params = dict()

    # Use the apply dispatcher to convert the attrs to their query parameters
    wlk.apply(tree, params)

    return [params]


@walker.add_applier(AttrAnd)
def iterate_over_and(wlk, tree, params):
    for sub in tree.attrs:
        wlk.apply(sub, params)


# Map type_name from SunPy Attrs to observation description fields:
# CALIB_LEVEL - Degree of data processing
# BTYPE - IVOA Unified Content Desciptor for data ('phot.flux')
# T_RESOLUTIION - (Minimal) sampling interval along the time axis [s]
#
# The default field/key name is given by Attr.type_name.upper();
# keys ending in '_' have the special significance of setting both
# `KEY_MIN` and `KEY_MAX` description fields.

_obs_fields = {'level': 'CALIB_LEVEL', 'physobs': 'BTYPE', 'sample': 'T_RESOLUTION',
               'time': 'DATE', 'wave': 'WAVELENGTH_'}


def _str_val(value, regex=False):
    """
    Format `value` for query string.
    """
    if isinstance(value, (int, float, complex)):
        return f"{value:g}"
    elif isinstance(value, Time):
87
        # Dates are stored as f"{{'$date':{int(value.unix * 1000)}}}" but queried as
88
        return f"{{'$date':'{value.isot}'}}"
89
    elif regex:
90
        return f"{{'$regex':'{str(value)}'}}"
91
    else:
92
        # Run through urllib.parse.quote() to be safe?
93
        return f"'{str(value)}'"
94
95


96
97
98
99
100
def _update_val(dictionary, key, value, block='description', regex=False):
    """
    Update dictionary with field_name:value string in format parseable by BSON filter.
    """
    strval = _str_val(value, regex)
101
102
103
104
    # Some fields are always lower- some uppercase...
    if key in ('BTYPE', 'INSTRUMENT', 'OBS_COLLECTION'):
        strval = strval.lower()
    if key in ('POL_STATES', 'TELESCOPE'):
105
106
107
        strval = strval.upper()
    # Special case for fields having a _MIN and _MAX form - require MIN <= value <= MAX:
    if key.endswith('_'):
108
109
        query = (f"{{'{block}.{key}MIN':{{'$lte':{strval}}}}},"
                 f"{{'{block}.{key}MAX':{{'$gte':{strval}}}}}")
110
111
112
113
114
    else:
        query = f"{{'{block}.{key}':{strval}}}"
    return dictionary.update({key: query})


115
def _update_range(dictionary, key, values, block='description', fullrange=False):
116
117
118
    """
    Update dictionary with field_name:{value range} string in format parseable by BSON filter.
    """
119
120
    strvals = [_str_val(values[0]), _str_val(values[1])]
    # Special case for fields having a _MIN and _MAX form:
121
    if key.endswith('_'):
122
123
        if fullrange:
            # Cover full search range: MIN >= values[:] >= MAX
124
125
            query = (f"{{'{block}.{key}MIN':{{'$lte':{strvals[0]}}}}},"
                     f"{{'{block}.{key}MAX':{{'$gte':{strvals[1]}}}}}")
126
127
        else:
            # Default: MIN <= values[1], MAX >= values[0]
128
129
            query = (f"{{'{block}.{key}MIN':{{'$lte':{strvals[1]}}}}},"
                     f"{{'{block}.{key}MAX':{{'$gte':{strvals[0]}}}}}")
130
    else:
131
        query = f"{{'{block}.{key}':{{'$gte':{strvals[0]},'$lte':{strvals[1]}}}}}"
132
133
134
135
136
137
138
139
140
141
142
    return dictionary.update({key: query})


# Converters from SunPy Attrs to Observation Record 'description' fields

# Generic applier for single-valued Attrs
@walker.add_applier(a.Level, a.Physobs, a.Sample, sattrs.Filter, sattrs.PolStates,
                    sattrs.DataProduct, sattrs.Telescope, sattrs.Target, sattrs.ObsName)
def _(wlk, attr, params):
    """Set `description.TYPE_NAME`"""
    key = _obs_fields.get(attr.type_name, attr.type_name.upper())
143
    return _update_val(params, key, attr.value, regex=getattr(attr, 'regex', False))
144
145
146
147
148
149
150
151


# OBS_COLLECTION: `instrument_observations` also defines the MongoDB collection (base path).
@walker.add_applier(a.Instrument)
def _(wlk, attr, params):
    """Set 'description.INSTRUMENT' and 'description.OBS_COLLECTION'"""
    params.update({'OBS_COLLECTION': f'{attr.value.lower()}_observations'})
    key = attr.type_name.upper()
152
    return _update_val(params, key, attr.value, regex=getattr(attr, 'regex', False))
153
154
155


# Time range covering the observation - require at least part of it within 'DATE_BEG', 'DATE_END'
156
157
@walker.add_applier(a.Time)
def _(wlk, attr, params):
158
159
    """Set 'description.DATE_BEG|END' [ISO Time str]"""
    if attr.end == attr.start:
160
        _update_val(params, 'DATE_', attr.start, regex=getattr(attr, 'regex', False))
161
    else:
162
163
        _update_range(params, 'DATE_', [attr.start, attr.end],
                      fullrange=getattr(attr, 'fullrange', False))
164
165
166
167
168
169
170
171
    params['DATE_'] = params['DATE_'].replace('_MIN', '_BEG').replace('_MAX', '_END')
    return


# sattrs.Date is a point in time to fall within the observation
@walker.add_applier(sattrs.Date)
def _(wlk, attr, params):
    """Set 'description.DATE_BEG|END' [ISO Time str]"""
172
    _update_val(params, attr.type_name.upper(), attr.time, regex=getattr(attr, 'regex', False))
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
    params['DATE_'] = params['DATE_'].replace('_MIN', '_BEG').replace('_MAX', '_END')
    return


# Generic applier for Range Attrs
@walker.add_applier(a.Wavelength, sattrs.AtmosR0, sattrs.ExposureTime, sattrs.Theta, sattrs.Mu,
                    sattrs.HelioProjLon, sattrs.HelioProjLat, sattrs.SpatialResolution,
                    sattrs.SpectralResolution, sattrs.TemporalResolution, sattrs.NDimensions,
                    sattrs.PolXel, sattrs.SpatialXel1, sattrs.SpatialXel2,
                    sattrs.SpectralXel, sattrs.TimeXel)
def _(wlk, attr, params):
    """Set 'description.TYPE_NAME[_MIN|_MAX]'"""
    key = _obs_fields.get(attr.type_name, attr.type_name.upper())
    # Wavelength returns quantities with default unit Angstrom; convert to nm.
    if isinstance(attr.min, u.Quantity):
        attrange = [attr.min.to_value(u.nm), attr.max.to_value(u.nm)]
    else:
        attrange = [attr.min, attr.max]
    # Are we querying for only a single parameter, or need to sort min/max yet?
    if attrange[1] == attrange[0]:
        return _update_val(params, key, attrange[0])
    else:
195
196
        attrange = [min(attrange), max(attrange)]
        return _update_range(params, key, attrange, fullrange=getattr(attr, 'fullrange', False))
197
198


199
@walker.add_applier(a.Provider, a.ExtentType, a.Source)
200
def _(wlk, attr, params):
201
202
203
    """
    Provider, ExtentType, Source are used by client `_can_handle_query` and not the API.
    """
204
205
206


class KISClient(BaseClient):
207
208
209
210
211
212
213
    """
    Search KIS-SDC observation datasets from GridFS and retrieve [metadata] files.

    """
    _BASE_URL = "http://dockertest:8083/sdc/"
    status = ""

214
215
216
    def __init__(self):
        """"""

217
218
219
220
    @classmethod
    def _status(cls):
        return cls.status

221
222
223
224
225
226
227
228
229
230
    @staticmethod
    def _make_filename(row: QueryResponseRow, path: os.PathLike = None):
        """
        Generate a filename for a file based on the observation record; to be worked out.
        """
        # The fallback name is just the dataset _id.
        name = f"{row['_id']}.fits"

        return path.append(name)

231
232
233
234
235
236
237
238
239
240
241
    def _make_search(self, query_dicts):
        """
        Combine (AND) query parameters from list of params dictionaries to search paths.
        """
        searches = []

        # param dicts for same OBS_COLLECTION (Instrument) could in principle be
        # combined to a single query using '$or':[] ...

        for query_params in query_dicts:
            collection = query_params.pop('OBS_COLLECTION')
242
243
244
            instrument = query_params.get('INSTRUMENT')
            if not collection.split('_')[0] in instrument.lower():
                raise ValueError(f"Mismatch of '{instrument}' in '{collection}.")
245
246
247
248
249
250
251
252
            # Search for existing dict for same collection (instrument) to update:
            # index_obs = [i for i, d in enumerate(params) if collection in d.values()]
            # if len(index_obs) > 0:
            #     ~~~query_dicts[index_obs[0]].update(sub_params)~~~ need to collect them for OR

            searches.append(f"{collection}?filter={{'$and':[{','.join(query_params.values())}]}}")

        return searches
253

254
    def search(self, *query) -> QueryResponseTable:
255
256
257
258
259
        """
        Query the SDC RESTHeart client for a list of results in the observation collection.

        Parameters
        ----------
260
261
262
        query : iterable or `~sunpy.net.attr.AttrAnd, `~sunpy.net.attr.AttrOr``
            Logical combination of `sunpy.net.attrs` objects representing the query,
            or any number of such objects to be combined using the AND (``&``) operator.
263

264
265
        Returns
        -------
266
267
        results : `~sunpy.net.base_client.QueryResponseTable`
            A `QueryResponseTable` instance containing the query result.
268
        """
269
270
271
272
273
274
275
276
277
278
279
        # Unit converters for 'description' fields
        converters = {'ATMOS': lambda x: x * u.cm,
                      'DATE_': lambda x: Time(x['$date'] * 1e-3, format='unix'),
                      'EXPTI': lambda x: x * u.s,
                      'HPLN_': lambda x: x * u.arcsec,
                      'HPLT_': lambda x: x * u.arcsec,
                      'S_RES': lambda x: x * u.arcsec,
                      'THETA': lambda x: x * u.deg,
                      'T_RES': lambda x: x * u.deg,
                      'WAVEL': lambda x: x * u.nm}

280
281
        query = and_(*query)

282
283
284
285
286
        # Not very extensively tested on hierarchical queries
        if not self._can_handle_query(query):
            raise AttributeError(f'Query not possible: {self._status()}')

        queries = self._make_search(walker.create(query))
287
288
        results = []

289
290
        for query_str in queries:
            full_url = f"{self._BASE_URL}{query_str}"
291
            try:
292
                response = urllib.request.urlopen(full_url)
293
                obs_dict = json.loads(response.read()).get('_embedded', [])
294
                if len(obs_dict) > 0 and 'links' in obs_dict[0]:
295
296
297
298
299
300
301
                    # Check if a.Level was set in this query (defaults to 1)
                    cl = re.compile(r"{'description.CALIB_LEVEL':(?P<level>\d+)}").search(query_str)
                    if cl:
                        data_type = f"l{int(cl['level'])}_data"
                    else:
                        data_type = "l1_data"
                    data_ln = obs_dict[0]['links'].get('data_ln', [])
302
303
304
305
                    obs_dict[0].update(obs_dict[0].pop('description', dict()))
                    for k in obs_dict[0].keys():
                        if k[:5] in converters:
                            obs_dict[0][k] = converters[k[:5]](obs_dict[0][k])
306
                    obs_rec = []
307
                    for exp in data_ln:
308
                        obs_rec.append(copy.deepcopy(obs_dict[0]))
309
310
                        obs_rec[-1]['links'] = exp
                        obs_rec[-1]['data_type'] = data_type
311
                    results += obs_rec
312
            except(HTTPError, URLError) as exc:
Derek Homeier's avatar
Derek Homeier committed
313
314
                raise URLError(f'Unable to execute search "{full_url}": {exc}. Confirm that '
                               f'RESTHeart is running on {self._BASE_URL} and connected.')
315

316
        return QueryResponseTable(results, client=self)
317

318
    @convert_row_to_table
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
    def fetch(self, query_results: QueryResponseTable, *, downloader: parfive.Downloader,
              path: os.PathLike = None, binary: bool = False, **kwargs):
        """
        Fetch GridFS dataset metadata or binary files.

        Parameters
        ----------
        query_results:
            Results to download.
        path : `str` or `pathlib.Path`, optional
            Path to the download directory
        downloader : `parfive.Downloader`
            The download manager to use.
        binary : bool, optional
            Fetch the binary data for the dataset (default: metadata).
        """
335
336
337
338
339
340
341
342
343
344
345
        if not len(query_results):
            return

        if binary:
            binfile = '/binary'
            ext = 'fits'
        else:
            binfile = ''
            ext = 'json'

        for row in query_results:
346
            inst = row['INSTRUMENT']
347
            oid = row['links']['$oid']
348
349
            # Content-Disposition header default is "{row['_id']['$oid']}/{oid}.{ext}" (no '.json').
            # rowpath = row['_id']['$oid']
350
            filepath = os.path.join(row['OBS_NAME'], f"{oid}.{ext}")
351
            url = f"{self._BASE_URL}{inst}_{row['data_type']}.files/{oid}{binfile}"
352
353
            downloader.enqueue_file(url, filename=str(path).format(file=filepath,
                                                                   **row.response_block_map))
354
355

    @classmethod
356
357
358
359
360
361
362
363
364
    def _can_handle_query(cls, *query, hasinstr=False):
        """
        Check if query attributes are valid and supported by this client.

        Tests that all attributes in tree are in `~sunpy.net.attrs` or `~sunpy.net.attrs.sdc`
        and, if in registered attributes, have a registered value.

        Parameters
        ----------
365
        query : iterable of `~sunpy.net.attr.Attr`, `~sunpy.net.attr.AttrAnd, `~sunpy.net.attr.AttrOr``
366
367
368
369
370
371
372
373
374
            Collection of query attributes.
        hasinstr : bool, optional
            Assume that `~sunpy.net.attrs.Instrument` attribute is already present.

        Notes
        -----
            An instrument is always required to form the 'sdc/{instrument}_observations'
            base query string, but its presence may not be checked in complex query trees.
        """
375
        query_attrs = set(type(x) for x in query)
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
        sunpy_attrs = [getattr(a, x) for x in a.__all__ if isinstance(getattr(a, x), AttrMeta)]
        sdc_attrs = [getattr(a.sdc, x) for x in a.sdc.__all__ if
                     isinstance(getattr(a.sdc, x), AttrMeta)]
        supported_attrs = set(sunpy_attrs + sdc_attrs + [AttrAnd, AttrOr])

        # Remove a.Provider, a.Source, a.ExtentType here since they are not used as filters?
        if not supported_attrs.issuperset(query_attrs):
            cls.status = f"Unknown Attributes: {[y.__name__ for y in query_attrs-supported_attrs]}"
            return False

        if len(query) == 1 and isinstance(*query, AttrOr):
            for x in query[0].attrs:
                if not cls._can_handle_query(x):
                    return False
            return True

        registered = cls.register_values()
        for x in query:
            if isinstance(x, AttrAnd):
                hasinstr = cls._can_handle_query(*x.attrs, hasinstr=False)
                if not hasinstr:
                    return False
            elif isinstance(x, a.Instrument):
                known = [y[0].lower() for y in registered.get(type(x))]
                if x.value.lower() in known:
                    hasinstr = True
                else:
                    cls.status = f"Instrument {x.value} not in registered list\n  {known}"
                    return False
            # Treat non-str descriptors separately here.
            elif isinstance(x, a.Level):
                if round(x.value) not in range(3):
                    cls.status = f"Level {x.value} not in supported range {range(2)}"
                    return False
            else:
                known = [y[0].lower() for y in registered.get(type(x), [])]
                if known and x.value.lower() not in known:
                    cls.status = f"{type(x).__name__} {x.value} not in registered list\n  {known}"
                    return False

        if not hasinstr:
            cls.status = f"No 'Instrument' found in Attributes {[y.__name__ for y in query_attrs]}"
        return hasinstr

    @classmethod
    def _attrs_module(cls):
422
        return 'sdc', 'sdc.attrs'
423
424
425

    @classmethod
    def register_values(cls):
426
427
428
        """
        Known and supported search values for SDC data, builtins for now.
        """
429
        adict = {
430
431
432
433
434
435
436
437
            a.Provider: [("KIS-SDC",
                          "Science Data Centre at the Leibniz-Institut für Sonnenphysik")],
            # description.INSTRUMENT
            a.Instrument: [("ChroTel", "Chromospheric Telescope CCD Imager"),
                           ("BBI", "Broadband Context Imager @ Gregor"),
                           ("GRIS", "Gregor Infrared Spectrograph"),
                           ("M-lite", "M-lite 2M Imagers @ Gregor"),
                           ("LARS", "Lars is an Absolute Reference Spectrograph @ VTT")],
438
439
440
            #              ("ECHELLE", "Echelle grating Spectrograph @ VTT"),  # 404 error
            #              ("HELLRIDE", "HELioseismic Large Region Interferometric Device @ VTT"),
            #              ("TESOS", "TESOS/VIP 2D Fabry-Perot interferometric spectrometer @ VTT"),
441
            #              ("GFPI", "Gregor Fast Processes Imaging Spectrograph"),
442
            #              ("HiFi", "High-resolution Fast Imager @ Gregor"),   # 404 error
443
            #              ("TIP-II", "Tenerife Infrared Polarimeter @ VTT"),  # name?
444
            #              ("ZIMPOL", "Zeeman Imaging Polarimeter @ Gregor"),  # 404 error
445
446
447
448
449
450
451
452
453
454
455
456
            # description.TELESCOPE - Name of the telescope
            a.sdc.Telescope: [("ChroTel", "10 cm Chromospheric Telescope, Observatorio del Teide"),
                              ("Gregor", "150 cm Gregor Solar Telescope, Observatorio del Teide"),
                              ("VTT", "70 cm Vacuum Tower Telescope, Observatorio del Teide")],
            # description.CALIB_LEVEL - Degree of data processing - note IVOA also specifies 3 & 4.
            a.Level: [("0", "L0 - raw"),
                      ("1", "L1 - calibrated science ready data"),
                      ("2", "L2 - enhanced data products")],
            # description.BTYPE` - IVOA Unified Content Desciptor for data - see also
            # https://gitlab.leibniz-kis.de/sdc/sdc_wiki/-/wikis/standards_and_references/btypes
            a.Physobs: [("phot.flux", "Photon flux"),
                        ("phot.count", "Flux expressed in counts")]
457
458
459
        }

        return adict