Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,7 @@
.migration_complete

# PlatformIO
.pio/
.pio/

# Worktrees
worktrees/
6 changes: 6 additions & 0 deletions software/control/_def.py
Original file line number Diff line number Diff line change
Expand Up @@ -1105,6 +1105,12 @@ class SOFTWARE_POS_LIMIT:
},
}

# Multi-camera support
# All cameras must be the same type (specified by CAMERA_TYPE above)
USE_MULTI_CAMERA = False
MULTI_CAMERA_IDS = [1] # List of camera IDs to instantiate
MULTI_CAMERA_SNS = {} # Camera ID -> serial number, e.g., {"1": "ABC123", "2": "DEF456"}

# Stage
USE_PRIOR_STAGE = False
PRIOR_STAGE_SN = ""
Expand Down
9 changes: 8 additions & 1 deletion software/control/camera_toupcam.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@ def _open(index=None, sn=None) -> Tuple[toupcam.Toupcam, ToupCamCapabilities]:
if not len(sn_matches):
all_sn = [d.id for d in devices]
raise ValueError(f"Could not find camera with SN={sn}, options are: {','.join(all_sn)}")
# Use the matched index
index = sn_matches[0]
log.info(f"Found camera with SN={sn} at index={index}")

for idx, device in enumerate(devices):
log.info(
Expand Down Expand Up @@ -237,7 +240,11 @@ def __init__(self, config: CameraConfig, hw_trigger_fn, hw_set_strobe_delay_ms_f
# is what the camera driver calls when a new frame is available.
self._raw_camera_stream_started = False
self._raw_frame_callback_lock = threading.Lock()
(self._camera, self._capabilities) = ToupcamCamera._open(index=0)
# Use serial number from config if available, otherwise fall back to index=0
if self._config.serial_number:
(self._camera, self._capabilities) = ToupcamCamera._open(sn=self._config.serial_number)
else:
(self._camera, self._capabilities) = ToupcamCamera._open(index=0)
self._pixel_format = self._config.default_pixel_format
self._binning = self._config.default_binning

Expand Down
7 changes: 6 additions & 1 deletion software/control/core/auto_focus_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,13 @@ def run_autofocus(self):
# "reset_image_ready_flag" arg, so this is broken for all other cameras.
image = self.camera.read_frame()
else:
# Get trigger channel for active camera
active_camera_id = self.liveController._active_camera_id
trigger_channel = self.liveController.microscope.get_trigger_channel_for_camera(active_camera_id)
self.microcontroller.send_hardware_trigger(
control_illumination=True, illumination_on_time_us=self.camera.get_exposure_time() * 1000
control_illumination=True,
illumination_on_time_us=self.camera.get_exposure_time() * 1000,
trigger_output_ch=trigger_channel,
)
image = self.camera.read_frame()
if image is None:
Expand Down
158 changes: 152 additions & 6 deletions software/control/core/live_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import time
import threading
from typing import List, Optional, TYPE_CHECKING
from typing import Callable, List, Optional, TYPE_CHECKING

import squid.logging
from control.microcontroller import Microcontroller
Expand All @@ -12,9 +12,21 @@
from control.core.config.utils import apply_confocal_override
from control.models import merge_channel_configs

# Mapping from camera acquisition mode to LiveController trigger mode
_CAM_MODE_TO_TRIGGER = {
CameraAcquisitionMode.SOFTWARE_TRIGGER: TriggerMode.SOFTWARE,
CameraAcquisitionMode.HARDWARE_TRIGGER: TriggerMode.HARDWARE,
CameraAcquisitionMode.CONTINUOUS: TriggerMode.CONTINUOUS,
}

if TYPE_CHECKING:
from control.models import AcquisitionChannel, IlluminationChannelConfig

# Type aliases for callbacks (multi-camera support)
# Note: TriggerMode values are strings (e.g., "Software Trigger"), not class instances
OnCameraSwitchedCallback = Callable[[AbstractCamera, AbstractCamera], None]
OnTriggerModeChangedCallback = Callable[[str], None]


class LiveController:
def __init__(
Expand All @@ -25,10 +37,16 @@ def __init__(
control_illumination: bool = True,
use_internal_timer_for_hardware_trigger: bool = True,
for_displacement_measurement: bool = False,
initial_camera_id: Optional[int] = None,
):
self._log = squid.logging.get_logger(self.__class__.__name__)
self.microscope = microscope
self.camera: AbstractCamera = camera
# Track which camera is currently active (for multi-camera support)
# Use explicit ID if provided, otherwise fall back to microscope's primary
self._active_camera_id: int = (
initial_camera_id if initial_camera_id is not None else microscope._primary_camera_id
)
self.currentConfiguration: Optional[AcquisitionChannel] = None
self.trigger_mode: Optional[TriggerMode] = TriggerMode.SOFTWARE # @@@ change to None
self.is_live = False
Expand Down Expand Up @@ -57,6 +75,12 @@ def __init__(
# Confocal mode state - when True, use confocal_override from acquisition configs
self._confocal_mode: bool = False

# Callback for camera switch notifications (for multi-camera support)
self.on_camera_switched: Optional[OnCameraSwitchedCallback] = None

# Callback for trigger mode change notifications (for multi-camera support)
self.on_trigger_mode_changed: Optional[OnTriggerModeChangedCallback] = None

# ─────────────────────────────────────────────────────────────────────────────
# Illumination config helpers
# ─────────────────────────────────────────────────────────────────────────────
Expand Down Expand Up @@ -114,6 +138,76 @@ def sync_confocal_mode_from_hardware(self, confocal: bool) -> None:
"""
self.toggle_confocal_widefield(confocal)

# ─────────────────────────────────────────────────────────────────────────────
# Multi-camera support
# ─────────────────────────────────────────────────────────────────────────────

def _get_target_camera_id(self, configuration: "AcquisitionChannel") -> int:
"""Get the camera ID to use for the given channel configuration.

Args:
configuration: The acquisition channel configuration

Returns:
Camera ID to use. If channel.camera is None, returns the primary camera ID.
"""
if configuration.camera is not None:
return configuration.camera
return self.microscope._primary_camera_id

def _switch_camera(self, camera_id: int) -> None:
"""Switch to a different camera for live preview.

This method updates the active camera used by the LiveController.
The caller is responsible for stopping live view before calling
this method if necessary.

Args:
camera_id: The camera ID to switch to

Raises:
ValueError: If the camera ID is not found in the microscope
"""
if camera_id == self._active_camera_id:
return # Already using this camera

new_camera = self.microscope.get_camera(camera_id)
old_camera = self.camera
old_camera_id = self._active_camera_id

self._log.info(f"Switching live camera: {old_camera_id} -> {camera_id}")
self.camera = new_camera
self._active_camera_id = camera_id

# Read the new camera's acquisition mode and update trigger_mode to match
cam_mode = new_camera.get_acquisition_mode()
new_trigger_mode = _CAM_MODE_TO_TRIGGER.get(cam_mode, TriggerMode.SOFTWARE)

if new_trigger_mode != self.trigger_mode:
self._log.info(f"Trigger mode changed: {self.trigger_mode} -> {new_trigger_mode}")
self.trigger_mode = new_trigger_mode
# Notify UI to update trigger mode display
if self.on_trigger_mode_changed:
try:
self.on_trigger_mode_changed(new_trigger_mode)
except Exception as e:
self._log.error(f"Error in on_trigger_mode_changed callback: {e}")

# Notify listeners of camera switch (e.g., to move frame callbacks)
if self.on_camera_switched:
try:
self.on_camera_switched(old_camera, new_camera)
except Exception as e:
self._log.error(f"Error in on_camera_switched callback: {e}")

def get_active_camera_id(self) -> int:
"""Get the ID of the currently active camera.

Returns:
The camera ID currently being used for live preview.
"""
return self._active_camera_id

# ─────────────────────────────────────────────────────────────────────────────
# Channel configuration access
# ─────────────────────────────────────────────────────────────────────────────
Expand Down Expand Up @@ -160,6 +254,18 @@ def get_channels(self, objective: str) -> List["AcquisitionChannel"]:
# Filter to only enabled channels
channels = [ch for ch in channels if ch.enabled]

# In multi-camera systems, filter out enabled channels without camera assignment
# (UI should prevent this, but enforce here as safety check)
if len(self.microscope._cameras) > 1:
unassigned = [ch for ch in channels if ch.camera is None]
if unassigned:
self._log.warning(
f"Skipping {len(unassigned)} enabled channel(s) without camera assignment "
f"in multi-camera system: {[ch.name for ch in unassigned]}. "
f"Assign cameras in Settings > Acquisition Channel Configuration."
)
channels = [ch for ch in channels if ch.camera is not None]

# Apply confocal mode if active
return apply_confocal_override(channels, self._confocal_mode)

Expand Down Expand Up @@ -446,14 +552,41 @@ def set_trigger_fps(self, fps):
self._set_trigger_fps(fps)

# set microscope mode
def set_microscope_mode(self, configuration: "AcquisitionChannel"):
def set_microscope_mode(self, configuration: "AcquisitionChannel") -> bool:
"""Set the microscope mode to the given channel configuration.

Args:
configuration: The acquisition channel configuration to apply.

Returns:
True if mode was successfully set, False if there was an error
(e.g., None configuration, invalid camera ID).
"""
if configuration is None:
self._log.error("set_microscope_mode() called with None configuration - this is a bug in the caller")
return
return False
self._log.info("setting microscope mode to " + configuration.name)

# temporarily stop live while changing mode
if self.is_live is True:
# Check if we need to switch cameras (multi-camera support)
target_camera_id = self._get_target_camera_id(configuration)

# Validate target camera exists BEFORE any state changes
if target_camera_id not in self.microscope._cameras:
available = sorted(self.microscope._cameras.keys())
self._log.error(
f"Channel '{configuration.name}' requires camera {target_camera_id}, "
f"but only cameras {available} exist. Mode not changed."
)
return False # Exit cleanly, no state changed

needs_camera_switch = target_camera_id != self._active_camera_id

# Save live state and temporarily disable to prevent race conditions
# (frame callbacks accessing camera during switch)
was_live = self.is_live

if was_live:
self.is_live = False # Prevent concurrent access during mode change
self._stop_existing_timer()
if self.control_illumination:
# Turn off illumination BEFORE switching self.currentConfiguration.
Expand All @@ -462,6 +595,17 @@ def set_microscope_mode(self, configuration: "AcquisitionChannel"):
# channel's laser instead of the OLD channel's laser (which is still on).
self.turn_off_illumination()

# Stop streaming on old camera before switching
if needs_camera_switch:
self.camera.stop_streaming()

# Switch camera if needed
if needs_camera_switch:
self._switch_camera(target_camera_id)
# Start streaming on new camera if we were live
if was_live:
self.camera.start_streaming()

self.currentConfiguration = configuration

# set camera exposure time and analog gain
Expand All @@ -476,11 +620,13 @@ def set_microscope_mode(self, configuration: "AcquisitionChannel"):
self.update_illumination()

# restart live
if self.is_live is True:
if was_live:
self.is_live = True # Restore live state
if self.control_illumination:
self.turn_on_illumination()
self._start_new_timer()
self._log.info("Done setting microscope mode.")
return True

def get_trigger_mode(self):
return self.trigger_mode
Expand Down
Loading
Loading