Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,5 @@
*.code-workspace
*.idea
*.DS_Store
analysis/webservice/algorithms/doms/domsconfig.ini
data-access/nexustiles/config/datastores.ini
venv/
3 changes: 2 additions & 1 deletion analysis/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ Python module that exposes NEXUS analytical capabilities via a HTTP webservice.
conda activate nexus-analysis
````

2. Install conda dependencies
2. Install conda dependencies and other dependencies

````
cd analysis
pip install asyncio # for asynchronous job management
conda install pyspark
conda install -c conda-forge --file conda-requirements.txt
#conda install numpy matplotlib mpld3 scipy netCDF4 basemap gdal pyproj=1.9.5.1 libnetcdf=4.3.3.1
Expand Down
1 change: 1 addition & 0 deletions analysis/conda-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ gdal==3.0.2
mock==2.0.0
singledispatch==3.4.0.3


3 changes: 1 addition & 2 deletions analysis/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@
# 'webservice.nexus_tornado.request.renderers'
#],
package_data={
'webservice': ['config/web.ini', 'config/algorithms.ini'],
'webservice.algorithms.doms': ['domsconfig.ini.default']
'webservice': ['config/web.ini', 'config/algorithms.ini']
},
data_files=[
('static', ['static/index.html'])
Expand Down
14 changes: 11 additions & 3 deletions analysis/webservice/NexusHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@

import logging
import types
from functools import partial

AVAILABLE_HANDLERS = []
AVAILABLE_LEGACY_HANDLERS = []
AVAILABLE_RESTAPI_HANDLERS = []
AVAILABLE_WPS_HANDLERS = []
AVAILABLE_INITIALIZERS = []


Expand All @@ -32,17 +35,22 @@ def nexus_initializer(clazz):
return clazz


def nexus_handler(clazz):
def nexus_handler(clazz, handler_list=AVAILABLE_LEGACY_HANDLERS):
log = logging.getLogger(__name__)
try:
clazz.validate()
log.info("Adding algorithm module '%s' with path '%s' (%s)" % (clazz.name, clazz.path, clazz))
AVAILABLE_HANDLERS.append(clazz)
handler_list.append(clazz)
except Exception as ex:
log.warn("Handler '%s' is invalid and will be skipped (reason: %s)" % (clazz, ex.message), exc_info=True)
return clazz


nexus_restapi_handler = partial(nexus_handler, handler_list=AVAILABLE_RESTAPI_HANDLERS)
nexus_wps_handler = partial(nexus_handler, handler_list=AVAILABLE_WPS_HANDLERS)



DEFAULT_PARAMETERS_SPEC = {
"ds": {
"name": "Dataset",
Expand Down
4 changes: 2 additions & 2 deletions analysis/webservice/algorithms/Capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import json

from webservice.NexusHandler import nexus_handler, AVAILABLE_HANDLERS
from webservice.NexusHandler import nexus_handler, AVAILABLE_LEGACY_HANDLERS
from webservice.algorithms.NexusCalcHandler import NexusCalcHandler
from webservice.webmodel import NexusResults

Expand All @@ -32,7 +32,7 @@ class CapabilitiesListCalcHandlerImpl(NexusCalcHandler):
def calc(self, computeOptions, **args):
capabilities = []

for capability in AVAILABLE_HANDLERS:
for capability in AVAILABLE_LEGACY_HANDLERS:
capabilityDef = {
"name": capability.name,
"path": capability.path,
Expand Down
37 changes: 15 additions & 22 deletions analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@
from datetime import datetime
from functools import partial

import uuid
import numpy as np
import shapely.geometry
from pytz import timezone

from webservice.NexusHandler import nexus_handler
from webservice.NexusHandler import nexus_handler, nexus_restapi_handler
from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler
from webservice.webmodel import NexusResults, NexusProcessingException, NoDataException

Expand All @@ -29,6 +30,7 @@


@nexus_handler
@nexus_restapi_handler
class TimeAvgMapNexusSparkHandlerImpl(NexusCalcSparkHandler):
# __singleton_lock = threading.Lock()
# __singleton_instance = None
Expand Down Expand Up @@ -67,19 +69,6 @@ class TimeAvgMapNexusSparkHandlerImpl(NexusCalcSparkHandler):
}
singleton = True

# @classmethod
# def instance(cls, algorithm_config=None, sc=None):
# with cls.__singleton_lock:
# if not cls.__singleton_instance:
# try:
# singleton_instance = cls()
# singleton_instance.set_config(algorithm_config)
# singleton_instance.set_spark_context(sc)
# cls.__singleton_instance = singleton_instance
# except AttributeError:
# pass
# return cls.__singleton_instance

def parse_arguments(self, request):
# Parse input arguments
self.log.debug("Parsing arguments")
Expand Down Expand Up @@ -118,7 +107,8 @@ def parse_arguments(self, request):

return ds, bounding_polygon, start_seconds_from_epoch, end_seconds_from_epoch, nparts_requested

def calc(self, compute_options, **args):
def calc(self, compute_options,
**args):
"""

:param compute_options: StatsComputeOptions
Expand All @@ -130,6 +120,7 @@ def calc(self, compute_options, **args):
metrics_record = self._create_metrics_record()

ds, bbox, start_time, end_time, nparts_requested = self.parse_arguments(compute_options)

self._setQueryParams(ds,
(float(bbox.bounds[1]),
float(bbox.bounds[3]),
Expand All @@ -147,13 +138,13 @@ def calc(self, compute_options, **args):
print('Found {} tiles'.format(len(nexus_tiles)))

daysinrange = self._get_tile_service().find_days_in_range_asc(bbox.bounds[1],
bbox.bounds[3],
bbox.bounds[0],
bbox.bounds[2],
ds,
start_time,
end_time,
metrics_callback=metrics_record.record_metrics)
bbox.bounds[3],
bbox.bounds[0],
bbox.bounds[2],
ds,
start_time,
end_time,
metrics_callback=metrics_record.record_metrics)
ndays = len(daysinrange)
if ndays == 0:
raise NoDataException(reason="No data found for selected timeframe")
Expand Down Expand Up @@ -262,6 +253,8 @@ def calc(self, compute_options, **args):
maxLon=bbox.bounds[2], ds=ds, startTime=start_time,
endTime=end_time)



@staticmethod
def _map(tile_service_factory, metrics_callback, tile_in_spark):
tile_bounds = tile_in_spark[0]
Expand Down
3 changes: 2 additions & 1 deletion analysis/webservice/algorithms_spark/TimeSeriesSpark.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from pytz import timezone
from scipy import stats
from webservice import Filtering as filtering
from webservice.NexusHandler import nexus_handler
from webservice.NexusHandler import nexus_handler, nexus_restapi_handler, nexus_wps_handler
from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler
from webservice.webmodel import NexusResults, NoDataException, NexusProcessingException

Expand All @@ -43,6 +43,7 @@


@nexus_handler
@nexus_restapi_handler
class TimeSeriesSparkHandlerImpl(NexusCalcSparkHandler):
name = "Time Series Spark"
path = "/timeSeriesSpark"
Expand Down
9 changes: 1 addition & 8 deletions analysis/webservice/algorithms_spark/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,11 @@
import CorrMapSpark
import DailyDifferenceAverageSpark
import HofMoellerSpark
import Matchup
import MaximaMinimaSpark
import NexusCalcSparkHandler
import TimeAvgMapSpark
import TimeSeriesSpark
import VarianceSpark

import NexusCalcSparkHandler

log = logging.getLogger(__name__)

Expand All @@ -46,11 +44,6 @@ def module_exists(module_name):
except ImportError:
pass

try:
import Matchup
except ImportError:
pass

try:
import TimeAvgMapSpark
except ImportError:
Expand Down
8 changes: 8 additions & 0 deletions analysis/webservice/config/spark_pools.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
<?xml version="1.0"?>
<allocations>
<pool name="default">
<schedulingMode>FAIR</schedulingMode>
<weight>1</weight>
<minShare>2</minShare>
</pool>
</allocations>
2 changes: 1 addition & 1 deletion analysis/webservice/config/web.ini
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ static_enabled=true
static_dir=static

[modules]
module_dirs=webservice.algorithms,webservice.algorithms_spark,webservice.algorithms.doms
module_dirs=webservice.algorithms,webservice.algorithms_spark
1 change: 1 addition & 0 deletions analysis/webservice/jobs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .job import Job
13 changes: 13 additions & 0 deletions analysis/webservice/jobs/job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from datetime import datetime


class Job():
def __init__(self):
self.request = None # NexusRequestObject
self.result_future = None # tornado.gen.Future
self.time_created = datetime.now()
self.time_done = None




Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import logging
import json
import uuid
from datetime import datetime, timedelta
import tornado.web
import tornado.ioloop
from webservice.nexus_tornado.request.renderers import NexusRendererFactory


class NexusAsyncJobHandler(tornado.web.RequestHandler):

_job_pool = {}
__logger = logging.getLogger('nexus')

obsolete_after = timedelta(hours=12)
clean_obsolete_every = timedelta(minutes=15)

@classmethod
def get_job_pool(cls):
return cls._job_pool

@classmethod
def start_jobs_cleaner(cls):

def clean():
for key, job in cls._job_pool.iteritems():
if datetime.now() - job.time_done > cls.obsolete_after:
cls.__logger.info("clean job {}".format(key))
del cls._job_pool[key]

tornado.ioloop.IOLoop.current().call_later(cls.clean_obsolete_every.seconds, clean)

def get(self, job_id):
self.__logger.info("get job among {}".format(self._job_pool))
if job_id in self._job_pool:
job = self._job_pool[job_id]
if job.result_future.done():
renderer = NexusRendererFactory.get_renderer(job.request)
renderer.render(self, job.result_future.result())
else:
self._non_completed_job_callback(job_id)

else:
self._non_existing_job_callback(job_id)

def _non_existing_job_callback(self, job_id, code=404):
message = "Job {} does not exist".format(job_id)
self._error_callback(message, code)

def _non_completed_job_callback(self, job_id, code=202):
message = "Job {} is being processed".format(job_id)
self._error_callback(message, code)

def _error_callback(self, message, code):
self.__logger.info(message, exc_info=True)

self.set_header("Content-Type", "application/json")
self.set_header("Cache-Control", "no-cache, no-store, must-revalidate")
self.set_header("Pragma", "no-cache")
self.set_header("Expires", 0)
self.set_status(code)

response = {
"error": message,
"code": code
}

self.write(json.dumps(response, indent=5))
self.finish()

def data_received(self, chunk):
pass

@classmethod
def get_short_job_id(cls):
while True:
job_id = str(uuid.uuid4())[:6]
if job_id not in cls._job_pool:
return job_id


Loading