Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,13 @@ async def _call_callback_for_all_granules(self, collections: List[Collection]):

def _get_files_at_path(self, path: str) -> List[str]:
if os.path.isfile(path):
logger.info("process collections path as file")
return [path]
elif os.path.isdir(path):
logger.info("process collection path as directory")
return [f for f in glob(path + '/**', recursive=True) if os.path.isfile(f)]
else:
logger.info("process collection path as file path regex")
return [f for f in glob(path, recursive=True) if os.path.isfile(f)]

async def _reload_and_reschedule(self):
Expand Down
4 changes: 3 additions & 1 deletion granule_ingester/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,6 @@ RUN pip install boto3==1.16.10

RUN apk del .build-deps

ENTRYPOINT ["/bin/sh", "/entrypoint.sh"]
USER 1001
ENV OPENBLAS_NUM_THREADS=1
ENTRYPOINT ["/bin/sh", "/entrypoint.sh"]
5 changes: 4 additions & 1 deletion granule_ingester/docker/entrypoint.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#!/bin/sh

[[ ! -z "$MAX_THREADS" ]] && export MAX_THREADS_INT=`echo $MAX_THREADS | sed -e 's/^"//' -e 's/"$//'`

python /sdap/granule_ingester/main.py \
$([[ ! -z "$RABBITMQ_HOST" ]] && echo --rabbitmq-host=$RABBITMQ_HOST) \
$([[ ! -z "$RABBITMQ_USERNAME" ]] && echo --rabbitmq-username=$RABBITMQ_USERNAME) \
Expand All @@ -11,4 +13,5 @@ python /sdap/granule_ingester/main.py \
$([[ ! -z "$CASSANDRA_PASSWORD" ]] && echo --cassandra-password=$CASSANDRA_PASSWORD) \
$([[ ! -z "$SOLR_HOST_AND_PORT" ]] && echo --solr-host-and-port=$SOLR_HOST_AND_PORT) \
$([[ ! -z "$ZK_HOST_AND_PORT" ]] && echo --zk_host_and_port=$ZK_HOST_AND_PORT) \
$([[ ! -z "$MAX_THREADS" ]] && echo --max-threads=$MAX_THREADS)
$([[ ! -z "$MAX_THREADS_INT" ]] && echo --max-threads=$MAX_THREADS_INT) \
--verbose
8 changes: 7 additions & 1 deletion granule_ingester/granule_ingester/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,10 @@ The custom code file would be copied into the ingestion pods via the helm chart
Example: `KelvinToCelsiusProcessor`
This processor checks the units of the saved variable. If it is some form of Kelvin, it automatically converts all of the temperature measurements to Celsius by subtracting 273.15 from each data point. The transformed data then replaces the default (untransformed) values and the processor returns the modified tile.

#### TODO Add configuration option for unusual representations of temperature units.
#### TODO Add configuration option for unusual representations of temperature units.


## Building the Docker image
From `incubator-sdap-ingester`, run:

$ docker build . -f granule_ingester/docker/Dockerfile -t nexusjpl/granule-ingester
8 changes: 5 additions & 3 deletions granule_ingester/granule_ingester/pipeline/Pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ def _init_worker(processor_list, dataset, data_store_factory, metadata_store_fac


async def _process_tile_in_worker(serialized_input_tile: str):
logger.debug("start to process tile in worker")
try:
input_tile = nexusproto.NexusTile.FromString(serialized_input_tile)
processed_tile = _recurse(_worker_processor_list, _worker_dataset, input_tile)
Expand All @@ -82,6 +83,7 @@ def _recurse(processor_list: List[TileProcessor],
input_tile: nexusproto.NexusTile) -> nexusproto.NexusTile:
if len(processor_list) == 0:
return input_tile
logger.debug("start processor %s", processor_list[0].__class__.__name__)
output_tile = processor_list[0].process(tile=input_tile, dataset=dataset)
return _recurse(processor_list[1:], dataset, output_tile) if output_tile else None

Expand All @@ -99,7 +101,7 @@ def __init__(self,
self._slicer = slicer
self._data_store_factory = data_store_factory
self._metadata_store_factory = metadata_store_factory
self._max_concurrency = max_concurrency
self._max_concurrency: int = max_concurrency

# Create a SyncManager so that we can to communicate exceptions from the
# worker processes back to the main process.
Expand Down Expand Up @@ -186,8 +188,8 @@ async def run(self):
self._data_store_factory,
self._metadata_store_factory,
shared_memory),
maxtasksperchild=self._max_concurrency,
childconcurrency=self._max_concurrency) as pool:
maxtasksperchild=int(self._max_concurrency),
childconcurrency=int(self._max_concurrency)) as pool:
serialized_tiles = [nexusproto.NexusTile.SerializeToString(tile) for tile in
self._slicer.generate_tiles(dataset, granule_name)]
# aiomultiprocess is built on top of the stdlib multiprocessing library, which has the limitation that
Expand Down
26 changes: 20 additions & 6 deletions granule_ingester/granule_ingester/writers/CassandraStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import asyncio
import logging
import uuid
import time

from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import Cluster, Session, NoHostAvailable
Expand All @@ -27,6 +28,7 @@

from granule_ingester.exceptions import CassandraFailedHealthCheckError, CassandraLostConnectionError
from granule_ingester.writers.DataStore import DataStore
from granule_ingester.writers.CassandraStoreConnectionRetryPolicy import CassandraStoreConnectionRetryPolicy

logging.getLogger('cassandra').setLevel(logging.INFO)
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -64,12 +66,16 @@ def _get_session(self) -> Session:

cluster = Cluster(contact_points=self._contact_points,
port=self._port,
# load_balancing_policy=
#load_balancing_policy=DCAwareRoundRobinPolicy("dc1"),
protocol_version=4,
reconnection_policy=ConstantReconnectionPolicy(delay=5.0),
default_retry_policy=RetryPolicy(),
default_retry_policy=CassandraStoreConnectionRetryPolicy(),
auth_provider=auth_provider)

session = cluster.connect()
session.set_keyspace('nexustiles')
session.default_timeout = 60

return session

def connect(self):
Expand All @@ -79,19 +85,27 @@ def __del__(self):
if self._session:
self._session.shutdown()

async def save_data(self, tile: NexusTile) -> None:
async def save_data(self, tile: NexusTile, max_num_try=6, num_try=0) -> None:
try:
tile_id = uuid.UUID(tile.summary.tile_id)
serialized_tile_data = TileData.SerializeToString(tile.tile)
prepared_query = self._session.prepare("INSERT INTO sea_surface_temp (tile_id, tile_blob) VALUES (?, ?)")
logger.debug("starting to updload tile %s data on cassandra", tile_id)
await self._execute_query_async(self._session, prepared_query,
[tile_id, bytearray(serialized_tile_data)])
except NoHostAvailable:
raise CassandraLostConnectionError(f"Lost connection to Cassandra, and cannot save tiles.")
except Exception as e:
if max_num_try >= num_try:
time.sleep(2**num_try)
logger.warning("exception while uploading tile data on cassandra %s, retry once more", e)
await self.save_data(tile, max_num_try=max_num_try, num_try=num_try+1)
else:
logger.error("exception while uploading tile data on cassandra %s, second attempt", e)
raise CassandraLostConnectionError(f"Lost connection to Cassandra, and cannot save tiles.")


@staticmethod
async def _execute_query_async(session: Session, query, parameters=None):
cassandra_future = session.execute_async(query, parameters)
cassandra_future = session.execute_async(query, parameters, timeout=6000)
asyncio_future = asyncio.Future()
cassandra_future.add_callbacks(asyncio_future.set_result, asyncio_future.set_exception)
return await asyncio_future
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import logging
from cassandra.policies import RetryPolicy
from cassandra import WriteType as WT

WriteType = WT

logging.getLogger('cassandra').setLevel(logging.DEBUG)
logger = logging.getLogger(__name__)


class CassandraStoreConnectionRetryPolicy(RetryPolicy):

def on_write_timeout(self, query, consistency, write_type,
required_responses, received_responses, retry_num):
"""
By default, failed write operations will retried at most once, and
they will only be retried if the `write_type` was
:attr:`~.WriteType.BATCH_LOG or SIMPLE`.
"""
logger.debug("Write timeout policy applied num retry %i, write_type %i", retry_num, write_type)
if retry_num != 0:
return self.RETHROW, None
elif write_type == WriteType.BATCH_LOG or write_type == WriteType.SIMPLE:
return self.RETRY, consistency
else:
return self.RETHROW, None





51 changes: 44 additions & 7 deletions granule_ingester/granule_ingester/writers/SolrStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import functools
import json
import logging
import time
from asyncio import AbstractEventLoop
from datetime import datetime
from pathlib import Path
Expand Down Expand Up @@ -50,13 +51,42 @@ def __init__(self, solr_url=None, zk_url=None):
self.log.setLevel(logging.DEBUG)
self._solr = None

def _get_collections(self, zk, parent_nodes):
"""
try to get list of collection from zookeper, on a list of candidate nodes,
return the first successful request result
"""

try:
logger.debug("getting solr configuration from zookeeper, node '%s'", parent_nodes[0])
return parent_nodes[0], zk.zk.get_children(parent_nodes[0])
except NoNodeError:
logger.debug("solr configuration not found in node '%s'", parent_nodes[0])
if len(parent_nodes)>1:
return self._get_collections(zk, parent_nodes[1:])
else:
raise

def _set_solr_status(self, zk):
""" because of something not working right between zookeeper and solr
we need to manually update the solr status on zookeeper
see https://github.com/django-haystack/pysolr/issues/189
"""
collections = {}
parent_node, zk_collections = self._get_collections(zk,
['collections',
'solr/collections']
# with bitnami/solr 0.3.3 helm chart deployment
)

for c in zk_collections:
collections.update(json.loads(zk.zk.get(f"{parent_node}/{c}/state.json")[0].decode("utf-8")))
zk.collections = collections

def _get_connection(self) -> pysolr.Solr:
if self._zk_url:
zk = pysolr.ZooKeeper(f"{self._zk_url}")
collections = {}
for c in zk.zk.get_children("collections"):
collections.update(json.loads(zk.zk.get("collections/{}/state.json".format(c))[0].decode("ascii")))
zk.collections = collections
self._set_solr_status(zk)
return pysolr.SolrCloud(zk, self._collection, always_commit=True)
elif self._solr_url:
return pysolr.Solr(f'{self._solr_url}/solr/{self._collection}', always_commit=True)
Expand All @@ -82,11 +112,18 @@ async def save_metadata(self, nexus_tile: NexusTile) -> None:
await self._save_document(solr_doc)

@run_in_executor
def _save_document(self, doc: dict):
def _save_document(self, doc: dict, max_num_try=6, num_try=0):
try:
self._solr.add([doc])
except pysolr.SolrError:
raise SolrLostConnectionError("Lost connection to Solr, and cannot save tiles.")
except pysolr.SolrError as e:
if max_num_try >= num_try :
time.sleep(2**num_try)
logger.warning("Lost connection to Solr, %s, retry once more", e)
self._save_document(doc,
max_num_try=max_num_try,
num_try=num_try+1)
else:
raise SolrLostConnectionError("Lost connection to Solr, and cannot save tiles.")

def _build_solr_doc(self, tile: NexusTile) -> Dict:
summary: TileSummary = tile.summary
Expand Down
1 change: 1 addition & 0 deletions granule_ingester/granule_ingester/writers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from granule_ingester.writers.DataStore import DataStore
from granule_ingester.writers.MetadataStore import MetadataStore
from granule_ingester.writers.SolrStore import SolrStore
from granule_ingester.writers.CassandraStoreConnectionRetryPolicy import CassandraStoreConnectionRetryPolicy
from granule_ingester.writers.CassandraStore import CassandraStore