From 38e633f0f4f93261797e5fe3d0b4672a15cb80a1 Mon Sep 17 00:00:00 2001 From: Marco Melloni <98281551+marcomelloni@users.noreply.github.com> Date: Thu, 24 Jul 2025 11:15:59 +0200 Subject: [PATCH 1/2] Refactor MATLAB simulation into interface and implementations --- agents/matlab/matlab_agent/src/core/batch.py | 9 +++- .../matlab_agent/src/core/interactive.py | 43 ++++++++++++++++--- .../matlab_agent/src/core/matlab_simulator.py | 28 +++++++++--- .../matlab/matlab_agent/src/core/streaming.py | 28 ++++++++++++ .../test/unit/test_matlab_simulator.py | 10 ++--- 5 files changed, 100 insertions(+), 18 deletions(-) diff --git a/agents/matlab/matlab_agent/src/core/batch.py b/agents/matlab/matlab_agent/src/core/batch.py index 7a17fceb..80fca501 100644 --- a/agents/matlab/matlab_agent/src/core/batch.py +++ b/agents/matlab/matlab_agent/src/core/batch.py @@ -14,7 +14,11 @@ from ..utils.logger import get_logger from ..utils.create_response import create_response from ..comm.interfaces import IMessageBroker -from .matlab_simulator import MatlabSimulator, MatlabSimulationError +from .matlab_simulator import ( + BatchSimulator, + MatlabSimulationError, + MatlabSimulator, +) from ..utils.performance_monitor import PerformanceMonitor # Configure logger @@ -56,7 +60,8 @@ def handle_batch_simulation( sim_path = path_simulation inputs, outputs = _extract_io_specs(data) logger.info("Starting simulation '%s'", sim_file) - sim = MatlabSimulator(sim_path, sim_file, function_name) + sim: MatlabSimulator = BatchSimulator( + sim_path, sim_file, function_name) # Record MATLAB startup complete performance_monitor.record_matlab_startup_complete() _send_progress(rabbitmq_manager, diff --git a/agents/matlab/matlab_agent/src/core/interactive.py b/agents/matlab/matlab_agent/src/core/interactive.py index 954b3f87..abe00f36 100644 --- a/agents/matlab/matlab_agent/src/core/interactive.py +++ b/agents/matlab/matlab_agent/src/core/interactive.py @@ -17,6 +17,7 @@ from ..utils.create_response import create_response from ..utils.logger import get_logger from ..utils.performance_monitor import PerformanceMonitor +from .matlab_simulator import MatlabSimulator from ..utils.constants import ( ACCEPT_TIMEOUT, BUFFER_SIZE, @@ -74,7 +75,8 @@ def send(self, data: Dict[str, Any]) -> None: def recv_all(self) -> list[Dict[str, Any]]: if not self._conn or not select([self._conn], [], [], 0)[0]: return [] - # Read up to BUFFER_SIZE bytes from socket and append to internal buffer (self._buffer) + # Read up to BUFFER_SIZE bytes from socket and append to internal buffer + # (self._buffer) self._buffer += self._conn.recv(BUFFER_SIZE) # Messages are sent with a "\n" character as delimiter. # Split everything received into "lines". @@ -99,7 +101,8 @@ def recv_all(self) -> list[Dict[str, Any]]: logger.error("[INTERACTIVE] Invalid JSON: %s", exc) messages.append({"error": f"Invalid JSON: {str(exc)}"}) # Return the messages list, i.e., all complete decoded JSON objects (plus any error placeholders) available at that moment. - # The next call to recv_all() will resume from where it left off, because only the last incomplete fragment remains in the buffer. + # The next call to recv_all() will resume from where it left off, + # because only the last incomplete fragment remains in the buffer. return messages def close(self) -> None: @@ -241,16 +244,17 @@ def run(self, pm: PerformanceMonitor, msg_dict: Dict[str, Any]) -> None: try: while True: if self.out_srv.matlab_proc and self.out_srv.matlab_proc.poll() is not None: - logger.debug("[INTERACTIVE] MATLAB process ended, stopping loop") + logger.debug( + "[INTERACTIVE] MATLAB process ended, stopping loop") break - method, _ , body = ch.basic_get( + method, _, body = ch.basic_get( queue=qname, auto_ack=True) while method: frame = _parse_frame(body) if frame: # Send the inputs to MATLAB self.in_srv.send(self._only_inputs(frame)) - method, _ , body = ch.basic_get( + method, _, body = ch.basic_get( queue=qname, auto_ack=True) # Receive Responses from MATLAB @@ -276,7 +280,7 @@ def metadata(self) -> Dict[str, Any]: if self.start_time: meta["execution_time"] = time.time() - self.start_time meta["memory_usage"] = psutil.Process( - ).memory_info().rss // BYTES_IN_MB + ).memory_info().rss // BYTES_IN_MB return meta @@ -339,3 +343,30 @@ def handle_interactive_simulation( finally: pm.complete_operation() controller.close() + + +class InteractiveSimulator(MatlabSimulator): + """Adapter implementing the ``MatlabSimulator`` interface for interactive mode.""" + + def __init__(self, *args, **kwargs) -> None: + self._controller = MatlabInteractiveController(*args, **kwargs) + + def start(self) -> None: + pm = PerformanceMonitor() + self._pm = pm + pm.start_operation(self._controller.request_id) + pm.record_matlab_start() + self._controller.start(pm) + + def run(self, msg_dict: Dict[str, Any], outputs: list[str] | None = None + ) -> Dict[str, Any]: + self._controller.run(self._pm, msg_dict) + self._pm.record_matlab_stop() + return {"status": "completed"} + + def close(self) -> None: + self._pm.complete_operation() + self._controller.close() + + def get_metadata(self) -> Dict[str, Any]: + return self._controller.metadata() diff --git a/agents/matlab/matlab_agent/src/core/matlab_simulator.py b/agents/matlab/matlab_agent/src/core/matlab_simulator.py index d9c232c9..b854d96b 100644 --- a/agents/matlab/matlab_agent/src/core/matlab_simulator.py +++ b/agents/matlab/matlab_agent/src/core/matlab_simulator.py @@ -11,6 +11,7 @@ import os import time +from abc import ABC, abstractmethod from pathlib import Path from typing import Dict, Union, List, Optional, Any, Tuple @@ -27,11 +28,28 @@ class MatlabSimulationError(Exception): """Custom exception for MATLAB simulation errors.""" -class MatlabSimulator: - """ - Manages the lifecycle of a MATLAB simulation with proper resource management, - error handling and type conversions. - """ +class MatlabSimulator(ABC): + """Interface for MATLAB-based simulators.""" + + @abstractmethod + def start(self) -> None: + """Start the simulation.""" + + @abstractmethod + def run(self, inputs: Dict[str, Any], outputs: List[str]) -> Dict[str, Any]: + """Run the simulation and return the results.""" + + @abstractmethod + def close(self) -> None: + """Stop the simulation and release resources.""" + + def get_metadata(self) -> Dict[str, Any]: # pragma: no cover - optional + """Return optional simulation metadata.""" + return {} + + +class BatchSimulator(MatlabSimulator): + """Concrete simulator for batch MATLAB executions.""" def __init__( self, diff --git a/agents/matlab/matlab_agent/src/core/streaming.py b/agents/matlab/matlab_agent/src/core/streaming.py index 476b7340..e26f346a 100644 --- a/agents/matlab/matlab_agent/src/core/streaming.py +++ b/agents/matlab/matlab_agent/src/core/streaming.py @@ -20,6 +20,7 @@ from ..utils.create_response import create_response from ..utils.logger import get_logger from ..utils.performance_monitor import PerformanceMonitor +from .matlab_simulator import MatlabSimulator # Configure logger logger = get_logger() @@ -345,6 +346,33 @@ def close(self) -> None: self.connection.close() +class StreamingSimulator(MatlabSimulator): + """Adapter implementing the ``MatlabSimulator`` interface for streaming.""" + + def __init__(self, *args, **kwargs) -> None: + self._controller = MatlabStreamingController(*args, **kwargs) + + def start(self) -> None: + pm = PerformanceMonitor() + self._pm = pm + pm.start_operation(self._controller.request_id) + pm.record_matlab_start() + self._controller.start(pm) + + def run(self, inputs: Dict[str, Any], outputs: list[str] | None = None + ) -> Dict[str, Any]: + self._controller.run(inputs, performance_monitor=self._pm) + self._pm.record_matlab_stop() + return {"status": "completed"} + + def close(self) -> None: + self._pm.complete_operation() + self._controller.close() + + def get_metadata(self) -> Dict[str, Any]: + return self._controller.get_metadata() + + def _handle_streaming_error( sim_file: str, error: Exception, diff --git a/agents/matlab/matlab_agent/test/unit/test_matlab_simulator.py b/agents/matlab/matlab_agent/test/unit/test_matlab_simulator.py index 984ce34a..404519f7 100644 --- a/agents/matlab/matlab_agent/test/unit/test_matlab_simulator.py +++ b/agents/matlab/matlab_agent/test/unit/test_matlab_simulator.py @@ -5,7 +5,7 @@ from unittest.mock import Mock, patch, MagicMock from pathlib import Path -from src.core.matlab_simulator import MatlabSimulator, MatlabSimulationError +from src.core.matlab_simulator import BatchSimulator, MatlabSimulationError @pytest.fixture @@ -39,8 +39,8 @@ def patch_path_exists(): @pytest.fixture def simulator(sim_path, sim_file, patch_path_exists): - """Create a MatlabSimulator instance with mocked dependencies.""" - return MatlabSimulator(sim_path, sim_file) + """Create a BatchSimulator instance with mocked dependencies.""" + return BatchSimulator(sim_path, sim_file) @pytest.fixture @@ -63,14 +63,14 @@ def test_init_with_valid_path(self, simulator, sim_path, sim_file): def test_init_with_custom_function_name( self, sim_path, sim_file, patch_path_exists): """Test initialization with a custom function name.""" - simulator = MatlabSimulator(sim_path, sim_file, "custom_function") + simulator = BatchSimulator(sim_path, sim_file, "custom_function") assert simulator.function_name == "custom_function" def test_init_with_invalid_path(self, sim_file): """Test initialization with an invalid path raises FileNotFoundError.""" with patch('pathlib.Path.is_dir', return_value=False): with pytest.raises(FileNotFoundError, match="Simulation directory not found"): - MatlabSimulator('/invalid/path', sim_file) + BatchSimulator('/invalid/path', sim_file) class TestMatlabSimulatorOperations: From 846eb40971dfd0a117783bee9e18bc168d1224ef Mon Sep 17 00:00:00 2001 From: Marco Melloni <98281551+marcomelloni@users.noreply.github.com> Date: Thu, 24 Jul 2025 11:31:34 +0200 Subject: [PATCH 2/2] Add command-based stop mechanism --- .../src/comm/rabbitmq/message_handler.py | 97 +++++++++++++------ agents/matlab/matlab_agent/src/core/batch.py | 21 ++++ .../matlab_agent/src/core/interactive.py | 5 + .../matlab/matlab_agent/src/core/streaming.py | 5 + .../matlab/matlab_agent/src/utils/commands.py | 4 +- .../test/unit/test_message_handler.py | 6 ++ 6 files changed, 106 insertions(+), 32 deletions(-) diff --git a/agents/matlab/matlab_agent/src/comm/rabbitmq/message_handler.py b/agents/matlab/matlab_agent/src/comm/rabbitmq/message_handler.py index cdebb776..6a74d5e1 100644 --- a/agents/matlab/matlab_agent/src/comm/rabbitmq/message_handler.py +++ b/agents/matlab/matlab_agent/src/comm/rabbitmq/message_handler.py @@ -2,6 +2,7 @@ Message handler for processing incoming RabbitMQ messages. """ import uuid +import threading from typing import Any, Optional, Dict import yaml @@ -99,6 +100,7 @@ def __init__(self, agent_id: str, rabbitmq_manager: Any, self.response_templates = self.config.get( 'response_templates', {}) self.interactive_queues: Dict[str, queue.Queue] = {} + self._sim_thread: threading.Thread | None = None def get_agent_id(self) -> str: """ @@ -109,6 +111,39 @@ def get_agent_id(self) -> str: """ return self.agent_id + # ------------------------------------------------------------------ + def _run_simulation(self, sim_type: str, msg_dict: Dict[str, Any], + source: str) -> None: + """Run the requested simulation in a background thread.""" + try: + if sim_type == 'batch': + handle_batch_simulation( + msg_dict, + source, + self.rabbitmq_manager, + self.path_simulation, + self.response_templates) + elif sim_type == 'streaming': + tcp_settings = self.config.get('tcp', {}) + handle_streaming_simulation( + msg_dict, + source, + self.rabbitmq_manager, + self.path_simulation, + self.response_templates, + tcp_settings) + elif sim_type == 'interactive': + tcp_settings = self.config.get('tcp', {}) + handle_interactive_simulation( + msg_dict, + source, + self.rabbitmq_manager, + self.path_simulation, + self.response_templates, + tcp_settings) + finally: + self._sim_thread = None + def handle_message( self, ch: BlockingChannel, @@ -145,11 +180,16 @@ def handle_message( cmd = msg_dict['command'].upper() if cmd == 'STOP': # Signal to stop the current simulation - logger.info("Received STOP command, signaling to stop simulation") + logger.info( + "Received STOP command, signaling to stop simulation") CommandRegistry.stop() + if self._sim_thread and self._sim_thread.is_alive(): + logger.debug( + "Terminating running simulation thread") elif cmd == 'RUN': # Reset the stop event to allow running simulations - logger.info("Received RUN command, resetting stop event") + logger.info( + "Received RUN command, resetting stop event") CommandRegistry.reset() elif cmd == 'CHECK': # Check the current status of the command registry @@ -236,36 +276,31 @@ def handle_message( ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False) return logger.info("Received simulation type: %s", sim_type) - # Process based on simulation type - if sim_type == 'batch': - handle_batch_simulation( - msg_dict, - source, - self.rabbitmq_manager, - self.path_simulation, - self.response_templates) - ch.basic_ack(delivery_tag=method.delivery_tag) - elif sim_type == 'streaming': - ch.basic_ack(delivery_tag=method.delivery_tag) - tcp_settings = self.config.get( - 'tcp', {}) - handle_streaming_simulation( - msg_dict, source, - self.rabbitmq_manager, - self.path_simulation, - self.response_templates, - tcp_settings - ) - elif sim_type == 'interactive': - ch.basic_ack(delivery_tag=method.delivery_tag) - tcp_settings = self.config.get('tcp', {}) - handle_interactive_simulation( - msg_dict, source, - self.rabbitmq_manager, - self.path_simulation, - self.response_templates, - tcp_settings + if self._sim_thread and self._sim_thread.is_alive(): + logger.warning("Simulation already running") + error_response = create_response( + template_type='error', + sim_file=sim_file, + sim_type=sim_type, + response_templates={}, + bridge_meta=bridge_meta, + request_id=request_id, + error={ + 'message': 'Simulation already running', + 'type': 'simulation_in_progress' + } ) + self.rabbitmq_manager.send_result(source, error_response) + ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False) + return + + self._sim_thread = threading.Thread( + target=self._run_simulation, + args=(sim_type, msg_dict, source), + daemon=True + ) + self._sim_thread.start() + ch.basic_ack(delivery_tag=method.delivery_tag) else: logger.error("Unknown simulation type: %s", sim_type) error_response = create_response( diff --git a/agents/matlab/matlab_agent/src/core/batch.py b/agents/matlab/matlab_agent/src/core/batch.py index 80fca501..8956235a 100644 --- a/agents/matlab/matlab_agent/src/core/batch.py +++ b/agents/matlab/matlab_agent/src/core/batch.py @@ -19,6 +19,7 @@ MatlabSimulationError, MatlabSimulator, ) +from ..utils.commands import CommandRegistry, StopRequested from ..utils.performance_monitor import PerformanceMonitor # Configure logger @@ -50,6 +51,8 @@ def handle_batch_simulation( try: # Record MATLAB start performance_monitor.record_matlab_start() + if CommandRegistry.should_stop(): + raise StopRequested() data: Dict[str, Any] = msg_dict.get('simulation', {}) bridge_meta = data.get('bridge_meta', 'unknown') @@ -72,6 +75,8 @@ def handle_batch_simulation( bridge_meta, request_id) _start_matlab_with_retry(sim) + if CommandRegistry.should_stop(): + raise StopRequested() _send_progress(rabbitmq_manager, source, sim_file, @@ -79,6 +84,8 @@ def handle_batch_simulation( response_templates, bridge_meta, request_id) + if CommandRegistry.should_stop(): + raise StopRequested() results = sim.run(inputs, outputs) metadata = _get_metadata(sim) if response_templates.get( 'success', {}).get('include_metadata', False) else None @@ -99,6 +106,18 @@ def handle_batch_simulation( logger.info("Simulation '%s' completed successfully", sim_file) + except StopRequested: + logger.info("Stop requested - terminating batch simulation") + error_response = create_response( + 'error', + sim_file or 'unknown', + 'batch', + response_templates, + bridge_meta=bridge_meta, + request_id=request_id, + error={'message': 'Simulation stopped', 'type': 'stopped'} + ) + rabbitmq_manager.send_result(source, error_response) except Exception as e: # pylint: disable=broad-except _handle_error( e, @@ -137,6 +156,8 @@ def _start_matlab_with_retry( max_retries: int = 3) -> None: """Attempt to start MATLAB engine with retries.""" for attempt in range(1, max_retries + 1): + if CommandRegistry.should_stop(): + raise StopRequested() try: sim.start() return diff --git a/agents/matlab/matlab_agent/src/core/interactive.py b/agents/matlab/matlab_agent/src/core/interactive.py index abe00f36..bd5281cd 100644 --- a/agents/matlab/matlab_agent/src/core/interactive.py +++ b/agents/matlab/matlab_agent/src/core/interactive.py @@ -18,6 +18,7 @@ from ..utils.logger import get_logger from ..utils.performance_monitor import PerformanceMonitor from .matlab_simulator import MatlabSimulator +from ..utils.commands import CommandRegistry, StopRequested from ..utils.constants import ( ACCEPT_TIMEOUT, BUFFER_SIZE, @@ -243,6 +244,8 @@ def run(self, pm: PerformanceMonitor, msg_dict: Dict[str, Any]) -> None: try: while True: + if CommandRegistry.should_stop(): + raise StopRequested() if self.out_srv.matlab_proc and self.out_srv.matlab_proc.poll() is not None: logger.debug( "[INTERACTIVE] MATLAB process ended, stopping loop") @@ -267,6 +270,8 @@ def run(self, pm: PerformanceMonitor, msg_dict: Dict[str, Any]) -> None: self._relay(resp) except KeyboardInterrupt: # pragma: no cover - manual interruption logger.info("[INTERACTIVE] Interrupted by user") + except StopRequested: + logger.info("[INTERACTIVE] Stop requested - terminating") finally: pm.record_simulation_complete() diff --git a/agents/matlab/matlab_agent/src/core/streaming.py b/agents/matlab/matlab_agent/src/core/streaming.py index e26f346a..4cb1c2ed 100644 --- a/agents/matlab/matlab_agent/src/core/streaming.py +++ b/agents/matlab/matlab_agent/src/core/streaming.py @@ -21,6 +21,7 @@ from ..utils.logger import get_logger from ..utils.performance_monitor import PerformanceMonitor from .matlab_simulator import MatlabSimulator +from ..utils.commands import CommandRegistry, StopRequested # Configure logger logger = get_logger() @@ -301,6 +302,8 @@ def run(self, inputs: Dict[str, Any], performance_monitor) -> None: buffer = b"" sequence = 0 while True: + if CommandRegistry.should_stop(): + raise StopRequested() chunk = self.connection.connection.recv(4096) if not chunk: logger.debug("Connection closed") @@ -323,6 +326,8 @@ def run(self, inputs: Dict[str, Any], performance_monitor) -> None: except (ConnectionError, OSError) as e: logger.error("Connection error: %s", str(e)) raise MatlabStreamingError(f"Connection error: {str(e)}") from e + except StopRequested: + logger.info("Stop requested - terminating streaming simulation") def get_metadata(self) -> Dict[str, Any]: """Collect system resource metadata.""" diff --git a/agents/matlab/matlab_agent/src/utils/commands.py b/agents/matlab/matlab_agent/src/utils/commands.py index 68d78a96..673328fe 100644 --- a/agents/matlab/matlab_agent/src/utils/commands.py +++ b/agents/matlab/matlab_agent/src/utils/commands.py @@ -1,9 +1,11 @@ import threading + class StopRequested(Exception): """Raised to unwind the stack when a stop is requested.""" pass + class CommandRegistry: _stop_event = threading.Event() @@ -22,4 +24,4 @@ def should_stop(cls) -> bool: @classmethod def wait(cls, timeout: float) -> bool: """Block up to `timeout` seconds. Return True if stop requested.""" - return cls._stop_event.wait(timeout) \ No newline at end of file + return cls._stop_event.wait(timeout) diff --git a/agents/matlab/matlab_agent/test/unit/test_message_handler.py b/agents/matlab/matlab_agent/test/unit/test_message_handler.py index f3893e7b..5644a318 100644 --- a/agents/matlab/matlab_agent/test/unit/test_message_handler.py +++ b/agents/matlab/matlab_agent/test/unit/test_message_handler.py @@ -8,6 +8,7 @@ import uuid from unittest.mock import Mock, patch +import time import pytest import yaml from pika.spec import Basic, BasicProperties @@ -172,6 +173,7 @@ def test_handle_message_batch_simulation_success( self.mock_properties, b'test message body' ) + time.sleep(0.05) # Verify mock_yaml_load.assert_called_once_with(b'test message body') @@ -214,6 +216,7 @@ def test_handle_message_streaming_simulation_success( self.mock_properties, b'test message body' ) + time.sleep(0.05) # Verify mock_yaml_load.assert_called_once_with(b'test message body') @@ -502,6 +505,7 @@ def test_routing_key_extraction(self, mock_yaml_load): self.mock_properties, b'test message' ) + time.sleep(0.05) # Verify source is correctly extracted mock_batch.assert_called_once() @@ -576,6 +580,7 @@ def test_complete_batch_message_flow(self, mock_handle_batch): mock_properties, message_body ) + time.sleep(0.05) # Verify successful processing mock_handle_batch.assert_called_once() @@ -634,6 +639,7 @@ def test_complete_streaming_message_flow(self, mock_handle_streaming): mock_properties, message_body ) + time.sleep(0.05) # Verify successful processing mock_handle_streaming.assert_called_once()