diff --git a/src/ocrd/cli/network.py b/src/ocrd/cli/network.py index 72ecefae49..116f51cac8 100644 --- a/src/ocrd/cli/network.py +++ b/src/ocrd/cli/network.py @@ -12,7 +12,6 @@ client_cli, processing_server_cli, processing_worker_cli, - processor_server_cli, ) @@ -27,4 +26,3 @@ def network_cli(): network_cli.add_command(client_cli) network_cli.add_command(processing_server_cli) network_cli.add_command(processing_worker_cli) -network_cli.add_command(processor_server_cli) diff --git a/src/ocrd/decorators/__init__.py b/src/ocrd/decorators/__init__.py index 553b6fa57d..7e0e1815aa 100644 --- a/src/ocrd/decorators/__init__.py +++ b/src/ocrd/decorators/__init__.py @@ -41,7 +41,6 @@ def ocrd_cli_wrap_processor( list_resources=False, # ocrd_network params start # subcommand=None, - address=None, queue=None, log_filename=None, database=None, @@ -88,9 +87,8 @@ def ocrd_cli_wrap_processor( if list_resources: processor.list_resources() sys.exit() - if subcommand or address or queue or database: - # Used for checking/starting network agents for the WebAPI architecture - check_and_run_network_agent(processorClass, subcommand, address, database, queue) + if subcommand == "worker" or queue or database: + check_and_run_processing_worker(processorClass, database, queue) if 'parameter' in kwargs: # Disambiguate parameter file/literal, and resolve file @@ -160,54 +158,26 @@ def goexit(): run_processor(processorClass, mets_url=mets, workspace=workspace, **kwargs) -def check_and_run_network_agent(ProcessorClass, subcommand: str, address: str, database: str, queue: str): +def check_and_run_processing_worker(ProcessorClass, database: str, queue: str): + """ Check/start Processing Worker for the WebAPI architecture """ - """ - from ocrd_network import ProcessingWorker, ProcessorServer, AgentType - SUBCOMMANDS = [AgentType.PROCESSING_WORKER, AgentType.PROCESSOR_SERVER] - - if not subcommand: - raise ValueError("Subcommand options --address --queue and --database " - f"are only valid for subcommands: {SUBCOMMANDS}") - if subcommand not in SUBCOMMANDS: - raise ValueError(f"SUBCOMMAND can only be one of {SUBCOMMANDS}") + from ocrd_network import ProcessingWorker if not database: - raise ValueError(f"Option '--database' is invalid for subcommand {subcommand}") - - if subcommand == AgentType.PROCESSOR_SERVER: - if not address: - raise ValueError(f"Option '--address' required for subcommand {subcommand}") - if queue: - raise ValueError(f"Option '--queue' invalid for subcommand {subcommand}") - if subcommand == AgentType.PROCESSING_WORKER: - if address: - raise ValueError(f"Option '--address' invalid for subcommand {subcommand}") - if not queue: - raise ValueError(f"Option '--queue' required for subcommand {subcommand}") + raise ValueError("Option '--database' is required for the Processing Worker") + if not queue: + raise ValueError("Option '--queue' is required for the Processing Worker") processor = ProcessorClass(workspace=None) - if subcommand == AgentType.PROCESSING_WORKER: - processing_worker = ProcessingWorker( - rabbitmq_addr=queue, - mongodb_addr=database, - processor_name=processor.ocrd_tool['executable'], - ocrd_tool=processor.ocrd_tool, - processor_class=ProcessorClass, - ) - # The RMQConsumer is initialized and a connection to the RabbitMQ is performed - processing_worker.connect_consumer() - # Start consuming from the queue with name `processor_name` - processing_worker.start_consuming() - elif subcommand == AgentType.PROCESSOR_SERVER: - # TODO: Better validate that inside the ProcessorServer itself - host, port = address.split(':') - processor_server = ProcessorServer( - mongodb_addr=database, - processor_name=processor.ocrd_tool['executable'], - processor_class=ProcessorClass, - ) - processor_server.run_server(host=host, port=int(port)) - else: - raise ValueError(f"Unknown network agent type, must be one of: {SUBCOMMANDS}") + processing_worker = ProcessingWorker( + rabbitmq_addr=queue, + mongodb_addr=database, + processor_name=processor.ocrd_tool['executable'], + ocrd_tool=processor.ocrd_tool, + processor_class=ProcessorClass, + ) + # The RMQConsumer is initialized and a connection to the RabbitMQ is performed + processing_worker.connect_consumer() + # Start consuming from the queue with name `processor_name` + processing_worker.start_consuming() sys.exit(0) diff --git a/src/ocrd/decorators/ocrd_cli_options.py b/src/ocrd/decorators/ocrd_cli_options.py index e8c3d86854..cf676ad0b1 100644 --- a/src/ocrd/decorators/ocrd_cli_options.py +++ b/src/ocrd/decorators/ocrd_cli_options.py @@ -1,12 +1,10 @@ import click from click import option, Path, argument from ocrd_utils import DEFAULT_METS_BASENAME -from ocrd_network import AgentType from .parameter_option import parameter_option, parameter_override_option from .loglevel_option import loglevel_option from ocrd_network import ( DatabaseParamType, - ServerAddressParamType, QueueServerParamType ) @@ -40,7 +38,6 @@ def cli(**kwargs): parameter_override_option, loglevel_option, option('--log-filename', default=None), - option('--address', type=ServerAddressParamType()), option('--queue', type=QueueServerParamType()), option('--database', type=DatabaseParamType()), option('-R', '--resolve-resource'), @@ -50,13 +47,12 @@ def cli(**kwargs): option('-D', '--dump-module-dir', is_flag=True, default=False), option('-h', '--help', is_flag=True, default=False), option('-V', '--version', is_flag=True, default=False), - # Subcommand, only used for 'worker'/'server'. Cannot be handled in + # Subcommand, only used for 'worker'. Cannot be handled in # click because processors use the @command decorator and even if they # were using `group`, you cannot combine have a command with # subcommands. So we have to work around that by creating a # pseudo-subcommand handled in ocrd_cli_wrap_processor - argument('subcommand', nargs=1, required=False, - type=click.Choice(list(map(str, AgentType)))), + argument('subcommand', nargs=1, required=False, type=click.Choice(["worker"])), ] for param in params: param(f) diff --git a/src/ocrd/lib.bash b/src/ocrd/lib.bash index 52bde30258..20c3228d30 100644 --- a/src/ocrd/lib.bash +++ b/src/ocrd/lib.bash @@ -183,30 +183,23 @@ ocrd__parse_argv () { -V|--version) ocrd ocrd-tool "$OCRD_TOOL_JSON" version; exit ;; --queue) ocrd__worker_queue="$2" ; shift ;; --database) ocrd__worker_database="$2" ; shift ;; - --address) ocrd__worker_address="$2" ; shift ;; *) ocrd__raise "Unknown option '$1'" ;; esac shift done - if [ -v ocrd__worker_queue -o -v ocrd__worker_database -o -v ocrd__subcommand -o -v ocrd__worker_address ]; then + if [ -v ocrd__worker_queue -o -v ocrd__worker_database -o -v ocrd__subcommand ]; then if ! [ -v ocrd__subcommand ] ; then - ocrd__raise "Provide subcommand 'worker' or 'server' for Processing Worker / Processor Server" + ocrd__raise "Provide subcommand 'worker' for Processing Worker" elif ! [ -v ocrd__worker_database ]; then - ocrd__raise "For the Processing Worker / Processor Server --database is required" + ocrd__raise "For the Processing Worker --database is required" + elif ! [ -v ocrd__worker_queue ]; then + ocrd__raise "For the Processing Worker --queue is required" fi if [ ${ocrd__subcommand} = "worker" ]; then - if ! [ -v ocrd__worker_queue ]; then - ocrd__raise "For the Processing Worker --queue is required" - fi ocrd network processing-worker $OCRD_TOOL_NAME --queue "${ocrd__worker_queue}" --database "${ocrd__worker_database}" - elif [ ${ocrd__subcommand} = "server" ]; then - if ! [ -v ocrd__worker_address ]; then - ocrd__raise "For the Processor Server --address is required" - fi - ocrd network processor-server $OCRD_TOOL_NAME --database "${ocrd__worker_database}" --address "${ocrd__worker_address}" else - ocrd__raise "subcommand must be either 'worker' or 'server' not '${ocrd__subcommand}'" + ocrd__raise "subcommand must be 'worker' not '${ocrd__subcommand}'" fi exit fi diff --git a/src/ocrd/processor/base.py b/src/ocrd/processor/base.py index 8b46b1f49b..1fd810d1cd 100644 --- a/src/ocrd/processor/base.py +++ b/src/ocrd/processor/base.py @@ -1209,7 +1209,7 @@ def generate_processor_help(ocrd_tool, processor_instance=None, subcommand=None) ocrd_tool (dict): this processor's ``tools`` section of the module's ``ocrd-tool.json`` processor_instance (object, optional): the processor implementation (for adding any module/class/function docstrings) - subcommand (string): 'worker' or 'server' + subcommand (string, optional): 'worker' """ doc_help = '' if processor_instance: @@ -1235,7 +1235,6 @@ def generate_processor_help(ocrd_tool, processor_instance=None, subcommand=None) preserve_paragraphs=True) subcommands = '''\ worker Start a processing worker rather than do local processing - server Start a processor server rather than do local processing ''' processing_worker_options = '''\ @@ -1250,8 +1249,6 @@ def generate_processor_help(ocrd_tool, processor_instance=None, subcommand=None) ''' processing_server_options = '''\ - --address The Processor server address in format - "{host}:{port}" --database The MongoDB server address in format "mongodb://{host}:{port}" [mongodb://localhost:27018] @@ -1296,8 +1293,8 @@ def generate_processor_help(ocrd_tool, processor_instance=None, subcommand=None) parameter_help = ' NONE\n' else: def wrap(s): - return wrap_text(s, initial_indent=' '*3, - subsequent_indent=' '*4, + return wrap_text(s, initial_indent=' ' * 3, + subsequent_indent=' ' * 4, width=72, preserve_paragraphs=True) for param_name, param in ocrd_tool['parameters'].items(): parameter_help += wrap('"%s" [%s%s]' % ( @@ -1335,17 +1332,6 @@ def wrap(s): Options: {processing_worker_options} -''' - elif subcommand == 'server': - return f'''\ -Usage: {ocrd_tool['executable']} server [OPTIONS] - - Run {ocrd_tool['executable']} as a processor sever. - - {ocrd_tool['description']}{doc_help} - -Options: -{processing_server_options} ''' else: pass diff --git a/src/ocrd/processor/helpers.py b/src/ocrd/processor/helpers.py index 188e627e4f..431cb6e452 100644 --- a/src/ocrd/processor/helpers.py +++ b/src/ocrd/processor/helpers.py @@ -66,8 +66,7 @@ def run_processor( when a match occurs - as long as the program is being run. They only get deleted (and their resources freed) when as many as :py:data:`~ocrd_utils.config.OCRD_MAX_PROCESSOR_CACHE` instances have already been cached while this particular parameter set was re-used - least frequently. (See :py:class:`~ocrd_network.ProcessingWorker` and - :py:class:`~ocrd_network.ProcessorServer` for use-cases.) + least frequently. (See :py:class:`~ocrd_network.ProcessingWorker` for use-cases.) Args: processorClass (object): Python class of the module processor. diff --git a/src/ocrd_network/__init__.py b/src/ocrd_network/__init__.py index 189a48100a..e7f472fd10 100644 --- a/src/ocrd_network/__init__.py +++ b/src/ocrd_network/__init__.py @@ -1,7 +1,6 @@ from .client import Client -from .constants import AgentType, JobState +from .constants import JobState from .processing_server import ProcessingServer from .processing_worker import ProcessingWorker -from .processor_server import ProcessorServer from .param_validators import DatabaseParamType, ServerAddressParamType, QueueServerParamType from .server_cache import CacheLockedPages, CacheProcessingRequests diff --git a/src/ocrd_network/cli/__init__.py b/src/ocrd_network/cli/__init__.py index 1704b2aaf7..281eea77e4 100644 --- a/src/ocrd_network/cli/__init__.py +++ b/src/ocrd_network/cli/__init__.py @@ -1,11 +1,9 @@ from .client import client_cli from .processing_server import processing_server_cli from .processing_worker import processing_worker_cli -from .processor_server import processor_server_cli __all__ = [ 'client_cli', 'processing_server_cli', 'processing_worker_cli', - 'processor_server_cli' ] diff --git a/src/ocrd_network/cli/client.py b/src/ocrd_network/cli/client.py index b75dcbd580..3cd8c8ce60 100644 --- a/src/ocrd_network/cli/client.py +++ b/src/ocrd_network/cli/client.py @@ -49,7 +49,7 @@ def discovery_cli(): @click.option('--address', type=URL, help=ADDRESS_HELP) def check_deployed_processors(address: Optional[str]): """ - Get a list of deployed processing workers/processor servers. + Get a list of deployed processing workers. Each processor is shown only once regardless of the amount of deployed instances. """ client = Client(server_addr_processing=address) @@ -113,7 +113,6 @@ def check_processing_job_status(address: Optional[str], processing_job_id: str): @parameter_override_option @click.option('--result-queue-name') @click.option('--callback-url') -@click.option('--agent-type', default='worker') @click.option('-b', '--block', default=False, is_flag=True, help='If set, the client will block till job timeout, fail or success.') @click.option('-p', '--print-state', default=False, is_flag=True, @@ -129,9 +128,6 @@ def send_processing_job_request( parameter_override: List[Tuple[str, str]], result_queue_name: Optional[str], callback_url: Optional[str], - # TODO: This is temporally available to toggle - # between the ProcessingWorker/ProcessorServer - agent_type: Optional[str], block: Optional[bool], print_state: Optional[bool] ): @@ -142,7 +138,6 @@ def send_processing_job_request( "path_to_mets": mets, "description": "OCR-D Network client request", "input_file_grps": input_file_grp.split(','), - "agent_type": agent_type } if output_file_grp: req_params["output_file_grps"] = output_file_grp.split(',') diff --git a/src/ocrd_network/cli/processing_server.py b/src/ocrd_network/cli/processing_server.py index 50a42887c6..5fb611c0c5 100644 --- a/src/ocrd_network/cli/processing_server.py +++ b/src/ocrd_network/cli/processing_server.py @@ -12,8 +12,7 @@ def processing_server_cli(path_to_config, address: str): """ Start the Processing Server - (proxy between the user and the - Processing Worker(s) / Processor Server(s)) + (proxy between the user and the Processing Worker(s)) """ # Note, the address is already validated with the type field diff --git a/src/ocrd_network/cli/processor_server.py b/src/ocrd_network/cli/processor_server.py deleted file mode 100644 index 50529adda3..0000000000 --- a/src/ocrd_network/cli/processor_server.py +++ /dev/null @@ -1,31 +0,0 @@ -import click -from ocrd_network import DatabaseParamType, ProcessorServer, ServerAddressParamType - - -@click.command('processor-server') -@click.argument('processor_name', required=True, type=click.STRING) -@click.option('-a', '--address', - help='The URL of the processor server, format: host:port', - type=ServerAddressParamType(), - required=True) -@click.option('-d', '--database', - default="mongodb://localhost:27018", - help='The URL of the MongoDB, format: mongodb://host:port', - type=DatabaseParamType(), - required=True) -def processor_server_cli(processor_name: str, address: str, database: str): - """ - Start Processor Server - (standalone REST API OCR-D processor) - """ - try: - # Note, the address is already validated with the type field - host, port = address.split(':') - processor_server = ProcessorServer( - mongodb_addr=database, - processor_name=processor_name, - processor_class=None # For readability purposes assigned here - ) - processor_server.run_server(host=host, port=int(port)) - except Exception as e: - raise Exception("Processor server has failed with error") from e diff --git a/src/ocrd_network/constants.py b/src/ocrd_network/constants.py index 089e321dff..00661801d8 100644 --- a/src/ocrd_network/constants.py +++ b/src/ocrd_network/constants.py @@ -16,11 +16,6 @@ def __str__(self): return self.value -class AgentType(StrEnum): - PROCESSING_WORKER = "worker" - PROCESSOR_SERVER = "server" - - class DeployType(StrEnum): # Deployed by the Processing Server config file DOCKER = "docker" @@ -40,7 +35,7 @@ class JobState(StrEnum): failed = "FAILED" # The processing job is queued inside the RabbitMQ queued = "QUEUED" - # Processing job is currently running in a Worker or Processor Server + # Processing job is currently running on a Worker running = "RUNNING" # Processing job finished successfully success = "SUCCESS" @@ -53,7 +48,6 @@ class NetworkLoggingDirs(StrEnum): PROCESSING_JOBS = "processing_jobs" PROCESSING_SERVERS = "processing_servers" PROCESSING_WORKERS = "processing_workers" - PROCESSOR_SERVERS = "processor_servers" class ServerApiTags(StrEnum): diff --git a/src/ocrd_network/logging_utils.py b/src/ocrd_network/logging_utils.py index 2b9bffa1d0..bf979b0d86 100644 --- a/src/ocrd_network/logging_utils.py +++ b/src/ocrd_network/logging_utils.py @@ -2,7 +2,7 @@ from pathlib import Path from ocrd_utils import config, LOG_FORMAT, safe_filename -from .constants import AgentType, NetworkLoggingDirs +from .constants import NetworkLoggingDirs def configure_file_handler_with_formatter(logger: Logger, log_file: Path, mode: str = "a") -> None: @@ -54,10 +54,5 @@ def get_processing_server_logging_file_path(pid: int) -> Path: def get_processing_worker_logging_file_path(processor_name: str, pid: int) -> Path: - log_file: str = f"{AgentType.PROCESSING_WORKER}.{pid}.{processor_name}.log" + log_file: str = f"worker.{pid}.{processor_name}.log" return Path(get_root_logging_dir(NetworkLoggingDirs.PROCESSING_WORKERS), log_file) - - -def get_processor_server_logging_file_path(processor_name: str, pid: int) -> Path: - log_file: str = f"{AgentType.PROCESSOR_SERVER}.{pid}.{processor_name}.log" - return Path(get_root_logging_dir(NetworkLoggingDirs.PROCESSOR_SERVERS), log_file) diff --git a/src/ocrd_network/models/__init__.py b/src/ocrd_network/models/__init__.py index 774f8aa130..052900e57d 100644 --- a/src/ocrd_network/models/__init__.py +++ b/src/ocrd_network/models/__init__.py @@ -10,13 +10,11 @@ 'DBWorkflowScript', 'PYJobInput', 'PYJobOutput', - 'PYOcrdTool', 'PYResultMessage', 'PYWorkflowJobOutput' ] from .job import DBProcessorJob, DBWorkflowJob, PYJobInput, PYJobOutput, PYWorkflowJobOutput from .messages import PYResultMessage -from .ocrd_tool import PYOcrdTool from .workspace import DBWorkspace from .workflow import DBWorkflowScript diff --git a/src/ocrd_network/models/job.py b/src/ocrd_network/models/job.py index efc6750c48..04c57c8e9c 100644 --- a/src/ocrd_network/models/job.py +++ b/src/ocrd_network/models/job.py @@ -2,7 +2,7 @@ from datetime import datetime from pydantic import BaseModel from typing import Dict, List, Optional -from ..constants import AgentType, JobState +from ..constants import JobState class PYJobInput(BaseModel): @@ -18,9 +18,7 @@ class PYJobInput(BaseModel): parameters: dict = {} # Always set to empty dict when None, otherwise it fails ocr-d-validation result_queue_name: Optional[str] = None callback_url: Optional[str] = None - # Used to toggle between sending requests to different network agents - agent_type: AgentType = AgentType.PROCESSING_WORKER - # Auto generated by the Processing Server when forwarding to the Processor Server + # Auto generated by the Processing Server when forwarding to the Processing-Worker job_id: Optional[str] = None # If set, specifies a list of job ids this job depends on depends_on: Optional[List[str]] = None @@ -32,7 +30,6 @@ class Config: 'description': 'The description of this execution', 'input_file_grps': ['DEFAULT'], 'output_file_grps': ['OCR-D-BIN'], - 'agent_type': AgentType.PROCESSING_WORKER, 'page_id': 'PHYS_0001..PHYS_0003', 'parameters': {} } diff --git a/src/ocrd_network/models/ocrd_tool.py b/src/ocrd_network/models/ocrd_tool.py deleted file mode 100644 index b3e2ceaea8..0000000000 --- a/src/ocrd_network/models/ocrd_tool.py +++ /dev/null @@ -1,12 +0,0 @@ -from pydantic import BaseModel -from typing import List, Optional - - -class PYOcrdTool(BaseModel): - executable: str - categories: List[str] - description: str - input_file_grp: List[str] - output_file_grp: Optional[List[str]] - steps: List[str] - parameters: Optional[dict] = None diff --git a/src/ocrd_network/models/workspace.py b/src/ocrd_network/models/workspace.py index 670cb14b58..f323b1b424 100644 --- a/src/ocrd_network/models/workspace.py +++ b/src/ocrd_network/models/workspace.py @@ -17,7 +17,7 @@ class DBWorkspace(Document): key-value-pairs which are saved here deleted the document is deleted if set, however, the record is still preserved pages_locked a data structure that holds output `fileGrp`s and their respective locked `page_id` - that are currently being processed by an OCR-D processor (server or worker). + that are currently being processed by an OCR-D Processing-Worker. If no `page_id` field is set, an identifier "all_pages" will be used. mets_server_url If set, the reading from and writing to the mets file happens through the METS Server """ diff --git a/src/ocrd_network/processing_server.py b/src/ocrd_network/processing_server.py index ba90bf86a0..bdc16144dd 100644 --- a/src/ocrd_network/processing_server.py +++ b/src/ocrd_network/processing_server.py @@ -10,7 +10,7 @@ from ocrd.task_sequence import ProcessorTask from ocrd_utils import initLogging, getLogger -from .constants import AgentType, JobState, ServerApiTags +from .constants import JobState, ServerApiTags from .database import ( initiate_database, db_get_processing_job, @@ -34,14 +34,13 @@ from .rabbitmq_utils import ( check_if_queue_exists, connect_rabbitmq_publisher, - create_message_queues, + get_message_queues, OcrdProcessingMessage ) from .server_cache import CacheLockedPages, CacheProcessingRequests from .server_utils import ( create_processing_message, create_workspace_if_not_exists, - forward_job_to_processor_server, _get_processor_job, _get_processor_job_log, get_page_ids_list, @@ -51,7 +50,6 @@ kill_mets_server_zombies, parse_workflow_tasks, raise_http_exception, - request_processor_server_tool_json, validate_and_return_mets_path, validate_first_task_input_file_groups_existence, validate_job_input, @@ -104,7 +102,7 @@ def __init__(self, config_path: str, host: str, port: int) -> None: self.mets_server_proxy = MetsServerProxy() self.use_tcp_mets = self.deployer.use_tcp_mets # If set, all Mets Server UDS requests are multiplexed over TCP - # Used by processing workers and/or processor servers to report back the results + # Used by processing workers to report back the results if self.deployer.internal_callback_url: host = self.deployer.internal_callback_url self.internal_job_callback_url = f"{host.rstrip('/')}/result_callback" @@ -153,16 +151,10 @@ def start(self) -> None: # The RMQPublisher is initialized and a connection to the RabbitMQ is performed self.rmq_publisher = connect_rabbitmq_publisher(self.log, self.rmq_data, enable_acks=True) - queue_names = self.deployer.find_matching_network_agents( - worker_only=True, str_names_only=True, unique_only=True - ) - self.log.info(f"Creating message queues on RabbitMQ instance url: {self.rabbitmq_url}") - create_message_queues(logger=self.log, rmq_publisher=self.rmq_publisher, queue_names=queue_names) - - self.deployer.deploy_network_agents(mongodb_url=self.mongodb_url, rabbitmq_url=self.rabbitmq_url) + self.deployer.deploy_workers(mongodb_url=self.mongodb_url, rabbitmq_url=self.rabbitmq_url) except Exception as error: self.log.exception(f"Failed to start the Processing Server, error: {error}") - self.log.warning("Trying to stop previously deployed services and network agents.") + self.log.warning("Trying to stop previously deployed services and workers.") self.deployer.stop_all() raise uvicorn_run(self, host=self.hostname, port=int(self.port)) @@ -225,7 +217,7 @@ def add_api_routes_processing(self): ) processing_router.add_api_route( path="/processor/info/{processor_name}", - endpoint=self.get_network_agent_ocrd_tool, + endpoint=self.get_worker_ocrd_tool, methods=["GET"], tags=[ServerApiTags.PROCESSING, ServerApiTags.DISCOVERY], status_code=status.HTTP_200_OK, @@ -233,7 +225,7 @@ def add_api_routes_processing(self): ) processing_router.add_api_route( path="/processor/run/{processor_name}", - endpoint=self.validate_and_forward_job_to_network_agent, + endpoint=self.validate_and_forward_job_to_worker, methods=["POST"], tags=[ServerApiTags.PROCESSING], status_code=status.HTTP_200_OK, @@ -267,7 +259,7 @@ def add_api_routes_processing(self): methods=["POST"], tags=[ServerApiTags.PROCESSING], status_code=status.HTTP_200_OK, - summary="Callback used by a worker or processor server for reporting result of a processing request" + summary="Callback used by a worker for reporting result of a processing request" ) self.include_router(processing_router) @@ -351,68 +343,38 @@ async def home_page(self): async def stop_deployed_agents(self) -> None: self.deployer.stop_all() - def query_ocrd_tool_json_from_server(self, processor_name: str) -> Dict: - processor_server_base_url = self.deployer.resolve_processor_server_url(processor_name) - if processor_server_base_url == '': - message = f"Processor Server URL of '{processor_name}' not found" - raise_http_exception(self.log, status.HTTP_404_NOT_FOUND, message=message) - return request_processor_server_tool_json(self.log, processor_server_base_url=processor_server_base_url) - - async def get_network_agent_ocrd_tool( - self, processor_name: str, agent_type: AgentType = AgentType.PROCESSING_WORKER - ) -> Dict: + async def get_worker_ocrd_tool(self, processor_name: str) -> Dict: ocrd_tool = {} - error_message = f"Network agent of type '{agent_type}' for processor '{processor_name}' not found." - if agent_type != AgentType.PROCESSING_WORKER and agent_type != AgentType.PROCESSOR_SERVER: - message = f"Unknown agent type: {agent_type}, {type(agent_type)}" - raise_http_exception(self.log, status_code=status.HTTP_501_NOT_IMPLEMENTED, message=message) - if agent_type == AgentType.PROCESSING_WORKER: - ocrd_tool = self.ocrd_all_tool_json.get(processor_name, None) - if agent_type == AgentType.PROCESSOR_SERVER: - ocrd_tool = self.query_ocrd_tool_json_from_server(processor_name) + ocrd_tool = self.ocrd_all_tool_json.get(processor_name, None) if not ocrd_tool: - raise_http_exception(self.log, status.HTTP_404_NOT_FOUND, error_message) + raise_http_exception(self.log, status.HTTP_404_NOT_FOUND, + f"Processing Worker '{processor_name}' not found.") return ocrd_tool - def network_agent_exists_server(self, processor_name: str) -> bool: - processor_server_url = self.deployer.resolve_processor_server_url(processor_name) - return bool(processor_server_url) - - def network_agent_exists_worker(self, processor_name: str) -> bool: + def exists_worker(self, processor_name: str) -> bool: # TODO: Reconsider and refactor this. # Added ocrd-dummy by default if not available for the integration tests. - # A proper Processing Worker / Processor Server registration endpoint - # is needed on the Processing Server side + # A proper Processing Worker registration endpoint is needed on the Processing Server side if processor_name == 'ocrd-dummy': return True return bool(check_if_queue_exists(self.log, self.rmq_data, processor_name=processor_name)) - def validate_agent_type_and_existence(self, processor_name: str, agent_type: AgentType) -> None: - agent_exists = False - if agent_type == AgentType.PROCESSOR_SERVER: - agent_exists = self.network_agent_exists_server(processor_name=processor_name) - elif agent_type == AgentType.PROCESSING_WORKER: - agent_exists = self.network_agent_exists_worker(processor_name=processor_name) - else: - message = f"Unknown agent type: {agent_type}, {type(agent_type)}" - raise_http_exception(self.log, status_code=status.HTTP_501_NOT_IMPLEMENTED, message=message) - if not agent_exists: - message = f"Network agent of type '{agent_type}' for processor '{processor_name}' not found." + def validate_worker_existence(self, processor_name: str) -> None: + worker_exists = self.exists_worker(processor_name=processor_name) + if not worker_exists: + message = f"Processing Worker '{processor_name}' not found." raise_http_exception(self.log, status.HTTP_422_UNPROCESSABLE_ENTITY, message) - async def validate_and_forward_job_to_network_agent(self, processor_name: str, data: PYJobInput) -> PYJobOutput: + async def validate_and_forward_job_to_worker(self, processor_name: str, data: PYJobInput) -> PYJobOutput: # Append the processor name to the request itself data.processor_name = processor_name - self.validate_agent_type_and_existence(processor_name=data.processor_name, agent_type=data.agent_type) + self.validate_worker_existence(processor_name=data.processor_name) if data.job_id: message = f"Processing request job id field is set but must not be: {data.job_id}" raise_http_exception(self.log, status.HTTP_422_UNPROCESSABLE_ENTITY, message) # Generate processing job id data.job_id = generate_id() - ocrd_tool = await self.get_network_agent_ocrd_tool( - processor_name=data.processor_name, - agent_type=data.agent_type - ) + ocrd_tool = await self.get_worker_ocrd_tool(processor_name=data.processor_name) validate_job_input(self.log, data.processor_name, ocrd_tool, data) if data.workspace_id: @@ -492,19 +454,13 @@ async def validate_and_forward_job_to_network_agent(self, processor_name: str, d ) await db_queued_job.insert() self.cache_processing_requests.update_request_counter(workspace_key=workspace_key, by_value=1) - job_output = await self.push_job_to_network_agent(data=data, db_job=db_queued_job) + job_output = await self.push_job_to_worker(data=data, db_job=db_queued_job) return job_output - async def push_job_to_network_agent(self, data: PYJobInput, db_job: DBProcessorJob) -> PYJobOutput: - if data.agent_type != AgentType.PROCESSING_WORKER and data.agent_type != AgentType.PROCESSOR_SERVER: - message = f"Unknown agent type: {data.agent_type}, {type(data.agent_type)}" - raise_http_exception(self.log, status_code=status.HTTP_501_NOT_IMPLEMENTED, message=message) + async def push_job_to_worker(self, data: PYJobInput, db_job: DBProcessorJob) -> PYJobOutput: job_output = None - self.log.debug(f"Pushing to {data.agent_type}: {data.processor_name}, {data.page_id}, {data.job_id}") - if data.agent_type == AgentType.PROCESSING_WORKER: - job_output = await self.push_job_to_processing_queue(db_job=db_job) - if data.agent_type == AgentType.PROCESSOR_SERVER: - job_output = await self.push_job_to_processor_server(job_input=data) + self.log.debug(f"Pushing to Processing Worker: {data.processor_name}, {data.page_id}, {data.job_id}") + job_output = await self.push_job_to_processing_queue(db_job=db_job) if not job_output: message = f"Failed to create job output for job input: {data}" raise_http_exception(self.log, status.HTTP_500_INTERNAL_SERVER_ERROR, message) @@ -526,12 +482,6 @@ async def push_job_to_processing_queue(self, db_job: DBProcessorJob) -> PYJobOut raise_http_exception(self.log, status.HTTP_500_INTERNAL_SERVER_ERROR, message, error) return db_job.to_job_output() - async def push_job_to_processor_server(self, job_input: PYJobInput) -> PYJobOutput: - processor_server_base_url = self.deployer.resolve_processor_server_url(job_input.processor_name) - return await forward_job_to_processor_server( - self.log, job_input=job_input, processor_server_base_url=processor_server_base_url - ) - async def get_processor_job(self, job_id: str) -> PYJobOutput: return await _get_processor_job(self.log, job_id) @@ -557,7 +507,7 @@ async def _unlock_pages_of_workspace( page_ids=page_ids ) - async def push_cached_jobs_to_agents(self, processing_jobs: List[PYJobInput]) -> None: + async def push_cached_jobs_to_workers(self, processing_jobs: List[PYJobInput]) -> None: if not len(processing_jobs): self.log.debug("No processing jobs were consumed from the requests cache") return @@ -574,7 +524,7 @@ async def push_cached_jobs_to_agents(self, processing_jobs: List[PYJobInput]) -> ) self.cache_processing_requests.update_request_counter(workspace_key=workspace_key, by_value=1) - job_output = await self.push_job_to_network_agent(data=data, db_job=db_consumed_job) + job_output = await self.push_job_to_worker(data=data, db_job=db_consumed_job) if not job_output: self.log.exception(f"Failed to create job output for job input data: {data}") @@ -654,22 +604,16 @@ async def remove_job_from_request_cache(self, result_message: PYResultMessage): consumed_cached_jobs = await self._consume_cached_jobs_of_workspace( workspace_key=workspace_key, mets_server_url=mets_server_url, path_to_mets=path_to_mets ) - await self.push_cached_jobs_to_agents(processing_jobs=consumed_cached_jobs) + await self.push_cached_jobs_to_workers(processing_jobs=consumed_cached_jobs) async def list_processors(self) -> List[str]: - # There is no caching on the Processing Server side - processor_names_list = self.deployer.find_matching_network_agents( - docker_only=False, native_only=False, worker_only=False, server_only=False, - str_names_only=True, unique_only=True, sort=True - ) - return processor_names_list + return get_message_queues(self.log, self.rmq_data) async def task_sequence_to_processing_jobs( self, tasks: List[ProcessorTask], mets_path: str, page_id: str, - agent_type: AgentType = AgentType.PROCESSING_WORKER ) -> List[PYJobOutput]: temp_file_group_cache = {} responses = [] @@ -688,10 +632,9 @@ async def task_sequence_to_processing_jobs( output_file_grps=task.output_file_grps, page_id=page_id, parameters=task.parameters, - agent_type=agent_type, depends_on=dependent_jobs, ) - response = await self.validate_and_forward_job_to_network_agent( + response = await self.validate_and_forward_job_to_worker( processor_name=job_input_data.processor_name, data=job_input_data ) @@ -700,18 +643,18 @@ async def task_sequence_to_processing_jobs( responses.append(response) return responses - def validate_tasks_agents_existence(self, tasks: List[ProcessorTask], agent_type: AgentType) -> None: - missing_agents = [] + def validate_tasks_worker_existence(self, tasks: List[ProcessorTask]) -> None: + missing_workers = [] for task in tasks: try: - self.validate_agent_type_and_existence(processor_name=task.executable, agent_type=agent_type) + self.validate_worker_existence(processor_name=task.executable) except HTTPException: # catching the error is not relevant here - missing_agents.append({task.executable, agent_type}) - if missing_agents: + missing_workers.append({task.executable}) + if missing_workers: message = ( - "Workflow validation has failed. The desired network agents not found. " - f"Missing processing agents: {missing_agents}" + "Workflow validation has failed. The desired Processing Worker was not found. " + f"Missing Processing Workers: {missing_workers}" ) raise_http_exception(self.log, status.HTTP_406_NOT_ACCEPTABLE, message) @@ -720,7 +663,6 @@ async def run_workflow( mets_path: str, workflow: Union[UploadFile, str, None] = File(None), workflow_id: str = None, - agent_type: AgentType = AgentType.PROCESSING_WORKER, page_id: str = None, page_wise: bool = False, workflow_callback_url: str = None @@ -732,9 +674,9 @@ async def run_workflow( # Validate the input file groups of the first task in the workflow validate_first_task_input_file_groups_existence(self.log, mets_path, processing_tasks[0].input_file_grps) - # Validate existence of agents (processing workers/processor servers) + # Validate existence of Processing Workers # for the ocr-d processors referenced inside tasks - self.validate_tasks_agents_existence(processing_tasks, agent_type) + self.validate_tasks_worker_existence(processing_tasks) # for page_wise mode, we need to expand the list of pages # for the database, it's better to keep a short string @@ -746,7 +688,6 @@ async def run_workflow( tasks=processing_tasks, mets_path=mets_path, page_id=page_id, - agent_type=agent_type ) processing_job_ids = [response.job_id for response in responses] db_workflow_job = DBWorkflowJob( @@ -766,7 +707,6 @@ async def run_workflow( tasks=processing_tasks, mets_path=mets_path, page_id=current_page, - agent_type=agent_type ) processing_job_ids = [response.job_id for response in responses] all_pages_job_ids[current_page] = processing_job_ids diff --git a/src/ocrd_network/processor_server.py b/src/ocrd_network/processor_server.py deleted file mode 100644 index f873d2857a..0000000000 --- a/src/ocrd_network/processor_server.py +++ /dev/null @@ -1,255 +0,0 @@ -from datetime import datetime -from os import getpid -from subprocess import run as subprocess_run, PIPE -from uvicorn import run - -from fastapi import APIRouter, BackgroundTasks, FastAPI, status -from fastapi.responses import FileResponse - -from ocrd_utils import ( - initLogging, - get_ocrd_tool_json, - getLogger, - parse_json_string_with_comments -) -from .constants import JobState, ServerApiTags -from .database import ( - DBProcessorJob, - db_get_workspace, - db_update_processing_job, - db_get_processing_job, - initiate_database -) -from .logging_utils import ( - configure_file_handler_with_formatter, - get_processor_server_logging_file_path, - get_processing_job_logging_file_path -) -from .models import PYJobInput, PYJobOutput, PYOcrdTool -from .process_helpers import invoke_processor -from .rabbitmq_utils import OcrdResultMessage -from .server_utils import ( - _get_processor_job, - _get_processor_job_log, - raise_http_exception, - validate_and_return_mets_path, - validate_job_input -) -from .utils import calculate_execution_time, post_to_callback_url, generate_id - - -class ProcessorServer(FastAPI): - def __init__(self, mongodb_addr: str, processor_name: str = "", processor_class=None): - if not (processor_name or processor_class): - raise ValueError("Either 'processor_name' or 'processor_class' must be provided") - super().__init__( - on_startup=[self.on_startup], - on_shutdown=[self.on_shutdown], - title="Network agent - Processor Server", - description="Network agent - Processor Server" - ) - initLogging() - self.log = getLogger("ocrd_network.processor_server") - log_file = get_processor_server_logging_file_path(processor_name=processor_name, pid=getpid()) - configure_file_handler_with_formatter(self.log, log_file=log_file, mode="a") - - self.db_url = mongodb_addr - self.processor_name = processor_name - self.processor_class = processor_class - self.ocrd_tool = None - self.version = None - - self.version = self.get_version() - self.ocrd_tool = self.get_ocrd_tool() - - if not self.ocrd_tool: - raise Exception("The ocrd_tool is empty or missing") - - if not self.processor_name: - self.processor_name = self.ocrd_tool["executable"] - - self.add_api_routes_processing() - self.log.info(f"Initialized processor server: {processor_name}") - - async def on_startup(self): - await initiate_database(db_url=self.db_url) - - async def on_shutdown(self) -> None: - """ - TODO: Perform graceful shutdown operations here - """ - pass - - def add_api_routes_processing(self): - processing_router = APIRouter() - processing_router.add_api_route( - path="/info", - endpoint=self.get_processor_info, - methods=["GET"], - tags=[ServerApiTags.PROCESSING], - status_code=status.HTTP_200_OK, - summary="Get information about this processor.", - response_model=PYOcrdTool, - response_model_exclude_unset=True, - response_model_exclude_none=True - ) - processing_router.add_api_route( - path="/run", - endpoint=self.create_processor_task, - methods=["POST"], - tags=[ServerApiTags.PROCESSING], - status_code=status.HTTP_202_ACCEPTED, - summary="Submit a job to this processor.", - response_model=PYJobOutput, - response_model_exclude_unset=True, - response_model_exclude_none=True - ) - processing_router.add_api_route( - path="/job/{job_id}", - endpoint=self.get_processor_job, - methods=["GET"], - tags=[ServerApiTags.PROCESSING], - status_code=status.HTTP_200_OK, - summary="Get information about a job based on its ID", - response_model=PYJobOutput, - response_model_exclude_unset=True, - response_model_exclude_none=True - ) - processing_router.add_api_route( - path="/log/{job_id}", - endpoint=self.get_processor_job_log, - methods=["GET"], - tags=[ServerApiTags.PROCESSING], - status_code=status.HTTP_200_OK, - summary="Get the log file of a job id" - ) - - async def get_processor_info(self): - if not self.ocrd_tool: - message = "Empty or missing ocrd tool json." - raise_http_exception(self.log, status.HTTP_500_INTERNAL_SERVER_ERROR, message) - return self.ocrd_tool - - # Note: The Processing server pushes to a queue, while - # the Processor Server creates (pushes to) a background task - async def create_processor_task(self, job_input: PYJobInput, background_tasks: BackgroundTasks): - validate_job_input(self.log, self.processor_name, self.ocrd_tool, job_input) - job_input.path_to_mets = await validate_and_return_mets_path(self.log, job_input) - - # The request is not forwarded from the Processing Server, assign a job_id - if not job_input.job_id: - job_id = generate_id() - # Create a DB entry - job = DBProcessorJob( - **job_input.dict(exclude_unset=True, exclude_none=True), - job_id=job_id, - processor_name=self.processor_name, - state=JobState.queued - ) - await job.insert() - else: - job = await db_get_processing_job(job_input.job_id) - # await self.run_processor_task(job=job) - background_tasks.add_task(self.run_processor_task, job) - return job.to_job_output() - - async def run_processor_task(self, job: DBProcessorJob): - execution_failed = False - start_time = datetime.now() - job_log_file = get_processing_job_logging_file_path(job_id=job.job_id) - await db_update_processing_job( - job_id=job.job_id, - state=JobState.running, - start_time=start_time, - log_file_path=job_log_file - ) - - mets_server_url = (await db_get_workspace(workspace_mets_path=job.path_to_mets)).mets_server_url - try: - invoke_processor( - processor_class=self.processor_class, - executable=self.processor_name, - abs_path_to_mets=job.path_to_mets, - input_file_grps=job.input_file_grps, - output_file_grps=job.output_file_grps, - page_id=job.page_id, - parameters=job.parameters, - mets_server_url=mets_server_url, - log_filename=job_log_file, - ) - except Exception as error: - self.log.debug(f"processor_name: {self.processor_name}, path_to_mets: {job.path_to_mets}, " - f"input_grps: {job.input_file_grps}, output_file_grps: {job.output_file_grps}, " - f"page_id: {job.page_id}, parameters: {job.parameters}") - self.log.exception(error) - execution_failed = True - end_time = datetime.now() - exec_duration = calculate_execution_time(start_time, end_time) - job_state = JobState.success if not execution_failed else JobState.failed - await db_update_processing_job( - job_id=job.job_id, - state=job_state, - end_time=end_time, - exec_time=f"{exec_duration} ms" - ) - result_message = OcrdResultMessage( - job_id=job.job_id, - state=job_state.value, - path_to_mets=job.path_to_mets, - # May not be always available - workspace_id=job.workspace_id if job.workspace_id else '' - ) - self.log.info(f"Result message: {result_message}") - if job.callback_url: - # If the callback_url field is set, - # post the result message (callback to a user defined endpoint) - post_to_callback_url(self.log, job.callback_url, result_message) - if job.internal_callback_url: - # If the internal callback_url field is set, - # post the result message (callback to Processing Server endpoint) - post_to_callback_url(self.log, job.internal_callback_url, result_message) - - def get_ocrd_tool(self): - if self.ocrd_tool: - return self.ocrd_tool - if self.processor_class: - # The way of accessing ocrd tool like in the line below may be problematic - # ocrd_tool = self.processor_class(workspace=None, version=True).ocrd_tool - ocrd_tool = parse_json_string_with_comments( - subprocess_run( - [self.processor_name, "--dump-json"], - stdout=PIPE, - check=True, - universal_newlines=True - ).stdout - ) - else: - ocrd_tool = get_ocrd_tool_json(self.processor_name) - return ocrd_tool - - def get_version(self) -> str: - if self.version: - return self.version - - """ - if self.processor_class: - # The way of accessing the version like in the line below may be problematic - # version_str = self.processor_class(workspace=None, version=True).version - return version_str - """ - version_str = subprocess_run( - [self.processor_name, "--version"], - stdout=PIPE, - check=True, - universal_newlines=True - ).stdout - return version_str - - def run_server(self, host, port): - run(self, host=host, port=port) - - async def get_processor_job(self, job_id: str) -> PYJobOutput: - return await _get_processor_job(self.log, job_id) - - async def get_processor_job_log(self, job_id: str) -> FileResponse: - return await _get_processor_job_log(self.log, job_id) diff --git a/src/ocrd_network/rabbitmq_utils/__init__.py b/src/ocrd_network/rabbitmq_utils/__init__.py index 93a8249ef6..71df04317b 100644 --- a/src/ocrd_network/rabbitmq_utils/__init__.py +++ b/src/ocrd_network/rabbitmq_utils/__init__.py @@ -3,6 +3,7 @@ "connect_rabbitmq_consumer", "connect_rabbitmq_publisher", "create_message_queues", + "get_message_queues", "verify_and_parse_mq_uri", "verify_rabbitmq_available", "RMQConsumer", @@ -19,6 +20,7 @@ connect_rabbitmq_consumer, connect_rabbitmq_publisher, create_message_queues, + get_message_queues, verify_and_parse_mq_uri, verify_rabbitmq_available ) diff --git a/src/ocrd_network/rabbitmq_utils/helpers.py b/src/ocrd_network/rabbitmq_utils/helpers.py index 5dc6dae779..f5e2a538a3 100644 --- a/src/ocrd_network/rabbitmq_utils/helpers.py +++ b/src/ocrd_network/rabbitmq_utils/helpers.py @@ -4,6 +4,9 @@ from re import match as re_match from time import sleep from typing import Dict, List, Union +from requests import get +from requests.auth import HTTPBasicAuth +from requests.exceptions import RequestException, HTTPError from .constants import RABBITMQ_URI_PATTERN, RECONNECT_TRIES, RECONNECT_WAIT from .consumer import RMQConsumer @@ -68,12 +71,6 @@ def check_if_queue_exists(logger: Logger, rmq_data: Dict, processor_name: str) - def create_message_queues(logger: Logger, rmq_publisher: RMQPublisher, queue_names: List[str]) -> None: - # TODO: Reconsider and refactor this. - # Added ocrd-dummy by default if not available for the integration tests. - # A proper Processing Worker / Processor Server registration endpoint is needed on the Processing Server side - if "ocrd-dummy" not in queue_names: - queue_names.append("ocrd-dummy") - for queue_name in queue_names: # The existence/validity of the worker.name is not tested. # Even if an ocr-d processor does not exist, the queue is created @@ -81,6 +78,26 @@ def create_message_queues(logger: Logger, rmq_publisher: RMQPublisher, queue_nam rmq_publisher.create_queue(queue_name=queue_name) +def get_message_queues(logger: Logger, rmq_data: Dict) -> List: + try: + response = get( + f"http://{rmq_data['host']}:{15672}/api/queues", + auth=HTTPBasicAuth(rmq_data["username"], rmq_data["password"]) + ) + response.raise_for_status() + queues = response.json() + return [queue['name'] for queue in queues] + except HTTPError: + logger.warn( + f"Error requesting all queue-names from rabbitmq. Status code: {response.status_code}. " + f"Response-Text: {response.text}" + ) + return [] + except RequestException as e: + logger.warn(f"Error querying RabbitMQ API: {e}") + return [] + + def verify_and_parse_mq_uri(rabbitmq_address: str): """ Check the full list of available parameters in the docs here: diff --git a/src/ocrd_network/runtime_data/__init__.py b/src/ocrd_network/runtime_data/__init__.py index e43be7ae3c..1e658c1305 100644 --- a/src/ocrd_network/runtime_data/__init__.py +++ b/src/ocrd_network/runtime_data/__init__.py @@ -5,10 +5,9 @@ "DataNetworkAgent", "DataRabbitMQ", "DataProcessingWorker", - "DataProcessorServer" ] from .deployer import Deployer from .hosts import DataHost -from .network_agents import DataNetworkAgent, DataProcessingWorker, DataProcessorServer +from .network_agents import DataNetworkAgent, DataProcessingWorker from .network_services import DataMongoDB, DataRabbitMQ diff --git a/src/ocrd_network/runtime_data/deployer.py b/src/ocrd_network/runtime_data/deployer.py index 4578e7eb85..9b85571c15 100644 --- a/src/ocrd_network/runtime_data/deployer.py +++ b/src/ocrd_network/runtime_data/deployer.py @@ -9,7 +9,7 @@ from __future__ import annotations from pathlib import Path import psutil -from typing import Dict, List, Union +from typing import Dict, List from ocrd import OcrdMetsServer from ocrd_utils import getLogger @@ -33,89 +33,15 @@ def __init__(self, config_path: str) -> None: self.mets_servers_paths: Dict = {} # {"ws_dir_path": "mets_server_url"} self.use_tcp_mets = ps_config.get("use_tcp_mets", False) - # TODO: Reconsider this. - def find_matching_network_agents( - self, worker_only: bool = False, server_only: bool = False, docker_only: bool = False, - native_only: bool = False, str_names_only: bool = False, unique_only: bool = False, sort: bool = False - ) -> Union[List[str], List[object]]: - """Finds and returns a list of matching data objects of type: - `DataProcessingWorker` and `DataProcessorServer`. - - :py:attr:`worker_only` match only worker network agents (DataProcessingWorker) - :py:attr:`server_only` match only server network agents (DataProcessorServer) - :py:attr:`docker_only` match only docker network agents (DataProcessingWorker and DataProcessorServer) - :py:attr:`native_only` match only native network agents (DataProcessingWorker and DataProcessorServer) - :py:attr:`str_names_only` returns the processor_name filed instead of the Data* object - :py:attr:`unique_only` remove duplicate names from the matches - :py:attr:`sort` sort the result - - `worker_only` and `server_only` are mutually exclusive to each other - `docker_only` and `native_only` are mutually exclusive to each other - `unique_only` is allowed only together with `str_names_only` - """ - - if worker_only and server_only: - msg = "Only 'worker_only' or 'server_only' is allowed, not both." - self.log.exception(msg) - raise ValueError(msg) - if docker_only and native_only: - msg = "Only 'docker_only' or 'native_only' is allowed, not both." - self.log.exception(msg) - raise ValueError(msg) - if not str_names_only and unique_only: - msg = "Value 'unique_only' is allowed only together with 'str_names_only'" - self.log.exception(msg) - raise ValueError(msg) - if sort and not str_names_only: - msg = "Value 'sort' is allowed only together with 'str_names_only'" - self.log.exception(msg) - raise ValueError(msg) - - # Find all matching objects of type DataProcessingWorker or DataProcessorServer - matched_objects = [] - for data_host in self.data_hosts: - if not server_only: - if not docker_only: - for data_worker in data_host.network_agents_worker_native: - matched_objects.append(data_worker) - if not native_only: - for data_worker in data_host.network_agents_worker_docker: - matched_objects.append(data_worker) - if not worker_only: - if not docker_only: - for data_server in data_host.network_agents_server_native: - matched_objects.append(data_server) - if not native_only: - for data_server in data_host.network_agents_server_docker: - matched_objects.append(data_server) - if not str_names_only: - return matched_objects - # Gets only the processor names of the matched objects - matched_names = [match.processor_name for match in matched_objects] - if not unique_only: - return matched_names - list_matched = list(dict.fromkeys(matched_names)) - if not sort: - # Removes any duplicate entries from matched names - return list_matched - list_matched.sort() - return list_matched - - def resolve_processor_server_url(self, processor_name) -> str: - processor_server_url = '' - for data_host in self.data_hosts: - processor_server_url = data_host.resolve_processor_server_url(processor_name=processor_name) - return processor_server_url - - def deploy_network_agents(self, mongodb_url: str, rabbitmq_url: str) -> None: - self.log.debug("Deploying processing workers/processor servers...") + def deploy_workers(self, mongodb_url: str, rabbitmq_url: str) -> None: + self.log.debug("Deploying processing workers...") for host_data in self.data_hosts: - host_data.deploy_network_agents(logger=self.log, mongodb_url=mongodb_url, rabbitmq_url=rabbitmq_url) + host_data.deploy_workers(logger=self.log, mongodb_url=mongodb_url, rabbitmq_url=rabbitmq_url) - def stop_network_agents(self) -> None: - self.log.debug("Stopping processing workers/processor servers...") + def stop_workers(self) -> None: + self.log.debug("Stopping processing workers...") for host_data in self.data_hosts: - host_data.stop_network_agents(logger=self.log) + host_data.stop_workers(logger=self.log) def deploy_rabbitmq(self) -> str: self.data_queue.deploy_rabbitmq(self.log) @@ -137,7 +63,7 @@ def stop_all(self) -> None: If RabbitMQ server is stopped before stopping Processing Workers that may have a bad outcome and leave Processing Workers in an unpredictable state. """ - self.stop_network_agents() + self.stop_workers() self.stop_mongodb() self.stop_rabbitmq() diff --git a/src/ocrd_network/runtime_data/hosts.py b/src/ocrd_network/runtime_data/hosts.py index 176afb991d..033521aad4 100644 --- a/src/ocrd_network/runtime_data/hosts.py +++ b/src/ocrd_network/runtime_data/hosts.py @@ -1,9 +1,9 @@ from logging import Logger from time import sleep -from typing import Dict, List, Union +from typing import Dict, List from .connection_clients import create_docker_client, create_ssh_client -from .network_agents import AgentType, DataNetworkAgent, DataProcessingWorker, DataProcessorServer, DeployType +from .network_agents import DataProcessingWorker, DeployType class DataHost: @@ -24,68 +24,39 @@ def __init__( self.ssh_client = None self.docker_client = None - # Time to wait between deploying agents - self.wait_between_agent_deploys: float = 0.3 + # Time to wait between deploying single workers + self.wait_between_deploys: float = 0.3 - # Lists of network agents based on their agent and deployment type - self.network_agents_worker_native = [] - self.network_agents_worker_docker = [] - self.network_agents_server_native = [] - self.network_agents_server_docker = [] + # Lists of Processing Workers based on their deployment type + self.workers_native = [] + self.workers_docker = [] if not workers: workers = [] if not servers: servers = [] - self.__parse_network_agents_workers(processing_workers=workers) - self.__parse_network_agents_servers(processor_servers=servers) + self.__parse_workers(processing_workers=workers) - # Used for caching deployed Processor Servers' ports on the current host - # Key: processor_name, Value: list of ports - self.processor_servers_ports: dict = {} + def __append_workers_to_lists(self, worker_data: DataProcessingWorker) -> None: + if worker_data.deploy_type != DeployType.DOCKER and worker_data.deploy_type != DeployType.NATIVE: + raise ValueError(f"Processing Worker deploy type is unknown: {worker_data.deploy_type}") - def __add_deployed_agent_server_port_to_cache(self, processor_name: str, port: int) -> None: - if processor_name not in self.processor_servers_ports: - self.processor_servers_ports[processor_name] = [port] - return - self.processor_servers_ports[processor_name] = self.processor_servers_ports[processor_name].append(port) - - def __append_network_agent_to_lists(self, agent_data: DataNetworkAgent) -> None: - if agent_data.deploy_type != DeployType.DOCKER and agent_data.deploy_type != DeployType.NATIVE: - raise ValueError(f"Network agent deploy type is unknown: {agent_data.deploy_type}") - if agent_data.agent_type != AgentType.PROCESSING_WORKER and agent_data.agent_type != AgentType.PROCESSOR_SERVER: - raise ValueError(f"Network agent type is unknown: {agent_data.agent_type}") - - if agent_data.deploy_type == DeployType.NATIVE: + if worker_data.deploy_type == DeployType.NATIVE: self.needs_ssh_connector = True - if agent_data.agent_type == AgentType.PROCESSING_WORKER: - self.network_agents_worker_native.append(agent_data) - if agent_data.agent_type == AgentType.PROCESSOR_SERVER: - self.network_agents_server_native.append(agent_data) - if agent_data.deploy_type == DeployType.DOCKER: + self.workers_native.append(worker_data) + if worker_data.deploy_type == DeployType.DOCKER: self.needs_docker_connector = True - if agent_data.agent_type == AgentType.PROCESSING_WORKER: - self.network_agents_worker_docker.append(agent_data) - if agent_data.agent_type == AgentType.PROCESSOR_SERVER: - self.network_agents_server_docker.append(agent_data) - - def __parse_network_agents_servers(self, processor_servers: List[Dict]): - for server in processor_servers: - server_data = DataProcessorServer( - processor_name=server["name"], deploy_type=server["deploy_type"], host=self.host, - port=int(server["port"]), init_by_config=True, pid=None - ) - self.__append_network_agent_to_lists(agent_data=server_data) + self.workers_docker.append(worker_data) - def __parse_network_agents_workers(self, processing_workers: List[Dict]): + def __parse_workers(self, processing_workers: List[Dict]): for worker in processing_workers: worker_data = DataProcessingWorker( - processor_name=worker["name"], deploy_type=worker["deploy_type"], host=self.host, - init_by_config=True, pid=None + processor_name=worker["name"], deploy_type=worker.get("deploy_type", "native"), + host=self.host, init_by_config=True, pid=None ) for _ in range(int(worker["number_of_instance"])): - self.__append_network_agent_to_lists(agent_data=worker_data) + self.__append_workers_to_lists(worker_data=worker_data) def create_connection_client(self, client_type: str): if client_type not in ["docker", "ssh"]: @@ -97,15 +68,14 @@ def create_connection_client(self, client_type: str): self.docker_client = create_docker_client(self.host, self.username, self.password, self.keypath) return self.docker_client - def __deploy_network_agent( - self, logger: Logger, agent_data: Union[DataProcessorServer, DataProcessingWorker], + def __deploy_single_worker( + self, logger: Logger, worker_data: DataProcessingWorker, mongodb_url: str, rabbitmq_url: str ) -> None: - deploy_type = agent_data.deploy_type - agent_type = agent_data.agent_type - name = agent_data.processor_name - agent_info = f"network agent: {agent_type}, deploy: {deploy_type}, name: {name}, host: {self.host}" - logger.info(f"Deploying {agent_info}") + deploy_type = worker_data.deploy_type + name = worker_data.processor_name + worker_info = f"Processing Worker, deploy: {deploy_type}, name: {name}, host: {self.host}" + logger.info(f"Deploying {worker_info}") connection_client = None if deploy_type == DeployType.NATIVE: @@ -115,44 +85,29 @@ def __deploy_network_agent( assert self.docker_client, "Docker client connection missing." connection_client = self.docker_client - if agent_type == AgentType.PROCESSING_WORKER: - agent_data.deploy_network_agent(logger, connection_client, mongodb_url, rabbitmq_url) - if agent_type == AgentType.PROCESSOR_SERVER: - agent_data.deploy_network_agent(logger, connection_client, mongodb_url) - - sleep(self.wait_between_agent_deploys) + worker_data.deploy_network_agent(logger, connection_client, mongodb_url, rabbitmq_url) + sleep(self.wait_between_deploys) - def __deploy_network_agents_workers(self, logger: Logger, mongodb_url: str, rabbitmq_url: str): + def __deploy_all_workers(self, logger: Logger, mongodb_url: str, rabbitmq_url: str): logger.info(f"Deploying processing workers on host: {self.host}") - amount_workers = len(self.network_agents_worker_native) + len(self.network_agents_worker_docker) + amount_workers = len(self.workers_native) + len(self.workers_docker) if not amount_workers: logger.info("No processing workers found to be deployed") - for data_worker in self.network_agents_worker_native: - self.__deploy_network_agent(logger, data_worker, mongodb_url, rabbitmq_url) - for data_worker in self.network_agents_worker_docker: - self.__deploy_network_agent(logger, data_worker, mongodb_url, rabbitmq_url) - - def __deploy_network_agents_servers(self, logger: Logger, mongodb_url: str, rabbitmq_url: str): - logger.info(f"Deploying processor servers on host: {self.host}") - amount_servers = len(self.network_agents_server_native) + len(self.network_agents_server_docker) - if not amount_servers: - logger.info("No processor servers found to be deployed") - for data_server in self.network_agents_server_native: - self.__deploy_network_agent(logger, data_server, mongodb_url, rabbitmq_url) - self.__add_deployed_agent_server_port_to_cache(data_server.processor_name, data_server.port) - for data_server in self.network_agents_server_docker: - self.__deploy_network_agent(logger, data_server, mongodb_url, rabbitmq_url) - self.__add_deployed_agent_server_port_to_cache(data_server.processor_name, data_server.port) - - def deploy_network_agents(self, logger: Logger, mongodb_url: str, rabbitmq_url: str) -> None: + for data_worker in self.workers_native: + self.__deploy_single_worker(logger, data_worker, mongodb_url, rabbitmq_url) + for data_worker in self.workers_docker: + self.__deploy_single_worker(logger, data_worker, mongodb_url, rabbitmq_url) + + def deploy_workers(self, logger: Logger, mongodb_url: str, rabbitmq_url: str) -> None: if self.needs_ssh_connector and not self.ssh_client: logger.debug("Creating missing ssh connector before deploying") self.ssh_client = self.create_connection_client(client_type="ssh") if self.needs_docker_connector: logger.debug("Creating missing docker connector before deploying") self.docker_client = self.create_connection_client(client_type="docker") - self.__deploy_network_agents_workers(logger=logger, mongodb_url=mongodb_url, rabbitmq_url=rabbitmq_url) - self.__deploy_network_agents_servers(logger=logger, mongodb_url=mongodb_url, rabbitmq_url=rabbitmq_url) + + self.__deploy_all_workers(logger=logger, mongodb_url=mongodb_url, rabbitmq_url=rabbitmq_url) + if self.ssh_client: self.ssh_client.close() self.ssh_client = None @@ -160,13 +115,13 @@ def deploy_network_agents(self, logger: Logger, mongodb_url: str, rabbitmq_url: self.docker_client.close() self.docker_client = None - def __stop_network_agent(self, logger: Logger, name: str, deploy_type: DeployType, agent_type: AgentType, pid: str): - agent_info = f"network agent: {agent_type}, deploy: {deploy_type}, name: {name}" + def __stop_worker(self, logger: Logger, name: str, deploy_type: DeployType, pid: str): + worker_info = f"Processing Worker: deploy: {deploy_type}, name: {name}" if not pid: - logger.warning(f"No pid was passed for {agent_info}") + logger.warning(f"No pid was passed for {worker_info}") return - agent_info += f", pid: {pid}" - logger.info(f"Stopping {agent_info}") + worker_info += f", pid: {pid}" + logger.info(f"Stopping {worker_info}") if deploy_type == DeployType.NATIVE: assert self.ssh_client, "SSH client connection missing" self.ssh_client.exec_command(f"kill {pid}") @@ -174,52 +129,28 @@ def __stop_network_agent(self, logger: Logger, name: str, deploy_type: DeployTyp assert self.docker_client, "Docker client connection missing" self.docker_client.containers.get(pid).stop() - def __stop_network_agents_workers(self, logger: Logger): - logger.info(f"Stopping processing workers on host: {self.host}") - amount_workers = len(self.network_agents_worker_native) + len(self.network_agents_worker_docker) - if not amount_workers: - logger.warning("No active processing workers to be stopped.") - for worker in self.network_agents_worker_native: - self.__stop_network_agent(logger, worker.processor_name, worker.deploy_type, worker.agent_type, worker.pid) - self.network_agents_worker_native = [] - for worker in self.network_agents_worker_docker: - self.__stop_network_agent(logger, worker.processor_name, worker.deploy_type, worker.agent_type, worker.pid) - self.network_agents_worker_docker = [] - - def __stop_network_agents_servers(self, logger: Logger): - logger.info(f"Stopping processor servers on host: {self.host}") - amount_servers = len(self.network_agents_server_native) + len(self.network_agents_server_docker) - if not amount_servers: - logger.warning("No active processor servers to be stopped.") - for server in self.network_agents_server_native: - self.__stop_network_agent(logger, server.processor_name, server.deploy_type, server.agent_type, server.pid) - self.network_agents_server_native = [] - for server in self.network_agents_server_docker: - self.__stop_network_agent(logger, server.processor_name, server.deploy_type, server.agent_type, server.pid) - self.network_agents_server_docker = [] - - def stop_network_agents(self, logger: Logger): + def stop_workers(self, logger: Logger): if self.needs_ssh_connector and not self.ssh_client: logger.debug("Creating missing ssh connector before stopping") self.ssh_client = self.create_connection_client(client_type="ssh") if self.needs_docker_connector and not self.docker_client: logger.debug("Creating missing docker connector before stopping") self.docker_client = self.create_connection_client(client_type="docker") - self.__stop_network_agents_workers(logger=logger) - self.__stop_network_agents_servers(logger=logger) + + logger.info(f"Stopping processing workers on host: {self.host}") + amount_workers = len(self.workers_native) + len(self.workers_docker) + if not amount_workers: + logger.warning("No active processing workers to be stopped.") + for worker in self.workers_native: + self.__stop_worker(logger, worker.processor_name, worker.deploy_type, worker.pid) + self.workers_native = [] + for worker in self.workers_docker: + self.__stop_worker(logger, worker.processor_name, worker.deploy_type, worker.pid) + self.workers_docker = [] + if self.ssh_client: self.ssh_client.close() self.ssh_client = None if self.docker_client: self.docker_client.close() self.docker_client = None - - def resolve_processor_server_url(self, processor_name: str) -> str: - processor_server_url = '' - for data_server in self.network_agents_server_docker: - if data_server.processor_name == processor_name: - processor_server_url = f"http://{self.host}:{data_server.port}/" - for data_server in self.network_agents_server_native: - if data_server.processor_name == processor_name: - processor_server_url = f"http://{self.host}:{data_server.port}/" - return processor_server_url diff --git a/src/ocrd_network/runtime_data/network_agents.py b/src/ocrd_network/runtime_data/network_agents.py index 742f30309d..735b3c2c1d 100644 --- a/src/ocrd_network/runtime_data/network_agents.py +++ b/src/ocrd_network/runtime_data/network_agents.py @@ -2,14 +2,15 @@ from typing import Any from re import search as re_search -from ..constants import AgentType, DeployType +from ..constants import DeployType # TODO: Find appropriate replacement for the hack def deploy_agent_native_get_pid_hack(logger: Logger, ssh_client, start_cmd: str): channel = ssh_client.invoke_shell() stdin, stdout = channel.makefile("wb"), channel.makefile("rb") - logger.debug(f"Executing command: {start_cmd}") + # TODO: set back to debug + logger.info(f"Executing command: {start_cmd}") # TODO: This hack should still be fixed # Note left from @joschrew @@ -40,14 +41,13 @@ def deploy_agent_docker_template(logger: Logger, docker_client, start_cmd: str): class DataNetworkAgent: def __init__( - self, processor_name: str, deploy_type: DeployType, agent_type: AgentType, + self, processor_name: str, deploy_type: DeployType, host: str, init_by_config: bool, pid: Any = None ) -> None: self.processor_name = processor_name self.deploy_type = deploy_type self.host = host self.deployed_by_config = init_by_config - self.agent_type = agent_type # The id is assigned when the agent is deployed self.pid = pid @@ -69,13 +69,13 @@ def __init__( self, processor_name: str, deploy_type: DeployType, host: str, init_by_config: bool, pid: Any = None ) -> None: super().__init__( - processor_name=processor_name, host=host, deploy_type=deploy_type, agent_type=AgentType.PROCESSING_WORKER, + processor_name=processor_name, host=host, deploy_type=deploy_type, init_by_config=init_by_config, pid=pid ) def deploy_network_agent(self, logger: Logger, connector_client, database_url: str, queue_url: str): if self.deploy_type == DeployType.NATIVE: - start_cmd = f"{self.processor_name} {self.agent_type} --database {database_url} --queue {queue_url} &" + start_cmd = f"{self.processor_name} --database {database_url} --queue {queue_url} &" self.pid = self._start_native_instance(logger, connector_client, start_cmd) return self.pid if self.deploy_type == DeployType.DOCKER: @@ -84,27 +84,3 @@ def deploy_network_agent(self, logger: Logger, connector_client, database_url: s self.pid = self._start_docker_instance(logger, connector_client, start_cmd) return self.pid raise RuntimeError(f"Unknown deploy type of {self.__dict__}") - - -class DataProcessorServer(DataNetworkAgent): - def __init__( - self, processor_name: str, deploy_type: DeployType, host: str, port: int, init_by_config: bool, pid: Any = None - ) -> None: - super().__init__( - processor_name=processor_name, host=host, deploy_type=deploy_type, agent_type=AgentType.PROCESSOR_SERVER, - init_by_config=init_by_config, pid=pid - ) - self.port = port - - def deploy_network_agent(self, logger: Logger, connector_client, database_url: str): - agent_address = f"{self.host}:{self.port}" - if self.deploy_type == DeployType.NATIVE: - start_cmd = f"{self.processor_name} {self.agent_type} --address {agent_address} --database {database_url} &" - self.pid = self._start_native_instance(logger, connector_client, start_cmd) - return self.pid - if self.deploy_type == DeployType.DOCKER: - # TODO: add real command to start processor server in docker here - start_cmd = "" - self.pid = self._start_docker_instance(logger, connector_client, start_cmd) - return self.pid - raise RuntimeError(f"Unknown deploy type of {self.__dict__}") diff --git a/src/ocrd_network/server_utils.py b/src/ocrd_network/server_utils.py index 7485a65dc2..0f12988571 100644 --- a/src/ocrd_network/server_utils.py +++ b/src/ocrd_network/server_utils.py @@ -124,50 +124,6 @@ async def _get_processor_job_log(logger: Logger, job_id: str) -> FileResponse: return FileResponse(path=log_file_path, filename=log_file_path.name) -def request_processor_server_tool_json(logger: Logger, processor_server_base_url: str) -> Dict: - # Request the ocrd tool json from the Processor Server - try: - response = requests_get( - urljoin(base=processor_server_base_url, url="info"), - headers={"Content-Type": "application/json"} - ) - except Exception as error: - message = f"Failed to retrieve ocrd tool json from: {processor_server_base_url}" - raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message, error) - if response.status_code != 200: - message = f"Failed to retrieve tool json from: {processor_server_base_url}, code: {response.status_code}" - raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message) - return response.json() - - -async def forward_job_to_processor_server( - logger: Logger, job_input: PYJobInput, processor_server_base_url: str -) -> PYJobOutput: - try: - json_data = dumps(job_input.dict(exclude_unset=True, exclude_none=True)) - except Exception as error: - message = f"Failed to json dump the PYJobInput: {job_input}" - raise_http_exception(logger, status.HTTP_500_INTERNAL_SERVER_ERROR, message, error) - - # TODO: The amount of pages should come as a request input - # TODO: cf https://github.com/OCR-D/core/pull/1030/files#r1152551161 - # currently, use 200 as a default - request_timeout = calculate_processing_request_timeout(amount_pages=200, timeout_per_page=20.0) - - # Post a processing job to the Processor Server asynchronously - async with AsyncClient(timeout=Timeout(timeout=request_timeout, connect=30.0)) as client: - response = await client.post( - urljoin(base=processor_server_base_url, url="run"), - headers={"Content-Type": "application/json"}, - json=loads(json_data) - ) - if response.status_code != 202: - message = f"Failed to post '{job_input.processor_name}' job to: {processor_server_base_url}" - raise_http_exception(logger, status.HTTP_500_INTERNAL_SERVER_ERROR, message) - job_output = response.json() - return job_output - - async def get_workflow_content(logger: Logger, workflow_id: str, workflow: Union[UploadFile, str, None]) -> str: if not workflow and not workflow_id: message = "Either 'workflow' must be uploaded as a file or 'workflow_id' must be provided. Both are missing." diff --git a/src/ocrd_utils/config.py b/src/ocrd_utils/config.py index 402644af4a..c1aeb00c8e 100644 --- a/src/ocrd_utils/config.py +++ b/src/ocrd_utils/config.py @@ -142,15 +142,14 @@ def raw_value(self, name): config.add('OCRD_MAX_PROCESSOR_CACHE', description="Maximum number of processor instances (for each set of parameters) to be kept in memory " - "(including loaded models) for processing workers or processor servers.", + "(including loaded models) for processing workers.", parser=int, default=(True, 128)) config.add('OCRD_MAX_PARALLEL_PAGES', - description="Maximum number of processor workers for page-parallel processing " - "(within each Processor's selected page range, independent of the number " - "of Processing Workers or Processor Servers). If set >1, then a METS Server " - "must be used for METS synchronisation.", + description="Maximum number of processor workers for page-parallel processing (within " + "each Processor's selected page range, independent of the number of Processing " + "Workers). If set >1, then a METS Server must be used for METS synchronisation.", parser=int, default=(True, 1)) diff --git a/src/ocrd_validators/processing_server_config.schema.yml b/src/ocrd_validators/processing_server_config.schema.yml index 934ce0c132..d90404d6a3 100644 --- a/src/ocrd_validators/processing_server_config.schema.yml +++ b/src/ocrd_validators/processing_server_config.schema.yml @@ -68,16 +68,12 @@ properties: required: - address - username + - workers oneOf: - required: - password - required: - path_to_privkey - anyOf: - - required: - - workers - - required: - - servers properties: address: description: The IP address or domain name of the target machine @@ -118,34 +114,6 @@ properties: - native - docker default: native - servers: - description: List of processor servers that will be deployed - type: array - minItems: 1 - items: - type: object - additionalProperties: false - required: - - name - - port - properties: - name: - description: Name of the processor - type: string - pattern: "^ocrd-.*$" - examples: - - ocrd-cis-ocropy-binarize - - ocrd-olena-binarize - deploy_type: - description: Should the processor server be deployed natively or with Docker - type: string - enum: - - native - - docker - default: native - port: - description: The port number to be deployed on the host - $ref: "#/$defs/port" $defs: address: diff --git a/tests/network/config.py b/tests/network/config.py index 611ad63821..97ffc63c0d 100644 --- a/tests/network/config.py +++ b/tests/network/config.py @@ -15,8 +15,8 @@ test_config.add( name='OCRD_MAX_PROCESSOR_CACHE', description=""" - Maximum number of processor instances (for each set of parameters) to be kept in memory (including loaded models) - for processing workers or processor servers. + Maximum number of processor instances (for each set of parameters) to be kept in memory (including loaded models) + for processing workers """, parser=int, default=(True, 128) @@ -97,7 +97,7 @@ test_config.add( name="OCRD_NETWORK_RABBITMQ_HEARTBEAT", description=""" - Controls AMQP heartbeat timeout (in seconds) negotiation during connection tuning. An integer value always overrides the value + Controls AMQP heartbeat timeout (in seconds) negotiation during connection tuning. An integer value always overrides the value proposed by broker. Use 0 to deactivate heartbeat. """, parser=int, diff --git a/tests/network/fixtures_processing_requests.py b/tests/network/fixtures_processing_requests.py index 3afaf74711..9a8141f812 100644 --- a/tests/network/fixtures_processing_requests.py +++ b/tests/network/fixtures_processing_requests.py @@ -1,5 +1,4 @@ from pytest import fixture -from src.ocrd_network.constants import AgentType from src.ocrd_network.models import PYJobInput @@ -10,7 +9,6 @@ def fixture_processing_request_1() -> PYJobInput: path_to_mets=workspace_key, input_file_grps=["DEFAULT"], output_file_grps=["OCR-D-BIN"], - agent_type=AgentType.PROCESSING_WORKER, page_id="PHYS_0001..PHYS_0003", parameters={} ) diff --git a/tests/network/test_integration_5_processing_server.py b/tests/network/test_integration_5_processing_server.py index bf5fadee3c..48ae86af92 100644 --- a/tests/network/test_integration_5_processing_server.py +++ b/tests/network/test_integration_5_processing_server.py @@ -3,7 +3,7 @@ from src.ocrd_network.client_utils import ( poll_job_status_till_timeout_fail_or_success, poll_wf_status_till_timeout_fail_or_success, post_ps_processing_request, post_ps_workflow_request) -from src.ocrd_network.constants import AgentType, JobState +from src.ocrd_network.constants import JobState from src.ocrd_network.logging_utils import get_processing_job_logging_file_path from tests.base import assets from tests.network.config import test_config @@ -20,14 +20,12 @@ def test_processing_server_connectivity(): assert message.startswith("The home page of"), f"Processing server home page message is corrupted" -# TODO: The processing workers are still not registered when deployed separately. -# Fix that by extending the processing server. def test_processing_server_deployed_processors(): test_url = f"{PROCESSING_SERVER_URL}/processor" response = request_get(test_url) processors = response.json() assert response.status_code == 200, f"Processing server: {test_url}, {response.status_code}" - assert processors == [], f"Mismatch in deployed processors" + assert "ocrd-dummy" in processors def test_processing_server_processing_request(): @@ -39,7 +37,6 @@ def test_processing_server_processing_request(): "path_to_mets": path_to_mets, "input_file_grps": [input_file_grp], "output_file_grps": [output_file_grp], - "agent_type": AgentType.PROCESSING_WORKER, "parameters": {} } test_processor = "ocrd-dummy" diff --git a/tests/network/test_integration_6_client.py b/tests/network/test_integration_6_client.py index 1a693ed0b1..83ff010238 100644 --- a/tests/network/test_integration_6_client.py +++ b/tests/network/test_integration_6_client.py @@ -1,5 +1,5 @@ from pathlib import Path -from src.ocrd_network.constants import AgentType, JobState +from src.ocrd_network.constants import JobState from tests.base import assets from tests.network.config import test_config from ocrd_network.client import Client @@ -19,7 +19,6 @@ def test_client_processing_processor(): "input_file_grps": ["OCR-D-IMG"], "output_file_grps": ["OCR-D-DUMMY-TEST-CLIENT"], "parameters": {}, - "agent_type": AgentType.PROCESSING_WORKER } processing_job_id = client.send_processing_job_request(processor_name="ocrd-dummy", req_params=req_params) assert processing_job_id diff --git a/tests/network/test_modules_logging_utils.py b/tests/network/test_modules_logging_utils.py index 530b501e05..c53d4d864d 100644 --- a/tests/network/test_modules_logging_utils.py +++ b/tests/network/test_modules_logging_utils.py @@ -18,10 +18,6 @@ def test_root_logging_dir_mets_servers(): root_logging_dir(module_name=NetworkLoggingDirs.METS_SERVERS) -def test_root_logging_dir_processor_servers(): - root_logging_dir(module_name=NetworkLoggingDirs.PROCESSOR_SERVERS) - - def test_root_logging_dir_processing_workers(): root_logging_dir(module_name=NetworkLoggingDirs.PROCESSING_WORKERS)