diff --git a/.gitignore b/.gitignore index 12ab2d61..23f84355 100644 --- a/.gitignore +++ b/.gitignore @@ -4,5 +4,6 @@ *.idea *.DS_Store analysis/webservice/algorithms/doms/domsconfig.ini -data-access/nexustiles/config/datastores.ini +data-access/nexustiles/backends/nexusproto/config/datastores.ini +data-access/nexustiles/config/datasets.ini venv/ diff --git a/CHANGELOG.md b/CHANGELOG.md index 94e2fa60..60cb6081 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,12 +8,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - SDAP-506: - Added STAC Catalog endpoint for matchup outputs +- SDAP-472: + - Support for Zarr backend (gridded data only) + - Dataset management endpoints for Zarr datasets +- SDAP-498: Support for satellite units & other dataset-level metadata ### Changed - SDAP-493: - Updated /job endpoint to use `executionId` terminology for consistency with existing `/cdmsresults` endpoint - Updated /job endpoint with details about number of primary and secondary tiles. - SDAP-500: Improvements to SDAP Asynchronous Jobs - SDAP-499: Added page number to default filename for matchup output +- SDAP-472: Overhauled `data-access` to support multiple backends for simultaneous support of multiple ARD formats ### Deprecated ### Removed - SDAP-493: diff --git a/analysis/conda-requirements.txt b/analysis/conda-requirements.txt index e27bdeae..22dff066 100644 --- a/analysis/conda-requirements.txt +++ b/analysis/conda-requirements.txt @@ -22,7 +22,8 @@ pytz==2021.1 utm==0.6.0 shapely==1.7.1 backports.functools_lru_cache==1.6.1 -boto3==1.16.63 +boto3>=1.16.63 +botocore==1.24.21 pillow==8.1.0 mpld3=0.5.1 tornado==6.1 @@ -33,4 +34,4 @@ gdal==3.2.1 mock==4.0.3 importlib_metadata==4.11.4 #singledispatch==3.4.0.3 - +schema diff --git a/analysis/setup.py b/analysis/setup.py index 8fbc617e..2e09815c 100644 --- a/analysis/setup.py +++ b/analysis/setup.py @@ -17,8 +17,11 @@ import setuptools from subprocess import check_call, CalledProcessError -with open('../VERSION.txt', 'r') as f: - __version__ = f.read() +try: + with open('../VERSION.txt', 'r') as f: + __version__ = f.read() +except: + __version__ = None try: diff --git a/analysis/webservice/algorithms/DailyDifferenceAverage.py b/analysis/webservice/algorithms/DailyDifferenceAverage.py index 05274fc2..c6c84951 100644 --- a/analysis/webservice/algorithms/DailyDifferenceAverage.py +++ b/analysis/webservice/algorithms/DailyDifferenceAverage.py @@ -21,7 +21,8 @@ import numpy as np import pytz -from nexustiles.nexustiles import NexusTileService, NexusTileServiceException +from nexustiles.nexustiles import NexusTileService +from nexustiles.exception import NexusTileServiceException from shapely.geometry import box from webservice.NexusHandler import nexus_handler diff --git a/analysis/webservice/algorithms/StandardDeviationSearch.py b/analysis/webservice/algorithms/StandardDeviationSearch.py index ae0566f1..26451cb1 100644 --- a/analysis/webservice/algorithms/StandardDeviationSearch.py +++ b/analysis/webservice/algorithms/StandardDeviationSearch.py @@ -19,7 +19,7 @@ from datetime import datetime from functools import partial -from nexustiles.nexustiles import NexusTileServiceException +from nexustiles.exception import NexusTileServiceException from pytz import timezone from webservice.NexusHandler import nexus_handler diff --git a/analysis/webservice/algorithms/doms/BaseDomsHandler.py b/analysis/webservice/algorithms/doms/BaseDomsHandler.py index f1ac0923..bc7db09e 100644 --- a/analysis/webservice/algorithms/doms/BaseDomsHandler.py +++ b/analysis/webservice/algorithms/doms/BaseDomsHandler.py @@ -78,14 +78,15 @@ def default(self, obj): class DomsQueryResults(NexusResults): def __init__(self, results=None, args=None, bounds=None, count=None, details=None, computeOptions=None, - executionId=None, status_code=200, page_num=None, page_size=None): - NexusResults.__init__(self, results=results, meta=None, stats=None, computeOptions=computeOptions, + executionId=None, status_code=200, page_num=None, page_size=None, meta=None): + NexusResults.__init__(self, results=results, meta=meta, stats=None, computeOptions=computeOptions, status_code=status_code) self.__args = args self.__bounds = bounds self.__count = count self.__details = details self.__executionId = str(executionId) + self.__meta = meta if meta is not None else {} if self.__details is None: self.__details = {} @@ -98,13 +99,13 @@ def toJson(self): bounds = self.__bounds.toMap() if self.__bounds is not None else {} return json.dumps( {"executionId": self.__executionId, "data": self.results(), "params": self.__args, "bounds": bounds, - "count": self.__count, "details": self.__details}, indent=4, cls=DomsEncoder) + "count": self.__count, "details": self.__details, "metadata": self.__meta}, indent=4, cls=DomsEncoder) def toCSV(self): - return DomsCSVFormatter.create(self.__executionId, self.results(), self.__args, self.__details) + return DomsCSVFormatter.create(self.__executionId, self.results(), self.__args, self.__details, self.__meta) def toNetCDF(self): - return DomsNetCDFFormatter.create(self.__executionId, self.results(), self.__args, self.__details) + return DomsNetCDFFormatter.create(self.__executionId, self.results(), self.__args, self.__details, self.__meta) def filename(self): return f'CDMS_{self.__executionId}_page{self.__details["pageNum"]}' @@ -112,12 +113,13 @@ def filename(self): class DomsCSVFormatter: @staticmethod - def create(executionId, results, params, details): + def create(executionId, results, params, details, metadata): csv_mem_file = io.StringIO() try: DomsCSVFormatter.__addConstants(csv_mem_file) DomsCSVFormatter.__addDynamicAttrs(csv_mem_file, executionId, results, params, details) + DomsCSVFormatter.__addMetadata(csv_mem_file, metadata) csv.writer(csv_mem_file).writerow([]) DomsCSVFormatter.__packValues(csv_mem_file, results) @@ -135,7 +137,11 @@ def is_empty(s): name = variable['cf_variable_name'] - return name if not is_empty(name) else variable['variable_name'] + header_name = name if not is_empty(name) else variable['variable_name'] + + unit = variable.get('variable_unit', None) + + return f'{header_name} ({unit})' if unit is not None else header_name @staticmethod def __packValues(csv_mem_file, results): @@ -288,10 +294,31 @@ def __addDynamicAttrs(csvfile, executionId, results, params, details): writer.writerows(global_attrs) + @staticmethod + def __addMetadata(csvfile, meta): + def meta_dict_to_list(meta_dict: dict, prefix='metadata') -> list: + attrs = [] + + for key in meta_dict: + new_key = key if prefix == '' else f'{prefix}.{key}' + value = meta_dict[key] + + if isinstance(value, dict): + attrs.extend(meta_dict_to_list(value, new_key)) + else: + attrs.append(dict(MetadataAttribute=new_key, Value=value)) + + return attrs + + metadata_attrs = meta_dict_to_list(meta) + + writer = csv.DictWriter(csvfile, sorted(next(iter(metadata_attrs)).keys())) + writer.writerows(metadata_attrs) + class DomsNetCDFFormatter: @staticmethod - def create(executionId, results, params, details): + def create(executionId, results, params, details, metadata): t = tempfile.mkstemp(prefix="cdms_", suffix=".nc") tempFileName = t[1] @@ -335,6 +362,30 @@ def create(executionId, results, params, details): dataset.CDMS_page_num = details["pageNum"] dataset.CDMS_page_size = details["pageSize"] + ####TEST + + def meta_dict_to_list(meta_dict: dict, prefix='metadata') -> list: + attrs = [] + + for key in meta_dict: + new_key = key if prefix == '' else f'{prefix}.{key}' + value = meta_dict[key] + + if value is None: + value = 'NULL' + elif isinstance(value, list): + value = json.dumps(value) + + if isinstance(value, dict): + attrs.extend(meta_dict_to_list(value, new_key)) + else: + attrs.append((new_key, value)) + + return attrs + + for attr in meta_dict_to_list(metadata): + setattr(dataset, *attr) + insituDatasets = params["matchup"] insituLinks = set() for insitu in insituDatasets: @@ -534,7 +585,8 @@ def writeGroup(self): self.__enrichVariable(data_variable, min_data, max_data, has_depth=None, unit=units[variable]) data_variable[:] = np.ma.masked_invalid(variables[variable]) data_variable.long_name = name - data_variable.standard_name = cf_name + if cf_name: + data_variable.standard_name = cf_name # # Lists may include 'None" values, to calc min these must be filtered out diff --git a/analysis/webservice/algorithms/doms/ResultsRetrieval.py b/analysis/webservice/algorithms/doms/ResultsRetrieval.py index f03c1caa..0b26056a 100644 --- a/analysis/webservice/algorithms/doms/ResultsRetrieval.py +++ b/analysis/webservice/algorithms/doms/ResultsRetrieval.py @@ -19,6 +19,11 @@ from . import ResultsStorage from webservice.NexusHandler import nexus_handler from webservice.webmodel import NexusProcessingException +from nexustiles.nexustiles import NexusTileService + +import logging + +log = logging.getLogger(__name__) @nexus_handler @@ -48,5 +53,22 @@ def calc(self, computeOptions, **args): with ResultsStorage.ResultsRetrieval(self.config) as storage: params, stats, data = storage.retrieveResults(execution_id, trim_data=simple_results, page_num=page_num, page_size=page_size) + try: + ds_metadata = {} + ds_meta_primary_name = params['primary'] + + primary_metadata = NexusTileService.get_metadata_for_dataset(ds_meta_primary_name) + + ds_metadata['primary'] = {ds_meta_primary_name: primary_metadata} + + ds_metadata['secondary'] = {} + + for secondary_ds_name in params['matchup'].split(','): + ds_metadata['secondary'][secondary_ds_name] = NexusTileService.get_metadata_for_dataset(secondary_ds_name) + except: + log.warning('Could not build dataset metadata dict due to an error') + ds_metadata = {} + return BaseDomsHandler.DomsQueryResults(results=data, args=params, details=stats, bounds=None, count=len(data), - computeOptions=None, executionId=execution_id, page_num=page_num, page_size=page_size) + computeOptions=None, executionId=execution_id, page_num=page_num, + page_size=page_size, meta=dict(datasets=ds_metadata)) diff --git a/analysis/webservice/algorithms/doms/subsetter.py b/analysis/webservice/algorithms/doms/subsetter.py index bf63fc88..c8ae8d79 100644 --- a/analysis/webservice/algorithms/doms/subsetter.py +++ b/analysis/webservice/algorithms/doms/subsetter.py @@ -24,6 +24,8 @@ from webservice.algorithms.doms.insitu import query_insitu from webservice.webmodel import NexusProcessingException, NexusResults +from nexustiles.nexustiles import NexusTileService + from . import BaseDomsHandler ISO_8601 = '%Y-%m-%dT%H:%M:%S%z' @@ -302,6 +304,20 @@ def toCsv(self): logging.info('Converting result to CSV') for dataset_name, results in dataset_results.items(): + try: + ds_metadata = NexusTileService.get_metadata_for_dataset(dataset_name) + except: + ds_metadata = {} + + ds_vars = ds_metadata.get('variables', []) + + variable_dict = {} + variable_dict_cf = {} + + for v in ds_vars: + variable_dict[v['name']] = v + variable_dict_cf[v['cf_standard_name']] = v + rows = [] headers = [ @@ -309,13 +325,25 @@ def toCsv(self): 'latitude', 'time' ] - data_variables = list(set([keys for result in results for keys in result['data'].keys()])) - data_variables.sort() + + data_variables = [] + data_variable_headers = [] + + for dv in sorted(list(set([keys for result in results for keys in result['data'].keys()]))): + data_variables.append(dv) + + if dv in variable_dict_cf and variable_dict_cf[dv]["unit"] is not None: + data_variable_headers.append(f'{dv} ({variable_dict_cf[dv]["unit"]})') + elif dv in variable_dict and variable_dict[dv]["unit"] is not None: + data_variable_headers.append(f'{dv} ({variable_dict[dv]["unit"]})') + else: + data_variable_headers.append(dv) if 'id' in list(set([keys for result in results for keys in result.keys()])): headers.append('id') - headers.extend(data_variables) + headers.extend(data_variable_headers) + for i, result in enumerate(results): cols = [] diff --git a/analysis/webservice/algorithms_spark/DailyDifferenceAverageSpark.py b/analysis/webservice/algorithms_spark/DailyDifferenceAverageSpark.py index b4245783..12f7deec 100644 --- a/analysis/webservice/algorithms_spark/DailyDifferenceAverageSpark.py +++ b/analysis/webservice/algorithms_spark/DailyDifferenceAverageSpark.py @@ -324,7 +324,7 @@ def calculate_diff(tile_service_factory, tile_ids, bounding_wkt, dataset, climat for tile_id in tile_ids: # Get the dataset tile try: - dataset_tile = get_dataset_tile(tile_service, wkt.loads(bounding_wkt.value), tile_id) + dataset_tile = get_dataset_tile(tile_service, wkt.loads(bounding_wkt.value), tile_id, dataset.value) except NoDatasetTile: # This should only happen if all measurements in a tile become masked after applying the bounding polygon continue @@ -348,12 +348,12 @@ def calculate_diff(tile_service_factory, tile_ids, bounding_wkt, dataset, climat return chain(*diff_generators) -def get_dataset_tile(tile_service, search_bounding_shape, tile_id): +def get_dataset_tile(tile_service, search_bounding_shape, tile_id, dataset): the_time = datetime.now() try: # Load the dataset tile - dataset_tile = tile_service.find_tile_by_id(tile_id)[0] + dataset_tile = tile_service.find_tile_by_id(tile_id, ds=dataset)[0] # Mask it to the search domain dataset_tile = tile_service.mask_tiles_to_polygon(search_bounding_shape, [dataset_tile])[0] except IndexError: diff --git a/analysis/webservice/algorithms_spark/HofMoellerSpark.py b/analysis/webservice/algorithms_spark/HofMoellerSpark.py index d50006a0..e876a11b 100644 --- a/analysis/webservice/algorithms_spark/HofMoellerSpark.py +++ b/analysis/webservice/algorithms_spark/HofMoellerSpark.py @@ -44,12 +44,12 @@ class HofMoellerCalculator(object): def hofmoeller_stats(tile_service_factory, metrics_callback, tile_in_spark): (latlon, tile_id, index, - min_lat, max_lat, min_lon, max_lon) = tile_in_spark + min_lat, max_lat, min_lon, max_lon, dataset) = tile_in_spark tile_service = tile_service_factory() try: # Load the dataset tile - tile = tile_service.find_tile_by_id(tile_id, metrics_callback=metrics_callback)[0] + tile = tile_service.find_tile_by_id(tile_id, metrics_callback=metrics_callback, ds=dataset)[0] calculation_start = datetime.now() # Mask it to the search domain tile = tile_service.mask_tiles_to_bbox(min_lat, max_lat, @@ -352,7 +352,7 @@ def calc(self, compute_options, **args): min_lon, min_lat, max_lon, max_lat = bbox.bounds - nexus_tiles_spark = [(self._latlon, tile.tile_id, x, min_lat, max_lat, min_lon, max_lon) for x, tile in + nexus_tiles_spark = [(self._latlon, tile.tile_id, x, min_lat, max_lat, min_lon, max_lon, tile.dataset) for x, tile in enumerate(self._get_tile_service().find_tiles_in_box(min_lat, max_lat, min_lon, max_lon, ds, start_time, end_time, metrics_callback=metrics_record.record_metrics, @@ -408,7 +408,7 @@ def calc(self, compute_options, **args): min_lon, min_lat, max_lon, max_lat = bbox.bounds - nexus_tiles_spark = [(self._latlon, tile.tile_id, x, min_lat, max_lat, min_lon, max_lon) for x, tile in + nexus_tiles_spark = [(self._latlon, tile.tile_id, x, min_lat, max_lat, min_lon, max_lon, tile.dataset) for x, tile in enumerate(self._get_tile_service().find_tiles_in_box(min_lat, max_lat, min_lon, max_lon, ds, start_time, end_time, metrics_callback=metrics_record.record_metrics, diff --git a/analysis/webservice/algorithms_spark/Matchup.py b/analysis/webservice/algorithms_spark/Matchup.py index 7c7f551b..defed2be 100644 --- a/analysis/webservice/algorithms_spark/Matchup.py +++ b/analysis/webservice/algorithms_spark/Matchup.py @@ -41,6 +41,8 @@ from webservice.webmodel import NexusProcessingException from webservice.webmodel.NexusExecutionResults import ExecutionStatus +from nexustiles.nexustiles import NexusTileService + EPOCH = timezone('UTC').localize(datetime(1970, 1, 1)) ISO_8601 = '%Y-%m-%dT%H:%M:%S%z' @@ -485,14 +487,36 @@ def from_nexus_point(nexus_point, tile=None): else: data_vals = [nexus_point.data_vals] + ds_metadata = NexusTileService.get_metadata_for_dataset(tile.dataset) + + if ds_metadata is not None: + ds_vars = ds_metadata.get('variables', []) + else: + ds_vars = [] + + variable_dict = {} + + for v in ds_vars: + variable_dict[v['name']] = v + data = [] for data_val, variable in zip(data_vals, tile.variables): if data_val: + if variable.variable_name in variable_dict: + standard_name = variable_dict[variable.variable_name]['cf_standard_name'] + unit = variable_dict[variable.variable_name]['unit'] + else: + standard_name = variable.standard_name + unit = None + + if standard_name is None or standard_name == '': + standard_name = variable.standard_name + data.append(DataPoint( variable_name=variable.variable_name, variable_value=data_val, - cf_variable_name=variable.standard_name, - variable_unit=None + cf_variable_name=standard_name, + variable_unit=unit )) point.data = data @@ -841,9 +865,9 @@ def match_satellite_to_insitu(tile_ids, primary_b, secondary_b, parameter_b, tt_ tile_service = tile_service_factory() # Determine the spatial temporal extents of this partition of tiles - tiles_bbox = tile_service.get_bounding_box(tile_ids) - tiles_min_time = tile_service.get_min_time(tile_ids) - tiles_max_time = tile_service.get_max_time(tile_ids) + tiles_bbox = tile_service.get_bounding_box(tile_ids, ds=primary_b.value) + tiles_min_time = tile_service.get_min_time(tile_ids, ds=primary_b.value) + tiles_max_time = tile_service.get_max_time(tile_ids, ds=primary_b.value) # Increase spatial extents by the radius tolerance matchup_min_lon, matchup_min_lat = add_meters_to_lon_lat(tiles_bbox.bounds[0], tiles_bbox.bounds[1], @@ -922,7 +946,7 @@ def match_satellite_to_insitu(tile_ids, primary_b, secondary_b, parameter_b, tt_ edge_results = [] for tile in matchup_tiles: # Retrieve tile data and convert to lat/lon projection - tiles = tile_service.find_tile_by_id(tile.tile_id, fetch_data=True) + tiles = tile_service.find_tile_by_id(tile.tile_id, fetch_data=True, ds=secondary_b.value) tile = tiles[0] valid_indices = tile.get_indices() @@ -948,14 +972,14 @@ def match_satellite_to_insitu(tile_ids, primary_b, secondary_b, parameter_b, tt_ # The actual matching happens in the generator. This is so that we only load 1 tile into memory at a time match_generators = [match_tile_to_point_generator(tile_service, tile_id, m_tree, edge_results, bounding_wkt_b.value, - parameter_b.value, rt_b.value, aeqd_proj) for tile_id - in tile_ids] + parameter_b.value, rt_b.value, aeqd_proj, primary_b.value) + for tile_id in tile_ids] return chain(*match_generators) def match_tile_to_point_generator(tile_service, tile_id, m_tree, edge_results, search_domain_bounding_wkt, - search_parameter, radius_tolerance, aeqd_proj): + search_parameter, radius_tolerance, aeqd_proj, primary_ds): from nexustiles.model.nexusmodel import NexusPoint from webservice.algorithms_spark.Matchup import DomsPoint # Must import DomsPoint or Spark complains @@ -963,7 +987,7 @@ def match_tile_to_point_generator(tile_service, tile_id, m_tree, edge_results, s try: the_time = datetime.now() tile = tile_service.mask_tiles_to_polygon(wkt.loads(search_domain_bounding_wkt), - tile_service.find_tile_by_id(tile_id))[0] + tile_service.find_tile_by_id(tile_id, ds=primary_ds))[0] print("%s Time to load tile %s" % (str(datetime.now() - the_time), tile_id)) except IndexError: # This should only happen if all measurements in a tile become masked after applying the bounding polygon diff --git a/analysis/webservice/algorithms_spark/NexusCalcSparkHandler.py b/analysis/webservice/algorithms_spark/NexusCalcSparkHandler.py index 4499773a..e0334676 100644 --- a/analysis/webservice/algorithms_spark/NexusCalcSparkHandler.py +++ b/analysis/webservice/algorithms_spark/NexusCalcSparkHandler.py @@ -362,6 +362,9 @@ def _create_metrics_record(self): SparkAccumulatorMetricsField(key='solr', description='Cumulative time to fetch data from Solr', accumulator=self._sc.accumulator(0)), + SparkAccumulatorMetricsField(key='backend', + description='Cumulative time to fetch data from external backend(s)', + accumulator=self._sc.accumulator(0)), SparkAccumulatorMetricsField(key='calculation', description='Cumulative time to do calculations', accumulator=self._sc.accumulator(0)), diff --git a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py index 90ae14d9..804d3eca 100644 --- a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py +++ b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py @@ -483,8 +483,9 @@ def calc_average_on_day(tile_service_factory, metrics_callback, normalize_dates, timestamps[0], timestamps[-1], rows=5000, - metrics_callback=metrics_callback) - + metrics_callback=metrics_callback, + distinct=True) + calculation_start = datetime.now() tile_dict = {} diff --git a/analysis/webservice/config/web.ini b/analysis/webservice/config/web.ini index 85849758..a9e3dda8 100644 --- a/analysis/webservice/config/web.ini +++ b/analysis/webservice/config/web.ini @@ -29,4 +29,4 @@ static_enabled=true static_dir=static [modules] -module_dirs=webservice.algorithms,webservice.algorithms_spark,webservice.algorithms.doms \ No newline at end of file +module_dirs=webservice.algorithms,webservice.algorithms_spark,webservice.algorithms.doms,webservice.management \ No newline at end of file diff --git a/analysis/webservice/management/Datasets.py b/analysis/webservice/management/Datasets.py new file mode 100644 index 00000000..40b267fd --- /dev/null +++ b/analysis/webservice/management/Datasets.py @@ -0,0 +1,242 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from yaml import load +import json +from webservice.NexusHandler import nexus_handler +from nexustiles.nexustiles import NexusTileService +from webservice.webmodel import NexusRequestObject, NexusProcessingException + +from schema import Schema, Or, SchemaError +from schema import Optional as Opt + +from urllib.parse import urlparse +try: + from yaml import CLoader as Loader +except ImportError: + from yaml import Loader + + +CONFIG_SCHEMA = Schema({ + Or('variable', 'variables'): Or(str, [str]), + 'coords': { + 'latitude': str, + 'longitude': str, + 'time': str, + Opt('depth'): str + }, + Opt('aws'): { + Opt('accessKeyID'): str, + Opt('secretAccessKey'): str, + 'public': bool, + Opt('region'): str + } +}) + + +class DatasetManagement: + @classmethod + def validate(cls): + pass + + @staticmethod + def parse_config(request: NexusRequestObject): + content_type = request.get_headers()['Content-Type'] + + if content_type in ['application/json', 'application/x-json']: + config_dict = json.loads(request.get_request_body()) + elif content_type == 'application/yaml': + config_dict = load(request.get_request_body(), Loader=Loader) + else: + raise NexusProcessingException(reason='Invalid Content-Type header', code=400) + + try: + CONFIG_SCHEMA.validate(config_dict) + + if 'aws' in config_dict: + if not config_dict['aws']['public']: + if 'accessKeyID' not in config_dict['aws'] or 'secretAccessKey' not in config_dict['aws']: + raise NexusProcessingException( + reason='Must provide AWS creds for non-public bucket', + code=400 + ) + except SchemaError as e: + raise NexusProcessingException( + reason=str(e), + code=400 + ) + + return config_dict + + +class Response: + def __init__(self, response): + self.response = response if response is not None else {} + + def toJson(self): + return json.dumps(self.response) + + +@nexus_handler +class DatasetAdd(DatasetManagement): + name = 'Add dataset' + path = '/datasets/add' + description = "Add new Zarr dataset to running SDAP instance" + params = { + "name": { + "name": "Dataset name", + "type": "string", + "description": "Name of new dataset to add" + }, + "path": { + "name": "Path or URL", + "type": "string", + "description": "Path/URL of Zarr group" + }, + "body": { + "name": "Request body", + "type": "application/json OR application/yaml", + "description": "POST request body. Config options for Zarr (variabe, coords, aws (if applicable))" + } + } + + def __init__(self, **args): + pass + + def calc(self, request: NexusRequestObject, **args): + try: + config = DatasetManagement.parse_config(request) + except Exception as e: + raise NexusProcessingException( + reason=repr(e), + code=400 + ) + + name = request.get_argument('name') + + if name is None: + raise NexusProcessingException( + reason='Name argument must be provided', + code=400 + ) + + path = request.get_argument('path') + + if path is None: + raise NexusProcessingException( + reason='Path argument must be provided', + code=400 + ) + + try: + if urlparse(path).scheme not in ['file','','s3']: + raise NexusProcessingException( + reason='Dataset URL must be for a local file or S3 URL', + code=400 + ) + except ValueError: + raise NexusProcessingException( + reason='Could not parse path URL', code=400 + ) + + try: + NexusTileService.user_ds_add(name, path, config) + except Exception as e: + raise NexusProcessingException( + reason=repr(e), + code=500 + ) + + +@nexus_handler +class DatasetUpdate(DatasetManagement): + name = 'Update dynamically added dataset' + path = '/datasets/update' + description = "Update Zarr dataset in running SDAP instance" + params = { + "name": { + "name": "Dataset name", + "type": "string", + "description": "Name of dataset to update" + }, + "body": { + "name": "Request body", + "type": "application/json OR application/yaml", + "description": "POST request body. Config options for Zarr (variabe, coords, aws (if applicable))" + } + } + + def __init__(self, **args): + pass + + def calc(self, request: NexusRequestObject, **args): + try: + config = DatasetManagement.parse_config(request) + except Exception as e: + raise NexusProcessingException( + reason=repr(e), + code=400 + ) + + name = request.get_argument('name') + + if name is None: + raise NexusProcessingException( + reason='Name argument must be provided', + code=400 + ) + + try: + return Response(NexusTileService.user_ds_update(name, config)) + except Exception as e: + raise NexusProcessingException( + reason=repr(e), + code=500 + ) + + +@nexus_handler +class DatasetDelete(DatasetManagement): + name = 'Remove dataset' + path = '/datasets/remove' + description = "Remove Zarr dataset from running SDAP instance" + params = { + "name": { + "name": "Dataset name", + "type": "string", + "description": "Name of dataset to remove" + } + } + + def __init__(self, **args): + pass + + def calc(self, request: NexusRequestObject, **args): + name = request.get_argument('name') + + if name is None: + raise NexusProcessingException( + reason='Name argument must be provided', + code=400 + ) + + try: + return Response(NexusTileService.user_ds_delete(name)) + except Exception as e: + raise NexusProcessingException( + reason=repr(e), + code=500 + ) + diff --git a/analysis/webservice/management/__init__.py b/analysis/webservice/management/__init__.py new file mode 100644 index 00000000..7c9f5ef4 --- /dev/null +++ b/analysis/webservice/management/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from webservice.management.Datasets import DatasetAdd \ No newline at end of file diff --git a/analysis/webservice/nexus_tornado/app_builders/NexusAppBuilder.py b/analysis/webservice/nexus_tornado/app_builders/NexusAppBuilder.py index afe7d690..01798583 100644 --- a/analysis/webservice/nexus_tornado/app_builders/NexusAppBuilder.py +++ b/analysis/webservice/nexus_tornado/app_builders/NexusAppBuilder.py @@ -53,7 +53,7 @@ def set_modules(self, module_dir, algorithm_config, remote_collections=None, max NexusHandler.executeInitializers(algorithm_config) self.log.info("Initializing request ThreadPool to %s" % max_request_threads) - tile_service_factory = partial(NexusTileService, False, False, algorithm_config) + tile_service_factory = partial(NexusTileService, algorithm_config) handler_args_builder = HandlerArgsBuilder( max_request_threads, tile_service_factory, diff --git a/analysis/webservice/nexus_tornado/request/handlers/NexusRequestHandler.py b/analysis/webservice/nexus_tornado/request/handlers/NexusRequestHandler.py index 95bddf46..7a559679 100644 --- a/analysis/webservice/nexus_tornado/request/handlers/NexusRequestHandler.py +++ b/analysis/webservice/nexus_tornado/request/handlers/NexusRequestHandler.py @@ -75,6 +75,35 @@ def get(self): except Exception as e: self.async_onerror_callback(str(e), 500) + @tornado.gen.coroutine + def post(self): + self.logger.info("Received %s" % self._request_summary()) + + request = NexusRequestObject(self) + + # create NexusCalcHandler which will process the request + instance = self.__clazz(**self._clazz_init_args) + + try: + # process the request asynchronously on a different thread, + # the current tornado handler is still available to get other user requests + results = yield tornado.ioloop.IOLoop.current().run_in_executor(self.executor, instance.calc, request) + + if results: + try: + self.set_status(results.status_code) + except AttributeError: + pass + + renderer = NexusRendererFactory.get_renderer(request) + renderer.render(self, results) + + except NexusProcessingException as e: + self.async_onerror_callback(e.reason, e.code) + + except Exception as e: + self.async_onerror_callback(str(e), 500) + def async_onerror_callback(self, reason, code=500): self.logger.error("Error processing request", exc_info=True) diff --git a/analysis/webservice/webmodel/NexusRequestObject.py b/analysis/webservice/webmodel/NexusRequestObject.py index bbd28280..18962364 100644 --- a/analysis/webservice/webmodel/NexusRequestObject.py +++ b/analysis/webservice/webmodel/NexusRequestObject.py @@ -35,6 +35,12 @@ def __init__(self, reqHandler): self.requestHandler = reqHandler StatsComputeOptions.__init__(self) + def get_headers(self): + return self.requestHandler.request.headers + + def get_request_body(self): + return self.requestHandler.request.body + def get_argument(self, name, default=None): return self.requestHandler.get_argument(name, default=default) diff --git a/data-access/nexustiles/AbstractTileService.py b/data-access/nexustiles/AbstractTileService.py new file mode 100644 index 00000000..c418180e --- /dev/null +++ b/data-access/nexustiles/AbstractTileService.py @@ -0,0 +1,212 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from abc import ABC, abstractmethod +from functools import reduce + +import numpy as np +import numpy.ma as ma +from copy import deepcopy + + +class AbstractTileService(ABC): + # @staticmethod + # @abstractmethod + # def open_dataset(dataset_s, **kwargs): + # pass + + # @abstractmethod + # def try_connect(self) -> bool: + # raise NotImplementedError() + + def __init__(self, dataset_name): + self._name = dataset_name + self._ds_info = {} + + @abstractmethod + def get_dataseries_list(self, simple=False): + raise NotImplementedError() + + @abstractmethod + def find_tile_by_id(self, tile_id, **kwargs): + raise NotImplementedError() + + @abstractmethod + def find_tiles_by_id(self, tile_ids, ds=None, **kwargs): + raise NotImplementedError() + + @abstractmethod + def find_days_in_range_asc(self, min_lat, max_lat, min_lon, max_lon, dataset, start_time, end_time, + metrics_callback=None, **kwargs): + raise NotImplementedError() + + @abstractmethod + def find_tile_by_polygon_and_most_recent_day_of_year(self, bounding_polygon, ds, day_of_year, **kwargs): + """ + Given a bounding polygon, dataset, and day of year, find tiles in that dataset with the same bounding + polygon and the closest day of year. + + For example: + given a polygon minx=0, miny=0, maxx=1, maxy=1; dataset=MY_DS; and day of year=32 + search for first tile in MY_DS with identical bbox and day_of_year <= 32 (sorted by day_of_year desc) + + Valid matches: + minx=0, miny=0, maxx=1, maxy=1; dataset=MY_DS; day of year = 32 + minx=0, miny=0, maxx=1, maxy=1; dataset=MY_DS; day of year = 30 + + Invalid matches: + minx=1, miny=0, maxx=2, maxy=1; dataset=MY_DS; day of year = 32 + minx=0, miny=0, maxx=1, maxy=1; dataset=MY_OTHER_DS; day of year = 32 + minx=0, miny=0, maxx=1, maxy=1; dataset=MY_DS; day of year = 30 if minx=0, miny=0, maxx=1, maxy=1; dataset=MY_DS; day of year = 32 also exists + + :param bounding_polygon: The exact bounding polygon of tiles to search for + :param ds: The dataset name being searched + :param day_of_year: Tile day of year to search for, tile nearest to this day (without going over) will be returned + :return: List of one tile from ds with bounding_polygon on or before day_of_year or raise NexusTileServiceException if no tile found + """ + raise NotImplementedError() + + @abstractmethod + def find_all_tiles_in_box_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs): + raise NotImplementedError() + + @abstractmethod + def find_all_tiles_in_polygon_at_time(self, bounding_polygon, dataset, time, **kwargs): + raise NotImplementedError() + + @abstractmethod + def find_tiles_in_box(self, min_lat, max_lat, min_lon, max_lon, ds=None, start_time=0, end_time=-1, **kwargs): + # Find tiles that fall in the given box in the Solr index + raise NotImplementedError() + + @abstractmethod + def find_tiles_in_polygon(self, bounding_polygon, ds=None, start_time=0, end_time=-1, **kwargs): + # Find tiles that fall within the polygon in the Solr index + raise NotImplementedError() + + @abstractmethod + def find_tiles_by_metadata(self, metadata, ds=None, start_time=0, end_time=-1, **kwargs): + """ + Return list of tiles whose metadata matches the specified metadata, start_time, end_time. + :param metadata: List of metadata values to search for tiles e.g ["river_id_i:1", "granule_s:granule_name"] + :param ds: The dataset name to search + :param start_time: The start time to search for tiles + :param end_time: The end time to search for tiles + :return: A list of tiles + """ + raise NotImplementedError() + + @abstractmethod + def find_tiles_by_exact_bounds(self, bounds, ds, start_time, end_time, **kwargs): + """ + The method will return tiles with the exact given bounds within the time range. It differs from + find_tiles_in_polygon in that only tiles with exactly the given bounds will be returned as opposed to + doing a polygon intersection with the given bounds. + + :param bounds: (minx, miny, maxx, maxy) bounds to search for + :param ds: Dataset name to search + :param start_time: Start time to search (seconds since epoch) + :param end_time: End time to search (seconds since epoch) + :param kwargs: fetch_data: True/False = whether or not to retrieve tile data + :return: + """ + raise NotImplementedError() + + @abstractmethod + def find_all_boundary_tiles_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs): + raise NotImplementedError() + + @abstractmethod + def get_min_max_time_by_granule(self, ds, granule_name): + raise NotImplementedError() + + @abstractmethod + def get_dataset_overall_stats(self, ds): + raise NotImplementedError() + + @abstractmethod + def get_stats_within_box_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs): + raise NotImplementedError() + + @abstractmethod + def get_bounding_box(self, tile_ids): + """ + Retrieve a bounding box that encompasses all of the tiles represented by the given tile ids. + :param tile_ids: List of tile ids + :return: shapely.geometry.Polygon that represents the smallest bounding box that encompasses all of the tiles + """ + raise NotImplementedError() + + @abstractmethod + def get_min_time(self, tile_ids, ds=None): + """ + Get the minimum tile date from the list of tile ids + :param tile_ids: List of tile ids + :param ds: Filter by a specific dataset. Defaults to None (queries all datasets) + :return: long time in seconds since epoch + """ + raise NotImplementedError() + + @abstractmethod + def get_max_time(self, tile_ids, ds=None): + """ + Get the maximum tile date from the list of tile ids + :param tile_ids: List of tile ids + :param ds: Filter by a specific dataset. Defaults to None (queries all datasets) + :return: long time in seconds since epoch + """ + raise NotImplementedError() + + @abstractmethod + def get_distinct_bounding_boxes_in_polygon(self, bounding_polygon, ds, start_time, end_time): + """ + Get a list of distinct tile bounding boxes from all tiles within the given polygon and time range. + :param bounding_polygon: The bounding polygon of tiles to search for + :param ds: The dataset name to search + :param start_time: The start time to search for tiles + :param end_time: The end time to search for tiles + :return: A list of distinct bounding boxes (as shapely polygons) for tiles in the search polygon + """ + raise NotImplementedError() + + @abstractmethod + def get_tile_count(self, ds, bounding_polygon=None, start_time=0, end_time=-1, metadata=None, **kwargs): + """ + Return number of tiles that match search criteria. + :param ds: The dataset name to search + :param bounding_polygon: The polygon to search for tiles + :param start_time: The start time to search for tiles + :param end_time: The end time to search for tiles + :param metadata: List of metadata values to search for tiles e.g ["river_id_i:1", "granule_s:granule_name"] + :return: number of tiles that match search criteria + """ + raise NotImplementedError() + + @abstractmethod + def fetch_data_for_tiles(self, *tiles): + raise NotImplementedError() + + @abstractmethod + def _metadata_store_docs_to_tiles(self, *store_docs): + raise NotImplementedError() + + @abstractmethod + def update_metadata(self, solr_doc): + raise NotImplementedError() + + def get_metadata(self, dataset=None): # ds as param for nexusproto backend + return deepcopy(self._ds_info) + + diff --git a/data-access/nexustiles/backends/__init__.py b/data-access/nexustiles/backends/__init__.py new file mode 100644 index 00000000..8afd240a --- /dev/null +++ b/data-access/nexustiles/backends/__init__.py @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + diff --git a/data-access/nexustiles/backends/nexusproto/__init__.py b/data-access/nexustiles/backends/nexusproto/__init__.py new file mode 100644 index 00000000..8afd240a --- /dev/null +++ b/data-access/nexustiles/backends/nexusproto/__init__.py @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + diff --git a/data-access/nexustiles/backends/nexusproto/backend.py b/data-access/nexustiles/backends/nexusproto/backend.py new file mode 100644 index 00000000..d86a594a --- /dev/null +++ b/data-access/nexustiles/backends/nexusproto/backend.py @@ -0,0 +1,604 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import configparser +import copy +import logging +import sys +import json +from datetime import datetime +from functools import reduce + +import numpy as np +import numpy.ma as ma +import pkg_resources +from pytz import timezone, UTC +from shapely.geometry import MultiPolygon, box + +from .dao import CassandraProxy +from .dao import DynamoProxy +from .dao import S3Proxy +from .dao import SolrProxy +from .dao import ElasticsearchProxy + +from nexustiles.model.nexusmodel import Tile, BBox, TileStats, TileVariable +from nexustiles.exception import NexusTileServiceException +from nexustiles.AbstractTileService import AbstractTileService + +EPOCH = timezone('UTC').localize(datetime(1970, 1, 1)) + +logger = logging.getLogger(__name__) + + +class NexusprotoTileService(AbstractTileService): + def __init__(self, skipDatastore=False, skipMetadatastore=False, config=None): + AbstractTileService.__init__(self, None) + self._datastore = None + self._metadatastore = None + + self._config = configparser.RawConfigParser() + self._config.read(NexusprotoTileService._get_config_files('config/datastores.ini')) + + if config: + self.override_config(config) + + if not skipDatastore: + datastore = self._config.get("datastore", "store") + if datastore == "cassandra": + self._datastore = CassandraProxy.CassandraProxy(self._config) + elif datastore == "s3": + self._datastore = S3Proxy.S3Proxy(self._config) + elif datastore == "dynamo": + self._datastore = DynamoProxy.DynamoProxy(self._config) + else: + raise ValueError("Error reading datastore from config file") + + if not skipMetadatastore: + metadatastore = self._config.get("metadatastore", "store", fallback='solr') + if metadatastore == "solr": + self._metadatastore = SolrProxy.SolrProxy(self._config) + elif metadatastore == "elasticsearch": + self._metadatastore = ElasticsearchProxy.ElasticsearchProxy(self._config) + + def override_config(self, config): + for section in config.sections(): + if self._config.has_section(section): # only override preexisting section, ignores the other + for option in config.options(section): + if config.get(section, option) is not None: + self._config.set(section, option, config.get(section, option)) + + def get_dataseries_list(self, simple=False): + if simple: + return self._metadatastore.get_data_series_list_simple() + else: + return self._metadatastore.get_data_series_list() + + def find_tile_by_id(self, tile_id, **kwargs): + return self._metadatastore.find_tile_by_id(tile_id) + + def find_tiles_by_id(self, tile_ids, ds=None, **kwargs): + return self._metadatastore.find_tiles_by_id(tile_ids, ds=ds, **kwargs) + + def find_days_in_range_asc(self, min_lat, max_lat, min_lon, max_lon, dataset, start_time, end_time, + metrics_callback=None, **kwargs): + start = datetime.now() + result = self._metadatastore.find_days_in_range_asc(min_lat, max_lat, min_lon, max_lon, dataset, start_time, + end_time, + **kwargs) + duration = (datetime.now() - start).total_seconds() + if metrics_callback: + metrics_callback(solr=duration) + return result + + def find_tile_by_polygon_and_most_recent_day_of_year(self, bounding_polygon, ds, day_of_year, **kwargs): + """ + Given a bounding polygon, dataset, and day of year, find tiles in that dataset with the same bounding + polygon and the closest day of year. + + For example: + given a polygon minx=0, miny=0, maxx=1, maxy=1; dataset=MY_DS; and day of year=32 + search for first tile in MY_DS with identical bbox and day_of_year <= 32 (sorted by day_of_year desc) + + Valid matches: + minx=0, miny=0, maxx=1, maxy=1; dataset=MY_DS; day of year = 32 + minx=0, miny=0, maxx=1, maxy=1; dataset=MY_DS; day of year = 30 + + Invalid matches: + minx=1, miny=0, maxx=2, maxy=1; dataset=MY_DS; day of year = 32 + minx=0, miny=0, maxx=1, maxy=1; dataset=MY_OTHER_DS; day of year = 32 + minx=0, miny=0, maxx=1, maxy=1; dataset=MY_DS; day of year = 30 if minx=0, miny=0, maxx=1, maxy=1; dataset=MY_DS; day of year = 32 also exists + + :param bounding_polygon: The exact bounding polygon of tiles to search for + :param ds: The dataset name being searched + :param day_of_year: Tile day of year to search for, tile nearest to this day (without going over) will be returned + :return: List of one tile from ds with bounding_polygon on or before day_of_year or raise NexusTileServiceException if no tile found + """ + try: + tile = self._metadatastore.find_tile_by_polygon_and_most_recent_day_of_year(bounding_polygon, ds, + day_of_year) + except IndexError: + raise NexusTileServiceException("No tile found.").with_traceback(sys.exc_info()[2]) + + return tile + + def find_all_tiles_in_box_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs): + return self._metadatastore.find_all_tiles_in_box_at_time(min_lat, max_lat, min_lon, max_lon, dataset, time, + rows=5000, + **kwargs) + + def find_all_tiles_in_polygon_at_time(self, bounding_polygon, dataset, time, **kwargs): + return self._metadatastore.find_all_tiles_in_polygon_at_time(bounding_polygon, dataset, time, rows=5000, + **kwargs) + + def find_tiles_in_box(self, min_lat, max_lat, min_lon, max_lon, ds=None, start_time=0, end_time=-1, **kwargs): + # Find tiles that fall in the given box in the Solr index + if type(start_time) is datetime: + start_time = (start_time - EPOCH).total_seconds() + if type(end_time) is datetime: + end_time = (end_time - EPOCH).total_seconds() + return self._metadatastore.find_all_tiles_in_box_sorttimeasc(min_lat, max_lat, min_lon, max_lon, ds, start_time, + end_time, **kwargs) + + def find_tiles_in_polygon(self, bounding_polygon, ds=None, start_time=0, end_time=-1, **kwargs): + # Find tiles that fall within the polygon in the Solr index + if 'sort' in list(kwargs.keys()): + tiles = self._metadatastore.find_all_tiles_in_polygon(bounding_polygon, ds, start_time, end_time, **kwargs) + else: + tiles = self._metadatastore.find_all_tiles_in_polygon_sorttimeasc(bounding_polygon, ds, start_time, + end_time, + **kwargs) + return tiles + + def find_tiles_by_metadata(self, metadata, ds=None, start_time=0, end_time=-1, **kwargs): + """ + Return list of tiles whose metadata matches the specified metadata, start_time, end_time. + :param metadata: List of metadata values to search for tiles e.g ["river_id_i:1", "granule_s:granule_name"] + :param ds: The dataset name to search + :param start_time: The start time to search for tiles + :param end_time: The end time to search for tiles + :return: A list of tiles + """ + tiles = self._metadatastore.find_all_tiles_by_metadata(metadata, ds, start_time, end_time, **kwargs) + + return tiles + + def get_tiles_by_metadata(self, metadata, ds=None, start_time=0, end_time=-1, **kwargs): + """ + Return list of tiles that matches the specified metadata, start_time, end_time with tile data outside of time + range properly masked out. + :param metadata: List of metadata values to search for tiles e.g ["river_id_i:1", "granule_s:granule_name"] + :param ds: The dataset name to search + :param start_time: The start time to search for tiles + :param end_time: The end time to search for tiles + :return: A list of tiles + """ + tiles = self.find_tiles_by_metadata(metadata, ds, start_time, end_time, **kwargs) + tiles = self.mask_tiles_to_time_range(start_time, end_time, tiles) + + return tiles + + def find_tiles_by_exact_bounds(self, bounds, ds, start_time, end_time, **kwargs): + """ + The method will return tiles with the exact given bounds within the time range. It differs from + find_tiles_in_polygon in that only tiles with exactly the given bounds will be returned as opposed to + doing a polygon intersection with the given bounds. + + :param bounds: (minx, miny, maxx, maxy) bounds to search for + :param ds: Dataset name to search + :param start_time: Start time to search (seconds since epoch) + :param end_time: End time to search (seconds since epoch) + :param kwargs: fetch_data: True/False = whether or not to retrieve tile data + :return: + """ + tiles = self._metadatastore.find_tiles_by_exact_bounds(bounds[0], bounds[1], bounds[2], bounds[3], ds, + start_time, + end_time) + return tiles + + def find_all_boundary_tiles_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs): + return self._metadatastore.find_all_boundary_tiles_at_time(min_lat, max_lat, min_lon, max_lon, dataset, time, + rows=5000, + **kwargs) + + def get_tiles_bounded_by_box(self, min_lat, max_lat, min_lon, max_lon, ds=None, start_time=0, end_time=-1, + **kwargs): + tiles = self.find_tiles_in_box(min_lat, max_lat, min_lon, max_lon, ds, start_time, end_time, **kwargs) + tiles = self.mask_tiles_to_bbox(min_lat, max_lat, min_lon, max_lon, tiles) + if 0 <= start_time <= end_time: + tiles = self.mask_tiles_to_time_range(start_time, end_time, tiles) + + return tiles + + def get_tiles_bounded_by_polygon(self, polygon, ds=None, start_time=0, end_time=-1, **kwargs): + tiles = self.find_tiles_in_polygon(polygon, ds, start_time, end_time, + **kwargs) + tiles = self.mask_tiles_to_polygon(polygon, tiles) + if 0 <= start_time <= end_time: + tiles = self.mask_tiles_to_time_range(start_time, end_time, tiles) + + return tiles + + def get_min_max_time_by_granule(self, ds, granule_name): + start_time, end_time = self._metadatastore.find_min_max_date_from_granule(ds, granule_name) + + return start_time, end_time + + def get_dataset_overall_stats(self, ds): + return self._metadatastore.get_data_series_stats(ds) + + def get_tiles_bounded_by_box_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs): + tiles = self.find_all_tiles_in_box_at_time(min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs) + tiles = self.mask_tiles_to_bbox_and_time(min_lat, max_lat, min_lon, max_lon, time, time, tiles) + + return tiles + + def get_tiles_bounded_by_polygon_at_time(self, polygon, dataset, time, **kwargs): + tiles = self.find_all_tiles_in_polygon_at_time(polygon, dataset, time, **kwargs) + tiles = self.mask_tiles_to_polygon_and_time(polygon, time, time, tiles) + + return tiles + + def get_boundary_tiles_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs): + tiles = self.find_all_boundary_tiles_at_time(min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs) + tiles = self.mask_tiles_to_bbox_and_time(min_lat, max_lat, min_lon, max_lon, time, time, tiles) + + return tiles + + def get_stats_within_box_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs): + tiles = self._metadatastore.find_all_tiles_within_box_at_time(min_lat, max_lat, min_lon, max_lon, dataset, time, + **kwargs) + + return tiles + + def get_bounding_box(self, tile_ids): + """ + Retrieve a bounding box that encompasses all of the tiles represented by the given tile ids. + :param tile_ids: List of tile ids + :return: shapely.geometry.Polygon that represents the smallest bounding box that encompasses all of the tiles + """ + tiles = self.find_tiles_by_id(tile_ids, fl=['tile_min_lat', 'tile_max_lat', 'tile_min_lon', 'tile_max_lon'], + fetch_data=False, rows=len(tile_ids)) + + tiles = self._metadata_store_docs_to_tiles(*tiles) + + polys = [] + for tile in tiles: + polys.append(box(tile.bbox.min_lon, tile.bbox.min_lat, tile.bbox.max_lon, tile.bbox.max_lat)) + return box(*MultiPolygon(polys).bounds) + + def get_min_time(self, tile_ids, ds=None): + """ + Get the minimum tile date from the list of tile ids + :param tile_ids: List of tile ids + :param ds: Filter by a specific dataset. Defaults to None (queries all datasets) + :return: long time in seconds since epoch + """ + min_time = self._metadatastore.find_min_date_from_tiles(tile_ids, ds=ds) + return int((min_time - EPOCH).total_seconds()) + + def get_max_time(self, tile_ids, ds=None): + """ + Get the maximum tile date from the list of tile ids + :param tile_ids: List of tile ids + :param ds: Filter by a specific dataset. Defaults to None (queries all datasets) + :return: long time in seconds since epoch + """ + max_time = self._metadatastore.find_max_date_from_tiles(tile_ids, ds=ds) + return int((max_time - EPOCH).total_seconds()) + + def get_distinct_bounding_boxes_in_polygon(self, bounding_polygon, ds, start_time, end_time): + """ + Get a list of distinct tile bounding boxes from all tiles within the given polygon and time range. + :param bounding_polygon: The bounding polygon of tiles to search for + :param ds: The dataset name to search + :param start_time: The start time to search for tiles + :param end_time: The end time to search for tiles + :return: A list of distinct bounding boxes (as shapely polygons) for tiles in the search polygon + """ + bounds = self._metadatastore.find_distinct_bounding_boxes_in_polygon(bounding_polygon, ds, start_time, end_time) + return [box(*b) for b in bounds] + + def mask_tiles_to_bbox(self, min_lat, max_lat, min_lon, max_lon, tiles): + + for tile in tiles: + tile.latitudes = ma.masked_outside(tile.latitudes, min_lat, max_lat) + tile.longitudes = ma.masked_outside(tile.longitudes, min_lon, max_lon) + + # Or together the masks of the individual arrays to create the new mask + data_mask = ma.getmaskarray(tile.times)[:, np.newaxis, np.newaxis] \ + | ma.getmaskarray(tile.latitudes)[np.newaxis, :, np.newaxis] \ + | ma.getmaskarray(tile.longitudes)[np.newaxis, np.newaxis, :] + + # If this is multi-var, need to mask each variable separately. + if tile.is_multi: + # Combine space/time mask with existing mask on data + data_mask = reduce(np.logical_or, [tile.data[0].mask, data_mask]) + + num_vars = len(tile.data) + multi_data_mask = np.repeat(data_mask[np.newaxis, ...], num_vars, axis=0) + tile.data = ma.masked_where(multi_data_mask, tile.data) + else: + tile.data = ma.masked_where(data_mask, tile.data) + + tiles[:] = [tile for tile in tiles if not tile.data.mask.all()] + + return tiles + + def mask_tiles_to_bbox_and_time(self, min_lat, max_lat, min_lon, max_lon, start_time, end_time, tiles): + for tile in tiles: + tile.times = ma.masked_outside(tile.times, start_time, end_time) + tile.latitudes = ma.masked_outside(tile.latitudes, min_lat, max_lat) + tile.longitudes = ma.masked_outside(tile.longitudes, min_lon, max_lon) + + # Or together the masks of the individual arrays to create the new mask + data_mask = ma.getmaskarray(tile.times)[:, np.newaxis, np.newaxis] \ + | ma.getmaskarray(tile.latitudes)[np.newaxis, :, np.newaxis] \ + | ma.getmaskarray(tile.longitudes)[np.newaxis, np.newaxis, :] + + tile.data = ma.masked_where(data_mask, tile.data) + + tiles[:] = [tile for tile in tiles if not tile.data.mask.all()] + + return tiles + + def mask_tiles_to_polygon(self, bounding_polygon, tiles): + + min_lon, min_lat, max_lon, max_lat = bounding_polygon.bounds + + return self.mask_tiles_to_bbox(min_lat, max_lat, min_lon, max_lon, tiles) + + def mask_tiles_to_polygon_and_time(self, bounding_polygon, start_time, end_time, tiles): + min_lon, min_lat, max_lon, max_lat = bounding_polygon.bounds + + return self.mask_tiles_to_bbox_and_time(min_lat, max_lat, min_lon, max_lon, start_time, end_time, tiles) + + def mask_tiles_to_time_range(self, start_time, end_time, tiles): + """ + Masks data in tiles to specified time range. + :param start_time: The start time to search for tiles + :param end_time: The end time to search for tiles + :param tiles: List of tiles + :return: A list tiles with data masked to specified time range + """ + if 0 <= start_time <= end_time: + for tile in tiles: + tile.times = ma.masked_outside(tile.times, start_time, end_time) + + # Or together the masks of the individual arrays to create the new mask + data_mask = ma.getmaskarray(tile.times)[:, np.newaxis, np.newaxis] \ + | ma.getmaskarray(tile.latitudes)[np.newaxis, :, np.newaxis] \ + | ma.getmaskarray(tile.longitudes)[np.newaxis, np.newaxis, :] + + # If this is multi-var, need to mask each variable separately. + if tile.is_multi: + # Combine space/time mask with existing mask on data + data_mask = reduce(np.logical_or, [tile.data[0].mask, data_mask]) + + num_vars = len(tile.data) + multi_data_mask = np.repeat(data_mask[np.newaxis, ...], num_vars, axis=0) + tile.data = ma.masked_where(multi_data_mask, tile.data) + else: + tile.data = ma.masked_where(data_mask, tile.data) + + tiles[:] = [tile for tile in tiles if not tile.data.mask.all()] + + return tiles + + def get_tile_count(self, ds, bounding_polygon=None, start_time=0, end_time=-1, metadata=None, **kwargs): + """ + Return number of tiles that match search criteria. + :param ds: The dataset name to search + :param bounding_polygon: The polygon to search for tiles + :param start_time: The start time to search for tiles + :param end_time: The end time to search for tiles + :param metadata: List of metadata values to search for tiles e.g ["river_id_i:1", "granule_s:granule_name"] + :return: number of tiles that match search criteria + """ + return self._metadatastore.get_tile_count(ds, bounding_polygon, start_time, end_time, metadata, **kwargs) + + def fetch_data_for_tiles(self, *tiles): + + nexus_tile_ids = set([tile.tile_id for tile in tiles]) + matched_tile_data = self._datastore.fetch_nexus_tiles(*nexus_tile_ids) + + tile_data_by_id = {str(a_tile_data.tile_id): a_tile_data for a_tile_data in matched_tile_data} + + missing_data = nexus_tile_ids.difference(list(tile_data_by_id.keys())) + if len(missing_data) > 0: + raise Exception("Missing data for tile_id(s) %s." % missing_data) + + for a_tile in tiles: + lats, lons, times, data, meta, is_multi_var = tile_data_by_id[a_tile.tile_id].get_lat_lon_time_data_meta() + + a_tile.latitudes = lats + a_tile.longitudes = lons + a_tile.times = times + a_tile.data = data + a_tile.meta_data = meta + a_tile.is_multi = is_multi_var + + del (tile_data_by_id[a_tile.tile_id]) + + return tiles + + def _metadata_store_docs_to_tiles(self, *store_docs): + + tiles = [] + for store_doc in store_docs: + tile = Tile() + try: + tile.tile_id = store_doc['id'] + except KeyError: + pass + + try: + min_lat = store_doc['tile_min_lat'] + min_lon = store_doc['tile_min_lon'] + max_lat = store_doc['tile_max_lat'] + max_lon = store_doc['tile_max_lon'] + + if isinstance(min_lat, list): + min_lat = min_lat[0] + if isinstance(min_lon, list): + min_lon = min_lon[0] + if isinstance(max_lat, list): + max_lat = max_lat[0] + if isinstance(max_lon, list): + max_lon = max_lon[0] + + tile.bbox = BBox(min_lat, max_lat, min_lon, max_lon) + except KeyError: + pass + + try: + tile.dataset = store_doc['dataset_s'] + except KeyError: + pass + + try: + tile.dataset_id = store_doc['dataset_id_s'] + except KeyError: + pass + + try: + tile.granule = store_doc['granule_s'] + except KeyError: + pass + + try: + tile.min_time = datetime.strptime(store_doc['tile_min_time_dt'], "%Y-%m-%dT%H:%M:%SZ").replace( + tzinfo=UTC) + except KeyError: + pass + + try: + tile.max_time = datetime.strptime(store_doc['tile_max_time_dt'], "%Y-%m-%dT%H:%M:%SZ").replace( + tzinfo=UTC) + except KeyError: + pass + + try: + tile.section_spec = store_doc['sectionSpec_s'] + except KeyError: + pass + + try: + tile.tile_stats = TileStats( + store_doc['tile_min_val_d'], store_doc['tile_max_val_d'], + store_doc['tile_avg_val_d'], store_doc['tile_count_i'] + ) + except KeyError: + pass + + try: + # Ensure backwards compatibility by working with old + # tile_var_name_s and tile_standard_name_s fields to + + # will be overwritten if tile_var_name_ss is present + # as well. + if '[' in store_doc['tile_var_name_s']: + var_names = json.loads(store_doc['tile_var_name_s']) + else: + var_names = [store_doc['tile_var_name_s']] + + standard_name = store_doc.get( + 'tile_standard_name_s', + json.dumps([None] * len(var_names)) + ) + if '[' in standard_name: + standard_names = json.loads(standard_name) + else: + standard_names = [standard_name] + + tile.variables = [] + for var_name, standard_name in zip(var_names, standard_names): + tile.variables.append(TileVariable( + variable_name=var_name, + standard_name=standard_name + )) + except KeyError: + pass + + if 'tile_var_name_ss' in store_doc: + tile.variables = [] + for var_name in store_doc['tile_var_name_ss']: + standard_name_key = f'{var_name}.tile_standard_name_s' + standard_name = store_doc.get(standard_name_key) + tile.variables.append(TileVariable( + variable_name=var_name, + standard_name=standard_name + )) + + tiles.append(tile) + + return tiles + + def pingSolr(self): + status = self._metadatastore.ping() + if status and status["status"] == "OK": + return True + else: + return False + + def update_metadata(self, solr_doc): + variables = solr_doc.get('variables_s', None) + + dataset = solr_doc['dataset_s'] + + if dataset not in self._ds_info: + self._ds_info[dataset] = {} + + if variables is not None: + variables = json.loads(variables) + + if isinstance(variables, dict): + variables = [variables] + else: + variables = [] + + extra_meta = solr_doc.get('meta_s', None) + + self._ds_info[dataset]['variables'] = variables + + if extra_meta is not None: + try: + extra_meta = json.loads(extra_meta) + except json.JSONDecodeError: + pass + + self._ds_info[dataset]['metadata'] = extra_meta + + def get_metadata(self, dataset=None): + if dataset is None: + logger.error('Cannot pull metadata for nexusproto without specifying dataset name') + return {} + else: + return copy.deepcopy(self._ds_info[dataset]) + + + @staticmethod + def _get_config_files(filename): + log = logging.getLogger(__name__) + candidates = [] + extensions = ['.default', ''] + for extension in extensions: + try: + candidate = pkg_resources.resource_filename(__name__, filename + extension) + log.info('use config file {}'.format(filename + extension)) + candidates.append(candidate) + except KeyError as ke: + log.warning('configuration file {} not found'.format(filename + extension)) + + return candidates diff --git a/data-access/nexustiles/config/datastores.ini.default b/data-access/nexustiles/backends/nexusproto/config/datastores.ini.default similarity index 100% rename from data-access/nexustiles/config/datastores.ini.default rename to data-access/nexustiles/backends/nexusproto/config/datastores.ini.default diff --git a/data-access/nexustiles/dao/CassandraProxy.py b/data-access/nexustiles/backends/nexusproto/dao/CassandraProxy.py similarity index 100% rename from data-access/nexustiles/dao/CassandraProxy.py rename to data-access/nexustiles/backends/nexusproto/dao/CassandraProxy.py diff --git a/data-access/nexustiles/dao/DynamoProxy.py b/data-access/nexustiles/backends/nexusproto/dao/DynamoProxy.py similarity index 100% rename from data-access/nexustiles/dao/DynamoProxy.py rename to data-access/nexustiles/backends/nexusproto/dao/DynamoProxy.py diff --git a/data-access/nexustiles/dao/ElasticsearchProxy.py b/data-access/nexustiles/backends/nexusproto/dao/ElasticsearchProxy.py similarity index 100% rename from data-access/nexustiles/dao/ElasticsearchProxy.py rename to data-access/nexustiles/backends/nexusproto/dao/ElasticsearchProxy.py diff --git a/data-access/nexustiles/dao/S3Proxy.py b/data-access/nexustiles/backends/nexusproto/dao/S3Proxy.py similarity index 100% rename from data-access/nexustiles/dao/S3Proxy.py rename to data-access/nexustiles/backends/nexusproto/dao/S3Proxy.py diff --git a/data-access/nexustiles/dao/SolrProxy.py b/data-access/nexustiles/backends/nexusproto/dao/SolrProxy.py similarity index 99% rename from data-access/nexustiles/dao/SolrProxy.py rename to data-access/nexustiles/backends/nexusproto/dao/SolrProxy.py index ca1033a3..29d0d492 100644 --- a/data-access/nexustiles/dao/SolrProxy.py +++ b/data-access/nexustiles/backends/nexusproto/dao/SolrProxy.py @@ -189,7 +189,8 @@ def get_data_series_list_simple(self): l.append({ "shortName": g, "title": g, - "tileCount": v + "tileCount": v, + "type": 'nexusproto' }) l = sorted(l, key=lambda entry: entry["title"]) return l diff --git a/data-access/nexustiles/dao/__init__.py b/data-access/nexustiles/backends/nexusproto/dao/__init__.py similarity index 100% rename from data-access/nexustiles/dao/__init__.py rename to data-access/nexustiles/backends/nexusproto/dao/__init__.py diff --git a/data-access/nexustiles/backends/zarr/__init__.py b/data-access/nexustiles/backends/zarr/__init__.py new file mode 100644 index 00000000..8afd240a --- /dev/null +++ b/data-access/nexustiles/backends/zarr/__init__.py @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + diff --git a/data-access/nexustiles/backends/zarr/backend.py b/data-access/nexustiles/backends/zarr/backend.py new file mode 100644 index 00000000..86081a27 --- /dev/null +++ b/data-access/nexustiles/backends/zarr/backend.py @@ -0,0 +1,535 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import sys +from datetime import datetime +from urllib.parse import urlparse + +import numpy as np +import numpy.ma as ma +import s3fs +import xarray as xr +from nexustiles.AbstractTileService import AbstractTileService +from nexustiles.exception import NexusTileServiceException +from nexustiles.model.nexusmodel import Tile, BBox, TileVariable +from pytz import timezone +from shapely.geometry import MultiPolygon, box +from yarl import URL + +EPOCH = timezone('UTC').localize(datetime(1970, 1, 1)) +ISO_8601 = '%Y-%m-%dT%H:%M:%S%z' + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + datefmt="%Y-%m-%dT%H:%M:%S", stream=sys.stdout) +logger = logging.getLogger(__name__) + + +class ZarrBackend(AbstractTileService): + def __init__(self, dataset_name, path, config=None): + AbstractTileService.__init__(self, dataset_name) + self.__config = config if config is not None else {} + + logger.info(f'Opening zarr backend at {path} for dataset {self._name}') + + url = urlparse(path) + + self.__url = path + + self.__store_type = url.scheme + self.__host = url.netloc + self.__path = url.path + + if 'variable' in config: + data_vars = config['variable'] + elif 'variables' in config: + data_vars = config['variables'] + else: + raise KeyError('Data variables not provided in config') + + if isinstance(data_vars, str): + self.__variables = [data_vars] + elif isinstance(data_vars, list): + self.__variables = data_vars + else: + raise TypeError(f'Improper type for variables config: {type(data_vars)}') + + self.__variables = [v.strip('\"\'') for v in self.__variables] + + self.__longitude = config['coords']['longitude'] + self.__latitude = config['coords']['latitude'] + self.__time = config['coords']['time'] + + self.__depth = config['coords'].get('depth') + + if self.__store_type in ['', 'file']: + store = self.__path + elif self.__store_type == 's3': + try: + aws_cfg = self.__config['aws'] + + if aws_cfg['public']: + # region = aws_cfg.get('region', 'us-west-2') + # store = f'https://{self.__host}.s3.{region}.amazonaws.com{self.__path}' + s3 = s3fs.S3FileSystem(True) + store = s3fs.S3Map(root=path, s3=s3, check=False) + else: + s3 = s3fs.S3FileSystem(False, key=aws_cfg['accessKeyID'], secret=aws_cfg['secretAccessKey']) + store = s3fs.S3Map(root=path, s3=s3, check=False) + except Exception as e: + logger.error(f'Failed to open zarr dataset at {self.__path}, ignoring it. Cause: {e}') + raise NexusTileServiceException(f'Cannot open S3 dataset ({e})') + else: + raise ValueError(self.__store_type) + + try: + self.__ds: xr.Dataset = xr.open_zarr(store, consolidated=True) + except Exception as e: + logger.error(f'Failed to open zarr dataset at {self.__path}, ignoring it. Cause: {e}') + raise NexusTileServiceException(f'Cannot open dataset ({e})') + + lats = self.__ds[self.__latitude].to_numpy() + delta = lats[1] - lats[0] + + if delta < 0: + logger.warning(f'Latitude coordinate for {self._name} is in descending order. Flipping it to ascending') + self.__ds = self.__ds.isel({self.__latitude: slice(None, None, -1)}) + + def get_dataseries_list(self, simple=False): + ds = { + "shortName": self._name, + "title": self._name, + "type": "zarr" + } + + if not simple: + min_date = self.get_min_time([]) + max_date = self.get_max_time([]) + ds['start'] = min_date + ds['end'] = max_date + ds['iso_start'] = datetime.utcfromtimestamp(min_date).strftime(ISO_8601) + ds['iso_end'] = datetime.utcfromtimestamp(max_date).strftime(ISO_8601) + + ds['metadata'] = dict(self.__ds.attrs) + + return [ds] + + def find_tile_by_id(self, tile_id, **kwargs): + return [tile_id] + + def find_tiles_by_id(self, tile_ids, ds=None, **kwargs): + return tile_ids + + def find_days_in_range_asc(self, min_lat, max_lat, min_lon, max_lon, dataset, start_time, end_time, + metrics_callback=None, **kwargs): + start = datetime.now() + + if not isinstance(start_time, datetime): + start_time = datetime.utcfromtimestamp(start_time) + + if not isinstance(end_time, datetime): + end_time = datetime.utcfromtimestamp(end_time) + + sel = { + self.__latitude: slice(min_lat, max_lat), + self.__longitude: slice(min_lon, max_lon), + self.__time: slice(start_time, end_time) + } + + times = self.__ds.sel(sel)[self.__time].to_numpy() + + if np.issubdtype(times.dtype, np.datetime64): + times = ((times - np.datetime64(EPOCH)) / 1e9).astype(int) + + times = sorted(times.tolist()) + + if metrics_callback: + metrics_callback(backend=(datetime.now() - start).total_seconds()) + + return times + + def find_tile_by_polygon_and_most_recent_day_of_year(self, bounding_polygon, ds, day_of_year, **kwargs): + """ + Given a bounding polygon, dataset, and day of year, find tiles in that dataset with the same bounding + polygon and the closest day of year. + + For example: + given a polygon minx=0, miny=0, maxx=1, maxy=1; dataset=MY_DS; and day of year=32 + search for first tile in MY_DS with identical bbox and day_of_year <= 32 (sorted by day_of_year desc) + + Valid matches: + minx=0, miny=0, maxx=1, maxy=1; dataset=MY_DS; day of year = 32 + minx=0, miny=0, maxx=1, maxy=1; dataset=MY_DS; day of year = 30 + + Invalid matches: + minx=1, miny=0, maxx=2, maxy=1; dataset=MY_DS; day of year = 32 + minx=0, miny=0, maxx=1, maxy=1; dataset=MY_OTHER_DS; day of year = 32 + minx=0, miny=0, maxx=1, maxy=1; dataset=MY_DS; day of year = 30 if minx=0, miny=0, maxx=1, maxy=1; dataset=MY_DS; day of year = 32 also exists + + :param bounding_polygon: The exact bounding polygon of tiles to search for + :param ds: The dataset name being searched + :param day_of_year: Tile day of year to search for, tile nearest to this day (without going over) will be returned + :return: List of one tile from ds with bounding_polygon on or before day_of_year or raise NexusTileServiceException if no tile found + """ + + times = self.__ds[self.__time].to_numpy() + + to_doy = lambda dt: datetime.utcfromtimestamp(int(dt)).timetuple().tm_yday + + vfunc = np.vectorize(to_doy) + days_of_year = vfunc(times.astype(datetime) / 1e9) + + try: + time = times[np.where(days_of_year <= day_of_year)[0][-1]].astype(datetime) / 1e9 + except IndexError: + raise NexusTileServiceException(reason='No tiles matched') + + min_lon, min_lat, max_lon, max_lat = bounding_polygon.bounds + + return self.find_tiles_in_box( + min_lat, max_lat, min_lon, max_lon, ds, time, time + ) + + def find_all_tiles_in_box_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs): + return self.find_tiles_in_box(min_lat, max_lat, min_lon, max_lon, dataset, time, time, **kwargs) + + def find_all_tiles_in_polygon_at_time(self, bounding_polygon, dataset, time, **kwargs): + return self.find_tiles_in_polygon(bounding_polygon, dataset, time, time, **kwargs) + + def find_tiles_in_box(self, min_lat, max_lat, min_lon, max_lon, ds=None, start_time=0, end_time=-1, **kwargs): + if type(start_time) is datetime: + start_time = (start_time - EPOCH).total_seconds() + if type(end_time) is datetime: + end_time = (end_time - EPOCH).total_seconds() + + params = { + 'min_lat': min_lat, + 'max_lat': max_lat, + 'min_lon': min_lon, + 'max_lon': max_lon + } + + times = None + + if 0 <= start_time <= end_time: + if kwargs.get('distinct', True): + times_asc = self.find_days_in_range_asc(min_lat, max_lat, min_lon, max_lon, ds, start_time, end_time) + times = [(t, t) for t in times_asc] + else: + times = [(start_time, end_time)] + + if 'depth' in kwargs: + params['depth'] = kwargs['depth'] + elif 'min_depth' in kwargs or 'max_depth' in kwargs: + params['min_depth'] = kwargs.get('min_depth') + params['max_depth'] = kwargs.get('max_depth') + + if times: + return [ZarrBackend.__to_url(self._name, min_time=t[0], max_time=t[1], **params) for t in times] + else: + return [ZarrBackend.__to_url(self._name, **params)] + + def find_tiles_in_polygon(self, bounding_polygon, ds=None, start_time=None, end_time=None, **kwargs): + # Find tiles that fall within the polygon in the Solr index + bounds = bounding_polygon.bounds + + min_lon = bounds[0] + min_lat = bounds[1] + max_lon = bounds[2] + max_lat = bounds[3] + + return self.find_tiles_in_box(min_lat, max_lat, min_lon, max_lon, ds, start_time, end_time, **kwargs) + + def find_tiles_by_metadata(self, metadata, ds=None, start_time=0, end_time=-1, **kwargs): + """ + Return list of tiles whose metadata matches the specified metadata, start_time, end_time. + :param metadata: List of metadata values to search for tiles e.g ["river_id_i:1", "granule_s:granule_name"] + :param ds: The dataset name to search + :param start_time: The start time to search for tiles + :param end_time: The end time to search for tiles + :return: A list of tiles + """ + raise NotImplementedError() + + def find_tiles_by_exact_bounds(self, bounds, ds, start_time, end_time, **kwargs): + """ + The method will return tiles with the exact given bounds within the time range. It differs from + find_tiles_in_polygon in that only tiles with exactly the given bounds will be returned as opposed to + doing a polygon intersection with the given bounds. + + :param bounds: (minx, miny, maxx, maxy) bounds to search for + :param ds: Dataset name to search + :param start_time: Start time to search (seconds since epoch) + :param end_time: End time to search (seconds since epoch) + :param kwargs: fetch_data: True/False = whether or not to retrieve tile data + :return: + """ + min_lon = bounds[0] + min_lat = bounds[1] + max_lon = bounds[2] + max_lat = bounds[3] + + return self.find_tiles_in_box(min_lat, max_lat, min_lon, max_lon, ds, start_time, end_time, **kwargs) + + def find_all_boundary_tiles_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs): + # Due to the precise nature of gridded Zarr's subsetting, it doesn't make sense to have a boundary region like + # this + return [] + + def get_min_max_time_by_granule(self, ds, granule_name): + raise NotImplementedError() + + def get_dataset_overall_stats(self, ds): + raise NotImplementedError() + + def get_stats_within_box_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs): + raise NotImplementedError() + + def get_bounding_box(self, tile_ids): + """ + Retrieve a bounding box that encompasses all of the tiles represented by the given tile ids. + :param tile_ids: List of tile ids + :return: shapely.geometry.Polygon that represents the smallest bounding box that encompasses all of the tiles + """ + + bounds = [ + ( + float(URL(u).query['min_lon']), + float(URL(u).query['min_lat']), + float(URL(u).query['max_lon']), + float(URL(u).query['max_lat']) + ) + for u in tile_ids + ] + + poly = MultiPolygon([box(*b) for b in bounds]) + + return box(*poly.bounds) + + def __get_ds_min_max_date(self): + min_date = self.__ds[self.__time].min().to_numpy() + max_date = self.__ds[self.__time].max().to_numpy() + + if np.issubdtype(min_date.dtype, np.datetime64): + min_date = ((min_date - np.datetime64(EPOCH)) / 1e9).astype(int).item() + + if np.issubdtype(max_date.dtype, np.datetime64): + max_date = ((max_date - np.datetime64(EPOCH)) / 1e9).astype(int).item() + + return min_date, max_date + + def get_min_time(self, tile_ids, ds=None): + """ + Get the minimum tile date from the list of tile ids + :param tile_ids: List of tile ids + :param ds: Filter by a specific dataset. Defaults to None (queries all datasets) + :return: long time in seconds since epoch + """ + times = list(filter(lambda x: x is not None, [int(URL(tid).query['min_time']) for tid in tile_ids])) + + if len(times) == 0: + min_date, max_date = self.__get_ds_min_max_date() + return min_date + else: + return min(times) + + def get_max_time(self, tile_ids, ds=None): + """ + Get the maximum tile date from the list of tile ids + :param tile_ids: List of tile ids + :param ds: Filter by a specific dataset. Defaults to None (queries all datasets) + :return: long time in seconds since epoch + """ + times = list(filter(lambda x: x is not None, [int(URL(tid).query['max_time']) for tid in tile_ids])) + + if len(tile_ids) == 0: + min_date, max_date = self.__get_ds_min_max_date() + return max_date + else: + return max(times) + + def get_distinct_bounding_boxes_in_polygon(self, bounding_polygon, ds, start_time, end_time): + """ + Get a list of distinct tile bounding boxes from all tiles within the given polygon and time range. + :param bounding_polygon: The bounding polygon of tiles to search for + :param ds: The dataset name to search + :param start_time: The start time to search for tiles + :param end_time: The end time to search for tiles + :return: A list of distinct bounding boxes (as shapely polygons) for tiles in the search polygon + """ + raise NotImplementedError() + + def get_tile_count(self, ds, bounding_polygon=None, start_time=0, end_time=-1, metadata=None, **kwargs): + """ + Return number of tiles that match search criteria. + :param ds: The dataset name to search + :param bounding_polygon: The polygon to search for tiles + :param start_time: The start time to search for tiles + :param end_time: The end time to search for tiles + :param metadata: List of metadata values to search for tiles e.g ["river_id_i:1", "granule_s:granule_name"] + :return: number of tiles that match search criteria + """ + raise NotImplementedError() + + def fetch_data_for_tiles(self, *tiles): + for tile in tiles: + self.__fetch_data_for_tile(tile) + + return tiles + + def __fetch_data_for_tile(self, tile: Tile): + bbox: BBox = tile.bbox + + min_lat = None + min_lon = None + max_lat = None + max_lon = None + + min_time = tile.min_time + max_time = tile.max_time + + # if min_time: + # min_time = datetime.utcfromtimestamp(min_time) + # + # if max_time: + # max_time = datetime.utcfromtimestamp(max_time) + + if bbox: + min_lat = bbox.min_lat + min_lon = bbox.min_lon + max_lat = bbox.max_lat + max_lon = bbox.max_lon + + sel_g = { + self.__latitude: slice(min_lat, max_lat), + self.__longitude: slice(min_lon, max_lon), + } + + sel_t = {} + + if min_time is None and max_time is None: + sel_t = None + method = None + elif min_time == max_time: + sel_t[self.__time] = [min_time] # List, otherwise self.__time dim will be dropped + method = 'nearest' + else: + sel_t[self.__time] = slice(min_time, max_time) + method = None + + tile.variables = [ + TileVariable(v, v) for v in self.__variables + ] + + matched = self.__ds.sel(sel_g) #.sel(sel_t, method=method) + + if sel_t is not None: + matched = matched.sel(sel_t, method=method) + + tile.latitudes = ma.masked_invalid(matched[self.__latitude].to_numpy()) + tile.longitudes = ma.masked_invalid(matched[self.__longitude].to_numpy()) + + times = matched[self.__time].to_numpy() + + if np.issubdtype(times.dtype, np.datetime64): + times = ((times - np.datetime64(EPOCH)) / 1e9).astype(int) + + tile.times = ma.masked_invalid(times) + + var_data = [matched[var].to_numpy() for var in self.__variables] + + if len(self.__variables) > 1: + tile.data = ma.masked_invalid(var_data) + tile.is_multi = True + else: + tile.data = ma.masked_invalid(var_data[0]) + tile.is_multi = False + + + def _metadata_store_docs_to_tiles(self, *store_docs): + return [ZarrBackend.__nts_url_to_tile(d) for d in store_docs] + + def update_metadata(self, solr_doc): + raise NotImplementedError() + + @staticmethod + def __nts_url_to_tile(nts_url): + tile = Tile() + + url = URL(nts_url) + + tile.tile_id = nts_url + + try: + min_lat = float(url.query['min_lat']) + min_lon = float(url.query['min_lon']) + max_lat = float(url.query['max_lat']) + max_lon = float(url.query['max_lon']) + + tile.bbox = BBox(min_lat, max_lat, min_lon, max_lon) + except KeyError: + pass + + tile.dataset = url.path + tile.dataset_id = url.path + + try: + # tile.min_time = int(url.query['min_time']) + tile.min_time = datetime.utcfromtimestamp(int(url.query['min_time'])) + except KeyError: + pass + + try: + # tile.max_time = int(url.query['max_time']) + tile.max_time = datetime.utcfromtimestamp(int(url.query['max_time'])) + except KeyError: + pass + + tile.meta_data = {} + + return tile + + @staticmethod + def __to_url(dataset, **kwargs): + if 'dataset' in kwargs: + del kwargs['dataset'] + + if 'ds' in kwargs: + del kwargs['ds'] + + params = {} + + # If any params are numpy dtypes, extract them to base python types + for kw in kwargs: + v = kwargs[kw] + + if v is None: + continue + + if isinstance(v, np.generic): + v = v.item() + + params[kw] = v + + return str(URL.build( + scheme='nts', + host='', + path=dataset, + query=params + )) + + diff --git a/data-access/nexustiles/config/datasets.ini.default b/data-access/nexustiles/config/datasets.ini.default new file mode 100644 index 00000000..9f586cf2 --- /dev/null +++ b/data-access/nexustiles/config/datasets.ini.default @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +[solr] +host=http://localhost:8983 +core=nexusdatasets diff --git a/data-access/nexustiles/exception.py b/data-access/nexustiles/exception.py new file mode 100644 index 00000000..d6ed2c64 --- /dev/null +++ b/data-access/nexustiles/exception.py @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +class NexusTileServiceException(Exception): + def __init__(self, reason): + Exception.__init__(self, reason) diff --git a/data-access/nexustiles/nexustiles.py b/data-access/nexustiles/nexustiles.py index a3aa61e9..ef64e8f8 100644 --- a/data-access/nexustiles/nexustiles.py +++ b/data-access/nexustiles/nexustiles.py @@ -14,25 +14,33 @@ # limitations under the License. import configparser +import json import logging import sys -import json +import threading from datetime import datetime -from functools import wraps, reduce +from functools import reduce, wraps +from time import sleep +from typing import Dict, Union import numpy as np import numpy.ma as ma import pkg_resources +import pysolr from pytz import timezone, UTC -from shapely.geometry import MultiPolygon, box +from shapely.geometry import box +from webservice.webmodel import DatasetNotFoundException, NexusProcessingException +from webservice.NexusHandler import nexus_initializer +from yarl import URL + +from .AbstractTileService import AbstractTileService +from .backends.nexusproto.backend import NexusprotoTileService +from .backends.zarr.backend import ZarrBackend +from .model.nexusmodel import Tile, BBox, TileStats, TileVariable -from .dao import CassandraProxy -from .dao import DynamoProxy -from .dao import S3Proxy -from .dao import SolrProxy -from .dao import ElasticsearchProxy +from .exception import NexusTileServiceException -from .model.nexusmodel import Tile, BBox, TileStats, TileVariable +from requests.structures import CaseInsensitiveDict EPOCH = timezone('UTC').localize(datetime(1970, 1, 1)) @@ -40,7 +48,7 @@ level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt="%Y-%m-%dT%H:%M:%S", stream=sys.stdout) -logger = logging.getLogger("testing") +logger = logging.getLogger("nexus-tile-svc") def tile_data(default_fetch=True): @@ -50,13 +58,27 @@ def fetch_data_for_func(*args, **kwargs): metadatastore_start = datetime.now() metadatastore_docs = func(*args, **kwargs) metadatastore_duration = (datetime.now() - metadatastore_start).total_seconds() - tiles = args[0]._metadata_store_docs_to_tiles(*metadatastore_docs) + + # Try to determine source dataset to route calls to proper backend + guessed_dataset = None + + if 'ds' in kwargs: + guessed_dataset = kwargs['ds'] + elif 'dataset' in kwargs: + guessed_dataset = kwargs['dataset'] + else: + for arg in args: + if isinstance(arg, str) and arg in NexusTileService.backends: + guessed_dataset = arg + break + + tiles = NexusTileService._get_backend(guessed_dataset)._metadata_store_docs_to_tiles(*metadatastore_docs) cassandra_duration = 0 if ('fetch_data' in kwargs and kwargs['fetch_data']) or ('fetch_data' not in kwargs and default_fetch): if len(tiles) > 0: cassandra_start = datetime.now() - args[0].fetch_data_for_tiles(*tiles) + NexusTileService._get_backend(guessed_dataset).fetch_data_for_tiles(*tiles) cassandra_duration += (datetime.now() - cassandra_start).total_seconds() if 'metrics_callback' in kwargs and kwargs['metrics_callback'] is not None: @@ -74,38 +96,289 @@ def fetch_data_for_func(*args, **kwargs): return tile_data_decorator -class NexusTileServiceException(Exception): - pass +def catch_not_implemented(func): + def wrapper(*args, **kwargs): + try: + return func(*args, **kwargs) + except NotImplementedError: + raise NexusTileServiceException('Action unsupported by backend') + + return wrapper -class NexusTileService(object): - def __init__(self, skipDatastore=False, skipMetadatastore=False, config=None): - self._datastore = None - self._metadatastore = None +SOLR_LOCK = threading.Lock() +DS_LOCK = threading.Lock() +thread_local = threading.local() + + +@nexus_initializer +class NTSInitializer: + def __init__(self): + self._log = logger.getChild('init') + + def init(self, config): + self._log.info('*** RUNNING NTS INITIALIZATION ***') + NexusTileService(config) + + +class NexusTileService: + backends: Dict[Union[None, str], Dict[str, Union[AbstractTileService, bool]]] = {} + + ds_config = None + + __update_thread = None + + @staticmethod + def __update_datasets_loop(): + while True: + with DS_LOCK: + NexusTileService._update_datasets() + sleep(3600) + def __init__(self, config=None): self._config = configparser.RawConfigParser() - self._config.read(NexusTileService._get_config_files('config/datastores.ini')) + self._config.read(NexusTileService._get_config_files('config/datasets.ini')) + + self._alg_config = config + + if not NexusTileService.backends: + NexusTileService.ds_config = configparser.RawConfigParser() + NexusTileService.ds_config.read(NexusTileService._get_config_files('config/datasets.ini')) + + default_backend = {"backend": NexusprotoTileService(False, False, config), 'up': True} + + NexusTileService.backends[None] = default_backend + NexusTileService.backends['__nexusproto__'] = default_backend if config: self.override_config(config) - if not skipDatastore: - datastore = self._config.get("datastore", "store") - if datastore == "cassandra": - self._datastore = CassandraProxy.CassandraProxy(self._config) - elif datastore == "s3": - self._datastore = S3Proxy.S3Proxy(self._config) - elif datastore == "dynamo": - self._datastore = DynamoProxy.DynamoProxy(self._config) + if not NexusTileService.__update_thread: + NexusTileService.__update_thread = threading.Thread( + target=NexusTileService.__update_datasets_loop, + name='dataset_update', + daemon=True + ) + + logger.info('Starting dataset refresh thread') + + NexusTileService.__update_thread.start() + + @staticmethod + def _get_backend(dataset_s) -> AbstractTileService: + if dataset_s is not None: + dataset_s = dataset_s + + with DS_LOCK: + if dataset_s not in NexusTileService.backends: + logger.warning(f'Dataset {dataset_s} not currently loaded. Checking to see if it was recently' + f'added') + NexusTileService._update_datasets() + if dataset_s not in NexusTileService.backends: + raise DatasetNotFoundException(reason=f'Dataset {dataset_s} is not currently loaded/ingested') + + b = NexusTileService.backends[dataset_s] + + # if not b['up']: + # success = b['backend'].try_connect() + # + # if not success: + # raise NexusProcessingException(reason=f'Dataset {dataset_s} is currently unavailable') + # else: + # NexusTileService.backends[dataset_s]['up'] = True + + return b['backend'] + + + @staticmethod + def _get_datasets_store(): + solr_url = NexusTileService.ds_config.get("solr", "host") + solr_core = NexusTileService.ds_config.get("solr", "core") + solr_kwargs = {} + + if NexusTileService.ds_config.has_option("solr", "time_out"): + solr_kwargs["timeout"] = NexusTileService.ds_config.get("solr", "time_out") + + with SOLR_LOCK: + solrcon = getattr(thread_local, 'solrcon', None) + if solrcon is None: + solr_url = '%s/solr/%s' % (solr_url, solr_core) + solrcon = pysolr.Solr(solr_url, **solr_kwargs) + thread_local.solrcon = solrcon + + solrcon = solrcon + + return solrcon + + @staticmethod + def _update_datasets(): + update_logger = logging.getLogger("nexus-tile-svc.backends") + solrcon = NexusTileService._get_datasets_store() + + update_logger.info('Executing Solr query to check for new datasets') + + present_datasets = {None, '__nexusproto__'} + next_cursor_mark = '*' + + added_datasets = 0 + + while True: + response = solrcon.search('*:*', cursorMark=next_cursor_mark, sort='id asc') + + try: + response_cursor_mark = response.nextCursorMark + except AttributeError: + break + + if response_cursor_mark == next_cursor_mark: + break else: - raise ValueError("Error reading datastore from config file") + next_cursor_mark = response_cursor_mark + + for dataset in response.docs: + d_id = dataset['dataset_s'] + store_type = dataset.get('store_type_s', 'nexusproto') + + present_datasets.add(d_id) + + if d_id in NexusTileService.backends: + continue + # is_up = NexusTileService.backends[d_id]['backend'].try_connect() + + added_datasets += 1 + + if store_type == 'nexus_proto' or store_type == 'nexusproto': + update_logger.info(f"Detected new nexusproto dataset {d_id}, using default nexusproto backend") + NexusTileService.backends[d_id] = NexusTileService.backends[None] + NexusTileService.backends[d_id]['backend'].update_metadata(dataset) + elif store_type == 'zarr': + update_logger.info(f"Detected new zarr dataset {d_id}, opening new zarr backend") + + ds_config = json.loads(dataset['config'][0]) + try: + NexusTileService.backends[d_id] = { + 'backend': ZarrBackend(dataset_name=dataset['dataset_s'], **ds_config), + 'up': True + } + except NexusTileServiceException: + added_datasets -= 1 + else: + update_logger.warning(f'Unsupported backend {store_type} for dataset {d_id}') + added_datasets -= 1 + + removed_datasets = set(NexusTileService.backends.keys()).difference(present_datasets) + + if len(removed_datasets) > 0: + update_logger.info(f'{len(removed_datasets)} old datasets marked for removal') + + for dataset in removed_datasets: + update_logger.info(f"Removing dataset {dataset}") + del NexusTileService.backends[dataset] + + update_logger.info(f'Finished dataset update: {added_datasets} added, {len(removed_datasets)} removed, ' + f'{len(NexusTileService.backends) - 2} total') + + # Update cfg (ie, creds) of dataset + @staticmethod + def user_ds_update(name, config): + solr = NexusTileService._get_datasets_store() + + docs = solr.search(f'dataset_s:{name}').docs + + if len(docs) != 1: + raise ValueError(f'Given name must match exactly one existing dataset; matched {len(docs)}') + + ds = docs[0] + + if 'source_s' not in ds or ds['source_s'] == 'collection_config': + raise ValueError('Provided dataset is source_s in collection config and cannot be deleted') + + config_dict = json.loads(ds['config'][0]) + + config_dict['config'] = config + + solr.delete(id=ds['id']) + solr.add([{ + 'id': name, + 'dataset_s': name, + 'latest_update_l': int(datetime.now().timestamp()), + 'store_type_s': ds['store_type_s'], + 'config': json.dumps(config_dict), + 'source_s': 'user_added' + }]) + solr.commit() + + logger.info(f'Updated dataset {name} in Solr. Updating backends') + + with DS_LOCK: + NexusTileService._update_datasets() + + return {'success': True} + + # Add dataset + backend + @staticmethod + def user_ds_add(name, path, config, type='zarr'): + solr = NexusTileService._get_datasets_store() + + docs = solr.search(f'dataset_s:{name}').docs + + if len(docs) > 0: + raise ValueError(f'Dataset {name} already exists') + + config_dict = { + 'path': path, + 'config': config + } + + solr.add([{ + 'id': name, + 'dataset_s': name, + 'latest_update_l': int(datetime.now().timestamp()), + 'store_type_s': type, + 'config': json.dumps(config_dict), + 'source_s': 'user_added' + }]) + solr.commit() + + logger.info(f'Added dataset {name} to Solr. Updating backends') + + with DS_LOCK: + NexusTileService._update_datasets() + + return {'success': True} + + # Delete dataset backend (error if it's a hardcoded one) + @staticmethod + def user_ds_delete(name): + solr = NexusTileService._get_datasets_store() - if not skipMetadatastore: - metadatastore = self._config.get("metadatastore", "store", fallback='solr') - if metadatastore == "solr": - self._metadatastore = SolrProxy.SolrProxy(self._config) - elif metadatastore == "elasticsearch": - self._metadatastore = ElasticsearchProxy.ElasticsearchProxy(self._config) + docs = solr.search(f'dataset_s:{name}').docs + + if len(docs) != 1: + raise ValueError(f'Given name must match exactly one existing dataset; matched {len(docs)}') + + ds = docs[0] + + if 'source_s' not in ds or ds['source_s'] == 'collection_config': + raise ValueError('Provided dataset is source_s in collection config and cannot be deleted') + + solr.delete(id=ds['id']) + solr.commit() + + logger.info(f'Removed dataset {name} from Solr. Updating backends') + + with DS_LOCK: + NexusTileService._update_datasets() + + return {'success': True} + + @staticmethod + def get_metadata_for_dataset(ds_name): + try: + backend = NexusTileService._get_backend(ds_name) + return backend.get_metadata(ds_name) + except: + return None def override_config(self, config): for section in config.sections(): @@ -113,109 +386,90 @@ def override_config(self, config): for option in config.options(section): if config.get(section, option) is not None: self._config.set(section, option, config.get(section, option)) + if NexusTileService.ds_config.has_section(section): # only override preexisting section, ignores the other + for option in config.options(section): + if config.get(section, option) is not None: + NexusTileService.ds_config.set(section, option, config.get(section, option)) def get_dataseries_list(self, simple=False): - if simple: - return self._metadatastore.get_data_series_list_simple() - else: - return self._metadatastore.get_data_series_list() + datasets = [] + for backend in set([b['backend'] for b in NexusTileService.backends.values() if b['up']]): + datasets.extend(backend.get_dataseries_list(simple)) + + return datasets + @tile_data() + @catch_not_implemented def find_tile_by_id(self, tile_id, **kwargs): - return self._metadatastore.find_tile_by_id(tile_id) + tile = URL(tile_id) + + if tile.scheme == 'nts': + return NexusTileService._get_backend(tile.path).find_tile_by_id(tile_id) + else: + return NexusTileService._get_backend('__nexusproto__').find_tile_by_id(tile_id) @tile_data() + @catch_not_implemented def find_tiles_by_id(self, tile_ids, ds=None, **kwargs): - return self._metadatastore.find_tiles_by_id(tile_ids, ds=ds, **kwargs) + if ds is None: + return [self.find_tile_by_id(tid, **kwargs, fetch_data=False) for tid in tile_ids] + return NexusTileService._get_backend(ds).find_tiles_by_id(tile_ids, ds=ds, **kwargs) + @catch_not_implemented def find_days_in_range_asc(self, min_lat, max_lat, min_lon, max_lon, dataset, start_time, end_time, metrics_callback=None, **kwargs): - start = datetime.now() - result = self._metadatastore.find_days_in_range_asc(min_lat, max_lat, min_lon, max_lon, dataset, start_time, - end_time, - **kwargs) - duration = (datetime.now() - start).total_seconds() - if metrics_callback: - metrics_callback(solr=duration) - return result + return NexusTileService._get_backend(dataset).find_days_in_range_asc(min_lat, max_lat, min_lon, max_lon, + dataset, start_time, end_time, + metrics_callback, **kwargs) @tile_data() + @catch_not_implemented def find_tile_by_polygon_and_most_recent_day_of_year(self, bounding_polygon, ds, day_of_year, **kwargs): - """ - Given a bounding polygon, dataset, and day of year, find tiles in that dataset with the same bounding - polygon and the closest day of year. - - For example: - given a polygon minx=0, miny=0, maxx=1, maxy=1; dataset=MY_DS; and day of year=32 - search for first tile in MY_DS with identical bbox and day_of_year <= 32 (sorted by day_of_year desc) - - Valid matches: - minx=0, miny=0, maxx=1, maxy=1; dataset=MY_DS; day of year = 32 - minx=0, miny=0, maxx=1, maxy=1; dataset=MY_DS; day of year = 30 - - Invalid matches: - minx=1, miny=0, maxx=2, maxy=1; dataset=MY_DS; day of year = 32 - minx=0, miny=0, maxx=1, maxy=1; dataset=MY_OTHER_DS; day of year = 32 - minx=0, miny=0, maxx=1, maxy=1; dataset=MY_DS; day of year = 30 if minx=0, miny=0, maxx=1, maxy=1; dataset=MY_DS; day of year = 32 also exists - - :param bounding_polygon: The exact bounding polygon of tiles to search for - :param ds: The dataset name being searched - :param day_of_year: Tile day of year to search for, tile nearest to this day (without going over) will be returned - :return: List of one tile from ds with bounding_polygon on or before day_of_year or raise NexusTileServiceException if no tile found - """ - try: - tile = self._metadatastore.find_tile_by_polygon_and_most_recent_day_of_year(bounding_polygon, ds, - day_of_year) - except IndexError: - raise NexusTileServiceException("No tile found.").with_traceback(sys.exc_info()[2]) - - return tile + return NexusTileService._get_backend(ds).find_tile_by_polygon_and_most_recent_day_of_year( + bounding_polygon, ds, day_of_year, **kwargs + ) @tile_data() + @catch_not_implemented def find_all_tiles_in_box_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs): - return self._metadatastore.find_all_tiles_in_box_at_time(min_lat, max_lat, min_lon, max_lon, dataset, time, - rows=5000, - **kwargs) + return NexusTileService._get_backend(dataset).find_all_tiles_in_box_at_time( + min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs + ) @tile_data() + @catch_not_implemented def find_all_tiles_in_polygon_at_time(self, bounding_polygon, dataset, time, **kwargs): - return self._metadatastore.find_all_tiles_in_polygon_at_time(bounding_polygon, dataset, time, rows=5000, - **kwargs) + return NexusTileService._get_backend(dataset).find_all_tiles_in_polygon_at_time( + bounding_polygon, dataset, time, **kwargs + ) @tile_data() + @catch_not_implemented def find_tiles_in_box(self, min_lat, max_lat, min_lon, max_lon, ds=None, start_time=0, end_time=-1, **kwargs): # Find tiles that fall in the given box in the Solr index if type(start_time) is datetime: start_time = (start_time - EPOCH).total_seconds() if type(end_time) is datetime: end_time = (end_time - EPOCH).total_seconds() - return self._metadatastore.find_all_tiles_in_box_sorttimeasc(min_lat, max_lat, min_lon, max_lon, ds, start_time, - end_time, **kwargs) + + return NexusTileService._get_backend(ds).find_tiles_in_box( + min_lat, max_lat, min_lon, max_lon, ds, start_time, end_time, **kwargs + ) @tile_data() + @catch_not_implemented def find_tiles_in_polygon(self, bounding_polygon, ds=None, start_time=0, end_time=-1, **kwargs): - # Find tiles that fall within the polygon in the Solr index - if 'sort' in list(kwargs.keys()): - tiles = self._metadatastore.find_all_tiles_in_polygon(bounding_polygon, ds, start_time, end_time, **kwargs) - else: - tiles = self._metadatastore.find_all_tiles_in_polygon_sorttimeasc(bounding_polygon, ds, start_time, - end_time, - **kwargs) - return tiles + return NexusTileService._get_backend(ds).find_tiles_in_polygon( + bounding_polygon, ds, start_time, end_time, **kwargs + ) @tile_data() + @catch_not_implemented def find_tiles_by_metadata(self, metadata, ds=None, start_time=0, end_time=-1, **kwargs): - """ - Return list of tiles whose metadata matches the specified metadata, start_time, end_time. - :param metadata: List of metadata values to search for tiles e.g ["river_id_i:1", "granule_s:granule_name"] - :param ds: The dataset name to search - :param start_time: The start time to search for tiles - :param end_time: The end time to search for tiles - :return: A list of tiles - """ - tiles = self._metadatastore.find_all_tiles_by_metadata(metadata, ds, start_time, end_time, **kwargs) - - return tiles + return NexusTileService._get_backend(ds).find_tiles_by_metadata( + metadata, ds, start_time, end_time, **kwargs + ) def get_tiles_by_metadata(self, metadata, ds=None, start_time=0, end_time=-1, **kwargs): """ @@ -233,6 +487,7 @@ def get_tiles_by_metadata(self, metadata, ds=None, start_time=0, end_time=-1, ** return tiles @tile_data() + @catch_not_implemented def find_tiles_by_exact_bounds(self, bounds, ds, start_time, end_time, **kwargs): """ The method will return tiles with the exact given bounds within the time range. It differs from @@ -246,16 +501,16 @@ def find_tiles_by_exact_bounds(self, bounds, ds, start_time, end_time, **kwargs) :param kwargs: fetch_data: True/False = whether or not to retrieve tile data :return: """ - tiles = self._metadatastore.find_tiles_by_exact_bounds(bounds[0], bounds[1], bounds[2], bounds[3], ds, - start_time, - end_time) - return tiles + return NexusTileService._get_backend(ds).find_tiles_by_exact_bounds( + bounds, ds, start_time, end_time, **kwargs + ) @tile_data() + @catch_not_implemented def find_all_boundary_tiles_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs): - return self._metadatastore.find_all_boundary_tiles_at_time(min_lat, max_lat, min_lon, max_lon, dataset, time, - rows=5000, - **kwargs) + return NexusTileService._get_backend(dataset).find_all_boundary_tiles_at_time( + min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs + ) def get_tiles_bounded_by_box(self, min_lat, max_lat, min_lon, max_lon, ds=None, start_time=0, end_time=-1, **kwargs): @@ -275,13 +530,15 @@ def get_tiles_bounded_by_polygon(self, polygon, ds=None, start_time=0, end_time= return tiles + @catch_not_implemented def get_min_max_time_by_granule(self, ds, granule_name): - start_time, end_time = self._metadatastore.find_min_max_date_from_granule(ds, granule_name) - - return start_time, end_time + return NexusTileService._get_backend(ds).get_min_max_time_by_granule( + ds, granule_name + ) + @catch_not_implemented def get_dataset_overall_stats(self, ds): - return self._metadatastore.get_data_series_stats(ds) + return NexusTileService._get_backend(ds).get_dataset_overall_stats(ds) def get_tiles_bounded_by_box_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs): tiles = self.find_all_tiles_in_box_at_time(min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs) @@ -301,24 +558,19 @@ def get_boundary_tiles_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset return tiles + @catch_not_implemented def get_stats_within_box_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs): - tiles = self._metadatastore.find_all_tiles_within_box_at_time(min_lat, max_lat, min_lon, max_lon, dataset, time, - **kwargs) - - return tiles + return NexusTileService.get_stats_within_box_at_time( + min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs + ) - def get_bounding_box(self, tile_ids): + def get_bounding_box(self, tile_ids, ds=None): """ Retrieve a bounding box that encompasses all of the tiles represented by the given tile ids. :param tile_ids: List of tile ids :return: shapely.geometry.Polygon that represents the smallest bounding box that encompasses all of the tiles """ - tiles = self.find_tiles_by_id(tile_ids, fl=['tile_min_lat', 'tile_max_lat', 'tile_min_lon', 'tile_max_lon'], - fetch_data=False, rows=len(tile_ids)) - polys = [] - for tile in tiles: - polys.append(box(tile.bbox.min_lon, tile.bbox.min_lat, tile.bbox.max_lon, tile.bbox.max_lat)) - return box(*MultiPolygon(polys).bounds) + return NexusTileService._get_backend(ds).get_bounding_box(tile_ids) def get_min_time(self, tile_ids, ds=None): """ @@ -327,8 +579,7 @@ def get_min_time(self, tile_ids, ds=None): :param ds: Filter by a specific dataset. Defaults to None (queries all datasets) :return: long time in seconds since epoch """ - min_time = self._metadatastore.find_min_date_from_tiles(tile_ids, ds=ds) - return int((min_time - EPOCH).total_seconds()) + return NexusTileService._get_backend(ds).get_min_time(tile_ids, ds) def get_max_time(self, tile_ids, ds=None): """ @@ -337,8 +588,7 @@ def get_max_time(self, tile_ids, ds=None): :param ds: Filter by a specific dataset. Defaults to None (queries all datasets) :return: long time in seconds since epoch """ - max_time = self._metadatastore.find_max_date_from_tiles(tile_ids, ds=ds) - return int((max_time - EPOCH).total_seconds()) + return int(NexusTileService._get_backend(ds).get_max_time(tile_ids)) def get_distinct_bounding_boxes_in_polygon(self, bounding_polygon, ds, start_time, end_time): """ @@ -352,8 +602,19 @@ def get_distinct_bounding_boxes_in_polygon(self, bounding_polygon, ds, start_tim bounds = self._metadatastore.find_distinct_bounding_boxes_in_polygon(bounding_polygon, ds, start_time, end_time) return [box(*b) for b in bounds] - def mask_tiles_to_bbox(self, min_lat, max_lat, min_lon, max_lon, tiles): + def get_tile_count(self, ds, bounding_polygon=None, start_time=0, end_time=-1, metadata=None, **kwargs): + """ + Return number of tiles that match search criteria. + :param ds: The dataset name to search + :param bounding_polygon: The polygon to search for tiles + :param start_time: The start time to search for tiles + :param end_time: The end time to search for tiles + :param metadata: List of metadata values to search for tiles e.g ["river_id_i:1", "granule_s:granule_name"] + :return: number of tiles that match search criteria + """ + return self._metadatastore.get_tile_count(ds, bounding_polygon, start_time, end_time, metadata, **kwargs) + def mask_tiles_to_bbox(self, min_lat, max_lat, min_lon, max_lon, tiles): for tile in tiles: tile.latitudes = ma.masked_outside(tile.latitudes, min_lat, max_lat) tile.longitudes = ma.masked_outside(tile.longitudes, min_lon, max_lon) @@ -438,45 +699,12 @@ def mask_tiles_to_time_range(self, start_time, end_time, tiles): return tiles - def get_tile_count(self, ds, bounding_polygon=None, start_time=0, end_time=-1, metadata=None, **kwargs): - """ - Return number of tiles that match search criteria. - :param ds: The dataset name to search - :param bounding_polygon: The polygon to search for tiles - :param start_time: The start time to search for tiles - :param end_time: The end time to search for tiles - :param metadata: List of metadata values to search for tiles e.g ["river_id_i:1", "granule_s:granule_name"] - :return: number of tiles that match search criteria - """ - return self._metadatastore.get_tile_count(ds, bounding_polygon, start_time, end_time, metadata, **kwargs) - def fetch_data_for_tiles(self, *tiles): + dataset = tiles[0].dataset - nexus_tile_ids = set([tile.tile_id for tile in tiles]) - matched_tile_data = self._datastore.fetch_nexus_tiles(*nexus_tile_ids) - - tile_data_by_id = {str(a_tile_data.tile_id): a_tile_data for a_tile_data in matched_tile_data} - - missing_data = nexus_tile_ids.difference(list(tile_data_by_id.keys())) - if len(missing_data) > 0: - raise Exception("Missing data for tile_id(s) %s." % missing_data) - - for a_tile in tiles: - lats, lons, times, data, meta, is_multi_var = tile_data_by_id[a_tile.tile_id].get_lat_lon_time_data_meta() - - a_tile.latitudes = lats - a_tile.longitudes = lons - a_tile.times = times - a_tile.data = data - a_tile.meta_data = meta - a_tile.is_multi = is_multi_var - - del (tile_data_by_id[a_tile.tile_id]) - - return tiles + return NexusTileService._get_backend(dataset).fetch_data_for_tiles(*tiles) def _metadata_store_docs_to_tiles(self, *store_docs): - tiles = [] for store_doc in store_docs: tile = Tile() @@ -573,7 +801,6 @@ def _metadata_store_docs_to_tiles(self, *store_docs): except KeyError: pass - if 'tile_var_name_ss' in store_doc: tile.variables = [] for var_name in store_doc['tile_var_name_ss']: @@ -588,13 +815,6 @@ def _metadata_store_docs_to_tiles(self, *store_docs): return tiles - def pingSolr(self): - status = self._metadatastore.ping() - if status and status["status"] == "OK": - return True - else: - return False - @staticmethod def _get_config_files(filename): log = logging.getLogger(__name__) diff --git a/data-access/requirements.txt b/data-access/requirements.txt index f91f1803..05b78947 100644 --- a/data-access/requirements.txt +++ b/data-access/requirements.txt @@ -21,4 +21,11 @@ urllib3==1.26.2 requests nexusproto Shapely -numpy==1.24.3 +s3fs==2022.5.0 +fsspec==2022.5.0 +botocore==1.24.21 +aiohttp==3.8.1 +xarray~=2022.3.0 +zarr>=2.11.3 +pandas<2.1.0rc0 # Temporary restriction because 2.1.0rc0 fails to build + diff --git a/data-access/setup.py b/data-access/setup.py index ab0248f0..e539e1e0 100644 --- a/data-access/setup.py +++ b/data-access/setup.py @@ -12,11 +12,14 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import setuptools from setuptools import setup -with open('../VERSION.txt', 'r') as f: - __version__ = f.read() +try: + with open('../VERSION.txt', 'r') as f: + __version__ = f.read() +except: + __version__ = None with open('requirements.txt') as f: @@ -32,8 +35,13 @@ description="NEXUS API.", long_description=open('README.md').read(), - packages=['nexustiles', 'nexustiles.model', 'nexustiles.dao'], - package_data={'nexustiles': ['config/datastores.ini.default', 'config/datastores.ini']}, + packages=setuptools.find_packages(), # ['nexustiles', 'nexustiles.model', 'nexustiles.dao'], + package_data={ + 'nexustiles': + ['config/datasets.ini.default', 'config/datasets.ini'], + 'nexustiles.backends.nexusproto': + ['config/datastores.ini.default', 'config/datastores.ini'] + }, platforms='any', python_requires='~=3.8', install_requires=pip_requirements,