diff --git a/.gitignore b/.gitignore index aa083e1c7..727ab9043 100644 --- a/.gitignore +++ b/.gitignore @@ -5,4 +5,7 @@ .migration_complete # PlatformIO -.pio/ \ No newline at end of file +.pio/ + +# Worktrees +worktrees/ diff --git a/software/control/_def.py b/software/control/_def.py index aa8f34c51..57c2950f8 100644 --- a/software/control/_def.py +++ b/software/control/_def.py @@ -1105,6 +1105,12 @@ class SOFTWARE_POS_LIMIT: }, } +# Multi-camera support +# All cameras must be the same type (specified by CAMERA_TYPE above) +USE_MULTI_CAMERA = False +MULTI_CAMERA_IDS = [1] # List of camera IDs to instantiate +MULTI_CAMERA_SNS = {} # Camera ID -> serial number, e.g., {"1": "ABC123", "2": "DEF456"} + # Stage USE_PRIOR_STAGE = False PRIOR_STAGE_SN = "" diff --git a/software/control/camera_toupcam.py b/software/control/camera_toupcam.py index 3d270413f..3d2036361 100644 --- a/software/control/camera_toupcam.py +++ b/software/control/camera_toupcam.py @@ -178,6 +178,9 @@ def _open(index=None, sn=None) -> Tuple[toupcam.Toupcam, ToupCamCapabilities]: if not len(sn_matches): all_sn = [d.id for d in devices] raise ValueError(f"Could not find camera with SN={sn}, options are: {','.join(all_sn)}") + # Use the matched index + index = sn_matches[0] + log.info(f"Found camera with SN={sn} at index={index}") for idx, device in enumerate(devices): log.info( @@ -237,7 +240,11 @@ def __init__(self, config: CameraConfig, hw_trigger_fn, hw_set_strobe_delay_ms_f # is what the camera driver calls when a new frame is available. self._raw_camera_stream_started = False self._raw_frame_callback_lock = threading.Lock() - (self._camera, self._capabilities) = ToupcamCamera._open(index=0) + # Use serial number from config if available, otherwise fall back to index=0 + if self._config.serial_number: + (self._camera, self._capabilities) = ToupcamCamera._open(sn=self._config.serial_number) + else: + (self._camera, self._capabilities) = ToupcamCamera._open(index=0) self._pixel_format = self._config.default_pixel_format self._binning = self._config.default_binning diff --git a/software/control/core/auto_focus_worker.py b/software/control/core/auto_focus_worker.py index 77a4770bb..ac2f5f99d 100644 --- a/software/control/core/auto_focus_worker.py +++ b/software/control/core/auto_focus_worker.py @@ -87,8 +87,13 @@ def run_autofocus(self): # "reset_image_ready_flag" arg, so this is broken for all other cameras. image = self.camera.read_frame() else: + # Get trigger channel for active camera + active_camera_id = self.liveController._active_camera_id + trigger_channel = self.liveController.microscope.get_trigger_channel_for_camera(active_camera_id) self.microcontroller.send_hardware_trigger( - control_illumination=True, illumination_on_time_us=self.camera.get_exposure_time() * 1000 + control_illumination=True, + illumination_on_time_us=self.camera.get_exposure_time() * 1000, + trigger_output_ch=trigger_channel, ) image = self.camera.read_frame() if image is None: diff --git a/software/control/core/live_controller.py b/software/control/core/live_controller.py index d1b6a9239..b09d508f0 100644 --- a/software/control/core/live_controller.py +++ b/software/control/core/live_controller.py @@ -2,7 +2,7 @@ import time import threading -from typing import List, Optional, TYPE_CHECKING +from typing import Callable, List, Optional, TYPE_CHECKING import squid.logging from control.microcontroller import Microcontroller @@ -12,9 +12,21 @@ from control.core.config.utils import apply_confocal_override from control.models import merge_channel_configs +# Mapping from camera acquisition mode to LiveController trigger mode +_CAM_MODE_TO_TRIGGER = { + CameraAcquisitionMode.SOFTWARE_TRIGGER: TriggerMode.SOFTWARE, + CameraAcquisitionMode.HARDWARE_TRIGGER: TriggerMode.HARDWARE, + CameraAcquisitionMode.CONTINUOUS: TriggerMode.CONTINUOUS, +} + if TYPE_CHECKING: from control.models import AcquisitionChannel, IlluminationChannelConfig +# Type aliases for callbacks (multi-camera support) +# Note: TriggerMode values are strings (e.g., "Software Trigger"), not class instances +OnCameraSwitchedCallback = Callable[[AbstractCamera, AbstractCamera], None] +OnTriggerModeChangedCallback = Callable[[str], None] + class LiveController: def __init__( @@ -25,10 +37,16 @@ def __init__( control_illumination: bool = True, use_internal_timer_for_hardware_trigger: bool = True, for_displacement_measurement: bool = False, + initial_camera_id: Optional[int] = None, ): self._log = squid.logging.get_logger(self.__class__.__name__) self.microscope = microscope self.camera: AbstractCamera = camera + # Track which camera is currently active (for multi-camera support) + # Use explicit ID if provided, otherwise fall back to microscope's primary + self._active_camera_id: int = ( + initial_camera_id if initial_camera_id is not None else microscope._primary_camera_id + ) self.currentConfiguration: Optional[AcquisitionChannel] = None self.trigger_mode: Optional[TriggerMode] = TriggerMode.SOFTWARE # @@@ change to None self.is_live = False @@ -57,6 +75,12 @@ def __init__( # Confocal mode state - when True, use confocal_override from acquisition configs self._confocal_mode: bool = False + # Callback for camera switch notifications (for multi-camera support) + self.on_camera_switched: Optional[OnCameraSwitchedCallback] = None + + # Callback for trigger mode change notifications (for multi-camera support) + self.on_trigger_mode_changed: Optional[OnTriggerModeChangedCallback] = None + # ───────────────────────────────────────────────────────────────────────────── # Illumination config helpers # ───────────────────────────────────────────────────────────────────────────── @@ -114,6 +138,76 @@ def sync_confocal_mode_from_hardware(self, confocal: bool) -> None: """ self.toggle_confocal_widefield(confocal) + # ───────────────────────────────────────────────────────────────────────────── + # Multi-camera support + # ───────────────────────────────────────────────────────────────────────────── + + def _get_target_camera_id(self, configuration: "AcquisitionChannel") -> int: + """Get the camera ID to use for the given channel configuration. + + Args: + configuration: The acquisition channel configuration + + Returns: + Camera ID to use. If channel.camera is None, returns the primary camera ID. + """ + if configuration.camera is not None: + return configuration.camera + return self.microscope._primary_camera_id + + def _switch_camera(self, camera_id: int) -> None: + """Switch to a different camera for live preview. + + This method updates the active camera used by the LiveController. + The caller is responsible for stopping live view before calling + this method if necessary. + + Args: + camera_id: The camera ID to switch to + + Raises: + ValueError: If the camera ID is not found in the microscope + """ + if camera_id == self._active_camera_id: + return # Already using this camera + + new_camera = self.microscope.get_camera(camera_id) + old_camera = self.camera + old_camera_id = self._active_camera_id + + self._log.info(f"Switching live camera: {old_camera_id} -> {camera_id}") + self.camera = new_camera + self._active_camera_id = camera_id + + # Read the new camera's acquisition mode and update trigger_mode to match + cam_mode = new_camera.get_acquisition_mode() + new_trigger_mode = _CAM_MODE_TO_TRIGGER.get(cam_mode, TriggerMode.SOFTWARE) + + if new_trigger_mode != self.trigger_mode: + self._log.info(f"Trigger mode changed: {self.trigger_mode} -> {new_trigger_mode}") + self.trigger_mode = new_trigger_mode + # Notify UI to update trigger mode display + if self.on_trigger_mode_changed: + try: + self.on_trigger_mode_changed(new_trigger_mode) + except Exception as e: + self._log.error(f"Error in on_trigger_mode_changed callback: {e}") + + # Notify listeners of camera switch (e.g., to move frame callbacks) + if self.on_camera_switched: + try: + self.on_camera_switched(old_camera, new_camera) + except Exception as e: + self._log.error(f"Error in on_camera_switched callback: {e}") + + def get_active_camera_id(self) -> int: + """Get the ID of the currently active camera. + + Returns: + The camera ID currently being used for live preview. + """ + return self._active_camera_id + # ───────────────────────────────────────────────────────────────────────────── # Channel configuration access # ───────────────────────────────────────────────────────────────────────────── @@ -160,6 +254,18 @@ def get_channels(self, objective: str) -> List["AcquisitionChannel"]: # Filter to only enabled channels channels = [ch for ch in channels if ch.enabled] + # In multi-camera systems, filter out enabled channels without camera assignment + # (UI should prevent this, but enforce here as safety check) + if len(self.microscope._cameras) > 1: + unassigned = [ch for ch in channels if ch.camera is None] + if unassigned: + self._log.warning( + f"Skipping {len(unassigned)} enabled channel(s) without camera assignment " + f"in multi-camera system: {[ch.name for ch in unassigned]}. " + f"Assign cameras in Settings > Acquisition Channel Configuration." + ) + channels = [ch for ch in channels if ch.camera is not None] + # Apply confocal mode if active return apply_confocal_override(channels, self._confocal_mode) @@ -446,14 +552,41 @@ def set_trigger_fps(self, fps): self._set_trigger_fps(fps) # set microscope mode - def set_microscope_mode(self, configuration: "AcquisitionChannel"): + def set_microscope_mode(self, configuration: "AcquisitionChannel") -> bool: + """Set the microscope mode to the given channel configuration. + + Args: + configuration: The acquisition channel configuration to apply. + + Returns: + True if mode was successfully set, False if there was an error + (e.g., None configuration, invalid camera ID). + """ if configuration is None: self._log.error("set_microscope_mode() called with None configuration - this is a bug in the caller") - return + return False self._log.info("setting microscope mode to " + configuration.name) - # temporarily stop live while changing mode - if self.is_live is True: + # Check if we need to switch cameras (multi-camera support) + target_camera_id = self._get_target_camera_id(configuration) + + # Validate target camera exists BEFORE any state changes + if target_camera_id not in self.microscope._cameras: + available = sorted(self.microscope._cameras.keys()) + self._log.error( + f"Channel '{configuration.name}' requires camera {target_camera_id}, " + f"but only cameras {available} exist. Mode not changed." + ) + return False # Exit cleanly, no state changed + + needs_camera_switch = target_camera_id != self._active_camera_id + + # Save live state and temporarily disable to prevent race conditions + # (frame callbacks accessing camera during switch) + was_live = self.is_live + + if was_live: + self.is_live = False # Prevent concurrent access during mode change self._stop_existing_timer() if self.control_illumination: # Turn off illumination BEFORE switching self.currentConfiguration. @@ -462,6 +595,17 @@ def set_microscope_mode(self, configuration: "AcquisitionChannel"): # channel's laser instead of the OLD channel's laser (which is still on). self.turn_off_illumination() + # Stop streaming on old camera before switching + if needs_camera_switch: + self.camera.stop_streaming() + + # Switch camera if needed + if needs_camera_switch: + self._switch_camera(target_camera_id) + # Start streaming on new camera if we were live + if was_live: + self.camera.start_streaming() + self.currentConfiguration = configuration # set camera exposure time and analog gain @@ -476,11 +620,13 @@ def set_microscope_mode(self, configuration: "AcquisitionChannel"): self.update_illumination() # restart live - if self.is_live is True: + if was_live: + self.is_live = True # Restore live state if self.control_illumination: self.turn_on_illumination() self._start_new_timer() self._log.info("Done setting microscope mode.") + return True def get_trigger_mode(self): return self.trigger_mode diff --git a/software/control/gui_hcs.py b/software/control/gui_hcs.py index 0aae43e03..97d1f939a 100644 --- a/software/control/gui_hcs.py +++ b/software/control/gui_hcs.py @@ -743,6 +743,19 @@ def __init__( filter_wheel_config_action.triggered.connect(self.openFilterWheelConfigEditor) advanced_menu.addAction(filter_wheel_config_action) + # Camera Configuration (only shown if multi-camera is enabled) + if USE_MULTI_CAMERA: + camera_config_action = QAction("Camera Configuration", self) + camera_config_action.triggered.connect(self.openCameraConfigEditor) + advanced_menu.addAction(camera_config_action) + + # Channel Group Configuration (only shown if multi-camera system) + camera_count = len(self.microscope.config_repo.get_camera_names()) + if camera_count > 1: + channel_group_config_action = QAction("Channel Group Configuration", self) + channel_group_config_action.triggered.connect(self.openChannelGroupConfigEditor) + advanced_menu.addAction(channel_group_config_action) + if USE_JUPYTER_CONSOLE: # Create namespace to expose to Jupyter self.namespace = { @@ -843,9 +856,15 @@ def setup_hardware(self, skip_init: bool = False): self.camera.set_acquisition_mode(squid.abc.CameraAcquisitionMode.HARDWARE_TRIGGER) else: self.camera.set_acquisition_mode(squid.abc.CameraAcquisitionMode.SOFTWARE_TRIGGER) - self.camera.add_frame_callback(self.streamHandler.get_frame_callback()) + self._frame_callback = self.streamHandler.get_frame_callback() + self._frame_callback_id = self.camera.add_frame_callback(self._frame_callback) self.camera.enable_callbacks(enabled=True) + # Register callback to move frame callback when live camera switches (multi-camera support) + self.liveController.on_camera_switched = self._on_live_camera_switched + # Register callback to update UI when trigger mode changes (multi-camera support) + self.liveController.on_trigger_mode_changed = self._on_trigger_mode_changed + if self.camera_focus: self.camera_focus.set_acquisition_mode( squid.abc.CameraAcquisitionMode.SOFTWARE_TRIGGER @@ -862,6 +881,49 @@ def setup_hardware(self, skip_init: bool = False): elif DEFAULT_OBJECTIVE in XERYON_OBJECTIVE_SWITCHER_POS_2: self.objective_changer.moveToPosition2(move_z=False) + def _on_live_camera_switched(self, old_camera, new_camera): + """Handle camera switch for live preview (multi-camera support). + + Moves the frame callback from the old camera to the new camera so that + frames from the new camera are properly displayed. + + Note: At this point, old_camera has already stopped streaming. + Uses QTimer.singleShot for thread safety since this may be called from non-GUI thread. + """ + from qtpy.QtCore import QTimer + + def switch_callback(): + try: + self.log.info(f"Moving frame callback from old camera to new camera") + old_callback_id = self._frame_callback_id + # Add to new camera first to minimize window with no callbacks + self._frame_callback_id = new_camera.add_frame_callback(self._frame_callback) + # Then remove from old camera (already stopped, just cleanup) + old_camera.remove_frame_callback(old_callback_id) + except Exception as e: + self.log.error(f"Failed to switch camera frame callback: {e}") + + QTimer.singleShot(0, switch_callback) + + def _on_trigger_mode_changed(self, new_mode): + """Handle trigger mode change when switching cameras (multi-camera support). + + Updates the UI to reflect the new camera's trigger mode. + """ + self.log.info(f"Updating UI trigger mode to: {new_mode}") + if hasattr(self, "liveControlWidget") and self.liveControlWidget: + # Use QTimer to safely update UI from potentially non-GUI thread + from qtpy.QtCore import QTimer + + def update_display(): + try: + if self.liveControlWidget is not None: + self.liveControlWidget.update_trigger_mode_display(new_mode) + except Exception as e: + self.log.error(f"Failed to update trigger mode display: {e}") + + QTimer.singleShot(0, update_display) + def waitForMicrocontroller(self, timeout=5.0, error_message=None): try: self.microcontroller.wait_till_operation_is_completed(timeout) @@ -2137,6 +2199,18 @@ def openFilterWheelConfigEditor(self): dialog.signal_config_updated.connect(self._refresh_channel_lists) dialog.exec_() + def openCameraConfigEditor(self): + """Open the camera configuration dialog""" + dialog = widgets.CameraConfiguratorDialog(self.microscope.config_repo, self) + dialog.signal_config_updated.connect(self._refresh_channel_lists) + dialog.exec_() + + def openChannelGroupConfigEditor(self): + """Open the channel group configuration dialog""" + dialog = widgets.ChannelGroupEditorDialog(self.microscope.config_repo, self) + dialog.signal_config_updated.connect(self._refresh_channel_lists) + dialog.exec_() + def _refresh_channel_lists(self): """Refresh channel lists in all widgets after channel configuration changes""" if self.liveControlWidget: diff --git a/software/control/microscope.py b/software/control/microscope.py index cf43f97df..bff2c4a0e 100644 --- a/software/control/microscope.py +++ b/software/control/microscope.py @@ -1,5 +1,5 @@ from pathlib import Path -from typing import Optional +from typing import Dict, List, Optional, Union import numpy as np @@ -27,6 +27,11 @@ import squid.logging import squid.stage.cephla import squid.stage.utils +from squid.camera.config_factory import ( + DEFAULT_SINGLE_CAMERA_ID, + create_camera_configs, + get_primary_camera_id, +) if control._def.USE_XERYON: from control.objective_changer_2_pos_controller import ( @@ -268,35 +273,79 @@ def build_from_global_config(simulated: bool = False, skip_init: bool = False) - cam_trigger_log = squid.logging.get_logger("camera hw functions") - def acquisition_camera_hw_trigger_fn(illumination_time: Optional[float]) -> bool: - # NOTE(imo): If this succeeds, it means we sent the request, - # but we didn't necessarily get confirmation of success. - if addons.nl5 and control._def.NL5_USE_DOUT: - addons.nl5.start_acquisition() - else: - illumination_time_us = 1000.0 * illumination_time if illumination_time else 0 + # Get camera registry for multi-camera support + from control.core.config import ConfigRepository + + config_repo = ConfigRepository() + camera_registry = config_repo.get_camera_registry() + + # Generate per-camera configs from registry + base template + base_camera_config = squid.config.get_camera_config() + camera_configs = create_camera_configs(camera_registry, base_camera_config) + + def make_hw_trigger_fn(trigger_channel: int): + """Create a hardware trigger function for a specific camera channel.""" + + def acquisition_camera_hw_trigger_fn(illumination_time: Optional[float]) -> bool: + # NOTE(imo): If this succeeds, it means we sent the request, + # but we didn't necessarily get confirmation of success. + if addons.nl5 and control._def.NL5_USE_DOUT: + addons.nl5.start_acquisition() + else: + illumination_time_us = 1000.0 * illumination_time if illumination_time else 0 + cam_trigger_log.debug( + f"Sending hw trigger ch={trigger_channel} with illumination_time=" + f"{illumination_time_us if illumination_time else None} [us]" + ) + low_level_devices.microcontroller.send_hardware_trigger( + illumination_time is not None, illumination_time_us, trigger_channel + ) + return True + + return acquisition_camera_hw_trigger_fn + + def make_hw_strobe_delay_fn(trigger_channel: int): + """Create a strobe delay function for a specific camera channel.""" + + def acquisition_camera_hw_strobe_delay_fn(strobe_delay_ms: float) -> bool: + strobe_delay_us = int(1000 * strobe_delay_ms) cam_trigger_log.debug( - f"Sending hw trigger with illumination_time={illumination_time_us if illumination_time else None} [us]" - ) - low_level_devices.microcontroller.send_hardware_trigger( - illumination_time is not None, illumination_time_us + f"Setting microcontroller strobe delay ch={trigger_channel} to {strobe_delay_us} [us]" ) - return True + low_level_devices.microcontroller.set_strobe_delay_us(strobe_delay_us, trigger_channel) + low_level_devices.microcontroller.wait_till_operation_is_completed() - def acquisition_camera_hw_strobe_delay_fn(strobe_delay_ms: float) -> bool: - strobe_delay_us = int(1000 * strobe_delay_ms) - cam_trigger_log.debug(f"Setting microcontroller strobe delay to {strobe_delay_us} [us]") - low_level_devices.microcontroller.set_strobe_delay_us(strobe_delay_us) - low_level_devices.microcontroller.wait_till_operation_is_completed() + return True - return True + return acquisition_camera_hw_strobe_delay_fn - camera = squid.camera.utils.get_camera( - config=squid.config.get_camera_config(), - simulated=camera_simulated, - hw_trigger_fn=acquisition_camera_hw_trigger_fn, - hw_set_strobe_delay_ms_fn=acquisition_camera_hw_strobe_delay_fn, - ) + # Instantiate all cameras + cameras: Dict[int, AbstractCamera] = {} + for camera_id, cam_config in camera_configs.items(): + # Get trigger channel from registry (defaults to camera_id - 1 if not configured) + trigger_channel = camera_registry.get_trigger_channel(camera_id) if camera_registry else (camera_id - 1) + cam_trigger_log.info( + f"Initializing camera {camera_id} (SN: {cam_config.serial_number}, trigger_ch={trigger_channel})" + ) + try: + camera = squid.camera.utils.get_camera( + config=cam_config, + simulated=camera_simulated, + hw_trigger_fn=make_hw_trigger_fn(trigger_channel), + hw_set_strobe_delay_ms_fn=make_hw_strobe_delay_fn(trigger_channel), + ) + cameras[camera_id] = camera + except Exception as e: + cam_trigger_log.error(f"Failed to initialize camera {camera_id}: {e}") + # Close any cameras we've already opened + for cleanup_cam_id, opened_camera in cameras.items(): + try: + opened_camera.close() + except Exception as cleanup_exc: + cam_trigger_log.error(f"Cleanup failed for camera {cleanup_cam_id}: {cleanup_exc}") + raise RuntimeError(f"Failed to initialize camera {camera_id}: {e}") from e + + cam_trigger_log.info(f"Initialized {len(cameras)} camera(s): IDs {sorted(cameras.keys())}") if control._def.USE_LDI_SERIAL_CONTROL and not simulated: ldi = serial_peripherals.LDI() @@ -329,10 +378,11 @@ def acquisition_camera_hw_strobe_delay_fn(strobe_delay_ms: float) -> bool: return Microscope( stage=stage, - camera=camera, + cameras=cameras, illumination_controller=illumination_controller, addons=addons, low_level_drivers=low_level_devices, + config_repo=config_repo, simulated=simulated, skip_init=skip_init, ) @@ -340,11 +390,12 @@ def acquisition_camera_hw_strobe_delay_fn(strobe_delay_ms: float) -> bool: def __init__( self, stage: AbstractStage, - camera: AbstractCamera, + cameras: Union[AbstractCamera, Dict[int, AbstractCamera]], illumination_controller: IlluminationController, addons: MicroscopeAddons, low_level_drivers: LowLevelDrivers, stream_handler_callbacks: Optional[StreamHandlerFunctions] = NoOpStreamHandlerFunctions, + config_repo: Optional["ConfigRepository"] = None, simulated: bool = False, skip_prepare_for_use: bool = False, skip_init: bool = False, @@ -352,7 +403,20 @@ def __init__( self._log = squid.logging.get_logger(self.__class__.__name__) self.stage: AbstractStage = stage - self.camera: AbstractCamera = camera + + # Multi-camera support: accept either a single camera or a dict of cameras + # For backward compatibility, a single camera is wrapped in a dict with default ID + if isinstance(cameras, dict): + self._cameras: Dict[int, AbstractCamera] = cameras + else: + # Backward compatibility: wrap single camera in dict + self._cameras = {DEFAULT_SINGLE_CAMERA_ID: cameras} + + self._primary_camera_id: int = get_primary_camera_id(list(self._cameras.keys())) + self._log.info( + f"Initialized with {len(self._cameras)} camera(s), " f"primary camera ID: {self._primary_camera_id}" + ) + self.illumination_controller: IlluminationController = illumination_controller self.addons = addons @@ -362,8 +426,8 @@ def __init__( self.objective_store: ObjectiveStore = ObjectiveStore() - # Centralized config management - self.config_repo: ConfigRepository = ConfigRepository() + # Centralized config management - reuse passed repo or create new one + self.config_repo: ConfigRepository = config_repo if config_repo is not None else ConfigRepository() # Note: Migration from acquisition_configurations to user_profiles is handled # by run_auto_migration() in main_hcs.py before Microscope is created @@ -393,7 +457,11 @@ def __init__( for_displacement_measurement=True, ) - self.live_controller: LiveController = LiveController(microscope=self, camera=self.camera) + self.live_controller: LiveController = LiveController( + microscope=self, + camera=self._cameras[self._primary_camera_id], + initial_camera_id=self._primary_camera_id, + ) # Sync confocal mode from hardware (must be after LiveController creation) if control._def.ENABLE_SPINNING_DISK_CONFOCAL: @@ -402,6 +470,56 @@ def __init__( if not skip_prepare_for_use: self._prepare_for_use(skip_init=skip_init) + # ═══════════════════════════════════════════════════════════════════════════ + # MULTI-CAMERA API + # ═══════════════════════════════════════════════════════════════════════════ + + @property + def camera(self) -> AbstractCamera: + """Get the primary camera (backward compatible). + + Returns: + The primary camera instance (camera with lowest ID). + """ + return self._cameras[self._primary_camera_id] + + def get_camera(self, camera_id: int) -> AbstractCamera: + """Get a camera by its ID. + + Args: + camera_id: The camera ID (from cameras.yaml). + + Returns: + The camera instance. + + Raises: + ValueError: If camera_id is not found. + """ + if camera_id not in self._cameras: + available = sorted(self._cameras.keys()) + raise ValueError(f"Camera ID {camera_id} not found. Available IDs: {available}") + return self._cameras[camera_id] + + def get_camera_ids(self) -> List[int]: + """Get sorted list of all camera IDs. + + Returns: + List of camera IDs in ascending order. + """ + return sorted(self._cameras.keys()) + + def get_camera_count(self) -> int: + """Get the number of cameras. + + Returns: + Number of cameras in the system. + """ + return len(self._cameras) + + # ═══════════════════════════════════════════════════════════════════════════ + # INITIALIZATION AND LIFECYCLE + # ═══════════════════════════════════════════════════════════════════════════ + def _prepare_for_use(self, skip_init: bool = False): self.low_level_drivers.prepare_for_use(skip_init=skip_init) self.addons.prepare_for_use(skip_init=skip_init) @@ -420,10 +538,14 @@ def _prepare_for_use(self, skip_init: bool = False): "does not support this feature (requires v1.1+)" ) - self.camera.set_pixel_format( - squid.config.CameraPixelFormat.from_string(control._def.CAMERA_CONFIG.PIXEL_FORMAT_DEFAULT) + # Initialize all cameras with default settings + default_pixel_format = squid.config.CameraPixelFormat.from_string( + control._def.CAMERA_CONFIG.PIXEL_FORMAT_DEFAULT ) - self.camera.set_acquisition_mode(CameraAcquisitionMode.SOFTWARE_TRIGGER) + for camera_id, camera in self._cameras.items(): + self._log.debug(f"Initializing camera {camera_id}") + camera.set_pixel_format(default_pixel_format) + camera.set_acquisition_mode(CameraAcquisitionMode.SOFTWARE_TRIGGER) if self.addons.camera_focus: self.addons.camera_focus.set_pixel_format(squid.config.CameraPixelFormat.from_string("MONO8")) @@ -537,6 +659,20 @@ def setup_hardware(self) -> None: self.addons.camera_focus.enable_callbacks(True) self.addons.camera_focus.start_streaming() + def get_trigger_channel_for_camera(self, camera_id: int) -> int: + """Get the trigger channel for a camera. + + Args: + camera_id: The camera ID to look up. + + Returns: + The trigger channel from camera registry, or camera_id - 1 as fallback. + """ + camera_registry = self.config_repo.get_camera_registry() + if camera_registry: + return camera_registry.get_trigger_channel(camera_id) + return camera_id - 1 + def acquire_image(self) -> np.ndarray: """Acquire a single image from the camera. @@ -558,8 +694,13 @@ def acquire_image(self) -> np.ndarray: self._wait_for_microcontroller() self.camera.send_trigger() elif self.live_controller.trigger_mode == control._def.TriggerMode.HARDWARE: + # Get trigger channel for active camera + active_camera_id = self.live_controller._active_camera_id + trigger_channel = self.get_trigger_channel_for_camera(active_camera_id) self.low_level_drivers.microcontroller.send_hardware_trigger( - control_illumination=True, illumination_on_time_us=self.camera.get_exposure_time() * 1000 + control_illumination=True, + illumination_on_time_us=self.camera.get_exposure_time() * 1000, + trigger_output_ch=trigger_channel, ) try: @@ -725,10 +866,17 @@ def close(self) -> None: except Exception as e: self._log.warning(f"Error closing focus camera: {e}") - try: - self.camera.close() - except Exception as e: - self._log.warning(f"Error closing camera: {e}") + # Close all cameras + for camera_id, camera in self._cameras.items(): + try: + # Stop streaming first (some cameras require this before close) + try: + camera.stop_streaming() + except Exception as stream_e: + self._log.debug(f"stop_streaming for camera {camera_id}: {stream_e}") + camera.close() + except Exception as e: + self._log.warning(f"Error closing camera {camera_id}: {e}") def move_to_position(self, x: float, y: float, z: float) -> None: """Move the stage to an absolute XYZ position. diff --git a/software/control/models/acquisition_config.py b/software/control/models/acquisition_config.py index 6ca7549c2..f93560cab 100644 --- a/software/control/models/acquisition_config.py +++ b/software/control/models/acquisition_config.py @@ -535,6 +535,17 @@ def validate_channel_group( """ errors = [] + # Check for empty channel names + empty_entries = [i + 1 for i, entry in enumerate(group.channels) if not entry.name] + if empty_entries: + errors.append(f"Group '{group.name}' has unselected channel(s) at row(s): {empty_entries}") + + # Check for duplicate channel names in the group (excluding empty names) + channel_names = [entry.name for entry in group.channels if entry.name] + if len(channel_names) != len(set(channel_names)): + duplicates = [name for name in set(channel_names) if channel_names.count(name) > 1] + errors.append(f"Group '{group.name}' has duplicate channels: {duplicates}") + # Track cameras used (v1.0: camera field is int ID) cameras_used: List[Optional[int]] = [] for entry in group.channels: diff --git a/software/control/models/camera_registry.py b/software/control/models/camera_registry.py index 00695ac1c..0765a2aa7 100644 --- a/software/control/models/camera_registry.py +++ b/software/control/models/camera_registry.py @@ -25,6 +25,11 @@ class CameraDefinition(BaseModel): id: Optional[int] = Field(None, ge=1, description="Camera ID for hardware bindings") serial_number: str = Field(..., min_length=1, description="Hardware serial number") model: Optional[str] = Field(None, description="Camera model for display") + trigger_channel: Optional[int] = Field( + None, + ge=0, + description="MCU trigger output channel for hardware triggering (0-based)", + ) model_config = {"extra": "forbid"} @@ -160,3 +165,20 @@ def get_serial_number(self, camera_name: str) -> Optional[str]: """Get serial number for a camera name.""" camera = self.get_camera_by_name(camera_name) return camera.serial_number if camera else None + + def get_trigger_channel(self, camera_id: int) -> int: + """Get trigger channel for a camera ID. + + Returns the configured trigger_channel, or defaults to (camera_id - 1) + if not configured (camera ID 1 → channel 0, camera ID 2 → channel 1, etc.) + """ + camera = self.get_camera_by_id(camera_id) + if camera and camera.trigger_channel is not None: + return camera.trigger_channel + # Default: camera ID 1 → channel 0, camera ID 2 → channel 1, etc. + default_channel = camera_id - 1 if camera_id >= 1 else 0 + if camera is None: + logger.warning( + f"Camera ID {camera_id} not found in registry, " f"using default trigger channel {default_channel}" + ) + return default_channel diff --git a/software/control/widgets.py b/software/control/widgets.py index 2fa4fa30f..7a4f49849 100644 --- a/software/control/widgets.py +++ b/software/control/widgets.py @@ -18,6 +18,16 @@ import squid.logging from control.core.config import ConfigRepository from control.core.core import TrackingController, LiveController +from control.models import ( + AcquisitionChannel, + CameraSettings, + ChannelGroup, + ChannelGroupEntry, + GeneralChannelConfig, + IlluminationSettings, + SynchronizationMode, + validate_channel_group, +) from control.core.multi_point_controller import MultiPointController from control.core.downsampled_views import format_well_id from control.core.geometry_utils import get_effective_well_size, calculate_well_coverage @@ -147,8 +157,12 @@ def save_last_used_saving_path(path: str) -> None: os.makedirs("cache", exist_ok=True) with open(cache_file, "w") as f: f.write(path) - except OSError: - pass # Silently fail - caching is a convenience feature + except OSError as e: + # Log at debug level - caching is a convenience feature but + # repeated failures could indicate disk issues + import logging + + logging.getLogger(__name__).debug(f"Failed to cache saving path: {e}") class WrapperWindow(QMainWindow): @@ -4188,36 +4202,49 @@ def update_trigger_mode(self): self.liveController.set_trigger_mode(self.dropdown_triggerManu.currentText()) def update_config_exposure_time(self, new_value): - if self.is_switching_mode == False: - self.currentConfiguration.exposure_time = new_value - self.liveController.microscope.config_repo.update_channel_setting( - self.objectiveStore.current_objective, self.currentConfiguration.name, "ExposureTime", new_value - ) - self.signal_newExposureTime.emit(new_value) + if self.is_switching_mode: + return + self.currentConfiguration.exposure_time = new_value + self.liveController.microscope.config_repo.update_channel_setting( + self.objectiveStore.current_objective, self.currentConfiguration.name, "ExposureTime", new_value + ) + self.signal_newExposureTime.emit(new_value) def update_config_analog_gain(self, new_value): - if self.is_switching_mode == False: - self.currentConfiguration.analog_gain = new_value - self.liveController.microscope.config_repo.update_channel_setting( - self.objectiveStore.current_objective, self.currentConfiguration.name, "AnalogGain", new_value - ) - self.signal_newAnalogGain.emit(new_value) + if self.is_switching_mode: + return + self.currentConfiguration.analog_gain = new_value + self.liveController.microscope.config_repo.update_channel_setting( + self.objectiveStore.current_objective, self.currentConfiguration.name, "AnalogGain", new_value + ) + self.signal_newAnalogGain.emit(new_value) def update_config_illumination_intensity(self, new_value): - if self.is_switching_mode == False: - self.currentConfiguration.illumination_intensity = new_value - self.liveController.microscope.config_repo.update_channel_setting( - self.objectiveStore.current_objective, - self.currentConfiguration.name, - "IlluminationIntensity", - new_value, - ) - self.liveController.update_illumination() + if self.is_switching_mode: + return + self.currentConfiguration.illumination_intensity = new_value + self.liveController.microscope.config_repo.update_channel_setting( + self.objectiveStore.current_objective, + self.currentConfiguration.name, + "IlluminationIntensity", + new_value, + ) + self.liveController.update_illumination() def set_trigger_mode(self, trigger_mode): self.dropdown_triggerManu.setCurrentText(trigger_mode) self.liveController.set_trigger_mode(self.dropdown_triggerManu.currentText()) + def update_trigger_mode_display(self, trigger_mode): + """Update the trigger mode dropdown without calling back to controller. + + Used when the trigger mode changes externally (e.g., camera switch) and + we just need to update the UI to reflect the current state. + """ + self.dropdown_triggerManu.blockSignals(True) + self.dropdown_triggerManu.setCurrentText(trigger_mode) + self.dropdown_triggerManu.blockSignals(False) + class PiezoWidget(QFrame): def __init__(self, piezo: PiezoStage, *args, **kwargs): @@ -15796,6 +15823,42 @@ def _populate_filter_positions_for_combo( combo.setCurrentIndex(0) +def _populate_camera_combo( + combo: QComboBox, + config_repo, + current_camera_id: Optional[int] = None, + include_none: bool = True, +) -> None: + """Populate a camera combo box from the camera registry. + + Args: + combo: The QComboBox to populate + config_repo: ConfigRepository instance + current_camera_id: Camera ID to select (None for "(None)" option) + include_none: Whether to include "(None)" as first option + """ + combo.clear() + + if include_none: + combo.addItem("(None)", None) + + registry = config_repo.get_camera_registry() + if registry: + for cam in registry.cameras: + combo.addItem(cam.name, cam.id) + + # Set current selection + if current_camera_id is not None: + for i in range(combo.count()): + if combo.itemData(i) == current_camera_id: + combo.setCurrentIndex(i) + return + + # Default to first item (usually "(None)") + if combo.count() > 0: + combo.setCurrentIndex(0) + + class AcquisitionChannelConfiguratorDialog(QDialog): """Dialog for editing acquisition channel configurations. @@ -15959,7 +16022,6 @@ def _load_channels(self): def _populate_row(self, row: int, channel): """Populate a table row with channel data.""" - from control.models import AcquisitionChannel # Enabled checkbox checkbox_widget = QWidget() @@ -15969,6 +16031,12 @@ def _populate_row(self, row: int, channel): checkbox = QCheckBox() enabled = channel.enabled if hasattr(channel, "enabled") else True checkbox.setChecked(enabled) + # In multi-camera systems, disable checkbox interaction if no camera assigned + # (preserves stored enabled value, just prevents user from enabling without camera) + camera_names = self.config_repo.get_camera_names() + if len(camera_names) > 1 and channel.camera is None: + checkbox.setEnabled(False) + checkbox.setToolTip("Assign a camera to enable this channel") checkbox_layout.addWidget(checkbox) self.table.setCellWidget(row, self.COL_ENABLED, checkbox_widget) @@ -15987,31 +16055,34 @@ def _populate_row(self, row: int, channel): illum_combo.setCurrentText(current_illum) self.table.setCellWidget(row, self.COL_ILLUMINATION, illum_combo) - # Camera dropdown + # Camera dropdown - stores camera ID (int) as userData, displays name camera_combo = QComboBox() - camera_combo.addItem("(None)") - camera_names = self.config_repo.get_camera_names() - camera_combo.addItems(camera_names) - if channel.camera and channel.camera in camera_names: - camera_combo.setCurrentText(channel.camera) + _populate_camera_combo(camera_combo, self.config_repo, channel.camera) + camera_combo.currentIndexChanged.connect(lambda _idx, r=row: self._on_camera_changed(r)) self.table.setCellWidget(row, self.COL_CAMERA, camera_combo) - # Filter wheel dropdown + # Filter wheel dropdown - filtered by camera's hardware binding wheel_combo = QComboBox() - wheel_combo.addItem("(None)") - wheel_names = self.config_repo.get_filter_wheel_names() - wheel_combo.addItems(wheel_names) - # Set selection if channel has explicit wheel name - if channel.filter_wheel and channel.filter_wheel in wheel_names: - wheel_combo.setCurrentText(channel.filter_wheel) + self._populate_wheel_combo_for_camera(wheel_combo, channel.camera, channel.filter_wheel) wheel_combo.currentTextChanged.connect(lambda text, r=row: self._on_wheel_changed(r, text)) self.table.setCellWidget(row, self.COL_FILTER_WHEEL, wheel_combo) - # Filter position dropdown - function auto-resolves single-wheel systems + # Filter position dropdown - use the wheel that was just selected (may differ from stored value) position_combo = QComboBox() - _populate_filter_positions_for_combo( - position_combo, channel.filter_wheel, self.config_repo, channel.filter_position - ) + if channel.camera is not None: + selected_wheel = wheel_combo.currentText() + _populate_filter_positions_for_combo( + position_combo, + selected_wheel if selected_wheel != "(None)" else None, + self.config_repo, + channel.filter_position, + ) + else: + # No camera selected - disable filter wheel and position, show empty + wheel_combo.setEnabled(False) + wheel_combo.setToolTip("Select a camera to enable filter wheel") + position_combo.setEnabled(False) + position_combo.setToolTip("Select a camera to enable filter position") self.table.setCellWidget(row, self.COL_FILTER_POSITION, position_combo) # Display color (color picker button - fills cell width) @@ -16028,6 +16099,89 @@ def _on_wheel_changed(self, row: int, wheel_name: str): if position_combo: _populate_filter_positions_for_combo(position_combo, wheel_name, self.config_repo) + def _on_camera_changed(self, row: int): + """Update filter wheel options and enabled checkbox when camera selection changes.""" + camera_combo = self.table.cellWidget(row, self.COL_CAMERA) + wheel_combo = self.table.cellWidget(row, self.COL_FILTER_WHEEL) + position_combo = self.table.cellWidget(row, self.COL_FILTER_POSITION) + if camera_combo is None or wheel_combo is None: + return + + camera_id = camera_combo.currentData() + + # Update enabled checkbox state based on camera selection (multi-camera only) + camera_names = self.config_repo.get_camera_names() + if len(camera_names) > 1: + checkbox_widget = self.table.cellWidget(row, self.COL_ENABLED) + if checkbox_widget: + checkbox = checkbox_widget.findChild(QCheckBox) + if checkbox: + if camera_id is None: + checkbox.setEnabled(False) + checkbox.setToolTip("Assign a camera to enable this channel") + else: + checkbox.setEnabled(True) + checkbox.setToolTip("") + + if camera_id is None: + # No camera selected - disable filter wheel and position, clear position + wheel_combo.setEnabled(False) + wheel_combo.setToolTip("Select a camera to enable filter wheel") + if position_combo: + position_combo.clear() + position_combo.setEnabled(False) + position_combo.setToolTip("Select a camera to enable filter position") + else: + # Camera selected - enable and populate filter wheel and position + wheel_combo.setEnabled(True) + wheel_combo.setToolTip("") + current_wheel = wheel_combo.currentText() + self._populate_wheel_combo_for_camera( + wheel_combo, camera_id, current_wheel if current_wheel != "(None)" else None + ) + + # Update filter positions for the new wheel + new_wheel = wheel_combo.currentText() + if position_combo: + _populate_filter_positions_for_combo( + position_combo, new_wheel if new_wheel != "(None)" else None, self.config_repo + ) + position_combo.setEnabled(True) + position_combo.setToolTip("") + + def _populate_wheel_combo_for_camera( + self, combo: QComboBox, camera_id: Optional[int], current_wheel: Optional[str] + ): + """Populate filter wheel combo based on camera's hardware binding. + + If hardware_bindings.yaml defines a filter wheel for the camera, only show that wheel. + Otherwise, show all available filter wheels. + """ + combo.blockSignals(True) + combo.clear() + + # Get the bound wheel for this camera + bound_wheel = None + if camera_id is not None: + bound_wheel = self.config_repo.get_effective_emission_wheel(camera_id) + self._log.debug(f"Camera {camera_id} -> bound_wheel: {bound_wheel}") + + if bound_wheel and bound_wheel.name: + # Camera has a bound wheel - only show that wheel (no (None) option) + combo.addItem(bound_wheel.name) + combo.setCurrentText(bound_wheel.name) + self._log.debug(f"Showing only bound wheel: {bound_wheel.name}") + else: + # No binding - show all wheels with (None) option (backwards compatible) + combo.addItem("(None)") + wheel_names = self.config_repo.get_filter_wheel_names() + combo.addItems(wheel_names) + if current_wheel and current_wheel in wheel_names: + combo.setCurrentText(current_wheel) + self._log.debug(f"No binding for camera {camera_id}, showing all wheels: {wheel_names}") + + combo.blockSignals(False) + def _pick_color(self, row: int): """Open color picker for a row.""" color_btn = self.table.cellWidget(row, self.COL_DISPLAY_COLOR) @@ -16143,7 +16297,6 @@ def _save_changes(self): def _export_config(self): """Export current channel configuration to a YAML file.""" - from control.models import GeneralChannelConfig import yaml # Get save file path @@ -16179,7 +16332,6 @@ def _export_config(self): def _import_config(self): """Import channel configuration from a YAML file.""" from pydantic import ValidationError - from control.models import GeneralChannelConfig import yaml # Get file path @@ -16250,11 +16402,10 @@ def _sync_table_to_config(self): if illum_combo and isinstance(illum_combo, QComboBox): channel.illumination_settings.illumination_channel = illum_combo.currentText() - # Camera + # Camera - extract camera ID from userData (int or None) camera_combo = self.table.cellWidget(row, self.COL_CAMERA) if camera_combo and isinstance(camera_combo, QComboBox): - camera_text = camera_combo.currentText() - channel.camera = camera_text if camera_text != "(None)" else None + channel.camera = camera_combo.currentData() # Returns int or None # Filter wheel: None = no selection, else explicit wheel name wheel_combo = self.table.cellWidget(row, self.COL_FILTER_WHEEL) @@ -16298,11 +16449,12 @@ def _setup_ui(self): layout.addRow("Illumination:", self.illumination_combo) # Camera dropdown (hidden if single camera - 0 or 1 cameras) - camera_names = self.config_repo.get_camera_names() - if len(camera_names) > 1: + # Stores camera ID (int) as userData, displays name + registry = self.config_repo.get_camera_registry() + camera_count = len(registry.cameras) if registry else 0 + if camera_count > 1: self.camera_combo = QComboBox() - self.camera_combo.addItem("(None)") - self.camera_combo.addItems(camera_names) + _populate_camera_combo(self.camera_combo, self.config_repo) layout.addRow("Camera:", self.camera_combo) else: self.camera_combo = None @@ -16378,20 +16530,13 @@ def _validate_and_accept(self): def get_channel(self): """Build AcquisitionChannel from dialog inputs.""" - from control.models import ( - AcquisitionChannel, - CameraSettings, - IlluminationSettings, - ) - name = self.name_edit.text().strip() illum_name = self.illumination_combo.currentText() - # Camera + # Camera - extract camera ID from userData (int or None) camera = None if self.camera_combo: - camera_text = self.camera_combo.currentText() - camera = camera_text if camera_text != "(None)" else None + camera = self.camera_combo.currentData() # Returns int or None # Filter wheel and position filter_wheel = None @@ -16620,6 +16765,933 @@ def _save_config(self): QMessageBox.critical(self, "Error", f"Failed to save configuration:\n{e}") +# ═══════════════════════════════════════════════════════════════════════════════ +# CAMERA CONFIGURATION UI +# ═══════════════════════════════════════════════════════════════════════════════ + + +class CameraConfiguratorDialog(QDialog): + """Dialog for configuring camera names. + + Serial numbers are configured in the INI file (MULTI_CAMERA_SNS). + This dialog edits machine_configs/cameras.yaml for friendly names only. + + All cameras must be the same type (CAMERA_TYPE in INI). + """ + + signal_config_updated = Signal() + + def __init__(self, config_repo, parent=None): + super().__init__(parent) + self._log = squid.logging.get_logger(self.__class__.__name__) + self.config_repo = config_repo + self.registry = None + self.setWindowTitle("Camera Configuration") + self.setMinimumSize(550, 350) + self._setup_ui() + self._load_config() + + def _setup_ui(self): + layout = QVBoxLayout(self) + + # Instructions + instructions = QLabel( + "Configure camera names and trigger channels.\n" + "Serial numbers are configured in the .ini file (read-only here)." + ) + instructions.setWordWrap(True) + layout.addWidget(instructions) + + # Camera table + self.table = QTableWidget() + self.table.setColumnCount(4) + self.table.setHorizontalHeaderLabels(["ID", "Name", "Serial Number (from INI)", "Trigger Ch"]) + self.table.horizontalHeader().setSectionResizeMode(0, QHeaderView.ResizeToContents) + self.table.horizontalHeader().setSectionResizeMode(1, QHeaderView.Stretch) + self.table.horizontalHeader().setSectionResizeMode(2, QHeaderView.Stretch) + self.table.horizontalHeader().setSectionResizeMode(3, QHeaderView.ResizeToContents) + self.table.setSelectionBehavior(QTableWidget.SelectRows) + layout.addWidget(self.table) + + # Note about INI + note = QLabel( + "INI settings required:\n" + " use_multi_camera = True\n" + " multi_camera_ids = [1, 2]\n" + ' multi_camera_sns = {"1": "SN001", "2": "SN002"}\n' + "All cameras must be the same type (camera_type setting)." + ) + note.setWordWrap(True) + note.setStyleSheet("color: gray; font-family: monospace; font-size: 11px;") + layout.addWidget(note) + + # Buttons + button_layout = QHBoxLayout() + button_layout.addStretch() + self.btn_save = QPushButton("Save") + self.btn_save.clicked.connect(self._save_config) + button_layout.addWidget(self.btn_save) + self.btn_cancel = QPushButton("Cancel") + self.btn_cancel.clicked.connect(self.reject) + button_layout.addWidget(self.btn_cancel) + layout.addLayout(button_layout) + + def _load_config(self): + """Load camera config from INI (serial numbers) and YAML (names).""" + from control.models.camera_registry import CameraRegistryConfig, CameraDefinition + + # Get INI settings + configured_ids = list(getattr(control._def, "MULTI_CAMERA_IDS", [1])) + camera_sns = getattr(control._def, "MULTI_CAMERA_SNS", {}) + # Convert string keys to int (INI parser may give us strings) + camera_sns = {int(k): v for k, v in camera_sns.items()} + + # Get YAML registry for names (optional) + self.registry = self.config_repo.get_camera_registry() + if self.registry is None: + self.registry = CameraRegistryConfig(cameras=[]) + + # Build lookups from existing registry + name_lookup = {} + trigger_channel_lookup = {} + for cam in self.registry.cameras: + if cam.id is not None: + name_lookup[cam.id] = cam.name + trigger_channel_lookup[cam.id] = cam.trigger_channel + + # Rebuild registry from INI IDs + serial numbers + cameras = [] + for cam_id in configured_ids: + serial_number = camera_sns.get(cam_id, "") + name = name_lookup.get(cam_id, f"Camera {cam_id}") + # Default trigger channel: camera ID 1 → channel 0, etc. + trigger_channel = trigger_channel_lookup.get(cam_id, cam_id - 1) + cameras.append( + CameraDefinition( + id=cam_id, + name=name, + serial_number=serial_number if serial_number else "NOT_CONFIGURED", + trigger_channel=trigger_channel, + ) + ) + self.registry = CameraRegistryConfig(cameras=cameras) + + # Populate table + self.table.setRowCount(0) + for camera in sorted(self.registry.cameras, key=lambda c: c.id or 0): + row = self.table.rowCount() + self.table.insertRow(row) + + # ID (read-only) + id_item = QTableWidgetItem(str(camera.id)) + id_item.setFlags(id_item.flags() & ~Qt.ItemIsEditable) + self.table.setItem(row, 0, id_item) + + # Name (editable) + self.table.setItem(row, 1, QTableWidgetItem(camera.name or "")) + + # Serial Number (read-only, from INI) + sn_item = QTableWidgetItem(camera.serial_number) + sn_item.setFlags(sn_item.flags() & ~Qt.ItemIsEditable) + if not camera.serial_number or camera.serial_number == "NOT_CONFIGURED": + sn_item.setForeground(Qt.red) + sn_item.setText("NOT CONFIGURED - add to INI") + self.table.setItem(row, 2, sn_item) + + # Trigger Channel (editable) + trigger_ch = camera.trigger_channel if camera.trigger_channel is not None else (camera.id - 1) + self.table.setItem(row, 3, QTableWidgetItem(str(trigger_ch))) + + def _save_config(self): + """Save camera names to YAML file.""" + from control.models.camera_registry import CameraDefinition + + # Check for empty names and warn user + empty_name_cameras = [] + for row in range(self.table.rowCount()): + cam_id = int(self.table.item(row, 0).text()) + name = self.table.item(row, 1).text().strip() + if not name: + empty_name_cameras.append(cam_id) + + if empty_name_cameras: + default_names = ", ".join(f'"Camera {cid}"' for cid in empty_name_cameras) + reply = QMessageBox.question( + self, + "Empty Camera Names", + f"Camera(s) {empty_name_cameras} have empty names.\n\n" f"Use default names ({default_names})?", + QMessageBox.Yes | QMessageBox.No, + ) + if reply == QMessageBox.No: + return + + # Sync names and trigger channels from table to registry + for row in range(self.table.rowCount()): + cam_id = int(self.table.item(row, 0).text()) + name = self.table.item(row, 1).text().strip() or f"Camera {cam_id}" + trigger_ch_text = self.table.item(row, 3).text().strip() + try: + trigger_channel = int(trigger_ch_text) if trigger_ch_text else (cam_id - 1) + except ValueError: + QMessageBox.warning( + self, + "Invalid Trigger Channel", + f"Camera {cam_id}: '{trigger_ch_text}' is not a valid trigger channel.\n" + "Please enter a number (0-based channel index).", + ) + return + + # Update name and trigger channel in registry + for camera in self.registry.cameras: + if camera.id == cam_id: + camera.name = name + camera.trigger_channel = trigger_channel + break + + try: + self.config_repo.save_camera_registry(self.registry) + self.signal_config_updated.emit() + QMessageBox.information(self, "Saved", "Camera configuration saved.") + self.accept() + except (PermissionError, OSError) as e: + self._log.error(f"Failed to save camera config: {e}") + QMessageBox.critical(self, "Error", f"Cannot write configuration file:\n{e}") + except Exception as e: + self._log.exception(f"Unexpected error saving camera config: {e}") + QMessageBox.critical(self, "Error", f"Failed to save configuration:\n{e}") + + +# ═══════════════════════════════════════════════════════════════════════════════ +# CHANNEL GROUP CONFIGURATION UI +# ═══════════════════════════════════════════════════════════════════════════════ + + +class ChannelPickerDialog(QDialog): + """Dialog for selecting channels to add to a group.""" + + def __init__(self, available_channels: list, existing_names: list, parent=None): + """ + Args: + available_channels: List of channel names that can be selected. + existing_names: List of channel names already in the group (shown but disabled). + parent: Parent widget. + """ + super().__init__(parent) + self.setWindowTitle("Select Channels") + self.setMinimumWidth(300) + + self._available_channels = available_channels + self._existing_names = set(existing_names) + self._selected_names = [] + + self._setup_ui() + + def _setup_ui(self): + layout = QVBoxLayout(self) + + instructions = QLabel("Select channels to add to the group:") + layout.addWidget(instructions) + + # Checkboxes for each channel + self._checkboxes = [] + for name in self._available_channels: + cb = QCheckBox(name) + if name in self._existing_names: + cb.setChecked(True) + cb.setEnabled(False) + cb.setToolTip("Already in group") + self._checkboxes.append(cb) + layout.addWidget(cb) + + # Buttons + button_layout = QHBoxLayout() + button_layout.addStretch() + + btn_ok = QPushButton("Add Selected") + btn_ok.clicked.connect(self._on_accept) + button_layout.addWidget(btn_ok) + + btn_cancel = QPushButton("Cancel") + btn_cancel.clicked.connect(self.reject) + button_layout.addWidget(btn_cancel) + + layout.addLayout(button_layout) + + def _on_accept(self): + """Collect selected channels and accept.""" + self._selected_names = [] + for cb in self._checkboxes: + if cb.isChecked() and cb.isEnabled(): + self._selected_names.append(cb.text()) + self.accept() + + def get_selected_channels(self) -> list: + """Get list of newly selected channel names (excludes existing).""" + return self._selected_names + + +class AddChannelGroupDialog(QDialog): + """Dialog for creating a new channel group.""" + + def __init__(self, existing_names: list, parent=None): + """ + Args: + existing_names: List of existing group names (for validation). + parent: Parent widget. + """ + super().__init__(parent) + self.setWindowTitle("Add Channel Group") + self.setMinimumWidth(350) + + self._existing_names = set(existing_names) + self._setup_ui() + + def _setup_ui(self): + layout = QFormLayout(self) + + # Group name + self.name_edit = QLineEdit() + self.name_edit.setPlaceholderText("e.g., BF + GFP Simultaneous") + layout.addRow("Group Name:", self.name_edit) + + # Synchronization mode + self.sync_combo = QComboBox() + self.sync_combo.addItem("Sequential (one channel at a time)", "sequential") + self.sync_combo.addItem("Simultaneous (multi-camera)", "simultaneous") + layout.addRow("Mode:", self.sync_combo) + + # Buttons + button_layout = QHBoxLayout() + button_layout.addStretch() + + btn_ok = QPushButton("Create") + btn_ok.clicked.connect(self._validate_and_accept) + button_layout.addWidget(btn_ok) + + btn_cancel = QPushButton("Cancel") + btn_cancel.clicked.connect(self.reject) + button_layout.addWidget(btn_cancel) + + layout.addRow(button_layout) + + def _validate_and_accept(self): + """Validate input before accepting.""" + name = self.name_edit.text().strip() + if not name: + QMessageBox.warning(self, "Validation Error", "Group name cannot be empty.") + return + + if name in self._existing_names: + QMessageBox.warning(self, "Validation Error", f"Group '{name}' already exists.") + return + + self.accept() + + def get_group_name(self) -> str: + """Get the entered group name.""" + return self.name_edit.text().strip() + + def get_sync_mode(self) -> str: + """Get the selected synchronization mode ('sequential' or 'simultaneous').""" + return self.sync_combo.currentData() + + +class ChannelGroupDetailWidget(QWidget): + """Widget for editing a single channel group's details.""" + + signal_modified = Signal() # Emitted when group is modified + + def __init__(self, config_repo, parent=None): + super().__init__(parent) + self._log = squid.logging.get_logger(self.__class__.__name__) + self.config_repo = config_repo + self._current_group = None + self._channels = [] # Available AcquisitionChannel objects + self._setup_ui() + + def _setup_ui(self): + layout = QVBoxLayout(self) + layout.setContentsMargins(0, 0, 0, 0) + + # Empty state label (shown when no group selected) + self.empty_label = QLabel("Select a group from the list\nor create a new one.") + self.empty_label.setAlignment(Qt.AlignCenter) + self.empty_label.setStyleSheet("color: #888; font-style: italic;") + layout.addWidget(self.empty_label) + + # Group detail container (hidden when no group selected) + self.detail_container = QWidget() + detail_layout = QVBoxLayout(self.detail_container) + detail_layout.setContentsMargins(0, 0, 0, 0) + + # Group name (read-only for now) + name_layout = QHBoxLayout() + name_layout.addWidget(QLabel("Group:")) + self.name_label = QLabel() + self.name_label.setStyleSheet("font-weight: bold;") + name_layout.addWidget(self.name_label) + name_layout.addStretch() + detail_layout.addLayout(name_layout) + + # Synchronization mode + sync_layout = QHBoxLayout() + sync_layout.addWidget(QLabel("Mode:")) + self.sync_combo = QComboBox() + self.sync_combo.addItem("Sequential", "sequential") + self.sync_combo.addItem("Simultaneous", "simultaneous") + self.sync_combo.currentIndexChanged.connect(self._on_sync_changed) + sync_layout.addWidget(self.sync_combo) + sync_layout.addStretch() + detail_layout.addLayout(sync_layout) + + # Channels table + detail_layout.addWidget(QLabel("Channels:")) + self.table = QTableWidget() + self.table.setColumnCount(3) + self.table.setHorizontalHeaderLabels(["Channel", "Camera", "Offset (μs)"]) + self.table.verticalHeader().setVisible(False) # Hide row numbers + self.table.horizontalHeader().setSectionResizeMode(0, QHeaderView.Stretch) + self.table.horizontalHeader().setSectionResizeMode(1, QHeaderView.ResizeToContents) + self.table.horizontalHeader().setSectionResizeMode(2, QHeaderView.ResizeToContents) + self.table.setSelectionBehavior(QTableWidget.SelectRows) + self.table.setSelectionMode(QTableWidget.SingleSelection) + self.table.setMaximumHeight(200) # Don't let table grow too tall + detail_layout.addWidget(self.table) + + # Channel buttons - inline with table + chan_btn_layout = QHBoxLayout() + self.btn_add_channel = QPushButton("Add") + self.btn_add_channel.clicked.connect(self._add_channel) + chan_btn_layout.addWidget(self.btn_add_channel) + + self.btn_remove_channel = QPushButton("Remove") + self.btn_remove_channel.clicked.connect(self._remove_selected_channel) + chan_btn_layout.addWidget(self.btn_remove_channel) + + self.btn_move_up = QPushButton("▲") + self.btn_move_up.setFixedWidth(30) + self.btn_move_up.setToolTip("Move channel up") + self.btn_move_up.clicked.connect(self._move_channel_up) + chan_btn_layout.addWidget(self.btn_move_up) + + self.btn_move_down = QPushButton("▼") + self.btn_move_down.setFixedWidth(30) + self.btn_move_down.setToolTip("Move channel down") + self.btn_move_down.clicked.connect(self._move_channel_down) + chan_btn_layout.addWidget(self.btn_move_down) + + chan_btn_layout.addStretch() + detail_layout.addLayout(chan_btn_layout) + + # Validation warnings + self.warning_label = QLabel() + self.warning_label.setStyleSheet("color: #c00; font-size: 11px;") + self.warning_label.setWordWrap(True) + self.warning_label.setVisible(False) + detail_layout.addWidget(self.warning_label) + + # Add stretch to push everything up + detail_layout.addStretch() + + layout.addWidget(self.detail_container) + self.detail_container.setVisible(False) + + def set_channels(self, channels: list): + """Set available channels for selection.""" + self._channels = channels + + def load_group(self, group): + """Load a channel group for editing.""" + self._current_group = group + + if group is None: + self.empty_label.setVisible(True) + self.detail_container.setVisible(False) + return + + self.empty_label.setVisible(False) + self.detail_container.setVisible(True) + + # Set name + self.name_label.setText(group.name) + + # Set sync mode + index = 0 if group.synchronization.value == "sequential" else 1 + self.sync_combo.blockSignals(True) + self.sync_combo.setCurrentIndex(index) + self.sync_combo.blockSignals(False) + + # Load channels into table + self._populate_table() + self._validate() + + def _populate_table(self): + """Populate the channels table.""" + # Disconnect signals from existing cell widgets before clearing + # This prevents stale lambda callbacks with invalid row indices + for row in range(self.table.rowCount()): + offset_widget = self.table.cellWidget(row, 2) + if offset_widget is not None: + try: + offset_widget.valueChanged.disconnect() + except (TypeError, RuntimeError): + # TypeError: No connections to disconnect + # RuntimeError: Widget was already deleted + pass + + self.table.setRowCount(0) + + if self._current_group is None: + return + + for entry in self._current_group.channels: + self._add_channel_row(entry.name, entry.offset_us) + + # Update offset column visibility based on mode + is_simultaneous = self.sync_combo.currentData() == "simultaneous" + self.table.setColumnHidden(2, not is_simultaneous) + + def _on_sync_changed(self, index): + """Handle synchronization mode change.""" + if self._current_group is None: + return + + mode = SynchronizationMode.SEQUENTIAL if index == 0 else SynchronizationMode.SIMULTANEOUS + self._current_group.synchronization = mode + + # Show/hide offset column + self.table.setColumnHidden(2, index == 0) + + self._validate() + self.signal_modified.emit() + + def _on_offset_changed(self, row: int, value: float): + """Handle offset value change.""" + if self._current_group is None or row >= len(self._current_group.channels): + return + + self._current_group.channels[row].offset_us = value + self.signal_modified.emit() + + def _add_channel_row(self, channel_name: str, offset_us: float = 0.0): + """Add a row to the channels table.""" + row = self.table.rowCount() + self.table.insertRow(row) + + # Channel name (read-only) - show with illumination info + display_text = channel_name + for ch in self._channels: + if ch.name == channel_name: + illum = ch.illumination_settings.illumination_channel if ch.illumination_settings else "" + if illum and ch.name != illum: + display_text = f"{ch.name} ({illum})" + break + channel_item = QTableWidgetItem(display_text) + channel_item.setFlags(channel_item.flags() & ~Qt.ItemIsEditable) + channel_item.setData(Qt.UserRole, channel_name) # Store actual name + self.table.setItem(row, 0, channel_item) + + # Camera (look up from channels list) - read-only display + camera_text = self._get_camera_for_channel(channel_name) + camera_item = QTableWidgetItem(camera_text) + camera_item.setFlags(camera_item.flags() & ~Qt.ItemIsEditable) + self.table.setItem(row, 1, camera_item) + + # Offset (editable spinbox) + offset_spin = QDoubleSpinBox() + offset_spin.setRange(0, 1000000) # Up to 1 second + offset_spin.setDecimals(1) + offset_spin.setSuffix(" μs") + offset_spin.setValue(offset_us) + offset_spin.valueChanged.connect(lambda v, r=row: self._on_offset_changed(r, v)) + self.table.setCellWidget(row, 2, offset_spin) + + def _get_camera_for_channel(self, channel_name: str) -> str: + """Get camera display name for a channel.""" + if not channel_name: + return "(None)" + for ch in self._channels: + if ch.name == channel_name: + if ch.camera is not None: + registry = self.config_repo.get_camera_registry() + if registry: + cam_def = registry.get_camera_by_id(ch.camera) + if cam_def: + return cam_def.name + break + return "(None)" + + def _add_channel(self): + """Add a new channel to the group via picker dialog.""" + if self._current_group is None: + return + + # Get channels already in the group + existing_names = {entry.name for entry in self._current_group.channels} + + # Filter to channels not already in the group + available_channels = [ch for ch in self._channels if ch.name not in existing_names] + + if not available_channels: + QMessageBox.information(self, "No Channels", "All channels are already in this group.") + return + + # Show picker dialog - pass channel names (not objects) and existing names + dialog = ChannelPickerDialog( + [ch.name for ch in available_channels], + list(existing_names), + self, + ) + if dialog.exec_() == QDialog.Accepted: + selected = dialog.get_selected_channels() + for channel_name in selected: + # Add to model + self._current_group.channels.append(ChannelGroupEntry(name=channel_name, offset_us=0.0)) + # Add row to table + self._add_channel_row(channel_name, 0.0) + + # Update offset column visibility based on mode + is_simultaneous = self.sync_combo.currentData() == "simultaneous" + self.table.setColumnHidden(2, not is_simultaneous) + + self._validate() + self.signal_modified.emit() + + def _remove_selected_channel(self): + """Remove the selected channel from the group.""" + if self._current_group is None: + return + + row = self.table.currentRow() + if row < 0: + return + + # Don't allow removing the last channel + if len(self._current_group.channels) <= 1: + QMessageBox.warning(self, "Cannot Remove", "A group must have at least one channel.") + return + + # Remove from model + if row < len(self._current_group.channels): + del self._current_group.channels[row] + + # Repopulate table to fix row indices + self._populate_table() + self._validate() + self.signal_modified.emit() + + def _move_channel_up(self): + """Move selected channel up in the list.""" + row = self.table.currentRow() + if row <= 0 or self._current_group is None: + return + + channels = self._current_group.channels + if row >= len(channels): + return + + channels[row], channels[row - 1] = channels[row - 1], channels[row] + self._populate_table() + self.table.selectRow(row - 1) + self.signal_modified.emit() + + def _move_channel_down(self): + """Move selected channel down in the list.""" + row = self.table.currentRow() + if self._current_group is None or row < 0: + return + + channels = self._current_group.channels + if row >= len(channels) - 1: + return + + channels[row], channels[row + 1] = channels[row + 1], channels[row] + self._populate_table() + self.table.selectRow(row + 1) + self.signal_modified.emit() + + def _validate(self): + """Validate the current group and show warnings.""" + if self._current_group is None: + self.warning_label.setVisible(False) + return + + # Run model validation + errors = validate_channel_group(self._current_group, self._channels) + + if errors: + self.warning_label.setText("\n".join(errors)) + self.warning_label.setVisible(True) + else: + self.warning_label.setVisible(False) + + def get_validation_errors(self) -> list: + """Get validation errors for the current group.""" + if self._current_group is None: + return [] + + return validate_channel_group(self._current_group, self._channels) + + +class ChannelGroupEditorDialog(QDialog): + """Master-detail dialog for editing channel groups. + + Edits user_profiles/{profile}/channel_configs/general.yaml (channel_groups field). + """ + + signal_config_updated = Signal() + + def __init__(self, config_repo, parent=None): + super().__init__(parent) + self._log = squid.logging.get_logger(self.__class__.__name__) + self.config_repo = config_repo + self.general_config = None + self._is_dirty = False # Track unsaved changes + + self.setWindowTitle("Channel Group Configuration") + self.setMinimumSize(700, 500) + + self._setup_ui() + self._load_config() + + def _setup_ui(self): + main_layout = QVBoxLayout(self) + main_layout.setContentsMargins(10, 10, 10, 10) + main_layout.setSpacing(10) + + # Content area (left list + right detail) + content_layout = QHBoxLayout() + content_layout.setSpacing(15) + + # Left panel: group list + left_panel = QWidget() + left_panel.setFixedWidth(200) + left_layout = QVBoxLayout(left_panel) + left_layout.setContentsMargins(0, 0, 0, 0) + + left_layout.addWidget(QLabel("Channel Groups:")) + + self.group_list = QListWidget() + self.group_list.currentRowChanged.connect(self._on_group_selected) + left_layout.addWidget(self.group_list) + + # Group list buttons + list_btn_layout = QHBoxLayout() + list_btn_layout.setSpacing(5) + self.btn_add_group = QPushButton("Add") + self.btn_add_group.clicked.connect(self._add_group) + list_btn_layout.addWidget(self.btn_add_group) + + self.btn_remove_group = QPushButton("Remove") + self.btn_remove_group.clicked.connect(self._remove_group) + list_btn_layout.addWidget(self.btn_remove_group) + + left_layout.addLayout(list_btn_layout) + content_layout.addWidget(left_panel) + + # Right panel: group detail + self.detail_widget = ChannelGroupDetailWidget(self.config_repo, self) + self.detail_widget.signal_modified.connect(self._on_modified) + content_layout.addWidget(self.detail_widget, 1) + + main_layout.addLayout(content_layout, 1) + + # Separator line + line = QFrame() + line.setFrameShape(QFrame.HLine) + line.setFrameShadow(QFrame.Sunken) + main_layout.addWidget(line) + + # Bottom buttons + button_layout = QHBoxLayout() + button_layout.addStretch() + + self.btn_save = QPushButton("Save") + self.btn_save.setMinimumWidth(80) + self.btn_save.clicked.connect(self._save_config) + button_layout.addWidget(self.btn_save) + + self.btn_cancel = QPushButton("Cancel") + self.btn_cancel.setMinimumWidth(80) + self.btn_cancel.clicked.connect(self.reject) + button_layout.addWidget(self.btn_cancel) + + main_layout.addLayout(button_layout) + + def _load_config(self): + """Load channel configuration.""" + original_config = self.config_repo.get_general_config() + + if original_config is None: + self.general_config = None + else: + # Work on a deep copy to avoid mutating the original until save + self.general_config = original_config.model_copy(deep=True) + + self._is_dirty = False + + if self.general_config is None: + QMessageBox.warning( + self, + "No Configuration", + "No channel configuration found. Please ensure a profile is loaded.", + ) + return + + # Pass available channels to detail widget + self.detail_widget.set_channels(self.general_config.channels) + + # Populate group list + self.group_list.clear() + for group in self.general_config.channel_groups: + self.group_list.addItem(group.name) + + # Select first group if any + if self.group_list.count() > 0: + self.group_list.setCurrentRow(0) + + def _on_group_selected(self, row: int): + """Handle group selection change.""" + if self.general_config is None or row < 0: + self.detail_widget.load_group(None) + return + + if row < len(self.general_config.channel_groups): + self.detail_widget.load_group(self.general_config.channel_groups[row]) + else: + self.detail_widget.load_group(None) + + def _on_modified(self): + """Handle modification in detail widget.""" + self._is_dirty = True + + # Update list item text if name changed (future: allow name editing) + row = self.group_list.currentRow() + if row >= 0 and self.general_config and row < len(self.general_config.channel_groups): + group = self.general_config.channel_groups[row] + item = self.group_list.item(row) + if item and item.text() != group.name: + item.setText(group.name) + + def _add_group(self): + """Add a new channel group.""" + if self.general_config is None: + return + + # Check that channels exist before allowing group creation + if not self.general_config.channels: + QMessageBox.warning( + self, + "No Channels", + "Cannot create channel group: no acquisition channels defined.\n" + "Please configure acquisition channels first.", + ) + return + + existing_names = [g.name for g in self.general_config.channel_groups] + dialog = AddChannelGroupDialog(existing_names, self) + + if dialog.exec_() == QDialog.Accepted: + name = dialog.get_group_name() + mode_str = dialog.get_sync_mode() + mode = SynchronizationMode.SEQUENTIAL if mode_str == "sequential" else SynchronizationMode.SIMULTANEOUS + + # Create group with first available channel as default + default_channel = self.general_config.channels[0].name + new_group = ChannelGroup( + name=name, + synchronization=mode, + channels=[ChannelGroupEntry(name=default_channel)], + ) + + self.general_config.channel_groups.append(new_group) + self.group_list.addItem(name) + self.group_list.setCurrentRow(self.group_list.count() - 1) + self._is_dirty = True + + def _remove_group(self): + """Remove selected channel group.""" + row = self.group_list.currentRow() + if row < 0 or self.general_config is None: + return + + group_name = self.group_list.item(row).text() + reply = QMessageBox.question( + self, + "Confirm Removal", + f"Remove channel group '{group_name}'?", + QMessageBox.Yes | QMessageBox.No, + ) + + if reply == QMessageBox.Yes: + del self.general_config.channel_groups[row] + self.group_list.takeItem(row) + self._is_dirty = True + + # Select another group if available + if self.group_list.count() > 0: + self.group_list.setCurrentRow(min(row, self.group_list.count() - 1)) + else: + self.detail_widget.load_group(None) + + def closeEvent(self, event): + """Handle dialog close - warn about unsaved changes.""" + if self._is_dirty: + reply = QMessageBox.question( + self, + "Unsaved Changes", + "You have unsaved changes. Discard them?", + QMessageBox.Yes | QMessageBox.No, + QMessageBox.No, + ) + if reply == QMessageBox.No: + event.ignore() + return + super().closeEvent(event) + + def _save_config(self): + """Save channel group configuration.""" + if self.general_config is None: + return + + # Validate all groups before saving + all_errors = [] + for group in self.general_config.channel_groups: + errors = validate_channel_group(group, self.general_config.channels) + if errors: + all_errors.extend([f"Group '{group.name}': {e}" for e in errors]) + + if all_errors: + reply = QMessageBox.warning( + self, + "Validation Warnings", + "The following issues were found:\n\n" + "\n".join(all_errors) + "\n\nSave anyway?", + QMessageBox.Yes | QMessageBox.No, + ) + if reply != QMessageBox.Yes: + return + + try: + profile = self.config_repo.current_profile + if not profile: + QMessageBox.critical(self, "Error", "No profile loaded.") + return + self.config_repo.save_general_config(profile, self.general_config) + self._is_dirty = False # Reset dirty flag before close + self.signal_config_updated.emit() + QMessageBox.information(self, "Saved", "Channel group configuration saved.") + self.accept() + except (PermissionError, OSError) as e: + self._log.error(f"Failed to save channel group config: {e}") + QMessageBox.critical(self, "Error", f"Cannot write configuration file:\n{e}") + except yaml.YAMLError as e: + self._log.error(f"Failed to serialize channel group config: {e}") + QMessageBox.critical(self, "Error", f"Configuration data could not be serialized:\n{e}") + except Exception as e: + self._log.exception(f"Unexpected error saving channel group config: {e}") + QMessageBox.critical(self, "Error", f"Failed to save configuration:\n{e}") + + class _QtLogSignalHolder(QObject): """QObject that holds the signal for QtLoggingHandler. diff --git a/software/docs/configuration-system.md b/software/docs/configuration-system.md index d496b3645..6cb212a53 100644 --- a/software/docs/configuration-system.md +++ b/software/docs/configuration-system.md @@ -115,40 +115,81 @@ channels: ### cameras.yaml (Optional) -Maps camera IDs to hardware serial numbers. **Optional for single-camera systems.** +Provides user-friendly names for cameras. **Serial numbers are configured in the INI file.** + +**All cameras must be the same type** (specified by `camera_type` in INI). + +**INI Settings (required for multi-camera):** + +```ini +[GENERAL] +# Camera type - all cameras must be the same type +camera_type = Toupcam + +# Enable multi-camera support (default: False) +use_multi_camera = True + +# List of camera IDs to instantiate (default: [1]) +multi_camera_ids = [1, 2] + +# Camera serial numbers (required for multi-camera) +# Maps camera ID to hardware serial number +multi_camera_sns = {"1": "ABC12345", "2": "DEF67890"} +``` + +| INI Setting | Default | Description | +|-------------|---------|-------------| +| `camera_type` | `Default` | Camera type (all cameras must be same type): `Toupcam`, `Hamamatsu`, `FLIR`, etc. | +| `use_multi_camera` | `False` | Enable multi-camera mode. When `False`, single camera used. | +| `multi_camera_ids` | `[1]` | List of camera IDs to instantiate. | +| `multi_camera_sns` | `{}` | **Required for multi-camera.** Maps camera ID to serial number. | + +**YAML Configuration (optional, for friendly names and trigger channels):** ```yaml version: 1.0 cameras: # Primary imaging camera - - id: 1 # Camera ID (used in channel configs and hardware_bindings) + - id: 1 # Camera ID (matches INI) name: "Main Camera" # User-friendly name for UI - serial_number: "ABC12345" # Camera serial number (from manufacturer) - model: "Hamamatsu C15440" # Optional: displayed in UI for reference + serial_number: "ABC12345" # Informational only (actual SN from INI) + trigger_channel: 0 # MCU trigger output channel (0-based) - # Secondary camera for simultaneous imaging + # Secondary camera - id: 2 name: "Side Camera" serial_number: "DEF67890" - model: "Basler acA2040" + trigger_channel: 1 # Optional: defaults to (id - 1) if not set ``` +| Field | Required | Description | +|-------|----------|-------------| +| `id` | Yes (multi-camera) | Camera ID matching INI `multi_camera_ids` | +| `name` | No | User-friendly name shown in UI (defaults to "Camera N") | +| `serial_number` | No | Informational; actual SN comes from INI | +| `trigger_channel` | No | MCU trigger output channel (0-based). Defaults to `id - 1` | + **Fields:** | Field | Description | |-------|-------------| | `version` | Schema version (`1.0`) | -| `cameras[].id` | Camera ID (must be unique, used in channel configs) | -| `cameras[].name` | User-friendly name for UI (must be unique) | -| `cameras[].serial_number` | Hardware serial number (must be unique) | -| `cameras[].model` | Optional: camera model for reference | +| `cameras[].id` | Camera ID (must match ID in `multi_camera_ids`) | +| `cameras[].name` | User-friendly name for UI dropdowns | +| `cameras[].serial_number` | Informational only (actual serial number comes from INI) | **Usage:** -- If `cameras.yaml` doesn't exist, the system assumes single-camera mode -- Single camera: `id` and `name` are optional (defaults applied) -- Multi-camera: `id` and `name` are required for all cameras -- Channel configs use the `id` field to reference cameras (e.g., `camera: 1`) +- Serial numbers **must** be in INI (`multi_camera_sns`), not YAML +- YAML (`cameras.yaml`) is optional and provides friendly names only +- If `cameras.yaml` doesn't exist, cameras are named "Camera 1", "Camera 2", etc. +- All cameras use the same `camera_type` setting + +**UI Configuration:** + +When `use_multi_camera = True`, a "Camera Configuration" menu item appears under **Settings > Advanced**. This dialog shows: +- Camera IDs and serial numbers (from INI, read-only) +- Camera names (editable, saved to `cameras.yaml`) ### filter_wheels.yaml (Optional) diff --git a/software/docs/pending/multi-camera-support.md b/software/docs/pending/multi-camera-support.md index d291fc886..4b3517685 100644 --- a/software/docs/pending/multi-camera-support.md +++ b/software/docs/pending/multi-camera-support.md @@ -43,16 +43,22 @@ Defines available cameras with user-friendly names mapped to hardware identifier version: 1.0 cameras: - name: "Main Camera" # User-friendly name (shown in UI) - serial_number: "ABC12345" # Hardware identifier + id: 1 # Camera ID (matches INI MULTI_CAMERA_IDS) + serial_number: "ABC12345" # Hardware identifier (from INI, read-only in UI) model: "Hamamatsu C15440" # Optional: for display + trigger_channel: 0 # Optional: MCU trigger output channel (0-based) - name: "Side Camera" + id: 2 serial_number: "DEF67890" model: "Basler acA2040" + trigger_channel: 1 # Optional: defaults to (id - 1) if not set ``` **Location**: `machine_configs/cameras.yaml` +**Note**: Serial numbers are configured in the INI file (`multi_camera_sns = {"1": "ABC12345", "2": "DEF67890"}`). The `cameras.yaml` file provides friendly names and trigger channel mappings. + ### New Machine Config: `filter_wheels.yaml` Defines available filter wheels with positions and filter names. @@ -583,6 +589,51 @@ the migration script. - ✅ Legacy pre-YAML configs migrated via `tools/migrate_acquisition_configs.py` - ✅ Write v1.0 format on save +### Phase 1.5: INI Settings and Camera Configuration UI ✅ COMPLETE + +**Scope**: Add INI configuration settings and Camera Configuration UI dialog (following the filter wheel pattern). + +**Design**: Hybrid INI + YAML approach: +- **INI**: Serial numbers (required) - like filter wheel indices +- **YAML**: Friendly names (optional) - like filter wheel position names +- **All cameras must be the same type** (specified by `camera_type` in INI) + +**INI Settings** (in `control/_def.py`): +```python +# Multi-camera support (all cameras must be same type as CAMERA_TYPE) +USE_MULTI_CAMERA = False # Enable multi-camera mode +MULTI_CAMERA_IDS = [1] # List of camera IDs to instantiate +MULTI_CAMERA_SNS = {} # Camera ID -> serial number mapping +``` + +**Example INI configuration:** +```ini +[GENERAL] +camera_type = Toupcam +use_multi_camera = True +multi_camera_ids = [1, 2] +multi_camera_sns = {"1": "ABC123", "2": "DEF456"} +``` + +**Behavior:** +- `USE_MULTI_CAMERA=False` (default): Single-camera mode +- `USE_MULTI_CAMERA=True`: Multi-camera mode using `MULTI_CAMERA_IDS` and `MULTI_CAMERA_SNS` +- Serial numbers **must** be in INI (validation error if missing) +- `cameras.yaml` is optional and provides friendly names only + +**Files modified**: +- ✅ `control/_def.py` - Added `USE_MULTI_CAMERA`, `MULTI_CAMERA_IDS`, `MULTI_CAMERA_SNS` settings +- ✅ `control/widgets.py` - Added `CameraConfiguratorDialog` class +- ✅ `control/gui_hcs.py` - Added "Camera Configuration" menu item (Settings > Advanced) +- ✅ `squid/camera/config_factory.py` - INI settings integration in `create_camera_configs()` +- ✅ `tests/control/test_microscope_cameras.py` - Updated tests with INI mocking + +**UI:** +- **Camera Configuration dialog** (Settings > Advanced > Camera Configuration, when `USE_MULTI_CAMERA=True`) + - View camera IDs and serial numbers (from INI, read-only) + - Edit camera names (saved to `cameras.yaml`) + - Shows warning if serial number not configured in INI + ### Phase 2: UI Integration (In Progress) **Scope**: Update UI to use camera/filter wheel names and support channel groups. @@ -597,31 +648,44 @@ the migration script. - ✅ **Filter Wheel Configuration dialog** (Settings > Advanced > Filter Wheel Configuration) - Configure filter position names (e.g., "DAPI emission" instead of "Position 1") - Saves to `machine_configs/filter_wheels.yaml` +- ✅ **Channel Group Configuration dialog** (Settings > Advanced > Channel Group Configuration) + - Create/edit channel groups for multi-camera acquisition + - Configure synchronization mode (simultaneous vs sequential) + - Saves to `general.yaml` in profile - ✅ Filter wheel/position selector in Add Channel dialog - ✅ Disabled channels filtered from live controller dropdown **Remaining**: -- [ ] Camera selector dropdown in channel configuration (for multi-camera) -- [ ] Channel group editor widget -- [ ] Acquisition setup to select channel groups +- [ ] Camera selector dropdown in CameraSettingsWidget (to switch which camera's settings are shown) +- [ ] Acquisition setup to select channel groups for acquisition -### Phase 3: Acquisition Engine +### Phase 3: Acquisition Engine (Partially Complete) **Scope**: Implement multi-camera acquisition with channel groups. -**Changes**: -- [ ] `LiveController` - Support multiple camera instances +**Completed**: +- ✅ `Microscope` - Multi-camera instantiation via `build_from_global_config()` +- ✅ `Microscope` - Camera dict API (`get_camera(id)`, `get_camera_ids()`, `camera_count`) +- ✅ `LiveController` - Support multiple camera instances with `switch_camera()` +- ✅ `LiveController` - Camera switching on channel change (`set_microscope_mode`) + +**Remaining**: - [ ] `MultiPointWorker` - Process channel groups with synchronization -- [ ] Hardware triggering with timing offsets +- [ ] Hardware triggering with timing offsets for simultaneous acquisition +- [ ] Per-camera triggering for multi-camera simultaneous capture -### Phase 4: Testing and Documentation +### Phase 4: Testing and Documentation (In Progress) **Scope**: Comprehensive testing and user documentation. **Deliverables**: - ✅ Unit tests for new models (v1.0 schema validation) +- ✅ Unit tests for camera config factory (INI settings integration) +- ✅ Unit tests for Microscope multi-camera API +- ✅ Unit tests for LiveController camera switching +- ✅ Documentation: configuration-system.md (INI settings + cameras.yaml) - [ ] Integration tests for multi-camera acquisition -- [ ] User documentation updates +- [ ] End-to-end multi-camera acquisition testing --- @@ -631,7 +695,7 @@ the migration script. | File | Description | |------|-------------| -| `machine_configs/cameras.yaml` | Camera registry | +| `machine_configs/cameras.yaml` | Camera registry (names, serial numbers) | | `machine_configs/filter_wheels.yaml` | Filter wheel registry | | `control/models/camera_registry.py` | CameraRegistryConfig model | | `control/models/filter_wheel_config.py` | FilterWheelRegistryConfig model | @@ -640,10 +704,16 @@ the migration script. | File | Changes | |------|---------| +| `control/_def.py` | Added `USE_MULTI_CAMERA`, `MULTI_CAMERA_IDS` settings | +| `control/widgets.py` | Added `CameraConfiguratorDialog` class | +| `control/gui_hcs.py` | Added "Camera Configuration" menu item | +| `squid/camera/config_factory.py` | INI settings integration in `create_camera_configs()` | +| `control/microscope.py` | Multi-camera instantiation and API | +| `control/core/live_controller.py` | Camera switching support | | `control/models/acquisition_config.py` | Restructure for v1.0 schema | | `control/models/__init__.py` | Export new models | | `control/core/config/repository.py` | Load new config files | -| `control/core/config/utils.py` | Validation utilities | +| `tests/control/test_microscope_cameras.py` | Multi-camera tests with INI mocking | | `user_profiles/*/channel_configs/*.yaml` | New v1.0 schema format | --- diff --git a/software/squid/abc.py b/software/squid/abc.py index 279968d96..7a9429a2a 100644 --- a/software/squid/abc.py +++ b/software/squid/abc.py @@ -4,6 +4,7 @@ from typing import Callable, Optional, Tuple, Sequence, List, Dict import abc import enum +import threading import time import pydantic @@ -417,7 +418,9 @@ def __init__( # Frame callbacks is a list of (id, callback) managed by add_frame_callback and remove_frame_callback. # Your frame receiving functions should call self._send_frame_to_callbacks(frame), and doesn't need # to do more than that. + # Protected by _frame_callbacks_lock for thread safety (camera threads vs GUI/TCP threads). self._frame_callbacks: List[Tuple[int, Callable[[CameraFrame], None]]] = [] + self._frame_callbacks_lock = threading.Lock() self._frame_callbacks_enabled = True # Software crop is applied after hardware crop (setting ROI). The ratio is based on size after hardware crop. @@ -459,22 +462,24 @@ def add_frame_callback(self, frame_callback: Callable[[CameraFrame], None]) -> i Returns the callback ID that can be used to remove the callback later if needed. """ - try: - next_id = max(t[0] for t in self._frame_callbacks) + 1 - except ValueError: - next_id = 1 + with self._frame_callbacks_lock: + try: + next_id = max(t[0] for t in self._frame_callbacks) + 1 + except ValueError: + next_id = 1 - self._frame_callbacks.append((next_id, frame_callback)) + self._frame_callbacks.append((next_id, frame_callback)) return next_id def remove_frame_callback(self, callback_id): - try: - idx_to_remove = [t[0] for t in self._frame_callbacks].index(callback_id) - self._log.debug(f"Removing callback with id={callback_id} at idx={idx_to_remove}.") - del self._frame_callbacks[idx_to_remove] - except ValueError: - self._log.warning(f"No callback with id={callback_id}, cannot remove it.") + with self._frame_callbacks_lock: + try: + idx_to_remove = [t[0] for t in self._frame_callbacks].index(callback_id) + self._log.debug(f"Removing callback with id={callback_id} at idx={idx_to_remove}.") + del self._frame_callbacks[idx_to_remove] + except ValueError: + self._log.warning(f"No callback with id={callback_id}, cannot remove it.") def _propogate_frame(self, camera_frame: CameraFrame): """ @@ -485,7 +490,11 @@ def _propogate_frame(self, camera_frame: CameraFrame): """ if not self._frame_callbacks_enabled: return - for _, cb in self._frame_callbacks: + # Copy the callback list under lock to avoid iteration issues if list is modified + with self._frame_callbacks_lock: + callbacks = list(self._frame_callbacks) + # Call callbacks outside the lock to avoid deadlocks + for _, cb in callbacks: cb(camera_frame) @abc.abstractmethod diff --git a/software/squid/camera/config_factory.py b/software/squid/camera/config_factory.py new file mode 100644 index 000000000..ee3d6af01 --- /dev/null +++ b/software/squid/camera/config_factory.py @@ -0,0 +1,132 @@ +""" +Camera configuration factory for multi-camera support. + +This module generates per-camera CameraConfig instances from the camera registry, +allowing each camera to have its own configuration while sharing common settings. +""" + +import logging +from typing import Dict, Optional + +from control.models import CameraRegistryConfig +from squid.config import CameraConfig + +logger = logging.getLogger(__name__) + +# Default camera ID for single-camera systems or fallback scenarios. +# This is used when no camera registry is configured, maintaining backward +# compatibility with single-camera workflows. +DEFAULT_SINGLE_CAMERA_ID = 1 + + +def create_camera_configs( + camera_registry: Optional[CameraRegistryConfig], + base_config: CameraConfig, +) -> Dict[int, CameraConfig]: + """Generate per-camera configs from INI settings + optional registry. + + Serial numbers come from INI (MULTI_CAMERA_SNS). The camera registry + (cameras.yaml) is optional and provides friendly names for the UI. + + All cameras use the same type (CAMERA_TYPE in INI). + + Behavior is controlled by INI settings: + - USE_MULTI_CAMERA=False: Returns single camera config (ID 1) + - USE_MULTI_CAMERA=True: Creates configs for each camera in MULTI_CAMERA_IDS + using serial numbers from MULTI_CAMERA_SNS + + Args: + camera_registry: Optional camera registry (from cameras.yaml) for names. + Not required - serial numbers come from INI. + base_config: Base camera configuration template (from _def.py). + + Returns: + Dict mapping camera ID to CameraConfig. + For single camera systems: {1: config} + For multi-camera systems: {cam_id: config for each camera in MULTI_CAMERA_IDS} + """ + import control._def + + # Check if multi-camera mode is enabled via INI settings + use_multi_camera = getattr(control._def, "USE_MULTI_CAMERA", False) + + if not use_multi_camera: + # Single-camera mode: return base config with default ID + logger.debug(f"USE_MULTI_CAMERA=False, using single camera with ID {DEFAULT_SINGLE_CAMERA_ID}") + return {DEFAULT_SINGLE_CAMERA_ID: base_config} + + # Get camera IDs and serial numbers from INI + configured_ids = list(getattr(control._def, "MULTI_CAMERA_IDS", [1])) + camera_sns = getattr(control._def, "MULTI_CAMERA_SNS", {}) + + # Convert string keys to int (INI parser may give us strings) + camera_sns = {int(k): v for k, v in camera_sns.items()} + + logger.debug(f"Multi-camera mode enabled, IDs: {configured_ids}, SNs: {camera_sns}") + + # Validate: MULTI_CAMERA_SNS must not be empty + if not camera_sns: + example_sns = ", ".join(f'"{cid}": "YOUR_SN_{cid}"' for cid in configured_ids[:2]) + logger.error( + f"MULTI_CAMERA_SNS is empty but USE_MULTI_CAMERA=True. " + f"Either add serial numbers: multi_camera_sns = {{{example_sns}}}, " + f"or disable multi-camera: use_multi_camera = False" + ) + raise ValueError( + f"MULTI_CAMERA_SNS is empty. When USE_MULTI_CAMERA=True, you must provide " + f"serial numbers for each camera ID. Example: multi_camera_sns = {{{example_sns}}}" + ) + + # Validate: each camera ID must have a serial number in INI + missing_sns = [cid for cid in configured_ids if cid not in camera_sns] + if missing_sns: + logger.error( + f"MULTI_CAMERA_SNS missing serial numbers for camera IDs: {missing_sns}. " + f'Add them to the INI file, e.g., multi_camera_sns = {{"{missing_sns[0]}": "YOUR_SN"}}' + ) + raise ValueError( + f"Missing serial numbers in MULTI_CAMERA_SNS for camera IDs: {missing_sns}. " + f"All cameras in MULTI_CAMERA_IDS must have a serial number in MULTI_CAMERA_SNS." + ) + + configs: Dict[int, CameraConfig] = {} + + for camera_id in configured_ids: + serial_number = camera_sns[camera_id] + + # Create a copy of the base config for this camera + cam_config = base_config.model_copy(deep=True) + cam_config.serial_number = serial_number + + # Get friendly name from registry if available + camera_name = f"Camera {camera_id}" + if camera_registry: + camera_def = camera_registry.get_camera_by_id(camera_id) + if camera_def and camera_def.name: + camera_name = camera_def.name + + configs[camera_id] = cam_config + logger.debug(f"Created config for camera {camera_id} ('{camera_name}', SN: {serial_number})") + + logger.info(f"Created {len(configs)} camera configurations: IDs {sorted(configs.keys())}") + return configs + + +def get_primary_camera_id(camera_ids: list[int]) -> int: + """Get the primary camera ID from a list of camera IDs. + + The primary camera is the one with the lowest ID, which is used for + backward compatibility with single-camera code paths. + + Args: + camera_ids: List of camera IDs. + + Returns: + The lowest camera ID. + + Raises: + ValueError: If camera_ids is empty. + """ + if not camera_ids: + raise ValueError("No camera IDs provided") + return min(camera_ids) diff --git a/software/tests/control/test_microscope_cameras.py b/software/tests/control/test_microscope_cameras.py new file mode 100644 index 000000000..9d00dcb4b --- /dev/null +++ b/software/tests/control/test_microscope_cameras.py @@ -0,0 +1,564 @@ +"""Tests for multi-camera support in Microscope class.""" + +import pytest +from unittest.mock import MagicMock + +from control.models import CameraRegistryConfig, CameraDefinition +from squid.camera.config_factory import create_camera_configs, get_primary_camera_id +from squid.config import CameraConfig, CameraVariant, CameraPixelFormat + + +@pytest.fixture +def base_camera_config(): + """Minimal camera config for testing.""" + return CameraConfig( + camera_type=CameraVariant.TOUPCAM, + default_pixel_format=CameraPixelFormat.MONO16, + ) + + +class TestCameraConfigFactory: + """Tests for create_camera_configs(). + + Note: create_camera_configs() respects INI settings: + - USE_MULTI_CAMERA=False (default): Returns single camera with ID 1 + - USE_MULTI_CAMERA=True: Uses MULTI_CAMERA_IDS and MULTI_CAMERA_SNS from INI + - Serial numbers come from MULTI_CAMERA_SNS (INI), not from cameras.yaml + - cameras.yaml (registry) is optional and provides friendly names only + """ + + def test_no_registry_returns_single_camera(self, base_camera_config, monkeypatch): + """When USE_MULTI_CAMERA=False, return single camera with ID 1.""" + import control._def + + monkeypatch.setattr(control._def, "USE_MULTI_CAMERA", False) + configs = create_camera_configs(None, base_camera_config) + assert list(configs.keys()) == [1] + assert configs[1] == base_camera_config + + def test_empty_registry_returns_single_camera(self, base_camera_config, monkeypatch): + """When USE_MULTI_CAMERA=False, return single camera with ID 1.""" + import control._def + + monkeypatch.setattr(control._def, "USE_MULTI_CAMERA", False) + registry = CameraRegistryConfig(cameras=[]) + configs = create_camera_configs(registry, base_camera_config) + assert list(configs.keys()) == [1] + + def test_multi_camera_disabled_ignores_ini_settings(self, base_camera_config, monkeypatch): + """When USE_MULTI_CAMERA=False, INI settings are ignored.""" + import control._def + + monkeypatch.setattr(control._def, "USE_MULTI_CAMERA", False) + monkeypatch.setattr(control._def, "MULTI_CAMERA_IDS", [1, 2]) + monkeypatch.setattr(control._def, "MULTI_CAMERA_SNS", {1: "SN001", 2: "SN002"}) + configs = create_camera_configs(None, base_camera_config) + # Should return single camera with base config + assert list(configs.keys()) == [1] + assert configs[1].serial_number == base_camera_config.serial_number + + def test_single_camera_from_ini(self, base_camera_config, monkeypatch): + """Single camera gets serial number from INI.""" + import control._def + + monkeypatch.setattr(control._def, "USE_MULTI_CAMERA", True) + monkeypatch.setattr(control._def, "MULTI_CAMERA_IDS", [1]) + monkeypatch.setattr(control._def, "MULTI_CAMERA_SNS", {1: "SN001"}) + configs = create_camera_configs(None, base_camera_config) + assert list(configs.keys()) == [1] + assert configs[1].serial_number == "SN001" + + def test_multi_camera_from_ini(self, base_camera_config, monkeypatch): + """Multiple cameras get serial numbers from INI.""" + import control._def + + monkeypatch.setattr(control._def, "USE_MULTI_CAMERA", True) + monkeypatch.setattr(control._def, "MULTI_CAMERA_IDS", [1, 2]) + monkeypatch.setattr(control._def, "MULTI_CAMERA_SNS", {1: "SN001", 2: "SN002"}) + configs = create_camera_configs(None, base_camera_config) + assert sorted(configs.keys()) == [1, 2] + assert configs[1].serial_number == "SN001" + assert configs[2].serial_number == "SN002" + + def test_camera_ids_not_sequential(self, base_camera_config, monkeypatch): + """Camera IDs don't have to be sequential.""" + import control._def + + monkeypatch.setattr(control._def, "USE_MULTI_CAMERA", True) + monkeypatch.setattr(control._def, "MULTI_CAMERA_IDS", [5, 10]) + monkeypatch.setattr(control._def, "MULTI_CAMERA_SNS", {5: "SN005", 10: "SN010"}) + configs = create_camera_configs(None, base_camera_config) + assert sorted(configs.keys()) == [5, 10] + + def test_base_config_is_copied(self, base_camera_config, monkeypatch): + """Each camera gets a deep copy of base config.""" + import control._def + + monkeypatch.setattr(control._def, "USE_MULTI_CAMERA", True) + monkeypatch.setattr(control._def, "MULTI_CAMERA_IDS", [1, 2]) + monkeypatch.setattr(control._def, "MULTI_CAMERA_SNS", {1: "SN001", 2: "SN002"}) + configs = create_camera_configs(None, base_camera_config) + + # Verify they're different objects + assert configs[1] is not configs[2] + assert configs[1] is not base_camera_config + + # Verify serial numbers are different + assert configs[1].serial_number == "SN001" + assert configs[2].serial_number == "SN002" + + # Verify other properties are copied from base + assert configs[1].camera_type == base_camera_config.camera_type + assert configs[2].camera_type == base_camera_config.camera_type + + def test_missing_serial_number_raises(self, base_camera_config, monkeypatch): + """Missing serial number in MULTI_CAMERA_SNS raises error.""" + import control._def + + monkeypatch.setattr(control._def, "USE_MULTI_CAMERA", True) + monkeypatch.setattr(control._def, "MULTI_CAMERA_IDS", [1, 2]) + monkeypatch.setattr(control._def, "MULTI_CAMERA_SNS", {1: "SN001"}) # Missing camera 2 + + with pytest.raises(ValueError, match="Missing serial numbers"): + create_camera_configs(None, base_camera_config) + + def test_empty_sns_dict_raises(self, base_camera_config, monkeypatch): + """Empty MULTI_CAMERA_SNS dict raises specific error.""" + import control._def + + monkeypatch.setattr(control._def, "USE_MULTI_CAMERA", True) + monkeypatch.setattr(control._def, "MULTI_CAMERA_IDS", [1, 2]) + monkeypatch.setattr(control._def, "MULTI_CAMERA_SNS", {}) # Empty dict + + with pytest.raises(ValueError, match="MULTI_CAMERA_SNS is empty"): + create_camera_configs(None, base_camera_config) + + def test_string_keys_in_sns_dict(self, base_camera_config, monkeypatch): + """String keys in MULTI_CAMERA_SNS are converted to int (INI parser behavior).""" + import control._def + + monkeypatch.setattr(control._def, "USE_MULTI_CAMERA", True) + monkeypatch.setattr(control._def, "MULTI_CAMERA_IDS", [1, 2]) + # INI parser may give us string keys + monkeypatch.setattr(control._def, "MULTI_CAMERA_SNS", {"1": "SN001", "2": "SN002"}) + configs = create_camera_configs(None, base_camera_config) + assert sorted(configs.keys()) == [1, 2] + assert configs[1].serial_number == "SN001" + assert configs[2].serial_number == "SN002" + + def test_registry_provides_names_only(self, base_camera_config, monkeypatch): + """Registry (cameras.yaml) provides names but serial numbers come from INI.""" + import control._def + + monkeypatch.setattr(control._def, "USE_MULTI_CAMERA", True) + monkeypatch.setattr(control._def, "MULTI_CAMERA_IDS", [1, 2]) + monkeypatch.setattr(control._def, "MULTI_CAMERA_SNS", {1: "INI_SN1", 2: "INI_SN2"}) + + # Registry has different serial numbers (should be ignored) + registry = CameraRegistryConfig( + cameras=[ + CameraDefinition(id=1, name="Main Camera", serial_number="YAML_SN1"), + CameraDefinition(id=2, name="Side Camera", serial_number="YAML_SN2"), + ] + ) + configs = create_camera_configs(registry, base_camera_config) + + # Serial numbers should come from INI, not YAML + assert sorted(configs.keys()) == [1, 2] + assert configs[1].serial_number == "INI_SN1" + assert configs[2].serial_number == "INI_SN2" + + +class TestGetPrimaryCameraId: + """Tests for get_primary_camera_id().""" + + def test_single_camera(self): + """Single camera returns its ID.""" + assert get_primary_camera_id([1]) == 1 + assert get_primary_camera_id([5]) == 5 + + def test_multiple_cameras_returns_lowest(self): + """Multiple cameras returns lowest ID.""" + assert get_primary_camera_id([3, 1, 2]) == 1 + assert get_primary_camera_id([10, 5, 20]) == 5 + + def test_empty_list_raises(self): + """Empty list raises ValueError.""" + with pytest.raises(ValueError, match="No camera IDs provided"): + get_primary_camera_id([]) + + +class TestMicroscopeCameraAPI: + """Tests for Microscope multi-camera API.""" + + def test_microscope_camera_property(self): + """microscope.camera returns primary camera for backward compatibility.""" + from control.microscope import Microscope + + # Create mock cameras + camera1 = MagicMock() + camera2 = MagicMock() + cameras = {1: camera1, 2: camera2} + + # Create minimal microscope (skip normal init) + microscope = object.__new__(Microscope) + microscope._cameras = cameras + microscope._primary_camera_id = 1 + + assert microscope.camera is camera1 + + def test_microscope_get_camera(self): + """microscope.get_camera() returns camera by ID.""" + from control.microscope import Microscope + + camera1 = MagicMock() + camera2 = MagicMock() + cameras = {1: camera1, 2: camera2} + + microscope = object.__new__(Microscope) + microscope._cameras = cameras + microscope._primary_camera_id = 1 + + assert microscope.get_camera(1) is camera1 + assert microscope.get_camera(2) is camera2 + + def test_microscope_get_camera_invalid_id(self): + """microscope.get_camera() raises for invalid ID.""" + from control.microscope import Microscope + + microscope = object.__new__(Microscope) + microscope._cameras = {1: MagicMock()} + microscope._primary_camera_id = 1 + + with pytest.raises(ValueError, match="Camera ID 99 not found"): + microscope.get_camera(99) + + def test_microscope_get_camera_ids(self): + """microscope.get_camera_ids() returns sorted IDs.""" + from control.microscope import Microscope + + microscope = object.__new__(Microscope) + microscope._cameras = {5: MagicMock(), 1: MagicMock(), 3: MagicMock()} + microscope._primary_camera_id = 1 + + assert microscope.get_camera_ids() == [1, 3, 5] + + def test_microscope_get_camera_count(self): + """microscope.get_camera_count() returns number of cameras.""" + from control.microscope import Microscope + + microscope = object.__new__(Microscope) + microscope._cameras = {1: MagicMock(), 2: MagicMock()} + microscope._primary_camera_id = 1 + + assert microscope.get_camera_count() == 2 + + def test_microscope_backward_compat_single_camera(self): + """Passing single camera wraps it in dict with ID 1.""" + from control.microscope import Microscope + + single_camera = MagicMock() + + microscope = object.__new__(Microscope) + microscope._log = MagicMock() + + # Simulate __init__ logic for camera handling + cameras = single_camera # Not a dict + if isinstance(cameras, dict): + microscope._cameras = cameras + else: + microscope._cameras = {1: cameras} + microscope._primary_camera_id = get_primary_camera_id(list(microscope._cameras.keys())) + + assert microscope._cameras == {1: single_camera} + assert microscope._primary_camera_id == 1 + assert microscope.camera is single_camera + + +class TestLiveControllerMultiCamera: + """Tests for LiveController multi-camera support.""" + + def _create_mock_microscope(self, cameras_dict): + """Create a mock microscope with given cameras.""" + from control.microscope import Microscope + + microscope = object.__new__(Microscope) + microscope._cameras = cameras_dict + microscope._primary_camera_id = min(cameras_dict.keys()) + microscope._log = MagicMock() + microscope.config_repo = MagicMock() + return microscope + + def _create_mock_channel(self, name="Channel", camera_id=None): + """Create a mock acquisition channel.""" + channel = MagicMock() + channel.name = name + channel.camera = camera_id + channel.exposure_time = 100 + channel.analog_gain = 1.0 + return channel + + def test_live_controller_tracks_active_camera(self): + """LiveController tracks active camera ID.""" + from control.core.live_controller import LiveController + + camera1 = MagicMock() + camera2 = MagicMock() + microscope = self._create_mock_microscope({1: camera1, 2: camera2}) + + controller = LiveController(microscope, camera1, control_illumination=False) + + assert controller._active_camera_id == 1 + assert controller.get_active_camera_id() == 1 + assert controller.camera is camera1 + + def test_get_target_camera_id_returns_channel_camera(self): + """_get_target_camera_id returns channel's camera ID when specified.""" + from control.core.live_controller import LiveController + + camera1 = MagicMock() + camera2 = MagicMock() + microscope = self._create_mock_microscope({1: camera1, 2: camera2}) + + controller = LiveController(microscope, camera1, control_illumination=False) + + channel = self._create_mock_channel(camera_id=2) + assert controller._get_target_camera_id(channel) == 2 + + def test_get_target_camera_id_returns_primary_for_none(self): + """_get_target_camera_id returns primary camera ID when channel.camera is None.""" + from control.core.live_controller import LiveController + + camera1 = MagicMock() + camera2 = MagicMock() + microscope = self._create_mock_microscope({1: camera1, 2: camera2}) + + controller = LiveController(microscope, camera1, control_illumination=False) + + channel = self._create_mock_channel(camera_id=None) + assert controller._get_target_camera_id(channel) == 1 + + def test_switch_camera_updates_camera_reference(self): + """_switch_camera updates the camera reference.""" + from control.core.live_controller import LiveController + + camera1 = MagicMock() + camera2 = MagicMock() + microscope = self._create_mock_microscope({1: camera1, 2: camera2}) + + controller = LiveController(microscope, camera1, control_illumination=False) + assert controller.camera is camera1 + + controller._switch_camera(2) + + assert controller.camera is camera2 + assert controller._active_camera_id == 2 + + def test_switch_camera_noop_when_same_camera(self): + """_switch_camera does nothing when switching to same camera.""" + from control.core.live_controller import LiveController + + camera1 = MagicMock() + microscope = self._create_mock_microscope({1: camera1}) + + controller = LiveController(microscope, camera1, control_illumination=False) + + # This should be a no-op + controller._switch_camera(1) + + assert controller.camera is camera1 + assert controller._active_camera_id == 1 + + def test_switch_camera_raises_for_invalid_id(self): + """_switch_camera raises ValueError for invalid camera ID.""" + from control.core.live_controller import LiveController + + camera1 = MagicMock() + microscope = self._create_mock_microscope({1: camera1}) + + controller = LiveController(microscope, camera1, control_illumination=False) + + with pytest.raises(ValueError, match="Camera ID 99 not found"): + controller._switch_camera(99) + + def test_set_microscope_mode_switches_camera(self): + """set_microscope_mode switches to channel's camera.""" + from control.core.live_controller import LiveController + + camera1 = MagicMock() + camera2 = MagicMock() + microscope = self._create_mock_microscope({1: camera1, 2: camera2}) + + controller = LiveController(microscope, camera1, control_illumination=False) + controller.is_live = False # Not live, so no streaming operations + + channel = self._create_mock_channel(name="Camera2 Channel", camera_id=2) + controller.set_microscope_mode(channel) + + assert controller.camera is camera2 + assert controller._active_camera_id == 2 + camera2.set_exposure_time.assert_called_once_with(100) + + def test_set_microscope_mode_stays_on_same_camera(self): + """set_microscope_mode doesn't switch when channel uses same camera.""" + from control.core.live_controller import LiveController + + camera1 = MagicMock() + camera2 = MagicMock() + microscope = self._create_mock_microscope({1: camera1, 2: camera2}) + + controller = LiveController(microscope, camera1, control_illumination=False) + controller.is_live = False + + # Channel uses camera 1 (same as current) + channel = self._create_mock_channel(name="Camera1 Channel", camera_id=1) + controller.set_microscope_mode(channel) + + assert controller.camera is camera1 + assert controller._active_camera_id == 1 + camera1.set_exposure_time.assert_called_once_with(100) + + def test_set_microscope_mode_uses_primary_for_none_camera(self): + """set_microscope_mode uses primary camera when channel.camera is None.""" + from control.core.live_controller import LiveController + + camera1 = MagicMock() + camera2 = MagicMock() + microscope = self._create_mock_microscope({1: camera1, 2: camera2}) + + controller = LiveController(microscope, camera1, control_illumination=False) + controller.is_live = False + + # Channel has no camera specified + channel = self._create_mock_channel(camera_id=None) + controller.set_microscope_mode(channel) + + assert controller.camera is camera1 + assert controller._active_camera_id == 1 + + def test_set_microscope_mode_handles_streaming_on_camera_switch(self): + """set_microscope_mode stops/starts streaming when switching cameras while live.""" + from control.core.live_controller import LiveController + + camera1 = MagicMock() + camera2 = MagicMock() + microscope = self._create_mock_microscope({1: camera1, 2: camera2}) + + controller = LiveController(microscope, camera1, control_illumination=False) + controller.is_live = True + controller.timer_trigger = None # No active timer + + channel = self._create_mock_channel(name="Camera2 Channel", camera_id=2) + controller.set_microscope_mode(channel) + + # Old camera streaming stopped + camera1.stop_streaming.assert_called_once() + # New camera streaming started + camera2.start_streaming.assert_called_once() + # New camera has exposure set + camera2.set_exposure_time.assert_called_once_with(100) + + def test_set_microscope_mode_invalid_camera_no_state_change(self): + """set_microscope_mode with invalid camera ID changes nothing.""" + from control.core.live_controller import LiveController + + camera1 = MagicMock() + microscope = self._create_mock_microscope({1: camera1}) + + controller = LiveController(microscope, camera1, control_illumination=False) + controller.is_live = False + + # Set an initial configuration + initial_channel = self._create_mock_channel(name="Initial", camera_id=1) + controller.set_microscope_mode(initial_channel) + camera1.reset_mock() + + # Try to switch to non-existent camera 99 + invalid_channel = self._create_mock_channel(name="Invalid Camera", camera_id=99) + controller.set_microscope_mode(invalid_channel) + + # State should be unchanged + assert controller.camera is camera1 + assert controller._active_camera_id == 1 + assert controller.currentConfiguration is initial_channel # Not changed + # Camera should not have been touched + camera1.set_exposure_time.assert_not_called() + camera1.stop_streaming.assert_not_called() + + +class TestChannelGroupValidation: + """Tests for validate_channel_group function.""" + + def test_duplicate_channel_names_detected(self): + """Duplicate channel names in a group are detected.""" + from control.models import ( + AcquisitionChannel, + CameraSettings, + ChannelGroup, + ChannelGroupEntry, + IlluminationSettings, + SynchronizationMode, + validate_channel_group, + ) + + # Create channels + channels = [ + AcquisitionChannel( + name="Channel A", + camera_settings=CameraSettings(exposure_time_ms=100, gain_mode=1.0), + illumination_settings=IlluminationSettings(illumination_channel="BF", intensity=50.0), + ), + ] + + # Create group with duplicate channel + group = ChannelGroup( + name="Test Group", + synchronization=SynchronizationMode.SEQUENTIAL, + channels=[ + ChannelGroupEntry(name="Channel A"), + ChannelGroupEntry(name="Channel A"), # Duplicate! + ], + ) + + errors = validate_channel_group(group, channels) + assert any("duplicate channels" in e.lower() for e in errors) + + def test_no_duplicate_channels_passes(self): + """Group without duplicate channels passes validation.""" + from control.models import ( + AcquisitionChannel, + CameraSettings, + ChannelGroup, + ChannelGroupEntry, + IlluminationSettings, + SynchronizationMode, + validate_channel_group, + ) + + # Create channels + channels = [ + AcquisitionChannel( + name="Channel A", + camera_settings=CameraSettings(exposure_time_ms=100, gain_mode=1.0), + illumination_settings=IlluminationSettings(illumination_channel="BF", intensity=50.0), + ), + AcquisitionChannel( + name="Channel B", + camera_settings=CameraSettings(exposure_time_ms=100, gain_mode=1.0), + illumination_settings=IlluminationSettings(illumination_channel="GFP", intensity=50.0), + ), + ] + + # Create group with unique channels + group = ChannelGroup( + name="Test Group", + synchronization=SynchronizationMode.SEQUENTIAL, + channels=[ + ChannelGroupEntry(name="Channel A"), + ChannelGroupEntry(name="Channel B"), + ], + ) + + errors = validate_channel_group(group, channels) + # Should not have duplicate channel error + assert not any("duplicate channels" in e.lower() for e in errors) diff --git a/software/tests/control/test_multi_camera_models.py b/software/tests/control/test_multi_camera_models.py index 723d22db2..40060bfea 100644 --- a/software/tests/control/test_multi_camera_models.py +++ b/software/tests/control/test_multi_camera_models.py @@ -94,6 +94,27 @@ def test_camera_definition_invalid_id_rejected(self): CameraDefinition(id=0, serial_number="ABC12345") assert "greater than or equal to 1" in str(exc_info.value) + def test_camera_definition_with_trigger_channel(self): + """Test camera definition with trigger_channel field.""" + camera = CameraDefinition( + name="Main Camera", + id=1, + serial_number="ABC12345", + trigger_channel=0, + ) + assert camera.trigger_channel == 0 + + def test_camera_definition_trigger_channel_none_by_default(self): + """Test that trigger_channel is None by default.""" + camera = CameraDefinition(serial_number="ABC12345") + assert camera.trigger_channel is None + + def test_camera_definition_negative_trigger_channel_rejected(self): + """Test that negative trigger_channel is rejected.""" + with pytest.raises(ValidationError) as exc_info: + CameraDefinition(serial_number="ABC12345", trigger_channel=-1) + assert "greater than or equal to 0" in str(exc_info.value) + class TestCameraRegistryConfig: """Tests for CameraRegistryConfig model.""" @@ -235,6 +256,39 @@ def test_duplicate_camera_ids_rejected(self): ) assert "Camera IDs must be unique" in str(exc_info.value) + def test_get_trigger_channel_configured(self): + """Test getting trigger channel when explicitly configured.""" + registry = CameraRegistryConfig( + cameras=[ + CameraDefinition(name="Camera 1", id=1, serial_number="ABC", trigger_channel=2), + CameraDefinition(name="Camera 2", id=2, serial_number="DEF", trigger_channel=5), + ] + ) + assert registry.get_trigger_channel(1) == 2 + assert registry.get_trigger_channel(2) == 5 + + def test_get_trigger_channel_default(self): + """Test that trigger channel defaults to camera_id - 1.""" + registry = CameraRegistryConfig( + cameras=[ + CameraDefinition(name="Camera 1", id=1, serial_number="ABC"), + CameraDefinition(name="Camera 2", id=2, serial_number="DEF"), + ] + ) + # No trigger_channel configured, should default to id - 1 + assert registry.get_trigger_channel(1) == 0 # camera 1 → channel 0 + assert registry.get_trigger_channel(2) == 1 # camera 2 → channel 1 + + def test_get_trigger_channel_not_found(self): + """Test trigger channel for non-existent camera ID.""" + registry = CameraRegistryConfig( + cameras=[ + CameraDefinition(name="Camera 1", id=1, serial_number="ABC"), + ] + ) + # Non-existent camera ID 99: should default to 99 - 1 = 98 + assert registry.get_trigger_channel(99) == 98 + class TestFilterWheelDefinition: """Tests for FilterWheelDefinition model.""" diff --git a/software/tools/test_multi_camera_hw_trigger.py b/software/tools/test_multi_camera_hw_trigger.py new file mode 100644 index 000000000..bc7bc3df3 --- /dev/null +++ b/software/tools/test_multi_camera_hw_trigger.py @@ -0,0 +1,213 @@ +#!/usr/bin/env python3 +""" +Test hardware trigger for multi-camera Toupcam systems. + +This script directly enumerates cameras and tests hardware triggers +without relying on INI or YAML configuration files. + +Usage: + cd software + python tools/test_multi_camera_hw_trigger.py +""" + +import sys +import time +import argparse +import logging +import io + +sys.path.insert(0, ".") + +# Suppress logs during imports +logging.basicConfig(level=logging.ERROR) +_stdout, _stderr = sys.stdout, sys.stderr +sys.stdout = sys.stderr = io.StringIO() + +try: + import control.toupcam as toupcam + from control.microcontroller import Microcontroller, get_microcontroller_serial_device + from control.camera_toupcam import ToupcamCamera + from squid.config import CameraConfig, CameraVariant, CameraPixelFormat, ToupcamCameraModel + from squid.abc import CameraAcquisitionMode +finally: + sys.stdout, sys.stderr = _stdout, _stderr + +for name in ["squid", "control", ""]: + lg = logging.getLogger(name) + lg.setLevel(logging.ERROR) + lg.handlers.clear() + + +def test_mapping(cameras, mcu, channel_mapping, num_frames, illumination_time_us, timeout): + """Test a specific camera-to-channel mapping.""" + results = {sn: {"captured": 0, "failed": 0, "trigger_ch": ch} for sn, ch in channel_mapping.items()} + + for i in range(num_frames): + for sn, camera in cameras.items(): + trigger_ch = channel_mapping[sn] + + mcu.send_hardware_trigger( + control_illumination=True, + illumination_on_time_us=illumination_time_us, + trigger_output_ch=trigger_ch, + ) + + start_time = time.time() + frame = None + while time.time() - start_time < timeout: + frame = camera.read_frame() + if frame is not None: + break + time.sleep(0.01) + + if frame is not None: + results[sn]["captured"] += 1 + else: + results[sn]["failed"] += 1 + print(f" TIMEOUT: {sn[:12]}... frame {i+1}") + + time.sleep(0.05) + + if (i + 1) % 10 == 0 or i == num_frames - 1: + print(f" Progress: {i+1}/{num_frames} frames") + + return results + + +def main(): + parser = argparse.ArgumentParser(description="Test hardware trigger for multi-camera Toupcam systems") + parser.add_argument("--exposure-ms", type=float, default=50.0, help="Exposure time in ms") + parser.add_argument("--num-frames", type=int, default=5, help="Number of frames per camera") + parser.add_argument("--timeout", type=float, default=0.5, help="Frame timeout in seconds") + args = parser.parse_args() + + print("Multi-Camera Hardware Trigger Test (standalone)") + print("=" * 50) + + # Enumerate cameras directly + devices = toupcam.Toupcam.EnumV2() + if len(devices) < 2: + print(f"ERROR: Need at least 2 cameras, found {len(devices)}") + return 1 + + print(f"Found {len(devices)} camera(s):") + serial_numbers = [] + for i, dev in enumerate(devices): + print(f" [{i}] {dev.id} ({dev.displayname})") + serial_numbers.append(dev.id) + + # Initialize microcontroller + print("\nInitializing microcontroller...", end=" ") + sys.stdout.flush() + try: + mcu = Microcontroller(serial_device=get_microcontroller_serial_device()) + mcu.initialize_drivers() + print("OK") + except Exception as e: + print(f"FAILED: {e}") + return 1 + + # Create trigger/strobe functions + def make_trigger_fn(ch): + def fn(illumination_time_ms): + us = int(illumination_time_ms * 1000) if illumination_time_ms else 0 + mcu.send_hardware_trigger(True, us, ch) + return True + + return fn + + def make_strobe_fn(ch): + def fn(strobe_delay_ms): + mcu.set_strobe_delay_us(int(strobe_delay_ms * 1000), ch) + return True + + return fn + + # Open cameras using ToupcamCamera + print("Opening cameras...", end=" ") + sys.stdout.flush() + cameras = {} + try: + for i, sn in enumerate(serial_numbers[:2]): # First 2 cameras + config = CameraConfig( + camera_type=CameraVariant.TOUPCAM, + camera_model=ToupcamCameraModel.ITR3CMOS26000KMA, + serial_number=sn, + default_pixel_format=CameraPixelFormat.MONO16, + default_binning=(1, 1), + default_fan_speed=1, + default_temperature=20.0, + default_black_level=3, + ) + camera = ToupcamCamera( + config=config, + hw_trigger_fn=make_trigger_fn(i), + hw_set_strobe_delay_ms_fn=make_strobe_fn(i), + ) + camera.set_exposure_time(args.exposure_ms) + camera.set_acquisition_mode(CameraAcquisitionMode.HARDWARE_TRIGGER) + camera.start_streaming() + cameras[sn] = camera + print("OK") + except Exception as e: + print(f"FAILED: {e}") + for c in cameras.values(): + c.close() + return 1 + + time.sleep(0.5) + + # Test both mappings + sn0, sn1 = serial_numbers[0], serial_numbers[1] + illumination_time_us = int(args.exposure_ms * 1000) + + print(f"\n=== Mapping 1: Cam0 -> Ch0, Cam1 -> Ch1 ===") + mapping1 = {sn0: 0, sn1: 1} + results1 = test_mapping(cameras, mcu, mapping1, args.num_frames, illumination_time_us, args.timeout) + + print(f"\n=== Mapping 2: Cam0 -> Ch1, Cam1 -> Ch0 ===") + mapping2 = {sn0: 1, sn1: 0} + results2 = test_mapping(cameras, mcu, mapping2, args.num_frames, illumination_time_us, args.timeout) + + # Summary + print("\n" + "=" * 50) + print("SUMMARY") + print("=" * 50) + + def print_results(results, label): + print(f"\n{label}:") + all_pass = True + for sn, r in results.items(): + status = "PASS" if r["failed"] == 0 else "FAIL" + if r["failed"] > 0: + all_pass = False + print(f" {sn[:16]}... (ch={r['trigger_ch']}): {r['captured']}/{args.num_frames} - {status}") + return all_pass + + all_passed_1 = print_results(results1, "Mapping 1 (Cam0->Ch0, Cam1->Ch1)") + all_passed_2 = print_results(results2, "Mapping 2 (Cam0->Ch1, Cam1->Ch0)") + + print("\n" + "-" * 50) + if all_passed_1 and not all_passed_2: + print(f"CORRECT MAPPING:") + print(f" {sn0} -> Channel 0") + print(f" {sn1} -> Channel 1") + elif all_passed_2 and not all_passed_1: + print(f"CORRECT MAPPING:") + print(f" {sn0} -> Channel 1") + print(f" {sn1} -> Channel 0") + elif all_passed_1 and all_passed_2: + print("BOTH MAPPINGS WORK") + else: + print("NEITHER MAPPING WORKS - check hardware wiring") + + # Cleanup + for camera in cameras.values(): + camera.stop_streaming() + camera.close() + + return 0 if (all_passed_1 or all_passed_2) else 1 + + +if __name__ == "__main__": + sys.exit(main())