diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py index 1226351..4debac7 100644 --- a/collection_manager/collection_manager/services/CollectionWatcher.py +++ b/collection_manager/collection_manager/services/CollectionWatcher.py @@ -30,7 +30,7 @@ def __init__(self, s3_bucket: Optional[str] = None, collections_refresh_interval: float = 30): if not os.path.isabs(collections_path): - raise RelativePathError("Collections config path must be an absolute path.") + raise RelativePathError("Collections config path must be an absolute path.") self._collections_path = collections_path self._granule_updated_callback = granule_updated_callback diff --git a/collection_manager/collection_manager/services/ZarrCollectionProcessor.py b/collection_manager/collection_manager/services/ZarrCollectionProcessor.py new file mode 100644 index 0000000..734e252 --- /dev/null +++ b/collection_manager/collection_manager/services/ZarrCollectionProcessor.py @@ -0,0 +1,112 @@ +import logging +import os.path +from glob import glob +from typing import Dict +from datetime import datetime + +import yaml +from collection_manager.entities import Collection +from collection_manager.services import MessagePublisher +from collection_manager.services.history_manager import (GranuleStatus, + IngestionHistory) +from collection_manager.services.history_manager.IngestionHistory import \ + IngestionHistoryBuilder + +logger = logging.getLogger(__name__) + +SUPPORTED_FILE_EXTENSIONS = ['.nc', '.nc4', '.h5'] + +# TODO generate tests () +class ZarrCollectionProcessor: + + def __init__(self, message_publisher: MessagePublisher, history_manager_builder: IngestionHistoryBuilder): + self._publisher = message_publisher + self._history_manager_builder = history_manager_builder + self._history_manager_cache: Dict[str, IngestionHistory] = {} + + async def process_granule(self, granule: str, modified_time: int, collection: Collection): + """ + Determine whether a granule needs to be ingested, and if so publish a RabbitMQ message for it. + :param granule: A path to a granule file + :param collection: A Collection against which to evaluate the granule + :return: None + """ + if not self._file_supported(granule): + return + + history_manager = self._get_history_manager(collection.dataset_id) + granule_status = await history_manager.get_granule_status(granule, + modified_time, + collection.date_from, + collection.date_to) + + if granule_status is GranuleStatus.DESIRED_FORWARD_PROCESSING: + logger.info(f"New granule '{granule}' detected for forward-processing ingestion " + f"in collection '{collection.dataset_id}'.") + if collection.forward_processing_priority is not None: + use_priority = collection.forward_processing_priority + else: + use_priority = collection.historical_priority + elif granule_status is GranuleStatus.DESIRED_HISTORICAL: + logger.info(f"New granule '{granule}' detected for historical ingestion in collection " + f"'{collection.dataset_id}'.") + use_priority = collection.historical_priority + else: + logger.debug(f"Granule '{granule}' detected but has already been ingested in " + f"collection '{collection.dataset_id}'. Skipping.") + return + + dataset_config = self._generate_ingestion_message(granule, collection) + await self._publisher.publish_message(body=dataset_config, priority=use_priority) + await history_manager.push(granule, modified_time) + + @staticmethod + def _file_supported(file_path: str): + ext = os.path.splitext(file_path)[-1] + return ext in SUPPORTED_FILE_EXTENSIONS + + def _get_history_manager(self, dataset_id: str) -> IngestionHistory: + if dataset_id not in self._history_manager_cache: + self._history_manager_cache[dataset_id] = self._history_manager_builder.build(dataset_id=dataset_id) + return self._history_manager_cache[dataset_id] + + + @staticmethod + def _get_default_processors(collection: Collection): + processors = [ + { + 'name': collection.projection, + **dict(collection.dimension_names), + }, + {'name': 'emptyTileFilter'}, + {'name': 'subtract180FromLongitude'} + ] + + if collection.projection == 'Grid': + processors.append({'name': 'forceAscendingLatitude'}) + processors.append({'name': 'kelvinToCelsius'}) + processors.append({ + 'name': 'tileSummary', + 'dataset_name': collection.dataset_id + }) + processors.append({'name': 'generateTileId'}) + + return processors + + + @staticmethod + def _generate_ingestion_message(granule_path: str, collection: Collection) -> str: + + config_dict = { + 'granule': { + 'resource': granule_path + }, + 'slicer': { + 'name': 'sliceFileByStepSize', + 'dimension_step_sizes': dict(collection.slices) + }, + 'processors': ZarrCollectionProcessor._get_default_processors(collection) + } + config_str = yaml.dump(config_dict) + logger.debug(f"Templated dataset config:\n{config_str}") + return config_str diff --git a/collection_manager/collection_manager/services/__init__.py b/collection_manager/collection_manager/services/__init__.py index 553e1b7..f977400 100644 --- a/collection_manager/collection_manager/services/__init__.py +++ b/collection_manager/collection_manager/services/__init__.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from .CollectionProcessor import CollectionProcessor +from .ZarrCollectionProcessor import ZarrCollectionProcessor from .CollectionProcessor import CollectionProcessor from .CollectionWatcher import CollectionWatcher from .MessagePublisher import MessagePublisher diff --git a/collection_manager/collection_manager/services/history_manager/ZarrSolrIngestionHistory.py b/collection_manager/collection_manager/services/history_manager/ZarrSolrIngestionHistory.py new file mode 100644 index 0000000..8e277f8 --- /dev/null +++ b/collection_manager/collection_manager/services/history_manager/ZarrSolrIngestionHistory.py @@ -0,0 +1,210 @@ +from gc import collect +import hashlib +from importlib.resources import path +import logging +from tkinter import W +from attr import s + +import pysolr +import requests +from collection_manager.services.history_manager.IngestionHistory import (IngestionHistory, IngestionHistoryBuilder) +from collection_manager.entities.Collection import Collection +from collections import defaultdict +from common.async_utils.AsyncUtils import run_in_executor +from typing import Awaitable, Callable, Dict, List, Optional, Set + +import yaml + +logging.getLogger("pysolr").setLevel(logging.WARNING) +logger = logging.getLogger(__name__) + + +def doc_key(dataset_id, file_name): + return hashlib.sha1(f'{dataset_id}{file_name}'.encode('utf-8')).hexdigest() + + +class ZarrSolrIngestionHistoryBuilder(IngestionHistoryBuilder): + def __init__(self, solr_url: str, collections_path: str, signature_fun=None): + self._solr_url = solr_url + self._signature_fun = signature_fun + self.collections_path = collections_path + + def build(self, dataset_id: str): + return ZarrSolrIngestionHistory(solr_url=self._solr_url, + dataset_id=dataset_id, + collections_path=self.collections_path, + signature_fun=self._signature_fun) + + +class ZarrSolrIngestionHistory(IngestionHistory): + _granule_collection_name = "zarrgranules" + _dataset_collection_name = "zarrdatasets" + _req_session = None + + def __init__(self, solr_url: str, dataset_id: str, collections_path: str, signature_fun=None): + try: + self._url_prefix = f"{solr_url.strip('/')}/solr" + # TODO check method + self._create_collection_if_needed() + self.collections_path = collections_path + self.collections_by_dir: Dict[collections_path, Set[Collection]] = defaultdict(set) + self._solr_granules = pysolr.Solr(f"{self._url_prefix}/{self._granule_collection_name}") + self._solr_datasets = pysolr.Solr(f"{self._url_prefix}/{self._dataset_collection_name}") + self._dataset_id = dataset_id + self._signature_fun = signature_fun + self._latest_ingested_file_update = self._get_latest_file_update() + self._solr_url = solr_url + self.collections_path = collections_path + except requests.exceptions.RequestException: + raise DatasetIngestionHistorySolrException(f"solr instance unreachable {solr_url}") + + def __del__(self): + self._req_session.close() + + @run_in_executor + def load_dataset_metadata(self): # retrieve metadata from respective dataset in config yaml + try: + with open(self._collections_path, 'r') as f: + collections_yaml = yaml.load(f, Loader=yaml.FullLoader) + self._collections_by_dir.clear() + for collection_dict in collections_yaml['collections']: + try: + collection = Collection.from_dict(collection_dict) + if collection['id'] == self.dataset_id: + return collection + except: + print("INNER LOOP ERROR") #TODO add error handling + except: + print("OUTER LOOP ERROR") #TODO add errors handling + + return None + + def retrieve_variable_lists(self, collection): # returns array of lists with variable and their fill values + var_arr = [{"name_s": collection['dimensionNames']['variable'], + "fill_d": collection['dimensionNames']['fill_value']}] + return var_arr + + def retrieve_chunk_size(self, collection): + chunkSize = [collection['slices']['time'], collection['slices']['lat'], + collection['slices']['lon']] + return chunkSize + + @run_in_executor + def _push_record(self, file_name, signature): # granule-level JSON entry + hash_id = doc_key(self._dataset_id, file_name) + self._solr_granules.delete(q=f"id:{hash_id}") + self._solr_granules.add([{ + 'id': hash_id, + 'dataset_s': self._dataset_id, + 'granule_s': file_name, + 'granule_signature_s': signature}]) + self._solr_granules.commit() + return None + + @run_in_executor + def _save_latest_timestamp(self): # dataset-level JSON entry + if self._solr_datasets: + collection = self.load_dataset_metadata + self._solr_datasets.delete(q=f"id:{self._dataset_id}") + chunkSize = [collection['slices']['time'], collection['slices']['lat'], + collection['slices']['lon']] + self._solr_datasets.add([{ + 'id': self._dataset_id, + 'latest_update_l': self._latest_ingested_file_update, + 'dataset_s': self._dataset_id, + 'variables': self.retrieve_variable_lists(collection), + 's3_url_s': collection['path'], + 'public_b': False, # TODO support for public buckets, make this dynamic + 'type_s': collection['projection'], + 'chunk_shape': self.retrieve_chunk_size(collection)}]) + self._solr_datasets.commit() + + def _get_latest_file_update(self): + results = self._solr_datasets.search(q=f"id:{self._dataset_id}") + if results: + return results.docs[0]['latest_update_l'] + else: + return None + + @run_in_executor + def _get_signature(self, file_name): + hash_id = doc_key(self._dataset_id, file_name) + results = self._solr_granules.search(q=f"id:{hash_id}") + if results: + return results.docs[0]['granule_signature_s'] + else: + return None + + def _create_collection_if_needed(self): + try: + if not self._req_session: + self._req_session = requests.session() + + payload = {'action': 'CLUSTERSTATUS'} + collections_endpoint = f"{self._url_prefix}/admin/collections" + result = self._req_session.get(collections_endpoint, params=payload) + response = result.json() + node_number = len(response['cluster']['live_nodes']) + + existing_collections = response['cluster']['collections'].keys() + + if self._granule_collection_name not in existing_collections: + # Create collection + payload = {'action': 'CREATE', + 'name': self._granule_collection_name, + 'numShards': node_number + } + result = self._req_session.get(collections_endpoint, params=payload) + response = result.json() + logger.info(f"solr collection created {response}") + + # Update schema + schema_endpoint = f"{self._url_prefix}/{self._granule_collection_name}/schema" + self._add_field(schema_endpoint, "dataset_s", "string") + self._add_field(schema_endpoint, "granule_s", "string") + self._add_field(schema_endpoint, "granule_signature_s", "string") + + if self._dataset_collection_name not in existing_collections: + # Create collection + payload = {'action': 'CREATE', + 'name': self._dataset_collection_name, + 'numShards': node_number + } + result = self._req_session.get(collections_endpoint, params=payload) + response = result.json() + logger.info(f"solr collection created {response}") + + # Update schema + schema_endpoint = f"{self._url_prefix}/{self._dataset_collection_name}/schema" + self._add_field(schema_endpoint, "latest_update_l", "TrieLongField") # TODO TrieLongField is depricated + self._add_field(schema_endpoint, "dataset_s", "string") + self._add_field(schema_endpoint, "variables", "list") + self._add_field(schema_endpoint, "s2_uri_s", "string") + self._add_field(schema_endpoint, "public_b", "bool") + self._add_field(schema_endpoint, "type_s", "string") + self._add_field(schema_endpoint, "chunk_shape", "list") + except requests.exceptions.RequestException as e: + logger.error(f"solr instance unreachable {self._solr_url}") + raise e + + def _add_field(self, schema_url, field_name, field_type): + """ + Helper to add a string field in a solr schema + :param schema_url: + :param field_name: + :param field_type + :return: + """ + add_field_payload = { + "add-field": { + "name": field_name, + "type": field_type, + "stored": False + } + } + return self._req_session.post(schema_url, data=str(add_field_payload).encode('utf-8')) + + +class DatasetIngestionHistorySolrException(Exception): + pass + \ No newline at end of file diff --git a/collection_manager/collection_manager/services/history_manager/__init__.py b/collection_manager/collection_manager/services/history_manager/__init__.py index 0f6133b..fc14b10 100644 --- a/collection_manager/collection_manager/services/history_manager/__init__.py +++ b/collection_manager/collection_manager/services/history_manager/__init__.py @@ -2,3 +2,4 @@ from .IngestionHistory import GranuleStatus from .IngestionHistory import IngestionHistory, md5sum_from_filepath from .SolrIngestionHistory import SolrIngestionHistory, SolrIngestionHistoryBuilder +from .ZarrSolrIngestionHistory import ZarrSolrIngestionHistory, ZarrSolrIngestionHistoryBuilder diff --git a/collection_manager/collection_manager/zarr_main.py b/collection_manager/collection_manager/zarr_main.py new file mode 100644 index 0000000..a85c8bf --- /dev/null +++ b/collection_manager/collection_manager/zarr_main.py @@ -0,0 +1,100 @@ +import argparse +import asyncio +import logging +import os + +from collection_manager.services import (CollectionProcessor, + CollectionWatcher, MessagePublisher) +from collection_manager.services.history_manager import ( + FileIngestionHistoryBuilder, ZarrSolrIngestionHistoryBuilder, + md5sum_from_filepath) + +logging.basicConfig(level=logging.DEBUG, format="%(asctime)s [%(levelname)s] [%(name)s::%(lineno)d] %(message)s") +logging.getLogger("pika").setLevel(logging.WARNING) +logger = logging.getLogger(__name__) + + +def check_path(path) -> str: + if not os.path.isabs(path): + raise argparse.ArgumentError("Paths must be absolute.") + return path + + +def get_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Watch the filesystem for new granules, and publish messages to " + "RabbitMQ whenever they become available.") + parser.add_argument("--collections-path", + help="Absolute path to collections configuration file", + metavar="PATH", + required=True) + history_group = parser.add_mutually_exclusive_group(required=True) + history_group.add_argument("--history-path", + metavar="PATH", + help="Absolute path to ingestion history local directory") + history_group.add_argument("--history-url", + metavar="URL", + help="URL to ingestion history solr database") + parser.add_argument('--rabbitmq-host', + default='localhost', + metavar='HOST', + help='RabbitMQ hostname to connect to. (Default: "localhost")') + parser.add_argument('--rabbitmq-username', + default='guest', + metavar='USERNAME', + help='RabbitMQ username. (Default: "guest")') + parser.add_argument('--rabbitmq-password', + default='guest', + metavar='PASSWORD', + help='RabbitMQ password. (Default: "guest")') + parser.add_argument('--rabbitmq-queue', + default="nexus", + metavar="QUEUE", + help='Name of the RabbitMQ queue to consume from. (Default: "nexus")') + parser.add_argument('--refresh', + default='30', + metavar="INTERVAL", + help='Number of seconds after which to reload the collections config file. (Default: 30)') + parser.add_argument('--s3-bucket', + metavar='S3-BUCKET', + help='Optional name of an AWS S3 bucket where granules are stored. If this option is set, then all collections to be scanned must have their granules on S3, not the local filesystem.') + + return parser.parse_args() + + +async def main(): + try: + options = get_args() + + signature_fun = None if options.s3_bucket else md5sum_from_filepath + + if options.history_path: + history_manager_builder = FileIngestionHistoryBuilder(history_path=options.history_path, + signature_fun=signature_fun) + else: + history_manager_builder = ZarrSolrIngestionHistoryBuilder(solr_url=options.history_url, + signature_fun=signature_fun) + async with MessagePublisher(host=options.rabbitmq_host, + username=options.rabbitmq_username, + password=options.rabbitmq_password, + queue=options.rabbitmq_queue) as publisher: + collection_processor = CollectionProcessor(message_publisher=publisher, + history_manager_builder=history_manager_builder) + collection_watcher = CollectionWatcher(collections_path=options.collections_path, + granule_updated_callback=collection_processor.process_granule, + collections_refresh_interval=int(options.refresh), + s3_bucket=options.s3_bucket) + + await collection_watcher.start_watching() + while True: + try: + await asyncio.sleep(1) + except KeyboardInterrupt: + return + + except Exception as e: + logger.exception(e) + return + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/collection_manager/requirements.txt b/collection_manager/requirements.txt index c4b6323..9e14605 100644 --- a/collection_manager/requirements.txt +++ b/collection_manager/requirements.txt @@ -1,9 +1,10 @@ PyYAML==5.3.1 -pystache==0.5.4 +pystache==0.6.0 pysolr==3.9.0 watchdog==0.10.2 requests==2.23.0 tenacity==6.2.0 aioboto3==8.0.5 aiohttp==3.7.2 -aio-pika==6.7.1 \ No newline at end of file +aio-pika==6.7.1 +#setuptools==57.5.0 \ No newline at end of file diff --git a/collection_manager/tests/resources/zarr_collections.yml b/collection_manager/tests/resources/zarr_collections.yml new file mode 100644 index 0000000..376245f --- /dev/null +++ b/collection_manager/tests/resources/zarr_collections.yml @@ -0,0 +1,50 @@ +collections: + - id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_LAND + path: /opt/data/grace/*land*.nc + priority: 1 + forward-processing-priority: 5 + projection: Grid + dimensionNames: + latitude: lat + longitude: lon + time: time + variable: lwe_thickness + fill_value: -99999 # need to change + slices: + time: 1 + lat: 30 + lon: 30 + + + - id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_OCEAN + path: /opt/data/grace/*ocean*.nc + priority: 2 + forward-processing-priority: 6 + projection: Grid + dimensionNames: + latitude: lat + longitude: lon + time: time + variable: lwe_thickness + fill_value: -99999 # need to change + slices: + time: 1 + lat: 30 + lon: 30 + + + - id: AVHRR_OI-NCEI-L4-GLOB-v2.0 + path: /opt/data/avhrr/*.nc + priority: 1 + projection: Grid + dimensionNames: + latitude: lat + longitude: lon + time: time + variable: analysed_sst + fill_value: -32768 # need to change + slices: + time: 1 + lat: 30 + lon: 30 + diff --git a/collection_manager/tests/services/history_manager/test_SolrIngestionHistory.py b/collection_manager/tests/services/history_manager/test_SolrIngestionHistory.py deleted file mode 100644 index deab42d..0000000 --- a/collection_manager/tests/services/history_manager/test_SolrIngestionHistory.py +++ /dev/null @@ -1,44 +0,0 @@ -import unittest -from collection_manager.services.history_manager import SolrIngestionHistory - -SOLR_URL = "http://localhost:8984/solr" -DATASET_ID = "zobi_la_mouche" - - -# TODO: mock solr and fix these tests -class TestSolrIngestionHistory(unittest.TestCase): - @unittest.skip("does not work without a solr server for history_manager") - def test_get(self): - ingestion_history = SolrIngestionHistory(SOLR_URL, DATASET_ID) - - ingestion_history.push("blue", "12weeukrhbwerqu7wier") - - result = ingestion_history.get("blue") - - self.assertEqual(result.docs[0]['dataset_s'], "zobi_la_mouche") - self.assertEqual(result.docs[0]['granule_s'], "blue") - self.assertEqual(result.docs[0]['granule_md5sum_s'], "12weeukrhbwerqu7wier") - - @unittest.skip("does not work without a solr server for history_manager") - def test_get_md5sum(self): - ingestion_history = SolrIngestionHistory(SOLR_URL, DATASET_ID) - - ingestion_history.push("blue", "12weeukrhbwerqu7wier") - - result = ingestion_history.get_md5sum("blue") - - self.assertEqual(result, "12weeukrhbwerqu7wier") - - @unittest.skip("does not work without a solr server for history_manager") - def test_get_missing_md5sum(self): - ingestion_history = SolrIngestionHistory(SOLR_URL, DATASET_ID) - - ingestion_history.push("blue", "12weeukrhbwerqu7wier") - - result = ingestion_history.get_md5sum("green") - - self.assertEqual(result, None) - - -if __name__ == '__main__': - unittest.main() diff --git a/granule_ingester/README.md b/granule_ingester/README.md index 1339835..786224c 100644 --- a/granule_ingester/README.md +++ b/granule_ingester/README.md @@ -9,26 +9,32 @@ data to Cassandra and Solr. ## Prerequisites -Python 3.7 +Python 3.7 (```conda install -c anaconda python=3.7``` in conda env) ## Building the service From `incubator-sdap-ingester`, run: - $ cd common && python setup.py install - $ cd ../granule_ingester && python setup.py install - -## Launching the service -From `incubator-sdap-ingester`, run: + cd common && python setup.py install + cd ../granule_ingester && python setup.py install + +# Install nexusproto + +Clone repo + + git clone https://github.com/apache/incubator-sdap-nexusproto.git - $ python granule_ingester/granule_ingester/main.py -h +From `incubator-sdap-nexus-proto`, run: + + cd build/python/nexusproto && python setup.py install -## Running the tests +## Launching the service From `incubator-sdap-ingester`, run: - $ cd common && python setup.py install - $ cd ../granule_ingester && python setup.py install - $ pip install pytest && pytest - + python granule_ingester/granule_ingester/main.py -h + +In order to successfully run the service, you will need to have a Cassandra, +Solr, and RabbitMQ connection. Make sure to provide their respective credentials. + ## Building the Docker image From `incubator-sdap-ingester`, run: diff --git a/granule_ingester/conda-requirements.txt b/granule_ingester/conda-requirements.txt index a1e4206..98aec50 100644 --- a/granule_ingester/conda-requirements.txt +++ b/granule_ingester/conda-requirements.txt @@ -8,4 +8,6 @@ pyyaml==5.3.1 aiohttp==3.6.2 tenacity requests==2.27.1 - +pathtools==0.1.2 +numcodecs==0.9.1 +zarr diff --git a/granule_ingester/granule_ingester/granule_loaders/GranuleLoader.py b/granule_ingester/granule_ingester/granule_loaders/GranuleLoader.py index 6377de0..53519c1 100644 --- a/granule_ingester/granule_ingester/granule_loaders/GranuleLoader.py +++ b/granule_ingester/granule_ingester/granule_loaders/GranuleLoader.py @@ -16,6 +16,7 @@ import logging import os import tempfile +from typing import Tuple from urllib import parse import aioboto3 @@ -40,8 +41,8 @@ async def __aenter__(self): async def __aexit__(self, type, value, traceback): if self._granule_temp_file: self._granule_temp_file.close() - - async def open(self) -> (xr.Dataset, str): + # @TODO causes error. Test to see if this Tuple change works with code + async def open(self) -> Tuple[xr.Dataset, str]: resource_url = parse.urlparse(self._resource) if resource_url.scheme == 's3': # We need to save a reference to the temporary granule file so we can delete it when the context manager diff --git a/granule_ingester/granule_ingester/processors/ZarrProcessor.py b/granule_ingester/granule_ingester/processors/ZarrProcessor.py new file mode 100644 index 0000000..da5e673 --- /dev/null +++ b/granule_ingester/granule_ingester/processors/ZarrProcessor.py @@ -0,0 +1,114 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import datetime +import json +import logging +from abc import ABC, abstractmethod +from time import time +from typing import Dict, Union, List + +import numpy as np +import xarray as xr +from nexusproto import DataTile_pb2 as nexusproto + +from granule_ingester.exceptions import TileProcessingError +from granule_ingester.processors.TileProcessor import TileProcessor + +logger = logging.getLogger(__name__) + + +class ZarrProcessor(): + + def __init__(self, variable: Union[str, list], latitude: str, longitude: str, time=None, *args, **kwargs): + try: + # TODO variable in test cases are being passed in as just lists, and is not passable through json.loads() + self.variable = json.loads(variable) + except Exception as e: + logger.exception(f'failed to convert literal list to python list. using as a single variable: {variable}') + self.variable = variable + if isinstance(self.variable, list) and len(self.variable) < 1: + logger.error(f'variable list is empty: {self}') + raise RuntimeError(f'variable list is empty: {self.variable}') + self.latitude = latitude + self.longitude = longitude + self.time = time + + + def process(self, granule: xr.Dataset, process_list: List, *args, **kwargs): + for processes in process_list['processors']: + logger.debug(f'Reading Processor: {type(processes)}') + try: + # grab ingestion message's processors + processName = processes['name'] + # TODO process granule via methods in this class + except Exception as e: + raise TileProcessingError(f"Could not generate tiles from the granule because of the following error: {e}.") + # returns granule. Passed into writers, which then push into S3 + return granule + + @classmethod + def _parse_input(cls, the_input_tile, temp_dir): + specs = the_input_tile.summary.section_spec + tile_specifications = cls._convert_spec_to_slices(specs) + + file_path = the_input_tile.summary.granule + file_path = file_path[len('file:'):] if file_path.startswith('file:') else file_path + + return tile_specifications, file_path + + @staticmethod + def _slices_for_variable(variable: xr.DataArray, dimension_to_slice: Dict[str, slice]) -> Dict[str, slice]: + return {dim_name: dimension_to_slice[dim_name] for dim_name in variable.dims} + + @staticmethod + def _convert_spec_to_slices(spec): + dim_to_slice = {} + for dimension in spec.split(','): + name, start, stop = dimension.split(':') + dim_to_slice[name] = slice(int(start), int(stop)) + + return dim_to_slice + + @staticmethod + def _convert_to_timestamp(times: xr.DataArray) -> xr.DataArray: + if times.dtype == np.float32: + return times + epoch = np.datetime64(datetime.datetime(1970, 1, 1, 0, 0, 0)) + return ((times - epoch) / 1e9).astype(int) + + # TODO passing in granule with single variable or with all? NEEDS TESTING + def kelvinToCelsius(self, granule: xr.Dataset): + for dataVar in self.variable: + logger.debug(f'converting kelvin to celsius for variable {dataVar}') + data_var = granule[dataVar] + + var_celsius = data_var - 273.15 + var_attrs = data_var.attrs + var_celsius.attrs = var_attrs + var_celsius.attrs['units'] = 'celsius' + granule[dataVar] = var_celsius + + # TODO needs testing + def forceAscendingLatitude(self, granule: xr.Dataset): + granule = granule.sortby('lat', ascending=False) + + # TODO below methods needs implementation + def GenerateTileId(self, granule: xr.Dataset): + pass + + def Subtract180FromLongitude(self, granule: xr.Dataset): + pass + + diff --git a/granule_ingester/granule_ingester/processors/__init__.py b/granule_ingester/granule_ingester/processors/__init__.py index 174a833..8b0c2ac 100644 --- a/granule_ingester/granule_ingester/processors/__init__.py +++ b/granule_ingester/granule_ingester/processors/__init__.py @@ -5,3 +5,5 @@ from granule_ingester.processors.kelvintocelsius import KelvinToCelsius from granule_ingester.processors.Subtract180FromLongitude import Subtract180FromLongitude from granule_ingester.processors.ForceAscendingLatitude import ForceAscendingLatitude +from granule_ingester.processors.NetCDFProcessor import NetCDFProcessor +from granule_ingester.processors.ZarrProcessor import ZarrProcessor diff --git a/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py index 68561e2..e3fa8ec 100644 --- a/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py +++ b/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py @@ -32,6 +32,7 @@ class TileReadingProcessor(TileProcessor, ABC): def __init__(self, variable: Union[str, list], latitude: str, longitude: str, *args, **kwargs): try: + # TODO variable in test cases are being passed in as just lists, and is not passable through json.loads() self.variable = json.loads(variable) except Exception as e: logger.exception(f'failed to convert literal list to python list. using as a single variable: {variable}') @@ -54,11 +55,7 @@ def process(self, tile, dataset: xr.Dataset, *args, **kwargs): return self._generate_tile(dataset, dimensions_to_slices, output_tile) except Exception as e: raise TileProcessingError(f"Could not generate tiles from the granule because of the following error: {e}.") - - @abstractmethod - def _generate_tile(self, dataset: xr.Dataset, dimensions_to_slices: Dict[str, slice], tile): - pass - + @classmethod def _parse_input(cls, the_input_tile, temp_dir): specs = the_input_tile.summary.section_spec diff --git a/granule_ingester/granule_ingester/writers/LocalStore.py b/granule_ingester/granule_ingester/writers/LocalStore.py new file mode 100644 index 0000000..e738e67 --- /dev/null +++ b/granule_ingester/granule_ingester/writers/LocalStore.py @@ -0,0 +1,41 @@ +# LocalStore +# Purpose: Writes netCDF4 file into a machine local directory +# Written by: Ricky Fok +from os import path +import asyncio +import logging +from typing import Tuple +#from msilib.schema import Error +import traceback +import logging +import xarray as xr +from numcodecs import Blosc + +from granule_ingester.exceptions import CassandraFailedHealthCheckError, CassandraLostConnectionError +from granule_ingester.writers import netCDF_Store + +logger = logging.getLogger(__name__) + +class LocalStore(netCDF_Store): + def __init__(self, store_path: str): + self.store_path = store_path + + #async def health_check(self) -> bool: + #try: + #session = self._get_session() + #session.shutdown() + #return True + #except Exception: + #raise CassandraFailedHealthCheckError("Cannot connect to Cassandra!") + + def save_data(self, ds: xr.Dataset, cname: str, clevel: int, shuffle: int, chunkShape: Tuple[int, int, int]) -> None: + compressor = Blosc(cname = cname, clevel = clevel, shuffle = shuffle) + # @TODO be able to customize encoding for each dava variable + encoding = {vname: {'compressor': compressor, 'chunks': chunkShape} for vname in ds.data_vars} + + try: + ds.to_zarr(self.store_path, encoding = encoding, consolidated=True , mode='w') + # @TODO update metadata in master Zarr and place error checks for invalid paths. + except Exception as e: + logging.error(traceback.format_exc()) + \ No newline at end of file diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py b/granule_ingester/granule_ingester/writers/SolrStore.py index 5c5f088..2ee1f81 100644 --- a/granule_ingester/granule_ingester/writers/SolrStore.py +++ b/granule_ingester/granule_ingester/writers/SolrStore.py @@ -16,6 +16,7 @@ import asyncio import functools import json +import netCDF4 import logging from asyncio import AbstractEventLoop from datetime import datetime @@ -111,6 +112,11 @@ async def save_metadata(self, nexus_tile: NexusTile) -> None: logger.debug(f'solr_doc: {solr_doc}') await self._save_document(solr_doc) + async def save_metadata_cdfDS(self, ds: netCDF4._netCDF4) -> None: + solr_doc = self._build_solr_doc(nexus_tile) + logger.debug(f'solr_doc: {solr_doc}') + await self._save_document(solr_doc) + @run_in_executor def _save_document(self, doc: dict): try: @@ -120,6 +126,22 @@ def _save_document(self, doc: dict): raise SolrLostConnectionError(f'Lost connection to Solr, and cannot save tiles. cause: {e}') def _build_solr_doc(self, tile: NexusTile) -> Dict: + + #{ + #"id": "MUR25-JPL-L4-GLOB-v04.2", + #"latest_update_l": 1637629358, + #"_version_": 1718445323844583426, + #"dataset_s": "MUR25-JPL-L4-GLOB-v04.2", + #"variables": [{ + #"name_s": "analysed_sst", + #"fill_d": -32768 + #}], + #"s3_uri_s": "s3://cdms-dev-zarr/MUR25-JPL-L4-GLOB-v04.2/", + #"public_b": false, + #"type_s": "gridded", + #"chunk_shape": [30, 120, 240] + #} + summary: TileSummary = tile.summary bbox: TileSummary.BBox = summary.bbox stats: TileSummary.DataStats = summary.stats diff --git a/granule_ingester/granule_ingester/writers/__init__.py b/granule_ingester/granule_ingester/writers/__init__.py index 9323d8c..9ab0b60 100644 --- a/granule_ingester/granule_ingester/writers/__init__.py +++ b/granule_ingester/granule_ingester/writers/__init__.py @@ -2,3 +2,6 @@ from granule_ingester.writers.MetadataStore import MetadataStore from granule_ingester.writers.SolrStore import SolrStore from granule_ingester.writers.CassandraStore import CassandraStore +from granule_ingester.writers.netCDF_Store import netCDF_Store +from granule_ingester.writers.LocalStore import LocalStore +from granule_ingester.writers.netCDF_Store import netCDF_Store \ No newline at end of file diff --git a/granule_ingester/granule_ingester/writers/netCDF_Store.py b/granule_ingester/granule_ingester/writers/netCDF_Store.py new file mode 100644 index 0000000..ede59c2 --- /dev/null +++ b/granule_ingester/granule_ingester/writers/netCDF_Store.py @@ -0,0 +1,13 @@ +from abc import ABC, abstractmethod + +import xarray as xr + +from granule_ingester.healthcheck import HealthCheck + + +class netCDF_Store(ABC): + + @abstractmethod + def save_data(self, ds: xr.Dataset) -> None: + pass + \ No newline at end of file diff --git a/granule_ingester/tests/granules/TROPOMI_methane_mixing_ratio_20200801.nc b/granule_ingester/tests/granules/TROPOMI_methane_mixing_ratio_20200801.nc new file mode 100644 index 0000000..43dd0da Binary files /dev/null and b/granule_ingester/tests/granules/TROPOMI_methane_mixing_ratio_20200801.nc differ diff --git a/granule_ingester/tests/granules/TROPOMI_methane_mixing_ratio_20200802.nc b/granule_ingester/tests/granules/TROPOMI_methane_mixing_ratio_20200802.nc new file mode 100644 index 0000000..00ef97b Binary files /dev/null and b/granule_ingester/tests/granules/TROPOMI_methane_mixing_ratio_20200802.nc differ diff --git a/granule_ingester/tests/writers/test_LocalStore.py b/granule_ingester/tests/writers/test_LocalStore.py new file mode 100644 index 0000000..da54960 --- /dev/null +++ b/granule_ingester/tests/writers/test_LocalStore.py @@ -0,0 +1,100 @@ +from tkinter import W +import unittest +from os import path +import xarray as xr +import netCDF4 +import json +from typing import Dict +from datetime import datetime +from pathlib import Path +#from nexusproto import DataTile_pb2 as nexusproto + +from granule_ingester.processors.reading_processors import LazyLoadProcessor +from granule_ingester.writers import LocalStore + +#def _build_solr_doc(self, ds: netCDF4._netCDF4) -> Dict: + #summary: TileSummary = tile.summary + #bbox: TileSummary.BBox = summary.bbox + #stats: TileSummary.DataStats = summary.stats + + #min_time = datetime.strftime(datetime.utcfromtimestamp(stats.min_time), self.iso) + #max_time = datetime.strftime(datetime.utcfromtimestamp(stats.max_time), self.iso) + #day_of_year = datetime.utcfromtimestamp(stats.min_time).timetuple().tm_yday + #geo = self.determine_geo(bbox) + + #granule_file_name: str = Path(summary.granule).name # get base filename + + #tile_type = tile.tile.WhichOneof("tile_type") + #tile_data = getattr(tile.tile, tile_type) + + #var_names = json.loads(summary.data_var_name) + #standard_names = [] + #if summary.standard_name: + #standard_names = json.loads(summary.standard_name) + #if not isinstance(var_names, list): + #var_names = [var_names] + #if not isinstance(standard_names, list): + #standard_names = [standard_names] + + #input_document = { + #'table_s': self.TABLE_NAME, + #'geo': geo, + #'id': summary.tile_id, + #'solr_id_s': '{ds_name}!{tile_id}'.format(ds_name=summary.dataset_name, tile_id=summary.tile_id), + #'sectionSpec_s': summary.section_spec, + #'dataset_s': summary.dataset_name, + #'granule_s': granule_file_name, + #'tile_var_name_ss': var_names, + #'day_of_year_i': day_of_year, + #'tile_min_lon': bbox.lon_min, + #'tile_max_lon': bbox.lon_max, + #'tile_min_lat': bbox.lat_min, + #'tile_max_lat': bbox.lat_max, + #'tile_depth': tile_data.depth, + #'tile_min_time_dt': min_time, + #'tile_max_time_dt': max_time, + #'tile_min_val_d': stats.min, + #'tile_max_val_d': stats.max, + #'tile_avg_val_d': stats.mean, + #'tile_count_i': int(stats.count) + #} + + #for var_name, standard_name in zip(var_names, standard_names): + #if standard_name: + #input_document[f'{var_name}.tile_standard_name_s'] = standard_name + + #ecco_tile_id = getattr(tile_data, 'tile', None) + #if ecco_tile_id: + #input_document['ecco_tile'] = ecco_tile_id + + #for attribute in summary.global_attributes: + #input_document[attribute.getName()] = attribute.getValues( + #0) if attribute.getValuesCount() == 1 else attribute.getValuesList() + + #return input_document + +# testing local store LLZA +def main(): + reading_processor_TROPOMI = LazyLoadProcessor(variable='[methane_mixing_ratio]', + latitude='lat', + longitude='lon', + time='time') + granule_path_TROPOMI = path.join(path.dirname(__file__), '../granules/TROPOMI_methane_mixing_ratio_20200801.nc') + #granule_path_OBP = path.join(path.dirname(__file__), '../granules/OBP_native_grid.nc') + + store_path = path.join(path.dirname(__file__), '../local_zarr_store/TROPOMI_MASTER.zarr') + zarr_writer_TROPOMI = LocalStore(store_path) + + cdfDS = xr.open_dataset(granule_path_TROPOMI) + chunkShape = (1,5,5) + processed_granule = reading_processor_TROPOMI.process(cdfDS) + zarr_writer_TROPOMI.save_data(ds=processed_granule, cname='blosclz', + clevel=9, shuffle=1, chunkShape=chunkShape) + print(xr.open_zarr(store_path)) + + # NOTE: zarr file is stored in test directory "local_zarr_store" + + +if __name__ == "__main__": + main() + \ No newline at end of file