diff --git a/.gitignore b/.gitignore index 3e296266..4e4cf6ec 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,5 @@ *.code-workspace *.idea *.DS_Store -analysis/webservice/algorithms/doms/domsconfig.ini data-access/nexustiles/config/datastores.ini venv/ diff --git a/analysis/README.md b/analysis/README.md index a55841b2..eaea5309 100644 --- a/analysis/README.md +++ b/analysis/README.md @@ -14,10 +14,11 @@ Python module that exposes NEXUS analytical capabilities via a HTTP webservice. conda activate nexus-analysis ```` -2. Install conda dependencies +2. Install conda dependencies and other dependencies ```` cd analysis + pip install asyncio # for asynchronous job management conda install pyspark conda install -c conda-forge --file conda-requirements.txt #conda install numpy matplotlib mpld3 scipy netCDF4 basemap gdal pyproj=1.9.5.1 libnetcdf=4.3.3.1 diff --git a/analysis/conda-requirements.txt b/analysis/conda-requirements.txt index 6d9a35ed..83d6d2d0 100644 --- a/analysis/conda-requirements.txt +++ b/analysis/conda-requirements.txt @@ -15,3 +15,4 @@ gdal==3.0.2 mock==2.0.0 singledispatch==3.4.0.3 + diff --git a/analysis/setup.py b/analysis/setup.py index 62a68916..9a449ceb 100644 --- a/analysis/setup.py +++ b/analysis/setup.py @@ -50,8 +50,7 @@ # 'webservice.nexus_tornado.request.renderers' #], package_data={ - 'webservice': ['config/web.ini', 'config/algorithms.ini'], - 'webservice.algorithms.doms': ['domsconfig.ini.default'] + 'webservice': ['config/web.ini', 'config/algorithms.ini'] }, data_files=[ ('static', ['static/index.html']) diff --git a/analysis/webservice/NexusHandler.py b/analysis/webservice/NexusHandler.py index 42972ec9..e4d35d77 100644 --- a/analysis/webservice/NexusHandler.py +++ b/analysis/webservice/NexusHandler.py @@ -16,8 +16,11 @@ import logging import types +from functools import partial -AVAILABLE_HANDLERS = [] +AVAILABLE_LEGACY_HANDLERS = [] +AVAILABLE_RESTAPI_HANDLERS = [] +AVAILABLE_WPS_HANDLERS = [] AVAILABLE_INITIALIZERS = [] @@ -32,17 +35,22 @@ def nexus_initializer(clazz): return clazz -def nexus_handler(clazz): +def nexus_handler(clazz, handler_list=AVAILABLE_LEGACY_HANDLERS): log = logging.getLogger(__name__) try: clazz.validate() log.info("Adding algorithm module '%s' with path '%s' (%s)" % (clazz.name, clazz.path, clazz)) - AVAILABLE_HANDLERS.append(clazz) + handler_list.append(clazz) except Exception as ex: log.warn("Handler '%s' is invalid and will be skipped (reason: %s)" % (clazz, ex.message), exc_info=True) return clazz +nexus_restapi_handler = partial(nexus_handler, handler_list=AVAILABLE_RESTAPI_HANDLERS) +nexus_wps_handler = partial(nexus_handler, handler_list=AVAILABLE_WPS_HANDLERS) + + + DEFAULT_PARAMETERS_SPEC = { "ds": { "name": "Dataset", diff --git a/analysis/webservice/algorithms/Capabilities.py b/analysis/webservice/algorithms/Capabilities.py index fa85a7c0..57d05009 100644 --- a/analysis/webservice/algorithms/Capabilities.py +++ b/analysis/webservice/algorithms/Capabilities.py @@ -16,7 +16,7 @@ import json -from webservice.NexusHandler import nexus_handler, AVAILABLE_HANDLERS +from webservice.NexusHandler import nexus_handler, AVAILABLE_LEGACY_HANDLERS from webservice.algorithms.NexusCalcHandler import NexusCalcHandler from webservice.webmodel import NexusResults @@ -32,7 +32,7 @@ class CapabilitiesListCalcHandlerImpl(NexusCalcHandler): def calc(self, computeOptions, **args): capabilities = [] - for capability in AVAILABLE_HANDLERS: + for capability in AVAILABLE_LEGACY_HANDLERS: capabilityDef = { "name": capability.name, "path": capability.path, diff --git a/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py b/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py index 6231873b..f7077820 100644 --- a/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py +++ b/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py @@ -16,11 +16,12 @@ from datetime import datetime from functools import partial +import uuid import numpy as np import shapely.geometry from pytz import timezone -from webservice.NexusHandler import nexus_handler +from webservice.NexusHandler import nexus_handler, nexus_restapi_handler from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler from webservice.webmodel import NexusResults, NexusProcessingException, NoDataException @@ -29,6 +30,7 @@ @nexus_handler +@nexus_restapi_handler class TimeAvgMapNexusSparkHandlerImpl(NexusCalcSparkHandler): # __singleton_lock = threading.Lock() # __singleton_instance = None @@ -67,19 +69,6 @@ class TimeAvgMapNexusSparkHandlerImpl(NexusCalcSparkHandler): } singleton = True - # @classmethod - # def instance(cls, algorithm_config=None, sc=None): - # with cls.__singleton_lock: - # if not cls.__singleton_instance: - # try: - # singleton_instance = cls() - # singleton_instance.set_config(algorithm_config) - # singleton_instance.set_spark_context(sc) - # cls.__singleton_instance = singleton_instance - # except AttributeError: - # pass - # return cls.__singleton_instance - def parse_arguments(self, request): # Parse input arguments self.log.debug("Parsing arguments") @@ -118,7 +107,8 @@ def parse_arguments(self, request): return ds, bounding_polygon, start_seconds_from_epoch, end_seconds_from_epoch, nparts_requested - def calc(self, compute_options, **args): + def calc(self, compute_options, + **args): """ :param compute_options: StatsComputeOptions @@ -130,6 +120,7 @@ def calc(self, compute_options, **args): metrics_record = self._create_metrics_record() ds, bbox, start_time, end_time, nparts_requested = self.parse_arguments(compute_options) + self._setQueryParams(ds, (float(bbox.bounds[1]), float(bbox.bounds[3]), @@ -147,13 +138,13 @@ def calc(self, compute_options, **args): print('Found {} tiles'.format(len(nexus_tiles))) daysinrange = self._get_tile_service().find_days_in_range_asc(bbox.bounds[1], - bbox.bounds[3], - bbox.bounds[0], - bbox.bounds[2], - ds, - start_time, - end_time, - metrics_callback=metrics_record.record_metrics) + bbox.bounds[3], + bbox.bounds[0], + bbox.bounds[2], + ds, + start_time, + end_time, + metrics_callback=metrics_record.record_metrics) ndays = len(daysinrange) if ndays == 0: raise NoDataException(reason="No data found for selected timeframe") @@ -262,6 +253,8 @@ def calc(self, compute_options, **args): maxLon=bbox.bounds[2], ds=ds, startTime=start_time, endTime=end_time) + + @staticmethod def _map(tile_service_factory, metrics_callback, tile_in_spark): tile_bounds = tile_in_spark[0] diff --git a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py index 43f7f6da..83d79ff8 100644 --- a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py +++ b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py @@ -32,7 +32,7 @@ from pytz import timezone from scipy import stats from webservice import Filtering as filtering -from webservice.NexusHandler import nexus_handler +from webservice.NexusHandler import nexus_handler, nexus_restapi_handler, nexus_wps_handler from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler from webservice.webmodel import NexusResults, NoDataException, NexusProcessingException @@ -43,6 +43,7 @@ @nexus_handler +@nexus_restapi_handler class TimeSeriesSparkHandlerImpl(NexusCalcSparkHandler): name = "Time Series Spark" path = "/timeSeriesSpark" diff --git a/analysis/webservice/algorithms_spark/__init__.py b/analysis/webservice/algorithms_spark/__init__.py index d6ed83f5..c8e3fd15 100644 --- a/analysis/webservice/algorithms_spark/__init__.py +++ b/analysis/webservice/algorithms_spark/__init__.py @@ -20,13 +20,11 @@ import CorrMapSpark import DailyDifferenceAverageSpark import HofMoellerSpark -import Matchup import MaximaMinimaSpark -import NexusCalcSparkHandler import TimeAvgMapSpark import TimeSeriesSpark import VarianceSpark - +import NexusCalcSparkHandler log = logging.getLogger(__name__) @@ -46,11 +44,6 @@ def module_exists(module_name): except ImportError: pass - try: - import Matchup - except ImportError: - pass - try: import TimeAvgMapSpark except ImportError: diff --git a/analysis/webservice/config/spark_pools.xml b/analysis/webservice/config/spark_pools.xml new file mode 100644 index 00000000..50906ad8 --- /dev/null +++ b/analysis/webservice/config/spark_pools.xml @@ -0,0 +1,8 @@ + + + + FAIR + 1 + 2 + + \ No newline at end of file diff --git a/analysis/webservice/config/web.ini b/analysis/webservice/config/web.ini index 2644ade2..a1ecb2c2 100644 --- a/analysis/webservice/config/web.ini +++ b/analysis/webservice/config/web.ini @@ -14,4 +14,4 @@ static_enabled=true static_dir=static [modules] -module_dirs=webservice.algorithms,webservice.algorithms_spark,webservice.algorithms.doms \ No newline at end of file +module_dirs=webservice.algorithms,webservice.algorithms_spark \ No newline at end of file diff --git a/analysis/webservice/jobs/__init__.py b/analysis/webservice/jobs/__init__.py new file mode 100644 index 00000000..a7a02cef --- /dev/null +++ b/analysis/webservice/jobs/__init__.py @@ -0,0 +1 @@ +from .job import Job \ No newline at end of file diff --git a/analysis/webservice/jobs/job.py b/analysis/webservice/jobs/job.py new file mode 100644 index 00000000..fad0a68d --- /dev/null +++ b/analysis/webservice/jobs/job.py @@ -0,0 +1,13 @@ +from datetime import datetime + + +class Job(): + def __init__(self): + self.request = None # NexusRequestObject + self.result_future = None # tornado.gen.Future + self.time_created = datetime.now() + self.time_done = None + + + + diff --git a/analysis/webservice/nexus_tornado/request/handlers/NexusAsyncJobHandler.py b/analysis/webservice/nexus_tornado/request/handlers/NexusAsyncJobHandler.py new file mode 100644 index 00000000..8a2b33d9 --- /dev/null +++ b/analysis/webservice/nexus_tornado/request/handlers/NexusAsyncJobHandler.py @@ -0,0 +1,81 @@ +import logging +import json +import uuid +from datetime import datetime, timedelta +import tornado.web +import tornado.ioloop +from webservice.nexus_tornado.request.renderers import NexusRendererFactory + + +class NexusAsyncJobHandler(tornado.web.RequestHandler): + + _job_pool = {} + __logger = logging.getLogger('nexus') + + obsolete_after = timedelta(hours=12) + clean_obsolete_every = timedelta(minutes=15) + + @classmethod + def get_job_pool(cls): + return cls._job_pool + + @classmethod + def start_jobs_cleaner(cls): + + def clean(): + for key, job in cls._job_pool.iteritems(): + if datetime.now() - job.time_done > cls.obsolete_after: + cls.__logger.info("clean job {}".format(key)) + del cls._job_pool[key] + + tornado.ioloop.IOLoop.current().call_later(cls.clean_obsolete_every.seconds, clean) + + def get(self, job_id): + self.__logger.info("get job among {}".format(self._job_pool)) + if job_id in self._job_pool: + job = self._job_pool[job_id] + if job.result_future.done(): + renderer = NexusRendererFactory.get_renderer(job.request) + renderer.render(self, job.result_future.result()) + else: + self._non_completed_job_callback(job_id) + + else: + self._non_existing_job_callback(job_id) + + def _non_existing_job_callback(self, job_id, code=404): + message = "Job {} does not exist".format(job_id) + self._error_callback(message, code) + + def _non_completed_job_callback(self, job_id, code=202): + message = "Job {} is being processed".format(job_id) + self._error_callback(message, code) + + def _error_callback(self, message, code): + self.__logger.info(message, exc_info=True) + + self.set_header("Content-Type", "application/json") + self.set_header("Cache-Control", "no-cache, no-store, must-revalidate") + self.set_header("Pragma", "no-cache") + self.set_header("Expires", 0) + self.set_status(code) + + response = { + "error": message, + "code": code + } + + self.write(json.dumps(response, indent=5)) + self.finish() + + def data_received(self, chunk): + pass + + @classmethod + def get_short_job_id(cls): + while True: + job_id = str(uuid.uuid4())[:6] + if job_id not in cls._job_pool: + return job_id + + diff --git a/analysis/webservice/nexus_tornado/request/handlers/NexusHandlerManager.py b/analysis/webservice/nexus_tornado/request/handlers/NexusHandlerManager.py new file mode 100644 index 00000000..67121221 --- /dev/null +++ b/analysis/webservice/nexus_tornado/request/handlers/NexusHandlerManager.py @@ -0,0 +1,105 @@ +import os +import logging +import sys +import importlib +import pkg_resources +import tornado.web +from webservice import NexusHandler +from webservice.nexus_tornado.request.handlers import NexusRequestHandler +from webservice.nexus_tornado.request.handlers import NexusAsyncJobHandler +import webservice.algorithms_spark.NexusCalcSparkHandler + +logging.basicConfig( + level=logging.DEBUG, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + datefmt="%Y-%m-%dT%H:%M:%S", stream=sys.stdout) +logger = logging.getLogger(__name__) + + +class VersionHandler(tornado.web.RequestHandler): + def get(self): + self.write(pkg_resources.get_distribution("nexusanalysis").version) + + +class NexusHandlerManager(object): + _spark_context = None + + def __init__(self, module_dirs, + algorithm_config, tile_service_factory, + max_request_threads=1, + static_dir=None): + + for moduleDir in module_dirs: + logger.info("Loading modules from %s" % moduleDir) + importlib.import_module(moduleDir) + + logger.info("Running Nexus Initializers") + NexusHandler.executeInitializers(algorithm_config) + + self._tile_service_factory = tile_service_factory + + logger.info("Initializing request ThreadPool to %s" % max_request_threads) + self._request_thread_pool = tornado.concurrent.futures.ThreadPoolExecutor(max_request_threads) + + self._static_dir = static_dir + + def get_handlers(self): + handlers = self._get_legacy_handlers() + handlers.extend(self._get_restapi_algorithm_handlers()) + + handlers.append((r"/version", VersionHandler)) + + NexusAsyncJobHandler.start_jobs_cleaner() + handlers.append((r"/jobs/(.*)", NexusAsyncJobHandler)) + + if self._static_dir: + handlers.append( + (r'/(.*)', tornado.web.StaticFileHandler, {'path': self._static_dir, "default_filename": "index.html"})) + + return handlers + + def _get_legacy_handlers(self): + return self.__get_tornado_handlers(NexusHandler.AVAILABLE_LEGACY_HANDLERS, lambda x: x) + + def _get_restapi_algorithm_handlers(self): + + def path_spark_to_restapi(s): + i_spark = s.find('Spark') + return '/algorithms' + s[:i_spark] + + return self.__get_tornado_handlers(NexusHandler.AVAILABLE_RESTAPI_HANDLERS, path_spark_to_restapi) + + def __get_tornado_handlers(self, wrappers, path_func): + handlers = [] + + for clazzWrapper in wrappers: + path = path_func(clazzWrapper.path) + logger.info("adding request handler for class {} on path {}".format(clazzWrapper, path)) + if issubclass(clazzWrapper, webservice.algorithms_spark.NexusCalcSparkHandler.NexusCalcSparkHandler): + spark_context = self._get_spark_context() + handlers.append((path, + NexusRequestHandler, + dict(clazz=clazzWrapper, + tile_service_factory=self._tile_service_factory, + sc=spark_context, + thread_pool=self._request_thread_pool))) + else: + handlers.append((path, + NexusRequestHandler, + dict(clazz=clazzWrapper, + tile_service_factory=self._tile_service_factory, + thread_pool=self._request_thread_pool))) + + return handlers + + def _get_spark_context(self): + if self._spark_context is None: + from pyspark.sql import SparkSession + + spark = SparkSession.builder.appName("nexus-analysis") \ + .config("spark.scheduler.mode", "FAIR") \ + .config("spark.scheduler.allocation.file", os.path.abspath("webservice/config/spark_pools.xml")) \ + .getOrCreate() + self._spark_context = spark.sparkContext + + return self._spark_context diff --git a/analysis/webservice/nexus_tornado/request/handlers/NexusRequestHandler.py b/analysis/webservice/nexus_tornado/request/handlers/NexusRequestHandler.py index 210c1f31..66643537 100644 --- a/analysis/webservice/nexus_tornado/request/handlers/NexusRequestHandler.py +++ b/analysis/webservice/nexus_tornado/request/handlers/NexusRequestHandler.py @@ -1,18 +1,28 @@ import json +import datetime import logging +import functools import tornado.gen import tornado.ioloop +import tornado.util +from datetime import datetime, timedelta +from webservice.jobs import Job from webservice.nexus_tornado.request.renderers import NexusRendererFactory +from webservice.nexus_tornado.request.handlers.NexusAsyncJobHandler import NexusAsyncJobHandler from webservice.webmodel import NexusRequestObjectTornadoFree, NexusRequestObject, NexusProcessingException class NexusRequestHandler(tornado.web.RequestHandler): + def initialize(self, thread_pool, clazz=None, **kargs): self.logger = logging.getLogger('nexus') self.executor = thread_pool self.__clazz = clazz - self._clazz_init_args = kargs # 'algorithm_config', 'sc' for spark handler + self.__synchronous_time_out_seconds = timedelta(seconds=30) + self._clazz_init_args = kargs # 'algorithm_config', 'sc' for spark handler + + @tornado.gen.coroutine def get(self): @@ -32,7 +42,14 @@ def get(self): try: # process the request asynchronously on a different thread, # the current tornado handler is still available to get other user requests - results = yield tornado.ioloop.IOLoop.current().run_in_executor(self.executor, instance.calc, request) + result_future = tornado.ioloop.IOLoop.current().run_in_executor(self.executor, + instance.calc, + request) + if self.__synchronous_time_out_seconds: + results = yield tornado.gen.with_timeout(self.__synchronous_time_out_seconds, + result_future) + else: + results = yield result_future try: self.set_status(results.status_code) @@ -42,12 +59,19 @@ def get(self): renderer = NexusRendererFactory.get_renderer(request) renderer.render(self, results) + except tornado.gen.TimeoutError as e: + self.logger.info("synchronous time out reached, switch to async mode") + + self._switch_to_async(request, result_future) + except NexusProcessingException as e: self.async_onerror_callback(e.reason, e.code) except Exception as e: self.async_onerror_callback(str(e), 500) + + def async_onerror_callback(self, reason, code=500): self.logger.error("Error processing request", exc_info=True) @@ -60,4 +84,37 @@ def async_onerror_callback(self, reason, code=500): } self.write(json.dumps(response, indent=5)) - self.finish() \ No newline at end of file + self.finish() + + def _switch_to_async(self, request, result_future): + job = Job() + job.request = request + + def set_job_done_datetime(job, future): + job.time_done = datetime.now() + + result_future.add_done_callback(functools.partial(set_job_done_datetime, job)) + job.result_future = result_future + job_id = NexusAsyncJobHandler.get_short_job_id() + NexusAsyncJobHandler.get_job_pool()[job_id] = job + self.async_onsynctimeout_callback(job_id) + + + def async_onsynctimeout_callback(self, job_id, code=202): + message = "Processing request is taking more than {} s, switch to async mode, check status at /jobs/{}".format( + self.__synchronous_time_out_seconds, job_id) + self.logger.info(message, + exc_info=True) + + self.set_header("Content-Type", "application/json") + self.set_status(code) + + response = { + "error": message, + "code": code, + "job_id": job_id + } + + self.write(json.dumps(response, indent=5)) + self.finish() + diff --git a/analysis/webservice/nexus_tornado/request/handlers/__init__.py b/analysis/webservice/nexus_tornado/request/handlers/__init__.py index 7c6b1f4e..0b6ad53c 100644 --- a/analysis/webservice/nexus_tornado/request/handlers/__init__.py +++ b/analysis/webservice/nexus_tornado/request/handlers/__init__.py @@ -1 +1,3 @@ -from .NexusRequestHandler import NexusRequestHandler \ No newline at end of file +from .NexusRequestHandler import NexusRequestHandler +from .NexusHandlerManager import NexusHandlerManager +from .NexusAsyncJobHandler import NexusAsyncJobHandler diff --git a/analysis/webservice/nexus_tornado/request/renderers/NexusRendererFactory.py b/analysis/webservice/nexus_tornado/request/renderers/NexusRendererFactory.py index 9fc06e31..54dd273e 100644 --- a/analysis/webservice/nexus_tornado/request/renderers/NexusRendererFactory.py +++ b/analysis/webservice/nexus_tornado/request/renderers/NexusRendererFactory.py @@ -14,3 +14,5 @@ def get_renderer(cls, request): + + diff --git a/analysis/webservice/webapp.py b/analysis/webservice/webapp.py index d1ada7f5..bf7fd488 100644 --- a/analysis/webservice/webapp.py +++ b/analysis/webservice/webapp.py @@ -17,6 +17,7 @@ import importlib import logging import sys +import os from functools import partial import pkg_resources @@ -27,6 +28,7 @@ from nexustiles.nexustiles import NexusTileService from webservice import NexusHandler from webservice.nexus_tornado.request.handlers import NexusRequestHandler +from nexus_tornado.request.handlers import NexusHandlerManager def inject_args_in_config(args, config): @@ -78,10 +80,7 @@ def inject_args_in_config(args, config): parse_command_line() algorithm_config = inject_args_in_config(options, algorithm_config) - moduleDirs = webconfig.get("modules", "module_dirs").split(",") - for moduleDir in moduleDirs: - log.info("Loading modules from %s" % moduleDir) - importlib.import_module(moduleDir) + module_dirs = webconfig.get("modules", "module_dirs").split(",") staticDir = webconfig.get("static", "static_dir") staticEnabled = webconfig.get("static", "static_enabled") == "true" @@ -94,47 +93,16 @@ def inject_args_in_config(args, config): else: log.info("Static resources disabled") - handlers = [] - - log.info("Running Nexus Initializers") - NexusHandler.executeInitializers(algorithm_config) - max_request_threads = webconfig.getint("global", "server.max_simultaneous_requests") - log.info("Initializing request ThreadPool to %s" % max_request_threads) - request_thread_pool = tornado.concurrent.futures.ThreadPoolExecutor(max_request_threads) tile_service_factory = partial(NexusTileService, False, False, algorithm_config) - spark_context = None - for clazzWrapper in NexusHandler.AVAILABLE_HANDLERS: - if issubclass(clazzWrapper, webservice.algorithms_spark.NexusCalcSparkHandler.NexusCalcSparkHandler): - if spark_context is None: - from pyspark.sql import SparkSession - - spark = SparkSession.builder.appName("nexus-analysis").getOrCreate() - spark_context = spark.sparkContext - - handlers.append((clazzWrapper.path, - NexusRequestHandler, - dict(clazz=clazzWrapper, - tile_service_factory=tile_service_factory, - sc=spark_context, - thread_pool=request_thread_pool))) - else: - handlers.append((clazzWrapper.path, - NexusRequestHandler, - dict(clazz=clazzWrapper, - tile_service_factory=tile_service_factory, - thread_pool=request_thread_pool))) - - class VersionHandler(tornado.web.RequestHandler): - def get(self): - self.write(pkg_resources.get_distribution("nexusanalysis").version) - - handlers.append((r"/version", VersionHandler)) + job_pool = {} - if staticEnabled: - handlers.append( - (r'/(.*)', tornado.web.StaticFileHandler, {'path': staticDir, "default_filename": "index.html"})) + nexus_handler_manager = NexusHandlerManager(module_dirs, + algorithm_config, tile_service_factory, + max_request_threads=max_request_threads, + static_dir=staticDir) + handlers = nexus_handler_manager.get_handlers() app = tornado.web.Application( handlers, diff --git a/analysis/webservice/webmodel/Exceptions.py b/analysis/webservice/webmodel/Exceptions.py index c07174e6..d8594513 100644 --- a/analysis/webservice/webmodel/Exceptions.py +++ b/analysis/webservice/webmodel/Exceptions.py @@ -16,4 +16,4 @@ def __init__(self, reason="No data found for the selected timeframe"): class DatasetNotFoundException(NexusProcessingException): def __init__(self, reason="Dataset not found"): - NexusProcessingException.__init__(self, StandardNexusErrors.DATASET_MISSING, reason, code=404) \ No newline at end of file + NexusProcessingException.__init__(self, StandardNexusErrors.DATASET_MISSING, reason, code=404) diff --git a/data-access/nexustiles/config/datastores.ini.default b/data-access/nexustiles/config/datastores.ini.default index ed40068e..2faae536 100644 --- a/data-access/nexustiles/config/datastores.ini.default +++ b/data-access/nexustiles/config/datastores.ini.default @@ -1,5 +1,5 @@ [cassandra] -host=localhost +host=sdap-cassandra port=9042 keyspace=nexustiles local_datacenter=datacenter1 diff --git a/data-access/nexustiles/dao/CassandraProxy.py b/data-access/nexustiles/dao/CassandraProxy.py index a8a4e6e6..9a2d6ee6 100644 --- a/data-access/nexustiles/dao/CassandraProxy.py +++ b/data-access/nexustiles/dao/CassandraProxy.py @@ -17,6 +17,7 @@ import uuid from ConfigParser import NoOptionError +from cassandra.auth import PlainTextAuthProvider import nexusproto.DataTile_pb2 as nexusproto import numpy as np from cassandra.auth import PlainTextAuthProvider @@ -161,6 +162,9 @@ def __init__(self, config): self.__cass_protocol_version = config.getint("cassandra", "protocol_version") self.__cass_dc_policy = config.get("cassandra", "dc_policy") + logger.info("Setting cassandra host to " + self.__cass_url) + logger.info("Setting cassandra username to " + self.__cass_username) + try: self.__cass_port = config.getint("cassandra", "port") except NoOptionError: diff --git a/data-access/tests/config/datastores.ini b/data-access/tests/config/datastores.ini deleted file mode 100644 index 194760cb..00000000 --- a/data-access/tests/config/datastores.ini +++ /dev/null @@ -1,9 +0,0 @@ -[cassandra] -host=127.0.0.1 -keyspace=nexustiles -local_datacenter=datacenter1 -protocol_version=3 - -[solr] -host=localhost:8983 -core=nexustiles \ No newline at end of file diff --git a/helm/requirements.yaml b/helm/requirements.yaml index 7970f294..78cc52ed 100644 --- a/helm/requirements.yaml +++ b/helm/requirements.yaml @@ -6,6 +6,13 @@ dependencies: - name: rabbitmq version: 7.1.0 repository: https://charts.bitnami.com/bitnami - condition: ingestion.enabled - + condition: rabbitmq.enabled + - name: solr + version: 1.5.2 + repository: http://storage.googleapis.com/kubernetes-charts-incubator + condition: solr.enabled + - name: cassandra + version: 5.5.3 + repository: https://charts.bitnami.com/bitnami + condition: cassandra.enabled diff --git a/helm/templates/_helpers.tpl b/helm/templates/_helpers.tpl index b697c179..a016b2fa 100644 --- a/helm/templates/_helpers.tpl +++ b/helm/templates/_helpers.tpl @@ -45,3 +45,10 @@ The data volume mount which is used in both the Collection Manager and the Granu mountPath: {{ .Values.ingestion.granules.mountPath }} {{- end -}} +{{- define "nexus.urls.solr" -}} +{{ .Values.external.solrHostAndPort | default (print "http://" .Release.Name "-solr-svc:8983") }} +{{- end -}} + +{{- define "nexus.urls.zookeeper" -}} +{{ .Values.external.zookeeperHostAndPort | default (print .Release.Name "-zookeeper:2181") }} +{{- end -}} \ No newline at end of file diff --git a/helm/templates/cassandra.yml b/helm/templates/cassandra.yml deleted file mode 100644 index 6023e55e..00000000 --- a/helm/templates/cassandra.yml +++ /dev/null @@ -1,107 +0,0 @@ -apiVersion: v1 -kind: Service -metadata: - name: sdap-cassandra -spec: - clusterIP: None - ports: - - name: cql - port: 9042 - targetPort: cql - selector: - app: sdap-cassandra - ---- - -apiVersion: apps/v1 -kind: StatefulSet -metadata: - name: cassandra-set -spec: - serviceName: sdap-cassandra - replicas: {{ .Values.cassandra.replicas }} - selector: - matchLabels: - app: sdap-cassandra - template: - metadata: - labels: - app: sdap-cassandra - spec: - terminationGracePeriodSeconds: 120 - {{ if .Values.cassandra.tolerations }} - tolerations: -{{ .Values.cassandra.tolerations | toYaml | indent 6 }} - {{ end }} - {{ if .Values.cassandra.nodeSelector }} - nodeSelector: -{{ .Values.cassandra.nodeSelector | toYaml | indent 8 }} - {{ end }} - affinity: - podAntiAffinity: - # Prefer spreading over all hosts - preferredDuringSchedulingIgnoredDuringExecution: - - weight: 100 - podAffinityTerm: - labelSelector: - matchExpressions: - - key: "app" - operator: In - values: - - sdap-cassandra - topologyKey: "kubernetes.io/hostname" - containers: - - name: cassandra - image: nexusjpl/cassandra:1.0.0-rc1 - imagePullPolicy: Always - ports: - - containerPort: 7000 - name: intra-node - - containerPort: 7001 - name: tls-intra-node - - containerPort: 7199 - name: jmx - - containerPort: 9042 - name: cql - resources: - requests: - cpu: {{ .Values.cassandra.requests.cpu }} - memory: {{ .Values.cassandra.requests.memory }} - limits: - cpu: {{ .Values.cassandra.limits.cpu }} - memory: {{ .Values.cassandra.limits.memory }} - securityContext: - capabilities: - add: - - IPC_LOCK - lifecycle: - preStop: - exec: - command: - - /bin/sh - - -c - - nodetool drain - env: - - name: MAX_HEAP_SIZE - value: 2G - - name: HEAP_NEWSIZE - value: 200M - - name: CASSANDRA_SEEDS - value: "cassandra-set-0.sdap-cassandra" - - name: POD_IP - valueFrom: - fieldRef: - fieldPath: status.podIP - volumeMounts: - - name: cassandra-data - mountPath: /var/lib/cassandra - - volumeClaimTemplates: - - metadata: - name: cassandra-data - spec: - accessModes: [ "ReadWriteOnce" ] - storageClassName: {{ .Values.storageClass }} - resources: - requests: - storage: {{ .Values.cassandra.storage }} diff --git a/helm/templates/collection-manager.yml b/helm/templates/collection-manager.yml index 6708b133..e2815264 100644 --- a/helm/templates/collection-manager.yml +++ b/helm/templates/collection-manager.yml @@ -19,7 +19,7 @@ spec: spec: containers: - image: {{ .Values.ingestion.collectionManager.image }} - imagePullPolicy: Always + imagePullPolicy: IfNotPresent name: collection-manager env: - name: RABBITMQ_USERNAME @@ -30,9 +30,9 @@ spec: value: {{ .Values.rabbitmq.fullnameOverride }} - name: COLLECTIONS_PATH value: {{ include "nexus.collectionsConfig.mountPath" . }}/collections.yml - {{- if $history.url }} + {{- if $history.solrEnabled }} - name: HISTORY_URL - value: {{ .Values.ingestion.history.url}} + value: {{ include "nexus.urls.solr" . }} {{- else }} - name: HISTORY_PATH value: {{ include "nexus.history.mountPath" . }} @@ -46,7 +46,7 @@ spec: memory: {{ .Values.ingestion.collectionManager.memory }} volumeMounts: {{ include "nexus.ingestion.dataVolumeMount" . | indent 12 }} - {{- if not $history.url }} + {{- if not $history.solrEnabled }} - name: history-volume mountPath: {{ include "nexus.history.mountPath" . }} {{- end }} @@ -57,7 +57,7 @@ spec: - name: collections-config-volume configMap: name: {{ include "nexus.collectionsConfig.configmapName" . }} - {{- if not $history.url }} + {{- if not $history.solrEnabled }} - name: history-volume persistentVolumeClaim: claimName: history-volume-claim diff --git a/helm/templates/config-operator-rbac.yml b/helm/templates/config-operator-rbac.yml index 54064d51..6626b0ba 100644 --- a/helm/templates/config-operator-rbac.yml +++ b/helm/templates/config-operator-rbac.yml @@ -6,7 +6,7 @@ metadata: --- apiVersion: rbac.authorization.k8s.io/v1 -kind: RoleBinding +kind: ClusterRoleBinding metadata: name: config-operator-role-binding roleRef: @@ -16,4 +16,6 @@ roleRef: subjects: - kind: ServiceAccount name: config-operator + namespace: {{ .Release.Namespace }} + diff --git a/helm/templates/granule-ingester.yml b/helm/templates/granule-ingester.yml index 2ce03b68..bb616ad6 100644 --- a/helm/templates/granule-ingester.yml +++ b/helm/templates/granule-ingester.yml @@ -17,6 +17,7 @@ spec: spec: containers: - image: {{ .Values.ingestion.granuleIngester.image }} + imagePullPolicy: IfNotPresent name: granule-ingester env: - name: RABBITMQ_USERNAME @@ -26,9 +27,17 @@ spec: - name: RABBITMQ_HOST value: {{ .Values.rabbitmq.fullnameOverride }} - name: CASSANDRA_CONTACT_POINTS - value: sdap-cassandra - - name: SOLR_HOST_AND_PORT - value: http://sdap-solr:8983 + value: {{ .Release.Name }}-cassandra + - name: CASSANDRA_USERNAME + value: cassandra + - name: CASSANDRA_PASSWORD + value: cassandra + - name: ZK_HOST_AND_PORT + value: {{ include "nexus.urls.zookeeper" . }} + {{ if .Values.ingestion.granuleIngester.maxConcurrency }} + - name: MAX_CONCURRENCY + value: "{{ .Values.ingestion.granuleIngester.maxConcurrency }}" + {{ end }} resources: requests: cpu: {{ .Values.ingestion.granuleIngester.cpu }} diff --git a/helm/templates/history-pvc.yml b/helm/templates/history-pvc.yml index 3ecabe9d..ed18f767 100644 --- a/helm/templates/history-pvc.yml +++ b/helm/templates/history-pvc.yml @@ -2,6 +2,8 @@ apiVersion: v1 kind: PersistentVolumeClaim metadata: name: history-volume-claim + annotations: + helm.sh/resource-policy: "keep" spec: accessModes: - ReadWriteOnce diff --git a/helm/templates/init-cassandra-configmap.yml b/helm/templates/init-cassandra-configmap.yml new file mode 100644 index 00000000..3e7ed3cc --- /dev/null +++ b/helm/templates/init-cassandra-configmap.yml @@ -0,0 +1,13 @@ +apiVersion: v1 +data: + init.cql: | + CREATE KEYSPACE IF NOT EXISTS nexustiles WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }; + + CREATE TABLE IF NOT EXISTS nexustiles.sea_surface_temp ( + tile_id uuid PRIMARY KEY, + tile_blob blob + ); +kind: ConfigMap +metadata: + name: init-cassandra + namespace: {{ .Release.Namespace }} diff --git a/helm/templates/solr-create-collection.yml b/helm/templates/solr-create-collection.yml new file mode 100644 index 00000000..7ecb2e3a --- /dev/null +++ b/helm/templates/solr-create-collection.yml @@ -0,0 +1,34 @@ +{{ if .Values.solrInitEnabled }} +apiVersion: apps/v1 +kind: Deployment +metadata: + name: solr-create-collection +spec: + selector: + matchLabels: + app: solr-create-collection # has to match .spec.template.metadata.labels + replicas: 1 + template: + metadata: + labels: + app: solr-create-collection + spec: + containers: + - name: solr-create-collection + imagePullPolicy: Always + image: nexusjpl/solr-cloud-init:1.0.1 + resources: + requests: + memory: "0.5Gi" + cpu: "0.25" + env: + - name: MINIMUM_NODES + value: "{{ .Values.solr.replicaCount }}" + - name: SDAP_SOLR_URL + value: {{ include "nexus.urls.solr" . }}/solr/ + - name: SDAP_ZK_SOLR + value: {{ include "nexus.urls.zookeeper" . }}/solr + - name: CREATE_COLLECTION_PARAMS + value: "name=nexustiles&numShards=$(MINIMUM_NODES)&waitForFinalState=true" + restartPolicy: Always +{{ end }} \ No newline at end of file diff --git a/helm/templates/solr.yml b/helm/templates/solr.yml deleted file mode 100644 index c8d0f9b0..00000000 --- a/helm/templates/solr.yml +++ /dev/null @@ -1,129 +0,0 @@ -apiVersion: v1 -kind: Service -metadata: - name: sdap-solr -spec: - ports: - - port: 8983 - clusterIP: None - selector: - app: sdap-solr - ---- - -apiVersion: apps/v1 -kind: StatefulSet -metadata: - name: solr-set -spec: - selector: - matchLabels: - app: sdap-solr # has to match .spec.template.metadata.labels - serviceName: "sdap-solr" - replicas: {{.Values.solr.replicas }} # by default is 1 - podManagementPolicy: Parallel - template: - metadata: - labels: - app: sdap-solr # has to match .spec.selector.matchLabels - spec: - terminationGracePeriodSeconds: 10 - {{ if .Values.solr.tolerations }} - tolerations: -{{ .Values.solr.tolerations | toYaml | indent 6 }} - {{ end }} - {{ if .Values.solr.nodeSelector }} - nodeSelector: -{{ .Values.solr.nodeSelector | toYaml | indent 8 }} - {{ end }} - affinity: - podAntiAffinity: - # Prefer spreading over all hosts - preferredDuringSchedulingIgnoredDuringExecution: - - weight: 100 - podAffinityTerm: - labelSelector: - matchExpressions: - - key: "app" - operator: In - values: - - sdap-solr - topologyKey: "kubernetes.io/hostname" - securityContext: - runAsUser: 8983 - fsGroup: 8983 - containers: - - name: solr-create-collection - imagePullPolicy: Always - image: nexusjpl/solr-cloud-init:1.0.0-rc1 - resources: - requests: - memory: "1Gi" - cpu: "0.25" - env: - - name: MINIMUM_NODES - value: "2" # MINIMUM_NODES should be the same as spec.replicas - - name: SOLR_HOST - valueFrom: - fieldRef: - fieldPath: status.podIP - - name: SDAP_SOLR_URL - value: http://$(SOLR_HOST):8983/solr/ - - name: SDAP_ZK_SOLR - value: "zk-hs:2181/solr" - - name: CREATE_COLLECTION_PARAMS - value: "name=nexustiles&collection.configName=nexustiles&numShards=$(MINIMUM_NODES)&waitForFinalState=true" - - name: solr-cloud - imagePullPolicy: Always - image: nexusjpl/solr-cloud:1.0.0-rc1 - resources: - requests: - memory: {{ .Values.solr.requests.memory }} - cpu: {{ .Values.solr.requests.cpu }} - limits: - memory: {{ .Values.solr.limits.memory }} - cpu: {{ .Values.solr.limits.cpu }} - env: - - name: SOLR_HEAP - value: {{ .Values.solr.heap }} - - name: SOLR_HOST - valueFrom: - fieldRef: - fieldPath: status.podIP - - name: SDAP_ZK_SERVICE_HOST - value: "zk-hs" - ports: - - containerPort: 8983 - name: http - volumeMounts: - - name: solr-data - mountPath: /opt/solr/server/solr/ - readinessProbe: - exec: - command: - - solr - - healthcheck - - -c - - nexustiles - - -z - - zk-hs:2181/solr - initialDelaySeconds: 10 - timeoutSeconds: 5 - livenessProbe: - exec: - command: - - solr - - assert - - -s - - http://localhost:8983/solr/ - initialDelaySeconds: 10 - timeoutSeconds: 5 - volumeClaimTemplates: - - metadata: - name: solr-data - spec: - accessModes: [ "ReadWriteOnce" ] - storageClassName: {{ .Values.storageClass }} - resources: - requests: - storage: {{ .Values.solr.storage }} diff --git a/helm/templates/webapp.yml b/helm/templates/webapp.yml index d77496f1..e4e2adf3 100644 --- a/helm/templates/webapp.yml +++ b/helm/templates/webapp.yml @@ -9,8 +9,13 @@ spec: pythonVersion: "2" mode: cluster image: {{ .Values.webapp.distributed.image }} - imagePullPolicy: Always + imagePullPolicy: IfNotPresent mainApplicationFile: local:///incubator-sdap-nexus/analysis/webservice/webapp.py + arguments: + - --cassandra-host={{ .Release.Name }}-cassandra + - --cassandra-username=cassandra + - --cassandra-password=cassandra + - --solr-host={{ include "nexus.urls.solr" . }} sparkVersion: "2.4.4" restartPolicy: type: OnFailure diff --git a/helm/templates/zookeeper.yml b/helm/templates/zookeeper.yml deleted file mode 100644 index bdc39258..00000000 --- a/helm/templates/zookeeper.yml +++ /dev/null @@ -1,144 +0,0 @@ -apiVersion: v1 -kind: Service -metadata: - name: zk-hs - labels: - app: zk -spec: - ports: - - port: 2888 - name: server - - port: 3888 - name: leader-election - clusterIP: None - selector: - app: zk ---- -apiVersion: v1 -kind: Service -metadata: - name: zk-cs - labels: - app: zk -spec: - ports: - - port: 2181 - name: client - selector: - app: zk ---- -apiVersion: policy/v1beta1 -kind: PodDisruptionBudget -metadata: - name: zk-pdb -spec: - selector: - matchLabels: - app: zk - maxUnavailable: 1 ---- -apiVersion: apps/v1 -kind: StatefulSet -metadata: - name: zk -spec: - selector: - matchLabels: - app: zk - serviceName: zk-hs - replicas: {{ .Values.zookeeper.replicas }} - updateStrategy: - type: RollingUpdate - podManagementPolicy: Parallel - template: - metadata: - labels: - app: zk - spec: - {{ if .Values.zookeeper.tolerations }} - tolerations: -{{ .Values.zookeeper.tolerations | toYaml | indent 6 }} - {{ end }} - {{ if .Values.zookeeper.nodeSelector }} - nodeSelector: -{{ .Values.zookeeper.nodeSelector | toYaml | indent 8 }} - {{ end }} - affinity: - podAntiAffinity: - preferredDuringSchedulingIgnoredDuringExecution: - - weight: 100 - podAffinityTerm: - labelSelector: - matchExpressions: - - key: "app" - operator: In - values: - - zk - topologyKey: "kubernetes.io/hostname" - containers: - - name: kubernetes-zookeeper - imagePullPolicy: Always - image: "k8s.gcr.io/kubernetes-zookeeper:1.0-3.4.10" - resources: - requests: - memory: {{ .Values.zookeeper.memory }} - cpu: {{ .Values.zookeeper.cpu }} - ports: - - containerPort: 2181 - name: client - - containerPort: 2888 - name: server - - containerPort: 3888 - name: leader-election - command: - - sh - - -c - - "start-zookeeper \ - --servers={{ .Values.zookeeper.replicas }} \ - --data_dir=/var/lib/zookeeper/data \ - --data_log_dir=/var/lib/zookeeper/data/log \ - --conf_dir=/opt/zookeeper/conf \ - --client_port=2181 \ - --election_port=3888 \ - --server_port=2888 \ - --tick_time=2000 \ - --init_limit=10 \ - --sync_limit=5 \ - --heap=512M \ - --max_client_cnxns=60 \ - --snap_retain_count=3 \ - --purge_interval=12 \ - --max_session_timeout=40000 \ - --min_session_timeout=4000 \ - --log_level=INFO" - readinessProbe: - exec: - command: - - sh - - -c - - "zookeeper-ready 2181" - initialDelaySeconds: 10 - timeoutSeconds: 5 - livenessProbe: - exec: - command: - - sh - - -c - - "zookeeper-ready 2181" - initialDelaySeconds: 10 - timeoutSeconds: 5 - volumeMounts: - - name: zkdatadir - mountPath: /var/lib/zookeeper - securityContext: - runAsUser: 1000 - fsGroup: 1000 - volumeClaimTemplates: - - metadata: - name: zkdatadir - spec: - accessModes: [ "ReadWriteOnce" ] - storageClassName: {{ .Values.storageClass }} - resources: - requests: - storage: {{ .Values.zookeeper.storage }} diff --git a/helm/values.yaml b/helm/values.yaml index c012e6e1..657dfe9b 100644 --- a/helm/values.yaml +++ b/helm/values.yaml @@ -31,7 +31,7 @@ ingestion: granuleIngester: replicas: 2 - image: nexusjpl/granule-ingester:0.0.1 + image: nexusjpl/granule-ingester:0.0.3 ## cpu refers to both request and limit cpu: 1 @@ -40,7 +40,7 @@ ingestion: memory: 1Gi collectionManager: - image: nexusjpl/collection-manager:0.0.2 + image: nexusjpl/collection-manager:0.0.3 ## cpu refers to both request and limit cpu: 0.5 @@ -97,34 +97,38 @@ ingestion: ## Defaults to a using a history directory, stored on a PVC using the storageClass defined in this file above history: ## Store ingestion history in a solr database instead of a filesystem directory - # url: http://history-solr + solrEnabled: true -cassandra: - replicas: 2 - storage: 13Gi - requests: - cpu: 1 - memory: 3Gi - limits: - cpu: 1 - memory: 3Gi +external: + solrHostAndPort: + zookeeperHostAndPort: -solr: - replicas: 2 - storage: 10Gi - heap: 4g - requests: - memory: 5Gi - cpu: 1 - limits: - memory: 5Gi - cpu: 1 +solrInitEnabled: true -zookeeper: - replicas: 3 - memory: 1Gi - cpu: 0.5 - storage: 8Gi +solr: + enabled: true + replicaCount: 3 + volumeClaimTemplates: + storageClassName: hostpath + storageSize: 10Gi + resources: + requests: + memory: 2Gi + cpu: 1 + limits: + memory: 2Gi + cpu: 1 + zookeeper: + replicaCount: 3 + persistence: + storageClass: hostpath + resources: + limits: + memory: 1Gi + cpu: 0.5 + requests: + memory: 1Gi + cpu: 0.5 ingressEnabled: false @@ -150,10 +154,32 @@ nginx-ingress: rabbitmq: ## fullnameOverride sets the name of the RabbitMQ service ## with which the ingestion components will communicate. + enabled: true + persistence: + storageClass: hostpath fullnameOverride: rabbitmq replicaCount: 1 auth: username: guest password: guest ingress: - enabled: true \ No newline at end of file + enabled: true + +cassandra: + enabled: true + initDBConfigMap: init-cassandra + dbUser: + user: cassandra + password: cassandra + cluster: + replicaCount: 1 + persistence: + storageClass: hostpath + size: 8Gi + resources: + requests: + cpu: 1 + memory: 8Gi + limits: + cpu: 1 + memory: 8Gi diff --git a/tools/doms/README.md b/tools/doms/README.md deleted file mode 100644 index c49fa4ab..00000000 --- a/tools/doms/README.md +++ /dev/null @@ -1,66 +0,0 @@ -# doms_reader.py -The functions in doms_reader.py read a DOMS netCDF file into memory, assemble a list of matches of satellite and in situ data, and optionally output the matches to a CSV file. Each matched pair contains one satellite data record and one in situ data record. - -The DOMS netCDF files hold satellite data and in situ data in different groups (`SatelliteData` and `InsituData`). The `matchIDs` netCDF variable contains pairs of IDs (matches) which reference a satellite data record and an in situ data record in their respective groups. These records have a many-to-many relationship; one satellite record may match to many in situ records, and one in situ record may match to many satellite records. The `assemble_matches` function assembles the individual data records into pairs based on their `dim` group dimension IDs as paired in the `matchIDs` variable. - -## Requirements -This tool was developed and tested with Python 2.7.5 and 3.7.0a0. -Imported packages: -* argparse -* netcdf4 -* sys -* datetime -* csv -* collections -* logging - - -## Functions -### Function: `assemble_matches(filename)` -Read a DOMS netCDF file into memory and return a list of matches from the file. - -#### Parameters -- `filename` (str): the DOMS netCDF file name. - -#### Returns -- `matches` (list): List of matches. - -Each list element in `matches` is a dictionary organized as follows: - For match `m`, netCDF group `GROUP` ('SatelliteData' or 'InsituData'), and netCDF group variable `VARIABLE`: - -`matches[m][GROUP]['matchID']`: netCDF `MatchedRecords` dimension ID for the match -`matches[m][GROUP]['GROUPID']`: GROUP netCDF `dim` dimension ID for the record -`matches[m][GROUP][VARIABLE]`: variable value - -For example, to access the timestamps of the satellite data and the in situ data of the first match in the list, along with the `MatchedRecords` dimension ID and the groups' `dim` dimension ID: -```python -matches[0]['SatelliteData']['time'] -matches[0]['InsituData']['time'] -matches[0]['SatelliteData']['matchID'] -matches[0]['SatelliteData']['SatelliteDataID'] -matches[0]['InsituData']['InsituDataID'] -``` - - -### Function: `matches_to_csv(matches, csvfile)` -Write the DOMS matches to a CSV file. Include a header of column names which are based on the group and variable names from the netCDF file. - -#### Parameters: -- `matches` (list): the list of dictionaries containing the DOMS matches as returned from the `assemble_matches` function. -- `csvfile` (str): the name of the CSV output file. - -## Usage -For example, to read some DOMS netCDF file called `doms_file.nc`: -### Command line -The main function for `doms_reader.py` takes one `filename` parameter (`doms_file.nc` argument in this example) for the DOMS netCDF file to read, calls the `assemble_matches` function, then calls the `matches_to_csv` function to write the matches to a CSV file `doms_matches.csv`. -``` -python doms_reader.py doms_file.nc -``` -``` -python3 doms_reader.py doms_file.nc -``` -### Importing `assemble_matches` -```python -from doms_reader import assemble_matches -matches = assemble_matches('doms_file.nc') -``` diff --git a/tools/doms/doms_reader.py b/tools/doms/doms_reader.py deleted file mode 100644 index c8229c48..00000000 --- a/tools/doms/doms_reader.py +++ /dev/null @@ -1,144 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import argparse -from netCDF4 import Dataset, num2date -import sys -import datetime -import csv -from collections import OrderedDict -import logging - -LOGGER = logging.getLogger("doms_reader") - -def assemble_matches(filename): - """ - Read a DOMS netCDF file and return a list of matches. - - Parameters - ---------- - filename : str - The DOMS netCDF file name. - - Returns - ------- - matches : list - List of matches. Each list element is a dictionary. - For match m, netCDF group GROUP (SatelliteData or InsituData), and - group variable VARIABLE: - matches[m][GROUP]['matchID']: MatchedRecords dimension ID for the match - matches[m][GROUP]['GROUPID']: GROUP dim dimension ID for the record - matches[m][GROUP][VARIABLE]: variable value - """ - - try: - # Open the netCDF file - with Dataset(filename, 'r') as doms_nc: - # Check that the number of groups is consistent w/ the MatchedGroups - # dimension - assert len(doms_nc.groups) == doms_nc.dimensions['MatchedGroups'].size,\ - ("Number of groups isn't the same as MatchedGroups dimension.") - - matches = [] - matched_records = doms_nc.dimensions['MatchedRecords'].size - - # Loop through the match IDs to assemble matches - for match in range(0, matched_records): - match_dict = OrderedDict() - # Grab the data from each platform (group) in the match - for group_num, group in enumerate(doms_nc.groups): - match_dict[group] = OrderedDict() - match_dict[group]['matchID'] = match - ID = doms_nc.variables['matchIDs'][match][group_num] - match_dict[group][group + 'ID'] = ID - for var in doms_nc.groups[group].variables.keys(): - match_dict[group][var] = doms_nc.groups[group][var][ID] - - # Create a UTC datetime field from timestamp - dt = num2date(match_dict[group]['time'], - doms_nc.groups[group]['time'].units) - match_dict[group]['datetime'] = dt - LOGGER.info(match_dict) - matches.append(match_dict) - - return matches - except (OSError, IOError) as err: - LOGGER.exception("Error reading netCDF file " + filename) - raise err - -def matches_to_csv(matches, csvfile): - """ - Write the DOMS matches to a CSV file. Include a header of column names - which are based on the group and variable names from the netCDF file. - - Parameters - ---------- - matches : list - The list of dictionaries containing the DOMS matches as returned from - assemble_matches. - csvfile : str - The name of the CSV output file. - """ - # Create a header for the CSV. Column names are GROUP_VARIABLE or - # GROUP_GROUPID. - header = [] - for key, value in matches[0].items(): - for otherkey in value.keys(): - header.append(key + "_" + otherkey) - - try: - # Write the CSV file - with open(csvfile, 'w') as output_file: - csv_writer = csv.writer(output_file) - csv_writer.writerow(header) - for match in matches: - row = [] - for group, data in match.items(): - for value in data.values(): - row.append(value) - csv_writer.writerow(row) - except (OSError, IOError) as err: - LOGGER.exception("Error writing CSV file " + csvfile) - raise err - -if __name__ == '__main__': - """ - Execution: - python doms_reader.py filename - OR - python3 doms_reader.py filename - """ - logging.basicConfig(format='%(asctime)s %(levelname)-8s %(message)s', - level=logging.INFO, - datefmt='%Y-%m-%d %H:%M:%S') - - p = argparse.ArgumentParser() - p.add_argument('filename', help='DOMS netCDF file to read') - args = p.parse_args() - - doms_matches = assemble_matches(args.filename) - - matches_to_csv(doms_matches, 'doms_matches.csv') - - - - - - - - - - - \ No newline at end of file