diff --git a/ocrd/ocrd/cli/__init__.py b/ocrd/ocrd/cli/__init__.py index c982261e8b..1a1142ed2e 100644 --- a/ocrd/ocrd/cli/__init__.py +++ b/ocrd/ocrd/cli/__init__.py @@ -25,6 +25,7 @@ def get_help(self, ctx): from ocrd.cli.ocrd_tool import ocrd_tool_cli from ocrd.cli.workspace import workspace_cli from ocrd.cli.process import process_cli +from ocrd.cli.workflow import workflow_cli from ocrd.cli.bashlib import bashlib_cli from ocrd.cli.validate import validate_cli from ocrd.cli.resmgr import resmgr_cli @@ -43,6 +44,7 @@ def cli(**kwargs): # pylint: disable=unused-argument cli.add_command(ocrd_tool_cli) cli.add_command(workspace_cli) cli.add_command(process_cli) +cli.add_command(workflow_cli) cli.add_command(bashlib_cli) cli.add_command(zip_cli) cli.add_command(validate_cli) diff --git a/ocrd/ocrd/cli/process.py b/ocrd/ocrd/cli/process.py index 0e01294394..9f3e551929 100644 --- a/ocrd/ocrd/cli/process.py +++ b/ocrd/ocrd/cli/process.py @@ -9,7 +9,7 @@ import click from ocrd_utils import getLogger, initLogging -from ocrd.task_sequence import run_tasks +from ocrd.task_sequence import run_tasks, parse_tasks from ..decorators import ocrd_loglevel @@ -24,9 +24,10 @@ @click.argument('tasks', nargs=-1, required=True) def process_cli(log_level, mets, page_id, tasks, overwrite): """ - Process a series of tasks + Run processor CLIs in a series of tasks """ initLogging() log = getLogger('ocrd.cli.process') + tasks = parse_tasks(tasks) run_tasks(mets, log_level, page_id, tasks, overwrite) log.info("Finished") diff --git a/ocrd/ocrd/cli/server.py b/ocrd/ocrd/cli/server.py new file mode 100644 index 0000000000..3ee86aecf6 --- /dev/null +++ b/ocrd/ocrd/cli/server.py @@ -0,0 +1,201 @@ +""" +Flask application for uwsgi workflow server + +(This is not meant to be imported directly, but loaded from uwsgi.) +""" +import base64 +import os +import signal +import json +import flask +import uwsgi # added to module path by uwsgi runner + +from io import BytesIO +from ocrd_modelfactory import page_from_file +from ocrd_utils import getLogger, initLogging, pushd_popd +from ocrd.task_sequence import run_tasks, parse_tasks +from ocrd.resolver import Resolver +from PIL import Image +from tempfile import TemporaryDirectory + + +# unwrap user-defined workflow: +tasks = json.loads(uwsgi.opt["tasks"]) +loglevel = uwsgi.opt["loglevel"].decode() +timeout_per_page = int(uwsgi.opt["timeout_per_page"]) +workers = uwsgi.numproc +where = "GPU" # priority/general worker (i.e. contract worker / wage labourer) +if "CUDA_WORKERS" in os.environ: + gpu_workers = int(os.environ["CUDA_WORKERS"]) + assert gpu_workers <= workers, \ + "CUDA_WORKERS[%d] <= workers[%d] violated" % (gpu_workers, workers) +else: + gpu_workers = workers + +initLogging() +res = Resolver() +app = flask.Flask(__name__) +log = getLogger('ocrd.workflow.server') +if loglevel: + log.setLevel(loglevel) + +def setup_where(): + global where + log.debug("Setup for worker %d", uwsgi.worker_id()) + if uwsgi.worker_id() > gpu_workers: + # avoid GPU + os.environ["CUDA_VISIBLE_DEVICES"] = "" + where = 'CPU' + +def setup(): + global tasks + setup_where() + log.info("Parsing and instantiating %d tasks (on %s)", len(tasks), where) + tasks = parse_tasks(tasks) # raises exception if invalid (causing worker to exit) + for task in tasks: + task.instantiate() # returns False if impossible (causing CLI fallback below) + +@app.route('/process') +def process(): # pylint: disable=unused-variable + log.debug("Processing request: %s", str(flask.request)) + if flask.request.args.get("mets"): + mets = flask.request.args["mets"] + else: + return 'Error: No METS', 400 + # prevent multiple concurrent requests to the same workspace/METS + if not lock(mets): + return 'Error: Locked METS', 423 + if flask.request.args.get('page_id'): + page_id = flask.request.args["page_id"] + else: + page_id = '' + if flask.request.args.get('log_level'): + log_level = flask.request.args["log_level"] + else: + log_level = None + if flask.request.args.get('overwrite'): + overwrite = flask.request.args["overwrite"] in ["True", "true", "1"] + else: + overwrite = False + try: + _process(mets, page_id, log_level, overwrite) + except Exception as e: + log.exception("Request '%s' failed", str(flask.request.args)) + unlock(mets) + return 'Failed: %s' % str(e), 500 + unlock(mets) + return 'Finished' + +def _process(mets, page_id='', log_level=None, overwrite=False): + if page_id: + npages = len(page_id.split(',')) + else: + workspace = res.workspace_from_url(mets) + npages = len(workspace.mets.physical_pages) + timeout = timeout_per_page * npages + log.info("Processing %d tasks on %d pages (timeout=%ds)", len(tasks), npages, timeout) + # allow no more than timeout_per_page before restarting worker: + uwsgi.set_user_harakiri(timeout) # go, go, go! + # run the workflow + run_tasks(mets, log_level, page_id, tasks, overwrite) + uwsgi.set_user_harakiri(0) # take a breath! + +@app.route('/list-tasks') +def list_tasks(): # pylint: disable=unused-variable + seq = '' + for task in tasks: + seq += '\n' + str(task) + return seq + +@app.route('/shutdown') +def shutdown(): # pylint: disable=unused-variable + log.debug("Shutting down") + # does not work ("error managing signal 2 on worker 1"): + # uwsgi.signal(signal.SIGINT) + os.kill(uwsgi.masterpid(), signal.SIGINT) + return 'Stopped' + +def lock(mets): + uwsgi.lock() + try: + log.debug("locking '%s'", mets) + if uwsgi.cache_exists(mets): + granted = False + else: + uwsgi.cache_set(mets, b'running') + granted = True + finally: + uwsgi.unlock() + return granted + +def unlock(mets): + uwsgi.lock() + try: + log.debug("unlocking '%s'", mets) + uwsgi.cache_del(mets) + finally: + uwsgi.unlock() + +@app.route('/process_images', methods=["POST"]) +def process_images(): # pylint: disable=undefined-name + log.debug(f"Processing request: {flask.request}") + if flask.request.is_json: + req = flask.request.get_json() + + pages = {} + if "pages" in req: + for k, v in req["pages"].items(): + pages[k] = v + elif "PAGES" in req: + for k, v in pages["PAGES"].items(): + pages[k] = v + else: + return 'Missing "pages" param.', 400 + + try: + work_dir = TemporaryDirectory() + ws = res.workspace_from_nothing(directory=work_dir.name) + + for k, v in pages.items(): + img = Image.open(BytesIO(base64.b64decode(v))) + if img.mode != "RGB": + img = img.convert("RGB") + ws.save_image_file(img, k, "OCR-D-IMG", page_id=k, mimetype='image/png') + ws.save_mets() + ws.reload_mets() + except Exception as e: + work_dir.cleanup() + return f"An error occured while decoding image(s) and creating mets.xml. {e}", 400 + + try: + _process(ws.mets_target) + ws.reload_mets() + for k in pages.keys(): + pages[k] = {"img": None, "page": None} + + page_file = next(ws.mets.find_files( + pageId=k, + fileGrp=tasks[-1].output_file_grps[0], + )) + with pushd_popd(ws.directory): + if page_file and os.path.exists(page_file.local_filename): + with open(page_file.local_filename, "r", encoding="utf8") as f: + pages[k]["page"] = f.read() + img_path = page_from_file( + page_file + ).get_Page().get_AlternativeImage()[-1].get_filename() + if img_path and os.path.exists(img_path): + img = Image.open(img_path) + img_file = BytesIO() + img.save(img_file, format="PNG") + pages[k]["img"] = base64.b64encode(img_file.getvalue()).decode("utf8") + except Exception as e: + return f"Failed: {e}", 500 + finally: + work_dir.cleanup() + + return flask.json.jsonify(pages) + else: + return "Request was not JSON.", 400 + +setup() diff --git a/ocrd/ocrd/cli/workflow.py b/ocrd/ocrd/cli/workflow.py new file mode 100644 index 0000000000..e7c21ef8e8 --- /dev/null +++ b/ocrd/ocrd/cli/workflow.py @@ -0,0 +1,205 @@ +""" +CLI for task_sequence +""" +import sys +import os +import json +import subprocess +import click +import requests + +from ocrd_utils import getLogger, initLogging + +from ..decorators import ocrd_loglevel +from .process import process_cli + +@click.group("workflow") +def workflow_cli(): + """ + Process a series of tasks + """ + initLogging() + +# ---------------------------------------------------------------------- +# ocrd workflow process +# ---------------------------------------------------------------------- +@workflow_cli.command('process') +@ocrd_loglevel +@click.option('-m', '--mets', help="METS to process", default="mets.xml") +@click.option('-g', '--page-id', help="ID(s) of the pages to process") +@click.option('--overwrite', is_flag=True, default=False, help="Remove output pages/images if they already exist") +@click.argument('tasks', nargs=-1, required=True) +def process_cli_alias(log_level, mets, page_id, tasks, overwrite): + """ + Run processor CLIs in a series of tasks + + (alias for ``ocrd process``) + """ + process_cli(log_level, mets, page_id, tasks, overwrite) + +# ---------------------------------------------------------------------- +# ocrd workflow server +# ---------------------------------------------------------------------- +@workflow_cli.command('server') +@ocrd_loglevel +@click.option('-t', '--timeout', help="maximum processing time (in sec per page) before reloading worker (0 to disable)", default=0) +@click.option('-j', '--processes', help="number of parallel workers to spawn", default=1) +@click.option('-h', '--host', help="host name/IP to listen at", default='127.0.0.1') +@click.option('-p', '--port', help="TCP port to listen at", default=5000, type=click.IntRange(min=1024)) +@click.argument('tasks', nargs=-1, required=True) +def server_cli(log_level, timeout, processes, host, port, tasks): + """ + Start server for a series of tasks to run processor CLIs or APIs on workspaces + + Parse the given tasks and try to instantiate all Pythonic + processors among them with the given parameters. + Open a web server that listens on the given ``host`` and ``port`` + and queues requests into ``processes`` worker processes + for GET requests named ``/process`` with the following + (URL-encoded) arguments: + + mets (string): Path name (relative to the server's CWD, + or absolute) of the workspace to process + + page_id (string): Comma-separated list of page IDs to process + + log_level (int): Override all logger levels during processing + + overwrite (bool): Remove output pages/images if they already exist + + The server will handle each request by running the tasks + on the given workspace. Pythonic processors will be run via API + (on those same instances). Non-Pythonic processors (or those + not directly accessible in the current venv) will be run via CLI + normally, instantiating each time. + Also, between each contiguous chain of Pythonic tasks in the overall + series, no METS de/serialization will be performed. + + If processing does not finish before ``timeout`` seconds per page, + then the request will fail and the respective worker be reloaded. + + To see the server's workflow configuration, send a GET request named + ``/list-tasks``. + + Stop the server by sending SIGINT (e.g. via ctrl+c + on the terminal), or sending a GET request named ``/shutdown``. + """ + log = getLogger('ocrd.workflow.server') + log.debug("Running server with %d workers on http://%s:%d", processes, host, port) + result = subprocess.run(["uwsgi", "--http-socket", "%s:%d" % (host, port), + "--wsgi-file", os.path.join(os.path.dirname(__file__), 'server.py'), + "--callable", "app", "--need-app", + # "--disable-logging", # OCR-D logging is enough + # "--http-keepalive", "true", + # "--add-header", "Connection: Keep-Alive", + "--processes", "%d" % processes, + "--master", "--single-interpreter", + # "--daemonize2", # return from workflow CLI + "--lazy-apps", # fork before loading app + "--no-orphans", # kill workers if master dies + "--die-on-term", # do not reload on SIGTERM + "--reload-on-exception", # reload failed workers + "--enable-threads", # for multithreading in Numpy, TF, ... + "--cache2", "name=workspace_lock,items=100", + # wrap in JSON to retain list/quotes (not possible with pyargv): + "--set", "tasks=%s" % json.dumps(tasks), + # server log level: + "--set", "loglevel=%s" % (log_level or ''), + # worker timeout per page: + "--set", "timeout_per_page=%d" % timeout, + "--buffer-size", "102400"]) + return result.returncode + +# ---------------------------------------------------------------------- +# ocrd workflow client +# ---------------------------------------------------------------------- +@workflow_cli.group('client') +@click.option('-h', '--host', help="host name/IP to request from", default='127.0.0.1') +@click.option('-p', '--port', help="TCP port to request from", default=5000, type=click.IntRange(min=1024)) +@click.pass_context +def client_cli(ctx, host, port): + """ + Have the workflow server run commands + """ + url = 'http://' + host + ':' + str(port) + '/' + ctx.ensure_object(dict) + ctx.obj['URL'] = url + ctx.obj['log'] = getLogger('ocrd.workflow.client') + +@client_cli.command('process') +@ocrd_loglevel +@click.option('-m', '--mets', help="METS to process", default="mets.xml") +@click.option('-g', '--page-id', help="ID(s) of the pages to process") +@click.option('--overwrite', is_flag=True, default=False, help="Remove output pages/images if they already exist") +@click.pass_context +def client_process_cli(ctx, log_level, mets, page_id, overwrite): + """ + Have the workflow server process another workspace + """ + url = ctx.obj['URL'] + 'process' + params = {'mets': mets, + 'page_id': page_id, + 'log_level': log_level, + 'overwrite': str(overwrite) + } + try: + response = requests.get(url, params=params) + response.raise_for_status() + print(response.text) + if response.text == 'Finished': + sys.exit(0) + else: + sys.exit(1) + except requests.exceptions.HTTPError as err: + ctx.obj['log'].error("Server error: %s", err.response.text) + except requests.exceptions.ConnectionError as err: + ctx.obj['log'].error("Connection error: %s", err) + except requests.exceptions.Timeout as err: + ctx.obj['log'].error("Timeout error: %s", err) + except requests.exceptions.RequestException as err: + ctx.obj['log'].error("Unknown error: %s", err) + sys.exit(2) + +@client_cli.command('list-tasks') +@click.pass_context +def client_list_tasks_cli(ctx): + """ + Have the workflow server print the configured task sequence + """ + url = ctx.obj['URL'] + 'list-tasks' + try: + response = requests.get(url) + response.raise_for_status() + print(response.text) + sys.exit(0) + except requests.exceptions.HTTPError as err: + ctx.obj['log'].error("Server error: %s", err.response.text) + except requests.exceptions.ConnectionError as err: + ctx.obj['log'].error("Connection error: %s", err) + except requests.exceptions.Timeout as err: + ctx.obj['log'].error("Timeout error: %s", err) + except requests.exceptions.RequestException as err: + ctx.obj['log'].error("Unknown error: %s", err) + sys.exit(2) + +@client_cli.command('shutdown') +@click.pass_context +def client_shutdown_cli(ctx): + """ + Have the workflow server shutdown gracefully + """ + url = ctx.obj['URL'] + 'shutdown' + try: + response = requests.get(url) + print(response.text) + sys.exit(0) + except requests.exceptions.HTTPError as err: + ctx.obj['log'].error("Server error: %s", err.response.text) + except requests.exceptions.ConnectionError as err: + ctx.obj['log'].error("Connection error: %s", err) + except requests.exceptions.Timeout as err: + ctx.obj['log'].error("Timeout error: %s", err) + except requests.exceptions.RequestException as err: + ctx.obj['log'].error("Unknown error: %s", err) + sys.exit(2) + diff --git a/ocrd/ocrd/decorators/__init__.py b/ocrd/ocrd/decorators/__init__.py index dd0877bc32..6153105fcf 100644 --- a/ocrd/ocrd/decorators/__init__.py +++ b/ocrd/ocrd/decorators/__init__.py @@ -14,6 +14,7 @@ from ..resolver import Resolver from ..processor.base import run_processor +from ..server import ProcessingServer from .loglevel_option import ocrd_loglevel from .parameter_option import parameter_option, parameter_override_option @@ -21,18 +22,19 @@ from .mets_find_options import mets_find_options def ocrd_cli_wrap_processor( - processorClass, - ocrd_tool=None, - mets=None, - working_dir=None, - dump_json=False, - help=False, # pylint: disable=redefined-builtin - version=False, - overwrite=False, - show_resource=None, - list_resources=False, - **kwargs -): + processorClass, + ocrd_tool=None, + mets=None, + working_dir=None, + server=None, + log_level=None, + dump_json=False, + help=False, # pylint: disable=redefined-builtin + version=False, + overwrite=False, + show_resource=None, + list_resources=False, + **kwargs): if not sys.argv[1:]: processorClass(workspace=None, show_help=True) sys.exit(1) @@ -46,6 +48,23 @@ def ocrd_cli_wrap_processor( list_resources=list_resources ) sys.exit() + elif server: + initLogging() + LOG = getLogger('ocrd_cli_wrap_processor') + # Merge parameter overrides and parameters + if 'parameter_override' in kwargs: + set_json_key_value_overrides(kwargs['parameter'], *kwargs['parameter_override']) + # instantiate processor without workspace + processorArgs = dict() + for param in kwargs: + if param in ['parameter', 'input_file_grp', 'output_file_grp', 'page_timeout']: + processorArgs[param] = kwargs[param] + host, port, workers = server + options = {'bind': '%s:%s' % (host, port), + 'workers': workers, + 'loglevel': log_level} + server = ProcessingServer(processorClass, processorArgs, options) + server.run() else: initLogging() LOG = getLogger('ocrd_cli_wrap_processor') diff --git a/ocrd/ocrd/decorators/ocrd_cli_options.py b/ocrd/ocrd/decorators/ocrd_cli_options.py index 9f7f8cafa9..57be3fd78e 100644 --- a/ocrd/ocrd/decorators/ocrd_cli_options.py +++ b/ocrd/ocrd/decorators/ocrd_cli_options.py @@ -26,6 +26,7 @@ def cli(mets_url): option('-O', '--output-file-grp', help='File group(s) used as output.', default='OUTPUT'), option('-g', '--page-id', help="ID(s) of the pages to process"), option('--overwrite', help="Overwrite the output file group or a page range (--page-id)", is_flag=True, default=False), + option('-s', '--server', help='Run web server instead of one-shot processing (shifts mets/working-dir/page-id options to HTTP request arguments); pass network interface to bind to, TCP port, number of worker processes', nargs=3), option('-C', '--show-resource', help='Dump the content of processor resource RESNAME', metavar='RESNAME'), option('-L', '--list-resources', is_flag=True, default=False, help='List names of processor resources'), parameter_option, diff --git a/ocrd/ocrd/processor/__init__.py b/ocrd/ocrd/processor/__init__.py index f01e2b3c91..ed3e7e1cb3 100644 --- a/ocrd/ocrd/processor/__init__.py +++ b/ocrd/ocrd/processor/__init__.py @@ -2,6 +2,7 @@ Processor, ) from .helpers import ( + run_api, run_cli, run_processor, generate_processor_help diff --git a/ocrd/ocrd/processor/base.py b/ocrd/ocrd/processor/base.py index 5e7ab6e9bb..d07a2e90af 100644 --- a/ocrd/ocrd/processor/base.py +++ b/ocrd/ocrd/processor/base.py @@ -33,7 +33,7 @@ from ocrd_models.ocrd_page import MetadataItemType, LabelType, LabelsType # XXX imports must remain for backwards-compatibilty -from .helpers import run_cli, run_processor, generate_processor_help # pylint: disable=unused-import +from .helpers import run_api, run_cli, run_processor, generate_processor_help # pylint: disable=unused-import class Processor(): """ @@ -50,12 +50,10 @@ def __init__( workspace, ocrd_tool=None, parameter=None, - # TODO OCR-D/core#274 - # input_file_grp=None, - # output_file_grp=None, - input_file_grp="INPUT", - output_file_grp="OUTPUT", + input_file_grp=None, + output_file_grp=None, page_id=None, + server=None, show_resource=None, list_resources=False, show_help=False, @@ -139,6 +137,7 @@ def __init__( # FIXME HACK would be better to use pushd_popd(self.workspace.directory) # but there is no way to do that in process here since it's an # overridden method. chdir is almost always an anti-pattern. + self.old_pwd = getcwd() if self.workspace: self.old_pwd = getcwd() os.chdir(self.workspace.directory) diff --git a/ocrd/ocrd/processor/helpers.py b/ocrd/ocrd/processor/helpers.py index 41eeb0638c..a7c344c076 100644 --- a/ocrd/ocrd/processor/helpers.py +++ b/ocrd/ocrd/processor/helpers.py @@ -2,15 +2,17 @@ Helper methods for running and documenting processors """ from time import perf_counter, process_time +import os import json import inspect -from subprocess import run, PIPE +from subprocess import run from click import wrap_text from ocrd_utils import getLogger __all__ = [ 'generate_processor_help', + 'run_api', 'run_cli', 'run_processor' ] @@ -68,16 +70,42 @@ def run_processor( mets_url, working_dir ) - log = getLogger('ocrd.processor.helpers.run_processor') - log.debug("Running processor %s", processorClass) processor = processorClass( workspace, ocrd_tool=ocrd_tool, - page_id=page_id, input_file_grp=input_file_grp, output_file_grp=output_file_grp, + page_id=page_id, parameter=parameter ) + error = run_api(processor) + if error: + raise error + workspace.save_mets() + return processor + +def run_api(processor, + workspace=None, + page_id=None, + input_file_grp=None, + output_file_grp=None +): # pylint: disable=too-many-locals + """ + Set workspace and fileGrps for processor and run through it + + Args: + processor (object): Processor instance + """ + log = getLogger('ocrd.processor.helpers.run_processor') + log.debug("Running processor %s", processor.__class__.__name__) + if workspace: + processor.workspace = workspace + if page_id: + processor.page_id = page_id + if input_file_grp: + processor.input_file_grp = input_file_grp + if output_file_grp: + processor.output_file_grp = output_file_grp ocrd_tool = processor.ocrd_tool name = '%s v%s' % (ocrd_tool['executable'], processor.version) otherrole = ocrd_tool['steps'][0] @@ -85,31 +113,37 @@ def run_processor( log.debug("Processor instance %s (%s doing %s)", processor, name, otherrole) t0_wall = perf_counter() t0_cpu = process_time() - processor.process() + try: + oldcwd = os.getcwd() + os.chdir(processor.workspace.directory) + processor.process() + except Exception as err: + log.exception("Failure in processor '%s'" % ocrd_tool['executable']) + return err + finally: + os.chdir(oldcwd) t1_wall = perf_counter() - t0_wall t1_cpu = process_time() - t0_cpu logProfile.info("Executing processor '%s' took %fs (wall) %fs (CPU)( [--input-file-grp='%s' --output-file-grp='%s' --parameter='%s' --page-id='%s']" % ( ocrd_tool['executable'], t1_wall, t1_cpu, - input_file_grp or '', - output_file_grp or '', - json.dumps(parameter) or '', - page_id or '' + processor.input_file_grp or '', + processor.output_file_grp or '', + json.dumps(processor.parameter or {}), + processor.page_id or '' )) - workspace.mets.add_agent( + processor.workspace.mets.add_agent( name=name, _type='OTHER', othertype='SOFTWARE', role='OTHER', otherrole=otherrole, - notes=[({'option': 'input-file-grp'}, input_file_grp or ''), - ({'option': 'output-file-grp'}, output_file_grp or ''), - ({'option': 'parameter'}, json.dumps(parameter or '')), - ({'option': 'page-id'}, page_id or '')] + notes=[({'option': 'input-file-grp'}, processor.input_file_grp or ''), + ({'option': 'output-file-grp'}, processor.output_file_grp or ''), + ({'option': 'parameter'}, json.dumps(processor.parameter or {})), + ({'option': 'page-id'}, processor.page_id or '')] ) - workspace.save_mets() - return processor def run_cli( executable, @@ -218,6 +252,10 @@ def wrap(s): or JSON file path -P, --param-override KEY VAL Override a single JSON object key-value pair, taking precedence over --parameter + -s, --server HOST PORT WORKERS Run web server instead of one-shot processing + (shifts mets/working-dir/page-id options to + HTTP request arguments); pass network interface + to bind to, TCP port, number of worker processes -m, --mets URL-PATH URL or file path of METS to process -w, --working-dir PATH Working directory of local workspace -l, --log-level [OFF|ERROR|WARN|INFO|DEBUG|TRACE] diff --git a/ocrd/ocrd/server.py b/ocrd/ocrd/server.py new file mode 100644 index 0000000000..b3496ee84b --- /dev/null +++ b/ocrd/ocrd/server.py @@ -0,0 +1,186 @@ +""" +Flask application and gunicorn processing server for Processor +""" +import os +import signal +import multiprocessing as mp +import atexit +import json +import flask +import gunicorn.app.base + +from ocrd_validators import WorkspaceValidator +from ocrd_utils import getLogger +from ocrd.task_sequence import ProcessorTask +from .processor import run_api +from . import Resolver + +class ProcessingServer(gunicorn.app.base.BaseApplication): + + def __init__(self, processorClass, processorArgs, options=None): + # happens in pre-fork context + self.options = options or {'bind': '127.0.0.1:5000', 'workers': 1} + # TODOs: + # - add 'CUDA_VISIBLE_DEVICES' to 'raw_env' to options (server level instead of worker level) + # - customize 'errorlog' (over stdout) in options + # - customize 'accesslog' (over None) in options + self.options['accesslog'] = '-' + self.options['access_log_format'] = '%(t)s "%(r)s" %(s)s %(b)s "%(T)s"' + # - customize 'logger_class' in options + # - customize 'logconfig' or 'logconfig_dict' in options + # - customize 'access_log_format' in options + self.options['timeout'] = 0 # disable (timeout managed by workers on request level) + self.options['preload_app'] = False # instantiate workers independently + self.options['pre_fork'] = pre_fork # see below + self.options['post_fork'] = post_fork # see below + self.options['pre_request'] = pre_request # see below + self.options['post_request'] = post_request # see below + self.options['worker_abort'] = worker_abort # see below + self.processor_cls = processorClass + self.processor_opt = processorArgs + self.master_pid = os.getpid() + manager = mp.Manager() + self.master_lock = manager.Lock() + self.master_cache = manager.dict() + # (Manager creates an additional mp.Process on __enter__, + # and registers an atexit handler joining that in __exit__, + # but our forked workers inherit this. To prevent attempting + # to join a non-child, we need to remove that in post_fork.) + super().__init__() + + def load_config(self): + config = {key: value for key, value in self.options.items() + if key in self.cfg.settings and value is not None} + for key, value in config.items(): + self.cfg.set(key.lower(), value) + + def load(self): + # happens in (forked) worker context (because preload_app=False) + # instantiate + self.obj = self.processor_cls(None, **self.processor_opt) + self.exe = self.obj.ocrd_tool['executable'] + self.res = Resolver() + self.log = getLogger('ocrd.processor.server') + self.app = flask.Flask(self.exe) + # add routes + self.app.add_url_rule('/process', None, self.process) + self.app.add_url_rule('/list-tasks', None, self.list_tasks) + self.app.add_url_rule('/shutdown', None, self.shutdown) + return self.app + + def process(self): + self.log.debug("Processing request: %s", str(flask.request)) + if flask.request.args.get("mets"): + mets = flask.request.args["mets"] + else: + return 'Error: No METS', 400 + # prevent multiple concurrent requests to the same workspace/METS + if not self.lock(mets): + return 'Error: Locked METS', 423 + if flask.request.args.get('page_id'): + page_id = flask.request.args["page_id"] + else: + page_id = '' + # if flask.request.args.get('log_level'): + # log_level = flask.request.args["log_level"] + # else: + # log_level = None + if flask.request.args.get('overwrite'): + overwrite = flask.request.args["overwrite"] in ["True", "true", "1"] + else: + overwrite = False + try: + workspace = self.res.workspace_from_url(mets) + workspace.overwrite_mode = overwrite + report = WorkspaceValidator.check_file_grp( + workspace, + self.obj.input_file_grp, + '' if overwrite else self.obj.output_file_grp, + page_id) + if not report.is_valid: + raise Exception("Invalid input/output file grps:\n\t%s" % '\n\t'.join(report.errors)) + if page_id: + npages = len(page_id.split(',')) + else: + npages = len(workspace.mets.physical_pages) + # allow no more than page_timeout before restarting worker: + timeout = getattr(self.obj, 'page_timeout', 0) + timeout *= npages + self.log.info("Processing %s on %d pages of '%s' (timeout=%ds)", self.exe, npages, mets, timeout) + with Timeout(timeout, "processing %s on %s cancelled after %d seconds on %d pages" % ( + self.exe, mets, timeout, npages)): + # run the workflow + error = run_api(self.obj, workspace, page_id) + if error: + raise error + workspace.save_mets() + except Exception as e: + self.log.exception("Request '%s' failed", str(flask.request.args)) + self.unlock(mets) + return 'Failed: %s' % str(e), 500 + self.unlock(mets) + return 'Finished' + + def list_tasks(self): + task = ProcessorTask(self.exe, [self.obj.input_file_grp], [self.obj.output_file_grp], self.obj.parameter) + return str(task) + '\n' + + def shutdown(self): + self.log.debug("Shutting down") + os.kill(self.master_pid, signal.SIGTERM) + return 'Stopped' + + def lock(self, resource): + with self.master_lock: + if resource in self.master_cache: + return False + self.master_cache[resource] = True + return True + def unlock(self, resource): + with self.master_lock: + del self.master_cache[resource] + +class Timeout: + def __init__(self, seconds, message): + self.seconds = seconds + self.message = message + def _handler(self, signum, stack): + raise TimeoutError(self.message) + def __enter__(self): + signal.signal(signal.SIGALRM, self._handler) + signal.alarm(self.seconds) + def __exit__(self, *args): + signal.alarm(0) + +def pre_fork(server, worker): + # happens when worker (but not app/processor) was instantiated (but not forked yet) + worker.num_workers = server.num_workers # nominal value + worker.worker_id = len(server.WORKERS) + 1 # actual value + +def post_fork(server, worker): + # happens when worker (but not app/processor) was was instantiated (and forked) + # remove atexit handler for multiprocessing.Manager process + atexit.unregister(mp.util._exit_function) + # differentiate GPU workers from CPU workers via envvar + if "CUDA_WORKERS" in os.environ: + cuda_workers = int(os.environ["CUDA_WORKERS"]) + assert cuda_workers <= worker.num_workers, \ + "CUDA_WORKERS[%d] <= workers[%d] violated" % (cuda_workers, worker.num_workers) + else: + cuda_workers = worker.num_workers + if worker.worker_id > cuda_workers: + worker.log.debug("Setup for worker %d (non-CUDA)", worker.worker_id) + os.environ["CUDA_VISIBLE_DEVICES"] = "" # avoid GPU + else: + worker.log.debug("Setup for worker %d (normal)", worker.worker_id) + +def pre_request(worker, req): + worker.log.debug("%s %s at worker %d" % (req.method, req.path, worker.worker_id)) + +def post_request(worker, req, env, res): + worker.log.debug("%s %s at worker %d: %s" % (req.method, req.path, worker.worker_id, res)) + +def worker_abort(worker): + worker.log.debug("aborting worker %s", worker) + # FIXME: skip/fallback remaining pages, save_mets, signalling ... + # worker.app.obj.clean_up() diff --git a/ocrd/ocrd/task_sequence.py b/ocrd/ocrd/task_sequence.py index 79c4fdbd32..d10ff9ce5f 100644 --- a/ocrd/ocrd/task_sequence.py +++ b/ocrd/ocrd/task_sequence.py @@ -1,15 +1,27 @@ import json +import re +import sys from shlex import split as shlex_split from distutils.spawn import find_executable as which # pylint: disable=import-error,no-name-in-module from subprocess import run, PIPE -from ocrd_utils import getLogger, parse_json_string_or_file, set_json_key_value_overrides +# workaround venvs created for Python>=3.8 +from pkg_resources import load_entry_point # pylint: disable=unused-import + +from ocrd_utils import ( + getLogger, + setOverrideLogLevel, + parse_json_string_or_file, + set_json_key_value_overrides +) # from collections import Counter -from ocrd.processor.base import run_cli +from ocrd.processor.base import run_cli, run_api from ocrd.resolver import Resolver from ocrd_validators import ParameterValidator, WorkspaceValidator from ocrd_models import ValidationReport +_processor_class = None # for exec in ProcessorTask.instantiate + class ProcessorTask(): @classmethod @@ -44,6 +56,7 @@ def __init__(self, executable, input_file_grps, output_file_grps, parameters): self.output_file_grps = output_file_grps self.parameters = parameters self._ocrd_tool_json = None + self.instance = None # for API (instead of CLI) integration @property def ocrd_tool_json(self): @@ -78,6 +91,60 @@ def validate(self): raise Exception("Processor requires output_file_grp but none was provided.") return report + def instantiate(self): + from ocrd import decorators + logger = getLogger('ocrd.task_sequence.ProcessorTask') + program = which(self.executable) + if not program: + logger.warning("Cannot find processor '%s' in PATH", self.executable) + return False + # run CLI merely to do imports and fetch class + with open(program) as f: + # check shebang in first line of CLI file for Python + line = f.readline().strip() + if not re.fullmatch('[#][!].*/python[0-9.]*', line): + logger.info("Non-Pythonic processor '%s' breaks the chain", self.executable) + return False + # compile Python processor from CLI file + try: + code = compile(f.read(), program, 'exec') + except (TypeError, SyntaxError, ValueError) as e: + logger.warning("Cannot compile and instantiate processor '%s': %s", + self.executable, str(e)) + return False + # temporarily monkey-patch entry point and sys.exit/sys.argv + def ignore(anything): # pylint: disable=unused-argument + return + global _processor_class + _processor_class = None + def get_processor_class(cls, **kwargs): + global _processor_class + _processor_class = cls + wrap_processor = decorators.ocrd_cli_wrap_processor + decorators.ocrd_cli_wrap_processor = get_processor_class + sys_exit = sys.exit + sys.exit = ignore + sys_argv = sys.argv + sys.argv = [self.executable] + # run Python processor from CLI file + __name__ = '__main__' + try: + exec(code) + logger.info("Instantiating %s for processor '%s'", + _processor_class.__name__, self.executable) + # instantiate processor without workspace + self.instance = _processor_class(None, parameter=self.parameters) + # circumvent calling CLI to get .ocrd_tool_json + self._ocrd_tool_json = self.instance.ocrd_tool + except Exception as e: + logger.warning("Cannot exec and instantiate processor '%s': %s", + self.executable, str(e)) + # reset modules + sys.argv = sys_argv + sys.exit = sys_exit + decorators.ocrd_cli_wrap_processor = wrap_processor + return bool(self.instance) + def __str__(self): ret = '%s -I %s -O %s' % ( self.executable.replace('ocrd-', '', 1), @@ -117,43 +184,76 @@ def validate_tasks(tasks, workspace, page_id=None, overwrite=False): return report -def run_tasks(mets, log_level, page_id, task_strs, overwrite=False): +def parse_tasks(task_strs): + return [ProcessorTask.parse(task_str) for task_str in task_strs] + +def run_tasks(mets, log_level, page_id, tasks, overwrite=False): resolver = Resolver() workspace = resolver.workspace_from_url(mets) + if overwrite: + workspace.overwrite_mode = True log = getLogger('ocrd.task_sequence.run_tasks') - tasks = [ProcessorTask.parse(task_str) for task_str in task_strs] + if log_level: + setOverrideLogLevel(log_level) validate_tasks(tasks, workspace, page_id, overwrite) # Run the tasks + is_first = True + last_is_instance = False for task in tasks: - log.info("Start processing task '%s'", task) - - # execute cli - returncode = run_cli( - task.executable, - mets, - resolver, - workspace, - log_level=log_level, - page_id=page_id, - overwrite=overwrite, - input_file_grp=','.join(task.input_file_grps), - output_file_grp=','.join(task.output_file_grps), - parameter=json.dumps(task.parameters) - ) - - # check return code - if returncode != 0: - raise Exception("%s exited with non-zero return value %s." % (task.executable, returncode)) + is_instance = bool(task.instance) + log.info("Start processing %s task '%s'", + "API" if is_instance else "CLI", task) - log.info("Finished processing task '%s'", task) + if (not is_first and + not is_instance and + last_is_instance): + workspace.save_mets() + + if is_instance: + # execute API + error = run_api( + task.instance, + workspace, + page_id=page_id, + input_file_grp=','.join(task.input_file_grps), + output_file_grp=','.join(task.output_file_grps) + ) - # reload mets - workspace.reload_mets() + if error: + raise error + else: + # execute cli + returncode = run_cli( + task.executable, + mets, + resolver, + workspace, + log_level=log_level, + page_id=page_id, + overwrite=overwrite, + input_file_grp=','.join(task.input_file_grps), + output_file_grp=','.join(task.output_file_grps), + parameter=json.dumps(task.parameters) + ) + + # check return code + if returncode != 0: + raise Exception("%s exited with non-zero return value %s." % (task.executable, returncode)) + + workspace.reload_mets() # check output file groups are in mets for output_file_grp in task.output_file_grps: if not output_file_grp in workspace.mets.file_groups: raise Exception("Invalid state: expected output file group '%s' not in METS (despite processor success)" % output_file_grp) + + log.info("Finished processing task '%s'", task) + + is_first = False + last_is_instance = is_instance + + if last_is_instance: + workspace.save_mets() diff --git a/ocrd/requirements.txt b/ocrd/requirements.txt index 2da0163b74..fd49a05a33 100644 --- a/ocrd/requirements.txt +++ b/ocrd/requirements.txt @@ -5,6 +5,8 @@ requests lxml opencv-python-headless Flask +gunicorn +uwsgi jsonschema pyyaml Deprecated == 1.2.0 diff --git a/tests/test_task_sequence.py b/tests/test_task_sequence.py index e33da7c5f7..40d8b5de75 100644 --- a/tests/test_task_sequence.py +++ b/tests/test_task_sequence.py @@ -12,7 +12,7 @@ from ocrd_utils import pushd_popd, MIMETYPE_PAGE from ocrd.resolver import Resolver -from ocrd.task_sequence import run_tasks, validate_tasks, ProcessorTask +from ocrd.task_sequence import parse_tasks, run_tasks, validate_tasks, ProcessorTask class TestOcrdWfStep(TestCase): @@ -141,10 +141,11 @@ def test_task_run(self): ws.add_file('GRP0', content='', local_filename='GRP0/foo', ID='file0', mimetype=MIMETYPE_PAGE, pageId=None) ws.save_mets() files_before = len(ws.mets.find_all_files()) - run_tasks('mets.xml', 'DEBUG', None, [ + tasks = parse_tasks([ "dummy -I OCR-D-IMG -O GRP1", "dummy -I GRP1 -O GRP2", ]) + run_tasks('mets.xml', 'DEBUG', None, tasks) ws.reload_mets() # step 1: 2 images in OCR-D-IMG -> 2 images 2 PAGEXML in GRP1 # step 2: 2 images and 2 PAGEXML in GRP1 -> process just the PAGEXML