diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py index b713f2d..e0cbe56 100644 --- a/collection_manager/collection_manager/services/CollectionWatcher.py +++ b/collection_manager/collection_manager/services/CollectionWatcher.py @@ -123,10 +123,13 @@ async def _call_callback_for_all_granules(self, collections: List[Collection]): def _get_files_at_path(self, path: str) -> List[str]: if os.path.isfile(path): + logger.info("process collections path as file") return [path] elif os.path.isdir(path): + logger.info("process collection path as directory") return [f for f in glob(path + '/**', recursive=True) if os.path.isfile(f)] else: + logger.info("process collection path as file path regex") return [f for f in glob(path, recursive=True) if os.path.isfile(f)] async def _reload_and_reschedule(self): diff --git a/granule_ingester/docker/Dockerfile b/granule_ingester/docker/Dockerfile index 1e7aedd..6d7414b 100644 --- a/granule_ingester/docker/Dockerfile +++ b/granule_ingester/docker/Dockerfile @@ -22,4 +22,6 @@ RUN pip install boto3==1.16.10 RUN apk del .build-deps -ENTRYPOINT ["/bin/sh", "/entrypoint.sh"] \ No newline at end of file +USER 1001 +ENV OPENBLAS_NUM_THREADS=1 +ENTRYPOINT ["/bin/sh", "/entrypoint.sh"] diff --git a/granule_ingester/docker/entrypoint.sh b/granule_ingester/docker/entrypoint.sh index 662bd3d..9a5a046 100644 --- a/granule_ingester/docker/entrypoint.sh +++ b/granule_ingester/docker/entrypoint.sh @@ -1,5 +1,7 @@ #!/bin/sh +[[ ! -z "$MAX_THREADS" ]] && export MAX_THREADS_INT=`echo $MAX_THREADS | sed -e 's/^"//' -e 's/"$//'` + python /sdap/granule_ingester/main.py \ $([[ ! -z "$RABBITMQ_HOST" ]] && echo --rabbitmq-host=$RABBITMQ_HOST) \ $([[ ! -z "$RABBITMQ_USERNAME" ]] && echo --rabbitmq-username=$RABBITMQ_USERNAME) \ @@ -11,4 +13,5 @@ python /sdap/granule_ingester/main.py \ $([[ ! -z "$CASSANDRA_PASSWORD" ]] && echo --cassandra-password=$CASSANDRA_PASSWORD) \ $([[ ! -z "$SOLR_HOST_AND_PORT" ]] && echo --solr-host-and-port=$SOLR_HOST_AND_PORT) \ $([[ ! -z "$ZK_HOST_AND_PORT" ]] && echo --zk_host_and_port=$ZK_HOST_AND_PORT) \ - $([[ ! -z "$MAX_THREADS" ]] && echo --max-threads=$MAX_THREADS) + $([[ ! -z "$MAX_THREADS_INT" ]] && echo --max-threads=$MAX_THREADS_INT) \ + --verbose diff --git a/granule_ingester/granule_ingester/README.md b/granule_ingester/granule_ingester/README.md index 881461a..aace983 100644 --- a/granule_ingester/granule_ingester/README.md +++ b/granule_ingester/granule_ingester/README.md @@ -8,4 +8,10 @@ The custom code file would be copied into the ingestion pods via the helm chart Example: `KelvinToCelsiusProcessor` This processor checks the units of the saved variable. If it is some form of Kelvin, it automatically converts all of the temperature measurements to Celsius by subtracting 273.15 from each data point. The transformed data then replaces the default (untransformed) values and the processor returns the modified tile. -#### TODO Add configuration option for unusual representations of temperature units. \ No newline at end of file +#### TODO Add configuration option for unusual representations of temperature units. + + +## Building the Docker image +From `incubator-sdap-ingester`, run: + + $ docker build . -f granule_ingester/docker/Dockerfile -t nexusjpl/granule-ingester diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py index 86bf9c8..6c32bf6 100644 --- a/granule_ingester/granule_ingester/pipeline/Pipeline.py +++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py @@ -64,6 +64,7 @@ def _init_worker(processor_list, dataset, data_store_factory, metadata_store_fac async def _process_tile_in_worker(serialized_input_tile: str): + logger.debug("start to process tile in worker") try: input_tile = nexusproto.NexusTile.FromString(serialized_input_tile) processed_tile = _recurse(_worker_processor_list, _worker_dataset, input_tile) @@ -82,6 +83,7 @@ def _recurse(processor_list: List[TileProcessor], input_tile: nexusproto.NexusTile) -> nexusproto.NexusTile: if len(processor_list) == 0: return input_tile + logger.debug("start processor %s", processor_list[0].__class__.__name__) output_tile = processor_list[0].process(tile=input_tile, dataset=dataset) return _recurse(processor_list[1:], dataset, output_tile) if output_tile else None @@ -99,7 +101,7 @@ def __init__(self, self._slicer = slicer self._data_store_factory = data_store_factory self._metadata_store_factory = metadata_store_factory - self._max_concurrency = max_concurrency + self._max_concurrency: int = max_concurrency # Create a SyncManager so that we can to communicate exceptions from the # worker processes back to the main process. @@ -186,8 +188,8 @@ async def run(self): self._data_store_factory, self._metadata_store_factory, shared_memory), - maxtasksperchild=self._max_concurrency, - childconcurrency=self._max_concurrency) as pool: + maxtasksperchild=int(self._max_concurrency), + childconcurrency=int(self._max_concurrency)) as pool: serialized_tiles = [nexusproto.NexusTile.SerializeToString(tile) for tile in self._slicer.generate_tiles(dataset, granule_name)] # aiomultiprocess is built on top of the stdlib multiprocessing library, which has the limitation that diff --git a/granule_ingester/granule_ingester/writers/CassandraStore.py b/granule_ingester/granule_ingester/writers/CassandraStore.py index cb5232b..505b56a 100644 --- a/granule_ingester/granule_ingester/writers/CassandraStore.py +++ b/granule_ingester/granule_ingester/writers/CassandraStore.py @@ -17,6 +17,7 @@ import asyncio import logging import uuid +import time from cassandra.auth import PlainTextAuthProvider from cassandra.cluster import Cluster, Session, NoHostAvailable @@ -27,6 +28,7 @@ from granule_ingester.exceptions import CassandraFailedHealthCheckError, CassandraLostConnectionError from granule_ingester.writers.DataStore import DataStore +from granule_ingester.writers.CassandraStoreConnectionRetryPolicy import CassandraStoreConnectionRetryPolicy logging.getLogger('cassandra').setLevel(logging.INFO) logger = logging.getLogger(__name__) @@ -64,12 +66,16 @@ def _get_session(self) -> Session: cluster = Cluster(contact_points=self._contact_points, port=self._port, - # load_balancing_policy= + #load_balancing_policy=DCAwareRoundRobinPolicy("dc1"), + protocol_version=4, reconnection_policy=ConstantReconnectionPolicy(delay=5.0), - default_retry_policy=RetryPolicy(), + default_retry_policy=CassandraStoreConnectionRetryPolicy(), auth_provider=auth_provider) + session = cluster.connect() session.set_keyspace('nexustiles') + session.default_timeout = 60 + return session def connect(self): @@ -79,19 +85,27 @@ def __del__(self): if self._session: self._session.shutdown() - async def save_data(self, tile: NexusTile) -> None: + async def save_data(self, tile: NexusTile, max_num_try=6, num_try=0) -> None: try: tile_id = uuid.UUID(tile.summary.tile_id) serialized_tile_data = TileData.SerializeToString(tile.tile) prepared_query = self._session.prepare("INSERT INTO sea_surface_temp (tile_id, tile_blob) VALUES (?, ?)") + logger.debug("starting to updload tile %s data on cassandra", tile_id) await self._execute_query_async(self._session, prepared_query, [tile_id, bytearray(serialized_tile_data)]) - except NoHostAvailable: - raise CassandraLostConnectionError(f"Lost connection to Cassandra, and cannot save tiles.") + except Exception as e: + if max_num_try >= num_try: + time.sleep(2**num_try) + logger.warning("exception while uploading tile data on cassandra %s, retry once more", e) + await self.save_data(tile, max_num_try=max_num_try, num_try=num_try+1) + else: + logger.error("exception while uploading tile data on cassandra %s, second attempt", e) + raise CassandraLostConnectionError(f"Lost connection to Cassandra, and cannot save tiles.") + @staticmethod async def _execute_query_async(session: Session, query, parameters=None): - cassandra_future = session.execute_async(query, parameters) + cassandra_future = session.execute_async(query, parameters, timeout=6000) asyncio_future = asyncio.Future() cassandra_future.add_callbacks(asyncio_future.set_result, asyncio_future.set_exception) return await asyncio_future diff --git a/granule_ingester/granule_ingester/writers/CassandraStoreConnectionRetryPolicy.py b/granule_ingester/granule_ingester/writers/CassandraStoreConnectionRetryPolicy.py new file mode 100644 index 0000000..4318dc2 --- /dev/null +++ b/granule_ingester/granule_ingester/writers/CassandraStoreConnectionRetryPolicy.py @@ -0,0 +1,31 @@ +import logging +from cassandra.policies import RetryPolicy +from cassandra import WriteType as WT + +WriteType = WT + +logging.getLogger('cassandra').setLevel(logging.DEBUG) +logger = logging.getLogger(__name__) + + +class CassandraStoreConnectionRetryPolicy(RetryPolicy): + + def on_write_timeout(self, query, consistency, write_type, + required_responses, received_responses, retry_num): + """ + By default, failed write operations will retried at most once, and + they will only be retried if the `write_type` was + :attr:`~.WriteType.BATCH_LOG or SIMPLE`. + """ + logger.debug("Write timeout policy applied num retry %i, write_type %i", retry_num, write_type) + if retry_num != 0: + return self.RETHROW, None + elif write_type == WriteType.BATCH_LOG or write_type == WriteType.SIMPLE: + return self.RETRY, consistency + else: + return self.RETHROW, None + + + + + diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py b/granule_ingester/granule_ingester/writers/SolrStore.py index b753404..3da6370 100644 --- a/granule_ingester/granule_ingester/writers/SolrStore.py +++ b/granule_ingester/granule_ingester/writers/SolrStore.py @@ -17,6 +17,7 @@ import functools import json import logging +import time from asyncio import AbstractEventLoop from datetime import datetime from pathlib import Path @@ -50,13 +51,42 @@ def __init__(self, solr_url=None, zk_url=None): self.log.setLevel(logging.DEBUG) self._solr = None + def _get_collections(self, zk, parent_nodes): + """ + try to get list of collection from zookeper, on a list of candidate nodes, + return the first successful request result + """ + + try: + logger.debug("getting solr configuration from zookeeper, node '%s'", parent_nodes[0]) + return parent_nodes[0], zk.zk.get_children(parent_nodes[0]) + except NoNodeError: + logger.debug("solr configuration not found in node '%s'", parent_nodes[0]) + if len(parent_nodes)>1: + return self._get_collections(zk, parent_nodes[1:]) + else: + raise + + def _set_solr_status(self, zk): + """ because of something not working right between zookeeper and solr + we need to manually update the solr status on zookeeper + see https://github.com/django-haystack/pysolr/issues/189 + """ + collections = {} + parent_node, zk_collections = self._get_collections(zk, + ['collections', + 'solr/collections'] + # with bitnami/solr 0.3.3 helm chart deployment + ) + + for c in zk_collections: + collections.update(json.loads(zk.zk.get(f"{parent_node}/{c}/state.json")[0].decode("utf-8"))) + zk.collections = collections + def _get_connection(self) -> pysolr.Solr: if self._zk_url: zk = pysolr.ZooKeeper(f"{self._zk_url}") - collections = {} - for c in zk.zk.get_children("collections"): - collections.update(json.loads(zk.zk.get("collections/{}/state.json".format(c))[0].decode("ascii"))) - zk.collections = collections + self._set_solr_status(zk) return pysolr.SolrCloud(zk, self._collection, always_commit=True) elif self._solr_url: return pysolr.Solr(f'{self._solr_url}/solr/{self._collection}', always_commit=True) @@ -82,11 +112,18 @@ async def save_metadata(self, nexus_tile: NexusTile) -> None: await self._save_document(solr_doc) @run_in_executor - def _save_document(self, doc: dict): + def _save_document(self, doc: dict, max_num_try=6, num_try=0): try: self._solr.add([doc]) - except pysolr.SolrError: - raise SolrLostConnectionError("Lost connection to Solr, and cannot save tiles.") + except pysolr.SolrError as e: + if max_num_try >= num_try : + time.sleep(2**num_try) + logger.warning("Lost connection to Solr, %s, retry once more", e) + self._save_document(doc, + max_num_try=max_num_try, + num_try=num_try+1) + else: + raise SolrLostConnectionError("Lost connection to Solr, and cannot save tiles.") def _build_solr_doc(self, tile: NexusTile) -> Dict: summary: TileSummary = tile.summary diff --git a/granule_ingester/granule_ingester/writers/__init__.py b/granule_ingester/granule_ingester/writers/__init__.py index 9323d8c..c9f30e0 100644 --- a/granule_ingester/granule_ingester/writers/__init__.py +++ b/granule_ingester/granule_ingester/writers/__init__.py @@ -1,4 +1,5 @@ from granule_ingester.writers.DataStore import DataStore from granule_ingester.writers.MetadataStore import MetadataStore from granule_ingester.writers.SolrStore import SolrStore +from granule_ingester.writers.CassandraStoreConnectionRetryPolicy import CassandraStoreConnectionRetryPolicy from granule_ingester.writers.CassandraStore import CassandraStore