Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 66 additions & 31 deletions agents/matlab/matlab_agent/src/comm/rabbitmq/message_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Message handler for processing incoming RabbitMQ messages.
"""
import uuid
import threading
from typing import Any, Optional, Dict

import yaml
Expand Down Expand Up @@ -99,6 +100,7 @@ def __init__(self, agent_id: str, rabbitmq_manager: Any,
self.response_templates = self.config.get(
'response_templates', {})
self.interactive_queues: Dict[str, queue.Queue] = {}
self._sim_thread: threading.Thread | None = None

def get_agent_id(self) -> str:
"""
Expand All @@ -109,6 +111,39 @@ def get_agent_id(self) -> str:
"""
return self.agent_id

# ------------------------------------------------------------------
def _run_simulation(self, sim_type: str, msg_dict: Dict[str, Any],
source: str) -> None:
"""Run the requested simulation in a background thread."""
try:
if sim_type == 'batch':
handle_batch_simulation(
msg_dict,
source,
self.rabbitmq_manager,
self.path_simulation,
self.response_templates)
elif sim_type == 'streaming':
tcp_settings = self.config.get('tcp', {})
handle_streaming_simulation(
msg_dict,
source,
self.rabbitmq_manager,
self.path_simulation,
self.response_templates,
tcp_settings)
elif sim_type == 'interactive':
tcp_settings = self.config.get('tcp', {})
handle_interactive_simulation(
msg_dict,
source,
self.rabbitmq_manager,
self.path_simulation,
self.response_templates,
tcp_settings)
finally:
self._sim_thread = None

def handle_message(
self,
ch: BlockingChannel,
Expand Down Expand Up @@ -145,11 +180,16 @@ def handle_message(
cmd = msg_dict['command'].upper()
if cmd == 'STOP':
# Signal to stop the current simulation
logger.info("Received STOP command, signaling to stop simulation")
logger.info(
"Received STOP command, signaling to stop simulation")
CommandRegistry.stop()
if self._sim_thread and self._sim_thread.is_alive():
logger.debug(
"Terminating running simulation thread")
elif cmd == 'RUN':
# Reset the stop event to allow running simulations
logger.info("Received RUN command, resetting stop event")
logger.info(
"Received RUN command, resetting stop event")
CommandRegistry.reset()
elif cmd == 'CHECK':
# Check the current status of the command registry
Expand Down Expand Up @@ -236,36 +276,31 @@ def handle_message(
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
return
logger.info("Received simulation type: %s", sim_type)
# Process based on simulation type
if sim_type == 'batch':
handle_batch_simulation(
msg_dict,
source,
self.rabbitmq_manager,
self.path_simulation,
self.response_templates)
ch.basic_ack(delivery_tag=method.delivery_tag)
elif sim_type == 'streaming':
ch.basic_ack(delivery_tag=method.delivery_tag)
tcp_settings = self.config.get(
'tcp', {})
handle_streaming_simulation(
msg_dict, source,
self.rabbitmq_manager,
self.path_simulation,
self.response_templates,
tcp_settings
)
elif sim_type == 'interactive':
ch.basic_ack(delivery_tag=method.delivery_tag)
tcp_settings = self.config.get('tcp', {})
handle_interactive_simulation(
msg_dict, source,
self.rabbitmq_manager,
self.path_simulation,
self.response_templates,
tcp_settings
if self._sim_thread and self._sim_thread.is_alive():
logger.warning("Simulation already running")
error_response = create_response(
template_type='error',
sim_file=sim_file,
sim_type=sim_type,
response_templates={},
bridge_meta=bridge_meta,
request_id=request_id,
error={
'message': 'Simulation already running',
'type': 'simulation_in_progress'
}
)
self.rabbitmq_manager.send_result(source, error_response)
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
return

self._sim_thread = threading.Thread(
target=self._run_simulation,
args=(sim_type, msg_dict, source),
daemon=True
)
self._sim_thread.start()
ch.basic_ack(delivery_tag=method.delivery_tag)
else:
logger.error("Unknown simulation type: %s", sim_type)
error_response = create_response(
Expand Down
30 changes: 28 additions & 2 deletions agents/matlab/matlab_agent/src/core/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@
from ..utils.logger import get_logger
from ..utils.create_response import create_response
from ..comm.interfaces import IMessageBroker
from .matlab_simulator import MatlabSimulator, MatlabSimulationError
from .matlab_simulator import (
BatchSimulator,
MatlabSimulationError,
MatlabSimulator,
)
from ..utils.commands import CommandRegistry, StopRequested
from ..utils.performance_monitor import PerformanceMonitor

# Configure logger
Expand Down Expand Up @@ -46,6 +51,8 @@ def handle_batch_simulation(
try:
# Record MATLAB start
performance_monitor.record_matlab_start()
if CommandRegistry.should_stop():
raise StopRequested()

data: Dict[str, Any] = msg_dict.get('simulation', {})
bridge_meta = data.get('bridge_meta', 'unknown')
Expand All @@ -56,7 +63,8 @@ def handle_batch_simulation(
sim_path = path_simulation
inputs, outputs = _extract_io_specs(data)
logger.info("Starting simulation '%s'", sim_file)
sim = MatlabSimulator(sim_path, sim_file, function_name)
sim: MatlabSimulator = BatchSimulator(
sim_path, sim_file, function_name)
# Record MATLAB startup complete
performance_monitor.record_matlab_startup_complete()
_send_progress(rabbitmq_manager,
Expand All @@ -67,13 +75,17 @@ def handle_batch_simulation(
bridge_meta,
request_id)
_start_matlab_with_retry(sim)
if CommandRegistry.should_stop():
raise StopRequested()
_send_progress(rabbitmq_manager,
source,
sim_file,
50,
response_templates,
bridge_meta,
request_id)
if CommandRegistry.should_stop():
raise StopRequested()
results = sim.run(inputs, outputs)
metadata = _get_metadata(sim) if response_templates.get(
'success', {}).get('include_metadata', False) else None
Expand All @@ -94,6 +106,18 @@ def handle_batch_simulation(

logger.info("Simulation '%s' completed successfully", sim_file)

except StopRequested:
logger.info("Stop requested - terminating batch simulation")
error_response = create_response(
'error',
sim_file or 'unknown',
'batch',
response_templates,
bridge_meta=bridge_meta,
request_id=request_id,
error={'message': 'Simulation stopped', 'type': 'stopped'}
)
rabbitmq_manager.send_result(source, error_response)
except Exception as e: # pylint: disable=broad-except
_handle_error(
e,
Expand Down Expand Up @@ -132,6 +156,8 @@ def _start_matlab_with_retry(
max_retries: int = 3) -> None:
"""Attempt to start MATLAB engine with retries."""
for attempt in range(1, max_retries + 1):
if CommandRegistry.should_stop():
raise StopRequested()
try:
sim.start()
return
Expand Down
48 changes: 42 additions & 6 deletions agents/matlab/matlab_agent/src/core/interactive.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
from ..utils.create_response import create_response
from ..utils.logger import get_logger
from ..utils.performance_monitor import PerformanceMonitor
from .matlab_simulator import MatlabSimulator
from ..utils.commands import CommandRegistry, StopRequested
from ..utils.constants import (
ACCEPT_TIMEOUT,
BUFFER_SIZE,
Expand Down Expand Up @@ -74,7 +76,8 @@ def send(self, data: Dict[str, Any]) -> None:
def recv_all(self) -> list[Dict[str, Any]]:
if not self._conn or not select([self._conn], [], [], 0)[0]:
return []
# Read up to BUFFER_SIZE bytes from socket and append to internal buffer (self._buffer)
# Read up to BUFFER_SIZE bytes from socket and append to internal buffer
# (self._buffer)
self._buffer += self._conn.recv(BUFFER_SIZE)
# Messages are sent with a "\n" character as delimiter.
# Split everything received into "lines".
Expand All @@ -99,7 +102,8 @@ def recv_all(self) -> list[Dict[str, Any]]:
logger.error("[INTERACTIVE] Invalid JSON: %s", exc)
messages.append({"error": f"Invalid JSON: {str(exc)}"})
# Return the messages list, i.e., all complete decoded JSON objects (plus any error placeholders) available at that moment.
# The next call to recv_all() will resume from where it left off, because only the last incomplete fragment remains in the buffer.
# The next call to recv_all() will resume from where it left off,
# because only the last incomplete fragment remains in the buffer.
return messages

def close(self) -> None:
Expand Down Expand Up @@ -240,17 +244,20 @@ def run(self, pm: PerformanceMonitor, msg_dict: Dict[str, Any]) -> None:

try:
while True:
if CommandRegistry.should_stop():
raise StopRequested()
if self.out_srv.matlab_proc and self.out_srv.matlab_proc.poll() is not None:
logger.debug("[INTERACTIVE] MATLAB process ended, stopping loop")
logger.debug(
"[INTERACTIVE] MATLAB process ended, stopping loop")
break
method, _ , body = ch.basic_get(
method, _, body = ch.basic_get(
queue=qname, auto_ack=True)
while method:
frame = _parse_frame(body)
if frame:
# Send the inputs to MATLAB
self.in_srv.send(self._only_inputs(frame))
method, _ , body = ch.basic_get(
method, _, body = ch.basic_get(
queue=qname, auto_ack=True)

# Receive Responses from MATLAB
Expand All @@ -263,6 +270,8 @@ def run(self, pm: PerformanceMonitor, msg_dict: Dict[str, Any]) -> None:
self._relay(resp)
except KeyboardInterrupt: # pragma: no cover - manual interruption
logger.info("[INTERACTIVE] Interrupted by user")
except StopRequested:
logger.info("[INTERACTIVE] Stop requested - terminating")
finally:
pm.record_simulation_complete()

Expand All @@ -276,7 +285,7 @@ def metadata(self) -> Dict[str, Any]:
if self.start_time:
meta["execution_time"] = time.time() - self.start_time
meta["memory_usage"] = psutil.Process(
).memory_info().rss // BYTES_IN_MB
).memory_info().rss // BYTES_IN_MB
return meta


Expand Down Expand Up @@ -339,3 +348,30 @@ def handle_interactive_simulation(
finally:
pm.complete_operation()
controller.close()


class InteractiveSimulator(MatlabSimulator):
"""Adapter implementing the ``MatlabSimulator`` interface for interactive mode."""

def __init__(self, *args, **kwargs) -> None:
self._controller = MatlabInteractiveController(*args, **kwargs)

def start(self) -> None:
pm = PerformanceMonitor()
self._pm = pm
pm.start_operation(self._controller.request_id)
pm.record_matlab_start()
self._controller.start(pm)

def run(self, msg_dict: Dict[str, Any], outputs: list[str] | None = None
) -> Dict[str, Any]:
self._controller.run(self._pm, msg_dict)
self._pm.record_matlab_stop()
return {"status": "completed"}

def close(self) -> None:
self._pm.complete_operation()
self._controller.close()

def get_metadata(self) -> Dict[str, Any]:
return self._controller.metadata()
28 changes: 23 additions & 5 deletions agents/matlab/matlab_agent/src/core/matlab_simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import os
import time
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Dict, Union, List, Optional, Any, Tuple

Expand All @@ -27,11 +28,28 @@ class MatlabSimulationError(Exception):
"""Custom exception for MATLAB simulation errors."""


class MatlabSimulator:
"""
Manages the lifecycle of a MATLAB simulation with proper resource management,
error handling and type conversions.
"""
class MatlabSimulator(ABC):
"""Interface for MATLAB-based simulators."""

@abstractmethod
def start(self) -> None:
"""Start the simulation."""

@abstractmethod
def run(self, inputs: Dict[str, Any], outputs: List[str]) -> Dict[str, Any]:
"""Run the simulation and return the results."""

@abstractmethod
def close(self) -> None:
"""Stop the simulation and release resources."""

def get_metadata(self) -> Dict[str, Any]: # pragma: no cover - optional
"""Return optional simulation metadata."""
return {}


class BatchSimulator(MatlabSimulator):
"""Concrete simulator for batch MATLAB executions."""

def __init__(
self,
Expand Down
Loading
Loading