client.py 16.9 KB
Newer Older
1
2
3
# -*- coding: utf-8 -*-

import warnings
4
5
import os

6
7
import astropy.units as u
from astropy.time import Time
8
import sunpy.net.attrs as a
9
from sunpy.net.attr import AttrAnd, AttrOr, AttrMeta, AttrWalker, DataAttr, and_, or_
10
11
from sunpy.net.base_client import (BaseClient, QueryResponseTable, QueryResponseRow,
                                   convert_row_to_table)
12
13
from sunpy.util.exceptions import SunpyUserWarning
# from sunpy.net.attrs import Instrument, Level, Physobs, Provider, Time, Wavelength
14
import sdc.attrs as sattrs
15

16
17
18
import urllib.parse
import urllib.request
from urllib.error import HTTPError, URLError
19

20
import json
21
import parfive
22
23
24
25
26

walker = AttrWalker()


@walker.add_creator(AttrOr)
27
28
29
30
31
def create_from_or(wlk, tree):
    """
    For each subtree under the OR, create a new set of query params.
    """
    params = []
32
    for sub in tree.attrs:
33
34
35
36
37
38
39
40
        sub_params = wlk.create(sub)

        # Strip out one layer of nesting of lists
        # This means that create always returns a list of dicts.
        if isinstance(sub_params, list) and len(sub_params) == 1:
            sub_params = sub_params[0]

        params.append(sub_params)
41

42
    return params
43
44
45


@walker.add_creator(AttrAnd, DataAttr)
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def create_new_param(wlk, tree):
    params = dict()

    # Use the apply dispatcher to convert the attrs to their query parameters
    wlk.apply(tree, params)

    return [params]


@walker.add_applier(AttrAnd)
def iterate_over_and(wlk, tree, params):
    for sub in tree.attrs:
        wlk.apply(sub, params)


# Map type_name from SunPy Attrs to observation description fields:
# CALIB_LEVEL - Degree of data processing
# BTYPE - IVOA Unified Content Desciptor for data ('phot.flux')
# T_RESOLUTIION - (Minimal) sampling interval along the time axis [s]
#
# The default field/key name is given by Attr.type_name.upper();
# keys ending in '_' have the special significance of setting both
# `KEY_MIN` and `KEY_MAX` description fields.

_obs_fields = {'level': 'CALIB_LEVEL', 'physobs': 'BTYPE', 'sample': 'T_RESOLUTION',
               'time': 'DATE', 'wave': 'WAVELENGTH_'}


def _str_val(value, regex=False):
    """
    Format `value` for query string.
    """
    if isinstance(value, (int, float, complex)):
        return f"{value:g}"
    elif isinstance(value, Time):
81
        # Dates are stored as f"{{'$date':{int(value.unix * 1000)}}}" but queried as
82
        return f"{{'$date':'{value.isot}'}}"
83
    elif regex:
84
        return f"{{'$regex':'{str(value)}'}}"
85
    else:
86
        # Run through urllib.parse.quote() to be safe?
87
        return f"'{str(value)}'"
88
89


90
91
92
93
94
def _update_val(dictionary, key, value, block='description', regex=False):
    """
    Update dictionary with field_name:value string in format parseable by BSON filter.
    """
    strval = _str_val(value, regex)
95
96
97
98
    # Some fields are always lower- some uppercase...
    if key in ('BTYPE', 'INSTRUMENT', 'OBS_COLLECTION'):
        strval = strval.lower()
    if key in ('POL_STATES', 'TELESCOPE'):
99
100
101
        strval = strval.upper()
    # Special case for fields having a _MIN and _MAX form - require MIN <= value <= MAX:
    if key.endswith('_'):
102
103
        query = (f"{{'{block}.{key}MIN':{{'$lte':{strval}}}}},"
                 f"{{'{block}.{key}MAX':{{'$gte':{strval}}}}}")
104
105
106
107
108
    else:
        query = f"{{'{block}.{key}':{strval}}}"
    return dictionary.update({key: query})


109
def _update_range(dictionary, key, values, block='description', fullrange=False):
110
111
112
    """
    Update dictionary with field_name:{value range} string in format parseable by BSON filter.
    """
113
114
    strvals = [_str_val(values[0]), _str_val(values[1])]
    # Special case for fields having a _MIN and _MAX form:
115
    if key.endswith('_'):
116
117
        if fullrange:
            # Cover full search range: MIN >= values[:] >= MAX
118
119
            query = (f"{{'{block}.{key}MIN':{{'$lte':{strvals[0]}}}}},"
                     f"{{'{block}.{key}MAX':{{'$gte':{strvals[1]}}}}}")
120
121
        else:
            # Default: MIN <= values[1], MAX >= values[0]
122
123
            query = (f"{{'{block}.{key}MIN':{{'$lte':{strvals[1]}}}}},"
                     f"{{'{block}.{key}MAX':{{'$gte':{strvals[0]}}}}}")
124
    else:
125
        query = f"{{'{block}.{key}':{{'$gte':{strvals[0]},'$lte':{strvals[1]}}}}}"
126
127
128
129
130
131
132
133
134
135
136
    return dictionary.update({key: query})


# Converters from SunPy Attrs to Observation Record 'description' fields

# Generic applier for single-valued Attrs
@walker.add_applier(a.Level, a.Physobs, a.Sample, sattrs.Filter, sattrs.PolStates,
                    sattrs.DataProduct, sattrs.Telescope, sattrs.Target, sattrs.ObsName)
def _(wlk, attr, params):
    """Set `description.TYPE_NAME`"""
    key = _obs_fields.get(attr.type_name, attr.type_name.upper())
137
    return _update_val(params, key, attr.value, regex=getattr(attr, 'regex', False))
138
139
140
141
142
143
144
145


# OBS_COLLECTION: `instrument_observations` also defines the MongoDB collection (base path).
@walker.add_applier(a.Instrument)
def _(wlk, attr, params):
    """Set 'description.INSTRUMENT' and 'description.OBS_COLLECTION'"""
    params.update({'OBS_COLLECTION': f'{attr.value.lower()}_observations'})
    key = attr.type_name.upper()
146
    return _update_val(params, key, attr.value, regex=getattr(attr, 'regex', False))
147
148
149


# Time range covering the observation - require at least part of it within 'DATE_BEG', 'DATE_END'
150
151
@walker.add_applier(a.Time)
def _(wlk, attr, params):
152
153
    """Set 'description.DATE_BEG|END' [ISO Time str]"""
    if attr.end == attr.start:
154
        _update_val(params, 'DATE_', attr.start, regex=getattr(attr, 'regex', False))
155
    else:
156
157
        _update_range(params, 'DATE_', [attr.start, attr.end],
                      fullrange=getattr(attr, 'fullrange', False))
158
159
160
161
162
163
164
165
    params['DATE_'] = params['DATE_'].replace('_MIN', '_BEG').replace('_MAX', '_END')
    return


# sattrs.Date is a point in time to fall within the observation
@walker.add_applier(sattrs.Date)
def _(wlk, attr, params):
    """Set 'description.DATE_BEG|END' [ISO Time str]"""
166
    _update_val(params, attr.type_name.upper(), attr.time, regex=getattr(attr, 'regex', False))
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
    params['DATE_'] = params['DATE_'].replace('_MIN', '_BEG').replace('_MAX', '_END')
    return


# Generic applier for Range Attrs
@walker.add_applier(a.Wavelength, sattrs.AtmosR0, sattrs.ExposureTime, sattrs.Theta, sattrs.Mu,
                    sattrs.HelioProjLon, sattrs.HelioProjLat, sattrs.SpatialResolution,
                    sattrs.SpectralResolution, sattrs.TemporalResolution, sattrs.NDimensions,
                    sattrs.PolXel, sattrs.SpatialXel1, sattrs.SpatialXel2,
                    sattrs.SpectralXel, sattrs.TimeXel)
def _(wlk, attr, params):
    """Set 'description.TYPE_NAME[_MIN|_MAX]'"""
    key = _obs_fields.get(attr.type_name, attr.type_name.upper())
    # Wavelength returns quantities with default unit Angstrom; convert to nm.
    if isinstance(attr.min, u.Quantity):
        attrange = [attr.min.to_value(u.nm), attr.max.to_value(u.nm)]
    else:
        attrange = [attr.min, attr.max]
    # Are we querying for only a single parameter, or need to sort min/max yet?
    if attrange[1] == attrange[0]:
        return _update_val(params, key, attrange[0])
    else:
189
190
        attrange = [min(attrange), max(attrange)]
        return _update_range(params, key, attrange, fullrange=getattr(attr, 'fullrange', False))
191
192


193
@walker.add_applier(a.Provider, a.ExtentType, a.Source)
194
def _(wlk, attr, params):
195
196
197
    """
    Provider, ExtentType, Source are used by client `_can_handle_query` and not the API.
    """
198
199
200


class KISClient(BaseClient):
201
202
203
204
205
206
207
    """
    Search KIS-SDC observation datasets from GridFS and retrieve [metadata] files.

    """
    _BASE_URL = "http://dockertest:8083/sdc/"
    status = ""

208
209
210
    def __init__(self):
        """"""

211
212
213
214
    @classmethod
    def _status(cls):
        return cls.status

215
216
217
218
219
220
221
222
223
224
    @staticmethod
    def _make_filename(row: QueryResponseRow, path: os.PathLike = None):
        """
        Generate a filename for a file based on the observation record; to be worked out.
        """
        # The fallback name is just the dataset _id.
        name = f"{row['_id']}.fits"

        return path.append(name)

225
226
227
228
229
230
231
232
233
234
235
    def _make_search(self, query_dicts):
        """
        Combine (AND) query parameters from list of params dictionaries to search paths.
        """
        searches = []

        # param dicts for same OBS_COLLECTION (Instrument) could in principle be
        # combined to a single query using '$or':[] ...

        for query_params in query_dicts:
            collection = query_params.pop('OBS_COLLECTION')
236
237
238
            instrument = query_params.get('INSTRUMENT')
            if not collection.split('_')[0] in instrument.lower():
                raise ValueError(f"Mismatch of '{instrument}' in '{collection}.")
239
240
241
242
243
244
245
246
            # Search for existing dict for same collection (instrument) to update:
            # index_obs = [i for i, d in enumerate(params) if collection in d.values()]
            # if len(index_obs) > 0:
            #     ~~~query_dicts[index_obs[0]].update(sub_params)~~~ need to collect them for OR

            searches.append(f"{collection}?filter={{'$and':[{','.join(query_params.values())}]}}")

        return searches
247

248
    def search(self, *query) -> QueryResponseTable:
249
250
251
252
253
        """
        Query the SDC RESTHeart client for a list of results in the observation collection.

        Parameters
        ----------
254
255
256
        query : iterable or `~sunpy.net.attr.AttrAnd, `~sunpy.net.attr.AttrOr``
            Logical combination of `sunpy.net.attrs` objects representing the query,
            or any number of such objects to be combined using the AND (``&``) operator.
257

258
259
        Returns
        -------
260
261
        results : `~sunpy.net.base_client.QueryResponseTable`
            A `QueryResponseTable` instance containing the query result.
262
        """
263
264
        query = and_(*query)

265
266
267
268
269
        # Not very extensively tested on hierarchical queries
        if not self._can_handle_query(query):
            raise AttributeError(f'Query not possible: {self._status()}')

        queries = self._make_search(walker.create(query))
270
271
        results = []

272
273
274
        for query_string in queries:
            full_url = f"{self._BASE_URL}{query_string}"
            try:
275
                response = urllib.request.urlopen(full_url)
276
277
                obs_dict = json.loads(response.read()).get('_embedded', [])
                results += obs_dict
278
            except(HTTPError, URLError) as exc:
Derek Homeier's avatar
Derek Homeier committed
279
280
                raise URLError(f'Unable to execute search "{full_url}": {exc}. Confirm that '
                               f'RESTHeart is running on {self._BASE_URL} and connected.')
281

282
        return QueryResponseTable(results, client=self)
283

284
    @convert_row_to_table
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
    def fetch(self, query_results: QueryResponseTable, *, downloader: parfive.Downloader,
              path: os.PathLike = None, binary: bool = False, **kwargs):
        """
        Fetch GridFS dataset metadata or binary files.

        Parameters
        ----------
        query_results:
            Results to download.
        path : `str` or `pathlib.Path`, optional
            Path to the download directory
        downloader : `parfive.Downloader`
            The download manager to use.
        binary : bool, optional
            Fetch the binary data for the dataset (default: metadata).
        """
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
        if not len(query_results):
            return

        if binary:
            binfile = '/binary'
            ext = 'fits'
        else:
            binfile = ''
            ext = 'json'

        for row in query_results:
            inst = row['description']['INSTRUMENT']
            rowpath = os.path.join(path, row['_id']['$oid'])
            for l1_data in row['links']['l1_data']:
                oid = l1_data['$oid']
                filename = f"{oid}.{ext}"
                url = f"{self._BASE_URL}{inst}_l1_data.files/{oid}{binfile}"
                downloader.enqueue_file(url, filename=os.path.join(rowpath, filename), max_splits=1)
319
320

    @classmethod
321
322
323
324
325
326
327
328
329
    def _can_handle_query(cls, *query, hasinstr=False):
        """
        Check if query attributes are valid and supported by this client.

        Tests that all attributes in tree are in `~sunpy.net.attrs` or `~sunpy.net.attrs.sdc`
        and, if in registered attributes, have a registered value.

        Parameters
        ----------
330
        query : iterable of `~sunpy.net.attr.Attr`, `~sunpy.net.attr.AttrAnd, `~sunpy.net.attr.AttrOr``
331
332
333
334
335
336
337
338
339
            Collection of query attributes.
        hasinstr : bool, optional
            Assume that `~sunpy.net.attrs.Instrument` attribute is already present.

        Notes
        -----
            An instrument is always required to form the 'sdc/{instrument}_observations'
            base query string, but its presence may not be checked in complex query trees.
        """
340
        query_attrs = set(type(x) for x in query)
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
        sunpy_attrs = [getattr(a, x) for x in a.__all__ if isinstance(getattr(a, x), AttrMeta)]
        sdc_attrs = [getattr(a.sdc, x) for x in a.sdc.__all__ if
                     isinstance(getattr(a.sdc, x), AttrMeta)]
        supported_attrs = set(sunpy_attrs + sdc_attrs + [AttrAnd, AttrOr])

        # Remove a.Provider, a.Source, a.ExtentType here since they are not used as filters?
        if not supported_attrs.issuperset(query_attrs):
            cls.status = f"Unknown Attributes: {[y.__name__ for y in query_attrs-supported_attrs]}"
            return False

        if len(query) == 1 and isinstance(*query, AttrOr):
            for x in query[0].attrs:
                if not cls._can_handle_query(x):
                    return False
            return True

        registered = cls.register_values()
        for x in query:
            if isinstance(x, AttrAnd):
                hasinstr = cls._can_handle_query(*x.attrs, hasinstr=False)
                if not hasinstr:
                    return False
            elif isinstance(x, a.Instrument):
                known = [y[0].lower() for y in registered.get(type(x))]
                if x.value.lower() in known:
                    hasinstr = True
                else:
                    cls.status = f"Instrument {x.value} not in registered list\n  {known}"
                    return False
            # Treat non-str descriptors separately here.
            elif isinstance(x, a.Level):
                if round(x.value) not in range(3):
                    cls.status = f"Level {x.value} not in supported range {range(2)}"
                    return False
            else:
                known = [y[0].lower() for y in registered.get(type(x), [])]
                if known and x.value.lower() not in known:
                    cls.status = f"{type(x).__name__} {x.value} not in registered list\n  {known}"
                    return False

        if not hasinstr:
            cls.status = f"No 'Instrument' found in Attributes {[y.__name__ for y in query_attrs]}"
        return hasinstr

    @classmethod
    def _attrs_module(cls):
387
        return 'sdc', 'sdc.attrs'
388
389
390

    @classmethod
    def register_values(cls):
391
392
393
        """
        Known and supported search values for SDC data, builtins for now.
        """
394
        adict = {
395
396
397
398
399
400
401
402
            a.Provider: [("KIS-SDC",
                          "Science Data Centre at the Leibniz-Institut für Sonnenphysik")],
            # description.INSTRUMENT
            a.Instrument: [("ChroTel", "Chromospheric Telescope CCD Imager"),
                           ("BBI", "Broadband Context Imager @ Gregor"),
                           ("GRIS", "Gregor Infrared Spectrograph"),
                           ("M-lite", "M-lite 2M Imagers @ Gregor"),
                           ("LARS", "Lars is an Absolute Reference Spectrograph @ VTT")],
403
404
405
            #              ("ECHELLE", "Echelle grating Spectrograph @ VTT"),  # 404 error
            #              ("HELLRIDE", "HELioseismic Large Region Interferometric Device @ VTT"),
            #              ("TESOS", "TESOS/VIP 2D Fabry-Perot interferometric spectrometer @ VTT"),
406
            #              ("GFPI", "Gregor Fast Processes Imaging Spectrograph"),
407
            #              ("HiFi", "High-resolution Fast Imager @ Gregor"),   # 404 error
408
            #              ("TIP-II", "Tenerife Infrared Polarimeter @ VTT"),  # name?
409
            #              ("ZIMPOL", "Zeeman Imaging Polarimeter @ Gregor"),  # 404 error
410
411
412
413
414
415
416
417
418
419
420
421
            # description.TELESCOPE - Name of the telescope
            a.sdc.Telescope: [("ChroTel", "10 cm Chromospheric Telescope, Observatorio del Teide"),
                              ("Gregor", "150 cm Gregor Solar Telescope, Observatorio del Teide"),
                              ("VTT", "70 cm Vacuum Tower Telescope, Observatorio del Teide")],
            # description.CALIB_LEVEL - Degree of data processing - note IVOA also specifies 3 & 4.
            a.Level: [("0", "L0 - raw"),
                      ("1", "L1 - calibrated science ready data"),
                      ("2", "L2 - enhanced data products")],
            # description.BTYPE` - IVOA Unified Content Desciptor for data - see also
            # https://gitlab.leibniz-kis.de/sdc/sdc_wiki/-/wikis/standards_and_references/btypes
            a.Physobs: [("phot.flux", "Photon flux"),
                        ("phot.count", "Flux expressed in counts")]
422
423
424
        }

        return adict