Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion optimizerapi/openapi/specification.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ paths:
- oauth2: []
- apikey: []
description: Run optimizer with the specified parameters
operationId: optimizerapi.optimizer.run
operationId: optimizerapi.optimizer_handler.run
responses:
"200":
description: Result of running the optimizer with the specified parameters
Expand Down
96 changes: 4 additions & 92 deletions optimizerapi/optimizer.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,17 @@
"""ProcessOptimizer web request handler
"""ProcessOptimizer executor

This file contains the main HTTP request handlers for exposing the ProcessOptimizer API.
The handler functions are mapped to the OpenAPI specification through the "operationId" field
in the specification.yml file found in the folder "openapi" in the root of this project.
This file contains the central logic for executing the optimizer requests.
It should only depend on ProcessOptimizer specifics and json related features.
"""

import os
import platform
import time
from time import strftime
import base64
import io
import json
import subprocess
import traceback
import hashlib
import json_tricks
from rq import Queue
from rq.job import Job
from rq.exceptions import NoSuchJobError
from rq.command import send_stop_job_command
from redis import Redis
from ProcessOptimizer import Optimizer, expected_minimum
from ProcessOptimizer.plots import (
plot_objective,
Expand All @@ -32,93 +23,14 @@
from ProcessOptimizer.space.constraints import SumEquals
import matplotlib.pyplot as plt
import numpy
import connexion

from .securepickle import pickleToString, get_crypto

numpy.random.seed(42)
if "REDIS_URL" in os.environ:
REDIS_URL = os.environ["REDIS_URL"]
else:
REDIS_URL = "redis://localhost:6379"
print("Connecting to" + REDIS_URL)
redis = Redis.from_url(REDIS_URL)
if "REDIS_TTL" in os.environ:
TTL = int(os.environ["REDIS_TTL"])
else:
TTL = 500
if "WORKER_TIMEOUT" in os.environ:
WORKER_TIMEOUT = os.environ["WORKER_TIMEOUT"]
else:
WORKER_TIMEOUT = "180"

queue = Queue(connection=redis)
plt.switch_backend("Agg")

def run(body) -> dict:
"""Executes the ProcessOptimizer

Returns
-------
dict
a JSON encodable dictionary representation of the result.
"""
try:
if "waitress.client_disconnected" in connexion.request.environ:
disconnect_check = connexion.request.environ["waitress.client_disconnected"]
else:

def disconnect_check():
return False

except RuntimeError:

def disconnect_check():
return False

if "USE_WORKER" in os.environ and os.environ["USE_WORKER"]:
body_hash = hashlib.new("sha256")
body_hash.update(json.dumps(body).encode())
job_id = body_hash.hexdigest()
try:
job = Job.fetch(job_id, connection=redis)

print("Found existing job")
except NoSuchJobError:
print(f"Creating new job (WORKER_TIMEOUT={WORKER_TIMEOUT})")
job = queue.enqueue(do_run_work, body, job_id=job_id, result_ttl=TTL, job_timeout=WORKER_TIMEOUT)
while job.return_value() is None:
if disconnect_check():
try:
print(f"Client disconnected, cancelling job {job.id}")
job.cancel()
send_stop_job_command(redis, job.id)
job.delete()
except Exception:
pass
return {}
time.sleep(0.2)
return job.return_value()
return do_run_work(body)


def do_run_work(body) -> dict:
""" "Handle the run request"""
try:
return __handle_run(body)
except IOError as err:
return ({"message": "I/O error", "error": str(err)}, 400)
except TypeError as err:
return ({"message": "Type error", "error": str(err)}, 400)
except ValueError as err:
return ({"message": "Validation error", "error": str(err)}, 400)
except Exception as err:
# Log unknown exceptions to support debugging
traceback.print_exc()
return ({"message": "Unknown error", "error": str(err)}, 500)


def __handle_run(body) -> dict:
def run(body) -> dict:
""" "Handle the run request"""
data = [(run["xi"], run["yi"]) for run in body["data"]]
cfg = body["optimizerConfig"]
Expand Down
105 changes: 105 additions & 0 deletions optimizerapi/optimizer_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
"""ProcessOptimizer web request handler

This file contains the main HTTP request handlers for exposing the ProcessOptimizer API.
The handler functions are mapped to the OpenAPI specification through the "operationId" field
in the specification.yml file found in the folder "openapi" in the root of this project.
"""

import os
import time
import json
import traceback
import hashlib
from rq import Queue
from rq.job import Job
from rq.exceptions import NoSuchJobError
from rq.command import send_stop_job_command
from redis import Redis
import connexion
from .optimizer import run as handle_run

if "REDIS_URL" in os.environ:
REDIS_URL = os.environ["REDIS_URL"]
else:
REDIS_URL = "redis://localhost:6379"
print("Connecting to" + REDIS_URL)
redis = Redis.from_url(REDIS_URL)
if "REDIS_TTL" in os.environ:
TTL = int(os.environ["REDIS_TTL"])
else:
TTL = 500
if "WORKER_TIMEOUT" in os.environ:
WORKER_TIMEOUT = os.environ["WORKER_TIMEOUT"]
else:
WORKER_TIMEOUT = "180"

queue = Queue(connection=redis)


def run(body) -> dict:
"""Executes the ProcessOptimizer

Returns
-------
dict
a JSON encodable dictionary representation of the result.
"""
try:
if "waitress.client_disconnected" in connexion.request.environ:
disconnect_check = connexion.request.environ["waitress.client_disconnected"]
else:

def disconnect_check():
return False

except RuntimeError:

def disconnect_check():
return False

if "USE_WORKER" in os.environ and os.environ["USE_WORKER"]:
body_hash = hashlib.new("sha256")
body_hash.update(json.dumps(body).encode())
job_id = body_hash.hexdigest()
try:
job = Job.fetch(job_id, connection=redis)

print("Found existing job")
except NoSuchJobError:
print(f"Creating new job (WORKER_TIMEOUT={WORKER_TIMEOUT})")
job = queue.enqueue(
do_run_work,
body,
job_id=job_id,
result_ttl=TTL,
job_timeout=WORKER_TIMEOUT,
)
while job.return_value() is None:
if disconnect_check():
try:
print(f"Client disconnected, cancelling job {job.id}")
job.cancel()
send_stop_job_command(redis, job.id)
job.delete()
except Exception:
pass
return {}
time.sleep(0.2)
return job.return_value()
return do_run_work(body)


def do_run_work(body) -> dict:
""" "Handle the run request"""
try:
return handle_run(body)
except IOError as err:
return ({"message": "I/O error", "error": str(err)}, 400)
except TypeError as err:
return ({"message": "Type error", "error": str(err)}, 400)
except ValueError as err:
return ({"message": "Validation error", "error": str(err)}, 400)
except Exception as err:
# Log unknown exceptions to support debugging
traceback.print_exc()
return ({"message": "Unknown error", "error": str(err)}, 500)
2 changes: 1 addition & 1 deletion tests/test_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from unittest.mock import patch
import copy
import collections.abc
from optimizerapi import optimizer
from optimizerapi import optimizer_handler as optimizer

# {'data': [{'xi': [651, 56, 722, 'Ræv'], 'yi': 1}, {'xi': [651, 42, 722, 'Ræv'], 'yi': 0.2}], 'optimizerConfig': {'baseEstimator': 'GP', 'acqFunc': 'gp_hedge', 'initialPoints': 5, 'kappa': 1.96, 'xi': 0.012, 'space': [{'type': 'numeric', 'name': 'Sukker', 'from': 0, 'to': 1000}, {'type': 'numeric', 'name': 'Peber', 'from': 0, 'to': 1000}, {'type': 'numeric', 'name': 'Hvedemel', 'from': 0, 'to': 1000}, {'type': 'category', 'name': 'Kunde', 'categories': ['Mus', 'Ræv']}]}}
# 'data': [{'xi': [0, 5, 'Rød'], 'yi': 10}, {'xi': [5, 8.33, 'Hvid'], 'yi': 3}, {'xi': [10, 1.66, 'Rød'], 'yi': 5}],
Expand Down
Loading