From 168ab41f7789c939c0d4a20fdcc67c47ce4d265e Mon Sep 17 00:00:00 2001 From: Eamon Ford Date: Mon, 10 Aug 2020 21:54:14 -0700 Subject: [PATCH 1/5] handle the error where a granule cant be opened --- .../granule_ingester/exceptions/Exceptions.py | 7 ++++++- .../granule_ingester/exceptions/__init__.py | 19 ++++++++----------- .../granule_loaders/GranuleLoader.py | 7 ++++++- 3 files changed, 20 insertions(+), 13 deletions(-) diff --git a/granule_ingester/granule_ingester/exceptions/Exceptions.py b/granule_ingester/granule_ingester/exceptions/Exceptions.py index c648b99..fdd03e5 100644 --- a/granule_ingester/granule_ingester/exceptions/Exceptions.py +++ b/granule_ingester/granule_ingester/exceptions/Exceptions.py @@ -6,7 +6,11 @@ class PipelineRunningError(Exception): pass -class TileProcessingError(Exception): +class TileProcessingError(PipelineRunningError): + pass + + +class GranuleLoadingError(PipelineRunningError): pass @@ -21,6 +25,7 @@ class RabbitMQLostConnectionError(LostConnectionError): class CassandraLostConnectionError(LostConnectionError): pass + class SolrLostConnectionError(LostConnectionError): pass diff --git a/granule_ingester/granule_ingester/exceptions/__init__.py b/granule_ingester/granule_ingester/exceptions/__init__.py index ea0969f..f2429b1 100644 --- a/granule_ingester/granule_ingester/exceptions/__init__.py +++ b/granule_ingester/granule_ingester/exceptions/__init__.py @@ -1,11 +1,8 @@ -from .Exceptions import CassandraFailedHealthCheckError -from .Exceptions import CassandraLostConnectionError -from .Exceptions import FailedHealthCheckError -from .Exceptions import LostConnectionError -from .Exceptions import PipelineBuildingError -from .Exceptions import PipelineRunningError -from .Exceptions import RabbitMQFailedHealthCheckError -from .Exceptions import RabbitMQLostConnectionError -from .Exceptions import SolrFailedHealthCheckError -from .Exceptions import SolrLostConnectionError -from .Exceptions import TileProcessingError +from .Exceptions import (CassandraFailedHealthCheckError, + CassandraLostConnectionError, FailedHealthCheckError, + GranuleLoadingError, LostConnectionError, + PipelineBuildingError, PipelineRunningError, + RabbitMQFailedHealthCheckError, + RabbitMQLostConnectionError, + SolrFailedHealthCheckError, SolrLostConnectionError, + TileProcessingError) diff --git a/granule_ingester/granule_ingester/granule_loaders/GranuleLoader.py b/granule_ingester/granule_ingester/granule_loaders/GranuleLoader.py index c28ffbb..0311f49 100644 --- a/granule_ingester/granule_ingester/granule_loaders/GranuleLoader.py +++ b/granule_ingester/granule_ingester/granule_loaders/GranuleLoader.py @@ -21,6 +21,8 @@ import aioboto3 import xarray as xr +from granule_ingester.exceptions import GranuleLoadingError + logger = logging.getLogger(__name__) @@ -52,7 +54,10 @@ async def open(self) -> (xr.Dataset, str): raise RuntimeError("Granule path scheme '{}' is not supported.".format(resource_url.scheme)) granule_name = os.path.basename(self._resource) - return xr.open_dataset(file_path, lock=False), granule_name + try: + return xr.open_dataset(file_path, lock=False), granule_name + except Exception: + raise GranuleLoadingError(f"The granule {self._resource} is not a valid NetCDF file.") @staticmethod async def _download_s3_file(url: str): From 6937c5aff05d0219db5623f805c91b5c9df40bff Mon Sep 17 00:00:00 2001 From: Eamon Ford Date: Tue, 11 Aug 2020 11:43:25 -0700 Subject: [PATCH 2/5] better error handling --- .../granule_ingester/slicers/SliceFileByStepSize.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/granule_ingester/granule_ingester/slicers/SliceFileByStepSize.py b/granule_ingester/granule_ingester/slicers/SliceFileByStepSize.py index 6e03336..9cbea44 100644 --- a/granule_ingester/granule_ingester/slicers/SliceFileByStepSize.py +++ b/granule_ingester/granule_ingester/slicers/SliceFileByStepSize.py @@ -15,8 +15,9 @@ import itertools import logging -from typing import List, Dict +from typing import Dict, List +from granule_ingester.exceptions import TileProcessingError from granule_ingester.slicers.TileSlicer import TileSlicer logger = logging.getLogger(__name__) @@ -33,7 +34,7 @@ def _generate_slices(self, dimension_specs: Dict[str, int]) -> List[str]: # make sure all provided dimensions are in dataset for dim_name in self._dimension_step_sizes.keys(): if dim_name not in list(dimension_specs.keys()): - raise KeyError('Provided dimension "{}" not found in dataset'.format(dim_name)) + raise TileProcessingError('Provided dimension "{}" not found in dataset'.format(dim_name)) slices = self._generate_chunk_boundary_slices(dimension_specs) logger.info("Sliced granule into {} slices.".format(len(slices))) From 6ebadcd87121a3b4286609bb8296ba84d9e8ac8c Mon Sep 17 00:00:00 2001 From: Eamon Ford Date: Wed, 12 Aug 2020 00:36:13 -0700 Subject: [PATCH 3/5] wip: automatic reading processor detector --- .../granule_loaders/GranuleLoader.py | 2 + .../granule_ingester/pipeline/Modules.py | 13 +++-- .../granule_ingester/pipeline/Pipeline.py | 3 + .../processors/ReadingProcessorSelector.py | 58 +++++++++++++++++++ .../granule_ingester/processors/__init__.py | 3 +- .../EccoReadingProcessor.py | 12 ++++ .../GridReadingProcessor.py | 11 ++++ .../SwathReadingProcessor.py | 8 +++ .../TileReadingProcessor.py | 5 ++ .../TimeSeriesReadingProcessor.py | 10 ++++ .../test_ReadingProcessorSelector.py | 57 ++++++++++++++++++ .../test_EccoReadingProcessor.py | 11 ++++ .../test_SwathReadingProcessor.py | 11 ++++ 13 files changed, 198 insertions(+), 6 deletions(-) create mode 100644 granule_ingester/granule_ingester/processors/ReadingProcessorSelector.py create mode 100644 granule_ingester/tests/processors/test_ReadingProcessorSelector.py diff --git a/granule_ingester/granule_ingester/granule_loaders/GranuleLoader.py b/granule_ingester/granule_ingester/granule_loaders/GranuleLoader.py index 0311f49..6377de0 100644 --- a/granule_ingester/granule_ingester/granule_loaders/GranuleLoader.py +++ b/granule_ingester/granule_ingester/granule_loaders/GranuleLoader.py @@ -56,6 +56,8 @@ async def open(self) -> (xr.Dataset, str): granule_name = os.path.basename(self._resource) try: return xr.open_dataset(file_path, lock=False), granule_name + except FileNotFoundError: + raise GranuleLoadingError(f"The granule file {self._resource} does not exist.") except Exception: raise GranuleLoadingError(f"The granule {self._resource} is not a valid NetCDF file.") diff --git a/granule_ingester/granule_ingester/pipeline/Modules.py b/granule_ingester/granule_ingester/pipeline/Modules.py index 2cf2245..689b3b1 100644 --- a/granule_ingester/granule_ingester/pipeline/Modules.py +++ b/granule_ingester/granule_ingester/pipeline/Modules.py @@ -1,7 +1,10 @@ -from granule_ingester.processors import * -from granule_ingester.processors.reading_processors import * -from granule_ingester.slicers import * -from granule_ingester.granule_loaders import * +from granule_ingester.granule_loaders import GranuleLoader +from granule_ingester.processors import (EmptyTileFilter, GenerateTileId, + KelvinToCelsius, + TileSummarizingProcessor) +from granule_ingester.processors.reading_processors import ( + EccoReadingProcessor, GridReadingProcessor) +from granule_ingester.slicers import SliceFileByStepSize modules = { "granule": GranuleLoader, @@ -11,5 +14,5 @@ "GridReadingProcessor": GridReadingProcessor, "tileSummary": TileSummarizingProcessor, "emptyTileFilter": EmptyTileFilter, - "kelvinToCelsius": KelvinToCelsius + "kelvinToCelsius": KelvinToCelsius, } diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py index dabca81..03da05f 100644 --- a/granule_ingester/granule_ingester/pipeline/Pipeline.py +++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py @@ -26,6 +26,7 @@ from aiomultiprocess.types import ProxyException from granule_ingester.exceptions import PipelineBuildingError from granule_ingester.granule_loaders import GranuleLoader +from granule_ingester.processors import ReadingProcessorSelector from granule_ingester.pipeline.Modules import \ modules as processor_module_mappings from granule_ingester.processors.TileProcessor import TileProcessor @@ -142,6 +143,8 @@ def _build_pipeline(cls, slicer_config = config['slicer'] slicer = cls._parse_module(slicer_config, module_mappings) + reading_processor_selector = ReadingProcessorSelector(**config['readingProcessorSelector']) + tile_processors = [] for processor_config in config['processors']: module = cls._parse_module(processor_config, module_mappings) diff --git a/granule_ingester/granule_ingester/processors/ReadingProcessorSelector.py b/granule_ingester/granule_ingester/processors/ReadingProcessorSelector.py new file mode 100644 index 0000000..2138c98 --- /dev/null +++ b/granule_ingester/granule_ingester/processors/ReadingProcessorSelector.py @@ -0,0 +1,58 @@ +import xarray as xr +from typing import List +import re + +from granule_ingester.processors.reading_processors import (TileReadingProcessor, + GridReadingProcessor, + EccoReadingProcessor, + SwathReadingProcessor, + TimeSeriesReadingProcessor) + + +GRID_PROCESSORS = [GridReadingProcessor, EccoReadingProcessor, SwathReadingProcessor, TimeSeriesReadingProcessor] + + +class ReadingProcessorSelector: + def __init__(self, dataset: xr.Dataset, variable: str, *args, **kwargs): + self._dataset = dataset + self._variable = variable + + def get_reading_processor(self): + ... + + def detect_grid_type(self, lat: str, lon: str, time: str, processor_types: List[TileReadingProcessor]): + bids = [] + for processor_type in processor_types: + bid = processor_type.bid(dataset=self._dataset, + variable=self._variable, + lat=lat, + lon=lon, + time=time) + bids.append((processor_type, bid)) + highest_bidder = max(bids, key=lambda bidder: bidder[1]) + + return highest_bidder[0] + + def detect_dimensions(self): + lat_regex = r'((.*\s+)?latitude(.*\s+)?)|((.*\s+)?lat(\s+.*)?)' + lon_regex = r'((.*\s+)?longitude(.*\s+)?)|((.*\s+)?lon(\s+.*)?)' + time_regex = r'(.*\s+)?time(.*\s+)?' + + lat = self._detect_dimension_name(lat_regex) + lon = self._detect_dimension_name(lon_regex) + time = self._detect_dimension_name(time_regex) + + return (lat, lon, time) + + def _detect_dimension_name(self, pattern: str) -> str: + candidates = [] + for dim_name in self._dataset.data_vars: + long_name = self._dataset[dim_name].long_name + if re.match(pattern, long_name): + candidates.append(dim_name) + if len(candidates) > 1: + raise Exception(f"Found multiple possibilities for dimension with pattern {pattern}.") + + if len(candidates) == 0: + return None + return candidates[0] diff --git a/granule_ingester/granule_ingester/processors/__init__.py b/granule_ingester/granule_ingester/processors/__init__.py index 592d8ea..a05673a 100644 --- a/granule_ingester/granule_ingester/processors/__init__.py +++ b/granule_ingester/granule_ingester/processors/__init__.py @@ -1,5 +1,6 @@ +from granule_ingester.processors.ReadingProcessorSelector import ReadingProcessorSelector from granule_ingester.processors.EmptyTileFilter import EmptyTileFilter from granule_ingester.processors.GenerateTileId import GenerateTileId +from granule_ingester.processors.kelvintocelsius import KelvinToCelsius from granule_ingester.processors.TileProcessor import TileProcessor from granule_ingester.processors.TileSummarizingProcessor import TileSummarizingProcessor -from granule_ingester.processors.kelvintocelsius import KelvinToCelsius diff --git a/granule_ingester/granule_ingester/processors/reading_processors/EccoReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/EccoReadingProcessor.py index 1876013..7c81e83 100644 --- a/granule_ingester/granule_ingester/processors/reading_processors/EccoReadingProcessor.py +++ b/granule_ingester/granule_ingester/processors/reading_processors/EccoReadingProcessor.py @@ -23,6 +23,18 @@ def __init__(self, self.time = time self.tile = tile + @staticmethod + def bid(dataset, variable, lat, lon, time): + bid = 0 + if lat == 'YC' and lon == 'XC': + bid += 1 + if lat not in dataset[variable].dims and lon not in dataset[variable].dims: + bid += 1 + if 'tile' in dataset[variable].dims: + bid += 1 + + return bid / 3 + def _generate_tile(self, ds: xr.Dataset, dimensions_to_slices: Dict[str, slice], input_tile): new_tile = nexusproto.EccoTile() diff --git a/granule_ingester/granule_ingester/processors/reading_processors/GridReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/GridReadingProcessor.py index 4354f9e..f1bc309 100644 --- a/granule_ingester/granule_ingester/processors/reading_processors/GridReadingProcessor.py +++ b/granule_ingester/granule_ingester/processors/reading_processors/GridReadingProcessor.py @@ -14,6 +14,17 @@ def __init__(self, variable_to_read, latitude, longitude, depth=None, time=None, self.depth = depth self.time = time + @staticmethod + def bid(dataset, variable, lat, lon, time): + bid = 0 + if all(dimension_size > 2 for dimension_size in dataset[variable].sizes.values()): + bid += 1 + if len(dataset[lat].dims) == 1 and len(dataset[lon].dims) == 1: + bid += 1 + if len(set(dataset[variable].dims) - {time}) >= 2: + bid += 1 + return bid / 3 + def _generate_tile(self, ds: xr.Dataset, dimensions_to_slices: Dict[str, slice], input_tile): new_tile = nexusproto.GridTile() diff --git a/granule_ingester/granule_ingester/processors/reading_processors/SwathReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/SwathReadingProcessor.py index fec28ca..fdc58f0 100644 --- a/granule_ingester/granule_ingester/processors/reading_processors/SwathReadingProcessor.py +++ b/granule_ingester/granule_ingester/processors/reading_processors/SwathReadingProcessor.py @@ -14,6 +14,14 @@ def __init__(self, variable_to_read, latitude, longitude, time, depth=None, **kw self.depth = depth self.time = time + @staticmethod + def bid(dataset, variable, lat, lon, time): + bid = 0 + if 2 in dataset[variable].sizes.values(): + bid += 1 + + return bid / 1 + def _generate_tile(self, ds: xr.Dataset, dimensions_to_slices: Dict[str, slice], input_tile): new_tile = nexusproto.SwathTile() diff --git a/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py index 8b69ad2..4192c36 100644 --- a/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py +++ b/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py @@ -32,6 +32,11 @@ def __init__(self, variable_to_read: str, latitude: str, longitude: str, *args, self.latitude = latitude self.longitude = longitude + @staticmethod + @abstractmethod + def bid(dataset: xr.Dataset) -> bool: + pass + def process(self, tile, dataset: xr.Dataset, *args, **kwargs): try: dimensions_to_slices = self._convert_spec_to_slices(tile.summary.section_spec) diff --git a/granule_ingester/granule_ingester/processors/reading_processors/TimeSeriesReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/TimeSeriesReadingProcessor.py index 2831c0c..c10586e 100644 --- a/granule_ingester/granule_ingester/processors/reading_processors/TimeSeriesReadingProcessor.py +++ b/granule_ingester/granule_ingester/processors/reading_processors/TimeSeriesReadingProcessor.py @@ -15,6 +15,16 @@ def __init__(self, variable_to_read, latitude, longitude, time, depth=None, **kw self.depth = depth self.time = time + @staticmethod + def bid(dataset, variable, lat, lon, time): + bid = 0 + if len(dataset[variable].dims) == 2: + bid += 1 + if time in dataset[variable].dims: + bid += 1 + + return bid / 2 + def _generate_tile(self, ds: xr.Dataset, dimensions_to_slices: Dict[str, slice], input_tile): new_tile = nexusproto.TimeSeriesTile() diff --git a/granule_ingester/tests/processors/test_ReadingProcessorSelector.py b/granule_ingester/tests/processors/test_ReadingProcessorSelector.py new file mode 100644 index 0000000..c607071 --- /dev/null +++ b/granule_ingester/tests/processors/test_ReadingProcessorSelector.py @@ -0,0 +1,57 @@ + +import unittest +from os import path + + +import xarray as xr + + +from granule_ingester.processors import ReadingProcessorSelector +from granule_ingester.processors.reading_processors import GridReadingProcessor, EccoReadingProcessor, TimeSeriesReadingProcessor, SwathReadingProcessor + + +from granule_ingester.processors.ReadingProcessorSelector import GRID_PROCESSORS + + +class TestGenerateTileId(unittest.TestCase): + + def test_detect_dimensions(self): + netcdf_path = path.join(path.dirname(__file__), '../granules/SMAP_L2B_SSS_04892_20160101T005507_R13080.h5') + with xr.open_dataset(netcdf_path, decode_cf=True) as dataset: + selector = ReadingProcessorSelector(dataset, 'smap_sss') + self.assertEqual(('lat', 'lon', 'row_time'), selector.detect_dimensions()) + + def test_detect_grid_type_smap(self): + netcdf_path = path.join(path.dirname(__file__), '../granules/SMAP_L2B_SSS_04892_20160101T005507_R13080.h5') + with xr.open_dataset(netcdf_path, decode_cf=True) as dataset: + selector = ReadingProcessorSelector(dataset, 'smap_sss') + processor = selector.detect_grid_type('lat', 'lon', 'time', GRID_PROCESSORS) + self.assertEqual(GridReadingProcessor, processor) + + def test_detect_grid_type_ecco_native(self): + netcdf_path = path.join(path.dirname(__file__), '../granules/OBP_native_grid.nc') + with xr.open_dataset(netcdf_path, decode_cf=True) as dataset: + selector = ReadingProcessorSelector(dataset, 'OBP') + processor = selector.detect_grid_type('YC', 'XC', 'time', GRID_PROCESSORS) + self.assertEqual(EccoReadingProcessor, processor) + + def test_detect_grid_type_ecco_interp(self): + netcdf_path = path.join(path.dirname(__file__), '../granules/OBP_2017_01.nc') + with xr.open_dataset(netcdf_path, decode_cf=True) as dataset: + selector = ReadingProcessorSelector(dataset, 'OBP') + processor = selector.detect_grid_type('latitude', 'longitude', 'time', GRID_PROCESSORS) + self.assertEqual(GridReadingProcessor, processor) + + def test_detect_grid_type_time_series(self): + netcdf_path = path.join(path.dirname(__file__), '../granules/not_empty_wswm.nc') + with xr.open_dataset(netcdf_path, decode_cf=True) as dataset: + selector = ReadingProcessorSelector(dataset, 'Qout') + processor = selector.detect_grid_type('lat', 'lon', 'time', GRID_PROCESSORS) + self.assertEqual(TimeSeriesReadingProcessor, processor) + + def test_detect_grid_type_swatch(self): + netcdf_path = path.join(path.dirname(__file__), '../granules/not_empty_smap.h5') + with xr.open_dataset(netcdf_path, decode_cf=True) as dataset: + selector = ReadingProcessorSelector(dataset, 'smap_sss') + processor = selector.detect_grid_type('lat', 'lon', 'row_time', GRID_PROCESSORS) + self.assertEqual(SwathReadingProcessor, processor) diff --git a/granule_ingester/tests/reading_processors/test_EccoReadingProcessor.py b/granule_ingester/tests/reading_processors/test_EccoReadingProcessor.py index f2e9f29..ec3311f 100644 --- a/granule_ingester/tests/reading_processors/test_EccoReadingProcessor.py +++ b/granule_ingester/tests/reading_processors/test_EccoReadingProcessor.py @@ -62,3 +62,14 @@ def test_generate_tile_with_dims_out_of_order(self): self.assertEqual(output_tile.tile.ecco_tile.variable_data.shape, [15, 7]) self.assertEqual(output_tile.tile.ecco_tile.latitude.shape, [15, 7]) self.assertEqual(output_tile.tile.ecco_tile.longitude.shape, [15, 7]) + + def test_bid(self): + netcdf_path = path.join(path.dirname(__file__), '../granules/OBP_native_grid.nc') + with xr.open_dataset(netcdf_path, decode_cf=True) as dataset: + bid = EccoReadingProcessor.bid( + dataset=dataset, + variable='OBP', + lat='YC', + lon='XC', + time='time') + self.assertEqual(3, bid) diff --git a/granule_ingester/tests/reading_processors/test_SwathReadingProcessor.py b/granule_ingester/tests/reading_processors/test_SwathReadingProcessor.py index 55ac4fc..c9e76c3 100644 --- a/granule_ingester/tests/reading_processors/test_SwathReadingProcessor.py +++ b/granule_ingester/tests/reading_processors/test_SwathReadingProcessor.py @@ -72,3 +72,14 @@ def test_read_not_empty_smap(self): self.assertEqual([38, 1], output_tile.tile.swath_tile.variable_data.shape) self.assertEqual([38, 1], output_tile.tile.swath_tile.latitude.shape) self.assertEqual([38, 1], output_tile.tile.swath_tile.longitude.shape) + + def test_bid(self): + netcdf_path = path.join(path.dirname(__file__), '../granules/not_empty_ascatb.nc4') + with xr.open_dataset(netcdf_path, decode_cf=True) as dataset: + bid = SwathReadingProcessor.bid( + dataset=dataset, + variable='wind_speed', + lat='lat', + lon='lon', + time='time') + self.assertTrue(bid) From e7b9a00f69ae01872dcae8e7d9be1fc2cbe53cdd Mon Sep 17 00:00:00 2001 From: Eamon Ford Date: Wed, 12 Aug 2020 13:42:22 -0700 Subject: [PATCH 4/5] wip --- .../processors/ReadingProcessorSelector.py | 10 ++++++++-- .../reading_processors/EccoReadingProcessor.py | 18 +++++++----------- .../reading_processors/GridReadingProcessor.py | 15 ++++++--------- .../SwathReadingProcessor.py | 8 ++------ .../reading_processors/TileReadingProcessor.py | 8 +++++++- .../TimeSeriesReadingProcessor.py | 13 +++++-------- .../test_ReadingProcessorSelector.py | 9 ++++++++- 7 files changed, 43 insertions(+), 38 deletions(-) diff --git a/granule_ingester/granule_ingester/processors/ReadingProcessorSelector.py b/granule_ingester/granule_ingester/processors/ReadingProcessorSelector.py index 2138c98..411b278 100644 --- a/granule_ingester/granule_ingester/processors/ReadingProcessorSelector.py +++ b/granule_ingester/granule_ingester/processors/ReadingProcessorSelector.py @@ -18,9 +18,15 @@ def __init__(self, dataset: xr.Dataset, variable: str, *args, **kwargs): self._variable = variable def get_reading_processor(self): - ... + lat, lon, time = self.detect_dimensions() + processor_class = self.detect_grid_type(lat=lat, lon=lon, time=time, processor_types=GRID_PROCESSORS) + return processor_class(variable_to_read=self._variable, latitude=lat, longitude=lon, time=time) - def detect_grid_type(self, lat: str, lon: str, time: str, processor_types: List[TileReadingProcessor]): + def detect_grid_type(self, + lat: str, + lon: str, + time: str, + processor_types: List[TileReadingProcessor]): bids = [] for processor_type in processor_types: bid = processor_type.bid(dataset=self._dataset, diff --git a/granule_ingester/granule_ingester/processors/reading_processors/EccoReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/EccoReadingProcessor.py index 7c81e83..0128d57 100644 --- a/granule_ingester/granule_ingester/processors/reading_processors/EccoReadingProcessor.py +++ b/granule_ingester/granule_ingester/processors/reading_processors/EccoReadingProcessor.py @@ -13,7 +13,7 @@ def __init__(self, variable_to_read, latitude, longitude, - tile, + tile='tile', depth=None, time=None, **kwargs): @@ -24,16 +24,12 @@ def __init__(self, self.tile = tile @staticmethod - def bid(dataset, variable, lat, lon, time): - bid = 0 - if lat == 'YC' and lon == 'XC': - bid += 1 - if lat not in dataset[variable].dims and lon not in dataset[variable].dims: - bid += 1 - if 'tile' in dataset[variable].dims: - bid += 1 - - return bid / 3 + def get_criteria(dataset: xr.Dataset, variable: str, lat: str, lon: str, time: str): + return [ + lambda: lat == 'YC' and lon == 'XC', + lambda: lat not in dataset[variable].dims and lon not in dataset[variable].dims, + lambda: 'tile' in dataset[variable].dims + ] def _generate_tile(self, ds: xr.Dataset, dimensions_to_slices: Dict[str, slice], input_tile): new_tile = nexusproto.EccoTile() diff --git a/granule_ingester/granule_ingester/processors/reading_processors/GridReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/GridReadingProcessor.py index f1bc309..27f0b46 100644 --- a/granule_ingester/granule_ingester/processors/reading_processors/GridReadingProcessor.py +++ b/granule_ingester/granule_ingester/processors/reading_processors/GridReadingProcessor.py @@ -15,15 +15,12 @@ def __init__(self, variable_to_read, latitude, longitude, depth=None, time=None, self.time = time @staticmethod - def bid(dataset, variable, lat, lon, time): - bid = 0 - if all(dimension_size > 2 for dimension_size in dataset[variable].sizes.values()): - bid += 1 - if len(dataset[lat].dims) == 1 and len(dataset[lon].dims) == 1: - bid += 1 - if len(set(dataset[variable].dims) - {time}) >= 2: - bid += 1 - return bid / 3 + def get_criteria(dataset: xr.Dataset, variable: str, lat: str, lon: str, time: str): + return [ + lambda: all(dimension_size > 2 for dimension_size in dataset[variable].sizes.values()), + lambda: len(dataset[lat].dims) == 1 and len(dataset[lon].dims) == 1, + lambda: len(set(dataset[variable].dims) - {time}) >= 2 + ] def _generate_tile(self, ds: xr.Dataset, dimensions_to_slices: Dict[str, slice], input_tile): new_tile = nexusproto.GridTile() diff --git a/granule_ingester/granule_ingester/processors/reading_processors/SwathReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/SwathReadingProcessor.py index fdc58f0..4c6fc6e 100644 --- a/granule_ingester/granule_ingester/processors/reading_processors/SwathReadingProcessor.py +++ b/granule_ingester/granule_ingester/processors/reading_processors/SwathReadingProcessor.py @@ -15,12 +15,8 @@ def __init__(self, variable_to_read, latitude, longitude, time, depth=None, **kw self.time = time @staticmethod - def bid(dataset, variable, lat, lon, time): - bid = 0 - if 2 in dataset[variable].sizes.values(): - bid += 1 - - return bid / 1 + def get_criteria(dataset: xr.Dataset, variable: str, lat: str, lon: str, time: str): + return [lambda: 2 in dataset[variable].sizes.values()] def _generate_tile(self, ds: xr.Dataset, dimensions_to_slices: Dict[str, slice], input_tile): new_tile = nexusproto.SwathTile() diff --git a/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py index 4192c36..1d2048d 100644 --- a/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py +++ b/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py @@ -32,9 +32,15 @@ def __init__(self, variable_to_read: str, latitude: str, longitude: str, *args, self.latitude = latitude self.longitude = longitude + @classmethod + def bid(cls, dataset: xr.Dataset, variable: str, lat: str, lon: str, time: str) -> bool: + criteria = cls.get_criteria(dataset, variable, lat, lon, time) + points = [1 if criterium() else 0 for criterium in criteria] + return sum(points) / len(criteria) + @staticmethod @abstractmethod - def bid(dataset: xr.Dataset) -> bool: + def get_criteria(dataset: xr.Dataset, variable: str, lat: str, lon: str, time: str): pass def process(self, tile, dataset: xr.Dataset, *args, **kwargs): diff --git a/granule_ingester/granule_ingester/processors/reading_processors/TimeSeriesReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/TimeSeriesReadingProcessor.py index c10586e..b84c08b 100644 --- a/granule_ingester/granule_ingester/processors/reading_processors/TimeSeriesReadingProcessor.py +++ b/granule_ingester/granule_ingester/processors/reading_processors/TimeSeriesReadingProcessor.py @@ -16,14 +16,11 @@ def __init__(self, variable_to_read, latitude, longitude, time, depth=None, **kw self.time = time @staticmethod - def bid(dataset, variable, lat, lon, time): - bid = 0 - if len(dataset[variable].dims) == 2: - bid += 1 - if time in dataset[variable].dims: - bid += 1 - - return bid / 2 + def get_criteria(dataset: xr.Dataset, variable: str, lat: str, lon: str, time: str): + return [ + lambda: len(dataset[variable].dims) == 2, + lambda: time in dataset[variable].dims + ] def _generate_tile(self, ds: xr.Dataset, dimensions_to_slices: Dict[str, slice], input_tile): new_tile = nexusproto.TimeSeriesTile() diff --git a/granule_ingester/tests/processors/test_ReadingProcessorSelector.py b/granule_ingester/tests/processors/test_ReadingProcessorSelector.py index c607071..558d214 100644 --- a/granule_ingester/tests/processors/test_ReadingProcessorSelector.py +++ b/granule_ingester/tests/processors/test_ReadingProcessorSelector.py @@ -49,9 +49,16 @@ def test_detect_grid_type_time_series(self): processor = selector.detect_grid_type('lat', 'lon', 'time', GRID_PROCESSORS) self.assertEqual(TimeSeriesReadingProcessor, processor) - def test_detect_grid_type_swatch(self): + def test_detect_grid_type_swath(self): netcdf_path = path.join(path.dirname(__file__), '../granules/not_empty_smap.h5') with xr.open_dataset(netcdf_path, decode_cf=True) as dataset: selector = ReadingProcessorSelector(dataset, 'smap_sss') processor = selector.detect_grid_type('lat', 'lon', 'row_time', GRID_PROCESSORS) self.assertEqual(SwathReadingProcessor, processor) + + def test_get_reading_processor(self): + netcdf_path = path.join(path.dirname(__file__), '../granules/SMAP_L2B_SSS_04892_20160101T005507_R13080.h5') + with xr.open_dataset(netcdf_path, decode_cf=True) as dataset: + selector = ReadingProcessorSelector(dataset, 'smap_sss') + processor = selector.get_reading_processor() + self.assertEqual(GridReadingProcessor, type(processor)) From 7f8693e8b9573e7889431f86257cf407ae8f72b2 Mon Sep 17 00:00:00 2001 From: Eamon Ford Date: Tue, 18 Aug 2020 11:27:54 -0700 Subject: [PATCH 5/5] wip: auto slicing --- .../resources/dataset_config_template.yml | 6 +-- .../granule_ingester/pipeline/Pipeline.py | 15 ++++++-- .../processors/ReadingProcessorSelector.py | 37 +++++++++++++++---- .../TileReadingProcessor.py | 5 ++- .../slicers/SliceFileByStepSize.py | 14 ++++--- .../granule_ingester/slicers/TileSlicer.py | 6 ++- .../test_ReadingProcessorSelector.py | 14 +++++++ 7 files changed, 72 insertions(+), 25 deletions(-) diff --git a/collection_manager/collection_manager/resources/dataset_config_template.yml b/collection_manager/collection_manager/resources/dataset_config_template.yml index d35a527..70a7643 100644 --- a/collection_manager/collection_manager/resources/dataset_config_template.yml +++ b/collection_manager/collection_manager/resources/dataset_config_template.yml @@ -1,5 +1,6 @@ granule: resource: {{granule}} +variable: {{variable}} slicer: name: sliceFileByStepSize dimension_step_sizes: @@ -7,11 +8,6 @@ slicer: lat: 30 lon: 30 processors: - - name: GridReadingProcessor - latitude: lat - longitude: lon - time: time - variable_to_read: {{variable}} - name: emptyTileFilter - name: kelvinToCelsius - name: tileSummary diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py index 03da05f..86bc617 100644 --- a/granule_ingester/granule_ingester/pipeline/Pipeline.py +++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py @@ -91,6 +91,7 @@ class Pipeline: def __init__(self, granule_loader: GranuleLoader, slicer: TileSlicer, + variable_name: str, data_store_factory, metadata_store_factory, tile_processors: List[TileProcessor], @@ -98,6 +99,7 @@ def __init__(self, self._granule_loader = granule_loader self._tile_processors = tile_processors self._slicer = slicer + self._variable_name = variable_name self._data_store_factory = data_store_factory self._metadata_store_factory = metadata_store_factory self._max_concurrency = max_concurrency @@ -142,8 +144,7 @@ def _build_pipeline(cls, slicer_config = config['slicer'] slicer = cls._parse_module(slicer_config, module_mappings) - - reading_processor_selector = ReadingProcessorSelector(**config['readingProcessorSelector']) + variable_name = config['variable'] tile_processors = [] for processor_config in config['processors']: @@ -152,6 +153,7 @@ def _build_pipeline(cls, return cls(granule_loader, slicer, + variable_name, data_store_factory, metadata_store_factory, tile_processors, @@ -177,9 +179,14 @@ async def run(self): async with self._granule_loader as (dataset, granule_name): start = time.perf_counter() + reading_processor = ReadingProcessorSelector(dataset, self._variable_name).get_reading_processor() + tile_processors = [reading_processor, *self._tile_processors] + logger.info(f"Using {type(reading_processor)} to process granule {granule_name}.") + shared_memory = self._manager.Namespace() + async with Pool(initializer=_init_worker, - initargs=(self._tile_processors, + initargs=(tile_processors, dataset, self._data_store_factory, self._metadata_store_factory, @@ -187,7 +194,7 @@ async def run(self): maxtasksperchild=self._max_concurrency, childconcurrency=self._max_concurrency) as pool: serialized_tiles = [nexusproto.NexusTile.SerializeToString(tile) for tile in - self._slicer.generate_tiles(dataset, granule_name)] + self._slicer.generate_tiles(dataset, self._variable_name, granule_name)] # aiomultiprocess is built on top of the stdlib multiprocessing library, which has the limitation that # a queue can't have more than 2**15-1 tasks. So, we have to batch it. for chunk in self._chunk_list(serialized_tiles, MAX_CHUNK_SIZE): diff --git a/granule_ingester/granule_ingester/processors/ReadingProcessorSelector.py b/granule_ingester/granule_ingester/processors/ReadingProcessorSelector.py index 411b278..a99f8be 100644 --- a/granule_ingester/granule_ingester/processors/ReadingProcessorSelector.py +++ b/granule_ingester/granule_ingester/processors/ReadingProcessorSelector.py @@ -44,17 +44,21 @@ def detect_dimensions(self): lon_regex = r'((.*\s+)?longitude(.*\s+)?)|((.*\s+)?lon(\s+.*)?)' time_regex = r'(.*\s+)?time(.*\s+)?' - lat = self._detect_dimension_name(lat_regex) - lon = self._detect_dimension_name(lon_regex) - time = self._detect_dimension_name(time_regex) + dims = self._dataset.data_vars + lat = self._find_dimension_in_list(lat_regex, dims) + lon = self._find_dimension_in_list(lon_regex, dims) + time = self._find_dimension_in_list(time_regex, dims) return (lat, lon, time) - def _detect_dimension_name(self, pattern: str) -> str: + def _find_dimension_in_list(self, pattern: str, dims: List[str], use_long_name=True) -> str: candidates = [] - for dim_name in self._dataset.data_vars: - long_name = self._dataset[dim_name].long_name - if re.match(pattern, long_name): + for dim_name in dims: + if use_long_name: + name = self._dataset[dim_name].long_name + else: + name = dim_name + if re.match(pattern, name): candidates.append(dim_name) if len(candidates) > 1: raise Exception(f"Found multiple possibilities for dimension with pattern {pattern}.") @@ -62,3 +66,22 @@ def _detect_dimension_name(self, pattern: str) -> str: if len(candidates) == 0: return None return candidates[0] + + def _detect_step_sizes(self, dataset: xr.Dataset, variable_name, slice_time=True): + dimensions = dataset[variable_name].dims + time_dim = self._find_dimension_in_list(r'(.*)?time(.*)?', dimensions, use_long_name=False) + + spatial_dims = set(dimensions[-2:]) - {time_dim} + other_dims = set(dimensions[:-2]) - {time_dim} + + spatial_step_sizes = {dim_name: 30 for dim_name in spatial_dims} + other_step_sizes = {dim_name: 1 for dim_name in other_dims} + if time_dim: + if slice_time: + time_step_size = {time_dim: 1} + else: + time_step_size = {time_dim: dataset[variable_name].sizes[time_dim]} + else: + time_step_size = {} + + return {**other_step_sizes, **spatial_step_sizes, **time_step_size} diff --git a/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py index 1d2048d..b5d5105 100644 --- a/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py +++ b/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py @@ -23,7 +23,9 @@ from granule_ingester.exceptions import TileProcessingError from granule_ingester.processors.TileProcessor import TileProcessor +import logging +logger = logging.getLogger(__name__) class TileReadingProcessor(TileProcessor, ABC): @@ -52,7 +54,8 @@ def process(self, tile, dataset: xr.Dataset, *args, **kwargs): output_tile.summary.data_var_name = self.variable_to_read return self._generate_tile(dataset, dimensions_to_slices, output_tile) - except Exception: + except Exception as e: + logger.exception(e) raise TileProcessingError("Could not generate tiles from the granule.") @abstractmethod diff --git a/granule_ingester/granule_ingester/slicers/SliceFileByStepSize.py b/granule_ingester/granule_ingester/slicers/SliceFileByStepSize.py index 9cbea44..2827c29 100644 --- a/granule_ingester/granule_ingester/slicers/SliceFileByStepSize.py +++ b/granule_ingester/granule_ingester/slicers/SliceFileByStepSize.py @@ -17,6 +17,8 @@ import logging from typing import Dict, List +import xarray as xr + from granule_ingester.exceptions import TileProcessingError from granule_ingester.slicers.TileSlicer import TileSlicer @@ -30,22 +32,22 @@ def __init__(self, super().__init__(*args, **kwargs) self._dimension_step_sizes = dimension_step_sizes - def _generate_slices(self, dimension_specs: Dict[str, int]) -> List[str]: + def _generate_slices(self, dimension_specs: Dict[str, int], step_sizes: Dict[str, int]) -> List[str]: # make sure all provided dimensions are in dataset - for dim_name in self._dimension_step_sizes.keys(): + for dim_name in step_sizes.keys(): if dim_name not in list(dimension_specs.keys()): raise TileProcessingError('Provided dimension "{}" not found in dataset'.format(dim_name)) - slices = self._generate_chunk_boundary_slices(dimension_specs) + slices = self._generate_chunk_boundary_slices(dimension_specs, step_sizes) logger.info("Sliced granule into {} slices.".format(len(slices))) return slices - def _generate_chunk_boundary_slices(self, dimension_specs) -> list: + def _generate_chunk_boundary_slices(self, dimension_specs, step_sizes) -> list: dimension_bounds = [] - dim_step_keys = self._dimension_step_sizes.keys() + dim_step_keys = step_sizes.keys() for dim_name, dim_len in dimension_specs.items(): - step_size = self._dimension_step_sizes[dim_name] if dim_name in dim_step_keys else dim_len + step_size = step_sizes[dim_name] if dim_name in dim_step_keys else dim_len bounds = [] for i in range(0, dim_len, step_size): diff --git a/granule_ingester/granule_ingester/slicers/TileSlicer.py b/granule_ingester/granule_ingester/slicers/TileSlicer.py index 06cf094..cdf67c4 100644 --- a/granule_ingester/granule_ingester/slicers/TileSlicer.py +++ b/granule_ingester/granule_ingester/slicers/TileSlicer.py @@ -18,6 +18,7 @@ import xarray as xr from nexusproto.DataTile_pb2 import NexusTile +import re class TileSlicer(ABC): @@ -44,10 +45,11 @@ def __next__(self) -> NexusTile: tile.summary.granule = self._granule_name return tile - def generate_tiles(self, dataset: xr.Dataset, granule_name: str = None): + def generate_tiles(self, dataset: xr.Dataset, variable_name: str, granule_name: str = None): self._granule_name = granule_name dimensions = dataset.dims - self._tile_spec_list = self._generate_slices(dimensions) + step_sizes = self._detect_step_sizes(dataset, variable_name) + self._tile_spec_list = self._generate_slices(dimensions, step_sizes) return self diff --git a/granule_ingester/tests/processors/test_ReadingProcessorSelector.py b/granule_ingester/tests/processors/test_ReadingProcessorSelector.py index 558d214..2d722e4 100644 --- a/granule_ingester/tests/processors/test_ReadingProcessorSelector.py +++ b/granule_ingester/tests/processors/test_ReadingProcessorSelector.py @@ -62,3 +62,17 @@ def test_get_reading_processor(self): selector = ReadingProcessorSelector(dataset, 'smap_sss') processor = selector.get_reading_processor() self.assertEqual(GridReadingProcessor, type(processor)) + + def test_detect_step_sizes_smap(self): + netcdf_path = path.join(path.dirname(__file__), '../granules/SMAP_L2B_SSS_04892_20160101T005507_R13080.h5') + with xr.open_dataset(netcdf_path, decode_cf=True) as dataset: + selector = ReadingProcessorSelector(dataset, 'smap_sss') + step_sizes = selector._detect_step_sizes(dataset, 'smap_sss') + self.assertEqual({'phony_dim_0': 30, 'phony_dim_1': 30}, step_sizes) + + def test_detect_step_sizes_timeseries(self): + netcdf_path = path.join(path.dirname(__file__), '../granules/not_empty_wswm.nc') + with xr.open_dataset(netcdf_path, decode_cf=True) as dataset: + selector = ReadingProcessorSelector(dataset, 'Qout') + step_sizes = selector._detect_step_sizes(dataset, 'Qout', slice_time=False) + self.assertEqual({'phony_dim_0': 30, 'phony_dim_1': 30}, step_sizes)