diff --git a/optimizerapi/openapi/specification.yml b/optimizerapi/openapi/specification.yml index b50292f..6648378 100644 --- a/optimizerapi/openapi/specification.yml +++ b/optimizerapi/openapi/specification.yml @@ -13,7 +13,7 @@ paths: - oauth2: [] - apikey: [] description: Run optimizer with the specified parameters - operationId: optimizerapi.optimizer.run + operationId: optimizerapi.optimizer_handler.run responses: "200": description: Result of running the optimizer with the specified parameters diff --git a/optimizerapi/optimizer.py b/optimizerapi/optimizer.py index 511ba1f..69ac65c 100644 --- a/optimizerapi/optimizer.py +++ b/optimizerapi/optimizer.py @@ -1,26 +1,17 @@ -"""ProcessOptimizer web request handler +"""ProcessOptimizer executor -This file contains the main HTTP request handlers for exposing the ProcessOptimizer API. -The handler functions are mapped to the OpenAPI specification through the "operationId" field -in the specification.yml file found in the folder "openapi" in the root of this project. +This file contains the central logic for executing the optimizer requests. +It should only depend on ProcessOptimizer specifics and json related features. """ import os import platform -import time from time import strftime import base64 import io import json import subprocess -import traceback -import hashlib import json_tricks -from rq import Queue -from rq.job import Job -from rq.exceptions import NoSuchJobError -from rq.command import send_stop_job_command -from redis import Redis from ProcessOptimizer import Optimizer, expected_minimum from ProcessOptimizer.plots import ( plot_objective, @@ -32,93 +23,14 @@ from ProcessOptimizer.space.constraints import SumEquals import matplotlib.pyplot as plt import numpy -import connexion from .securepickle import pickleToString, get_crypto numpy.random.seed(42) -if "REDIS_URL" in os.environ: - REDIS_URL = os.environ["REDIS_URL"] -else: - REDIS_URL = "redis://localhost:6379" -print("Connecting to" + REDIS_URL) -redis = Redis.from_url(REDIS_URL) -if "REDIS_TTL" in os.environ: - TTL = int(os.environ["REDIS_TTL"]) -else: - TTL = 500 -if "WORKER_TIMEOUT" in os.environ: - WORKER_TIMEOUT = os.environ["WORKER_TIMEOUT"] -else: - WORKER_TIMEOUT = "180" - -queue = Queue(connection=redis) plt.switch_backend("Agg") -def run(body) -> dict: - """Executes the ProcessOptimizer - - Returns - ------- - dict - a JSON encodable dictionary representation of the result. - """ - try: - if "waitress.client_disconnected" in connexion.request.environ: - disconnect_check = connexion.request.environ["waitress.client_disconnected"] - else: - def disconnect_check(): - return False - - except RuntimeError: - - def disconnect_check(): - return False - - if "USE_WORKER" in os.environ and os.environ["USE_WORKER"]: - body_hash = hashlib.new("sha256") - body_hash.update(json.dumps(body).encode()) - job_id = body_hash.hexdigest() - try: - job = Job.fetch(job_id, connection=redis) - - print("Found existing job") - except NoSuchJobError: - print(f"Creating new job (WORKER_TIMEOUT={WORKER_TIMEOUT})") - job = queue.enqueue(do_run_work, body, job_id=job_id, result_ttl=TTL, job_timeout=WORKER_TIMEOUT) - while job.return_value() is None: - if disconnect_check(): - try: - print(f"Client disconnected, cancelling job {job.id}") - job.cancel() - send_stop_job_command(redis, job.id) - job.delete() - except Exception: - pass - return {} - time.sleep(0.2) - return job.return_value() - return do_run_work(body) - - -def do_run_work(body) -> dict: - """ "Handle the run request""" - try: - return __handle_run(body) - except IOError as err: - return ({"message": "I/O error", "error": str(err)}, 400) - except TypeError as err: - return ({"message": "Type error", "error": str(err)}, 400) - except ValueError as err: - return ({"message": "Validation error", "error": str(err)}, 400) - except Exception as err: - # Log unknown exceptions to support debugging - traceback.print_exc() - return ({"message": "Unknown error", "error": str(err)}, 500) - - -def __handle_run(body) -> dict: +def run(body) -> dict: """ "Handle the run request""" data = [(run["xi"], run["yi"]) for run in body["data"]] cfg = body["optimizerConfig"] diff --git a/optimizerapi/optimizer_handler.py b/optimizerapi/optimizer_handler.py new file mode 100644 index 0000000..ee496e8 --- /dev/null +++ b/optimizerapi/optimizer_handler.py @@ -0,0 +1,105 @@ +"""ProcessOptimizer web request handler + +This file contains the main HTTP request handlers for exposing the ProcessOptimizer API. +The handler functions are mapped to the OpenAPI specification through the "operationId" field +in the specification.yml file found in the folder "openapi" in the root of this project. +""" + +import os +import time +import json +import traceback +import hashlib +from rq import Queue +from rq.job import Job +from rq.exceptions import NoSuchJobError +from rq.command import send_stop_job_command +from redis import Redis +import connexion +from .optimizer import run as handle_run + +if "REDIS_URL" in os.environ: + REDIS_URL = os.environ["REDIS_URL"] +else: + REDIS_URL = "redis://localhost:6379" +print("Connecting to" + REDIS_URL) +redis = Redis.from_url(REDIS_URL) +if "REDIS_TTL" in os.environ: + TTL = int(os.environ["REDIS_TTL"]) +else: + TTL = 500 +if "WORKER_TIMEOUT" in os.environ: + WORKER_TIMEOUT = os.environ["WORKER_TIMEOUT"] +else: + WORKER_TIMEOUT = "180" + +queue = Queue(connection=redis) + + +def run(body) -> dict: + """Executes the ProcessOptimizer + + Returns + ------- + dict + a JSON encodable dictionary representation of the result. + """ + try: + if "waitress.client_disconnected" in connexion.request.environ: + disconnect_check = connexion.request.environ["waitress.client_disconnected"] + else: + + def disconnect_check(): + return False + + except RuntimeError: + + def disconnect_check(): + return False + + if "USE_WORKER" in os.environ and os.environ["USE_WORKER"]: + body_hash = hashlib.new("sha256") + body_hash.update(json.dumps(body).encode()) + job_id = body_hash.hexdigest() + try: + job = Job.fetch(job_id, connection=redis) + + print("Found existing job") + except NoSuchJobError: + print(f"Creating new job (WORKER_TIMEOUT={WORKER_TIMEOUT})") + job = queue.enqueue( + do_run_work, + body, + job_id=job_id, + result_ttl=TTL, + job_timeout=WORKER_TIMEOUT, + ) + while job.return_value() is None: + if disconnect_check(): + try: + print(f"Client disconnected, cancelling job {job.id}") + job.cancel() + send_stop_job_command(redis, job.id) + job.delete() + except Exception: + pass + return {} + time.sleep(0.2) + return job.return_value() + return do_run_work(body) + + +def do_run_work(body) -> dict: + """ "Handle the run request""" + try: + return handle_run(body) + except IOError as err: + return ({"message": "I/O error", "error": str(err)}, 400) + except TypeError as err: + return ({"message": "Type error", "error": str(err)}, 400) + except ValueError as err: + return ({"message": "Validation error", "error": str(err)}, 400) + except Exception as err: + # Log unknown exceptions to support debugging + traceback.print_exc() + return ({"message": "Unknown error", "error": str(err)}, 500) diff --git a/tests/test_optimizer.py b/tests/test_optimizer.py index fd4fdbc..94a6c81 100644 --- a/tests/test_optimizer.py +++ b/tests/test_optimizer.py @@ -5,7 +5,7 @@ from unittest.mock import patch import copy import collections.abc -from optimizerapi import optimizer +from optimizerapi import optimizer_handler as optimizer # {'data': [{'xi': [651, 56, 722, 'Ræv'], 'yi': 1}, {'xi': [651, 42, 722, 'Ræv'], 'yi': 0.2}], 'optimizerConfig': {'baseEstimator': 'GP', 'acqFunc': 'gp_hedge', 'initialPoints': 5, 'kappa': 1.96, 'xi': 0.012, 'space': [{'type': 'numeric', 'name': 'Sukker', 'from': 0, 'to': 1000}, {'type': 'numeric', 'name': 'Peber', 'from': 0, 'to': 1000}, {'type': 'numeric', 'name': 'Hvedemel', 'from': 0, 'to': 1000}, {'type': 'category', 'name': 'Kunde', 'categories': ['Mus', 'Ræv']}]}} # 'data': [{'xi': [0, 5, 'Rød'], 'yi': 10}, {'xi': [5, 8.33, 'Hvid'], 'yi': 3}, {'xi': [10, 1.66, 'Rød'], 'yi': 5}],