From de9c183ca3b78f4e8b2390db3e97dfff042c5b22 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20S=C3=A1nchez-Gallego?= Date: Thu, 28 Aug 2025 18:11:53 -0700 Subject: [PATCH 1/5] Modify handling of binary replies to prevent pixel shifts --- CHANGELOG.md | 4 ++ archon/controller/command.py | 12 +++--- archon/controller/controller.py | 67 +++++++++++++++-------------- tests/conftest.py | 3 +- tests/controller/test_controller.py | 4 ++ 5 files changed, 52 insertions(+), 38 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c74758f..168173e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,10 @@ * Update `sdss-clu` to 2.5.3. +### 🔧 Fixed + +* Fix pixel shifts in fetched buffers. The shifts were caused by a search-and-replace of the header values in the binary reply, which could remove additional data values when the read pixels matched the header values. The fix removes the header as each binary reply is processed. + ## 0.15.4 - August 10, 2025 diff --git a/archon/controller/command.py b/archon/controller/command.py index 4bd2bb6..1a59a3a 100644 --- a/archon/controller/command.py +++ b/archon/controller/command.py @@ -219,15 +219,17 @@ def __init__(self, raw_reply: bytes, command: ArchonCommand): rtype, rcid, rbin, rmessage = parsed.groups() self.type: str = rtype.decode() self.command_id: int = int(rcid, 16) - self.is_binary: bool = rbin.decode() == ":" + self.is_binary: bool = rbin.decode("latin-1") == ":" self.reply: str | bytes if self.is_binary: - # If the reply is binary, remove the prefixes and save the full - # content as the reply. - self.reply = raw_reply.replace(b"<" + rcid + b":", b"") + # If the reply is binary we have already removed all the headers except + # the one for the first block. + self.reply = rmessage else: - self.reply = rmessage.decode().strip() + if rmessage.endswith(b"\n"): + rmessage = rmessage[:-1] + self.reply = rmessage.decode("latin-1") def __str__(self) -> str: if isinstance(self.reply, bytes): diff --git a/archon/controller/controller.py b/archon/controller/controller.py index e4b90c6..5618649 100644 --- a/archon/controller/controller.py +++ b/archon/controller/controller.py @@ -322,26 +322,26 @@ async def send_and_wait( else: warnings.warn(f"Failed running {command_string}.", ArchonUserWarning) - async def process_message(self, line: bytes) -> None: # type: ignore + async def process_message(self, reply: bytes) -> None: # type: ignore """Processes a message from the Archon and associates it with its command.""" - match = re.match(b"^[<|?]([0-9A-F]{2})", line) - if match is None: + header_match = re.match(b"^[<|?]([0-9A-F]{2})", reply) + if header_match is None: warnings.warn( - f"Received invalid reply {line.decode()}", + f"Received invalid reply {reply.decode()}", ArchonControllerWarning, ) return - command_id = int(match[1], 16) + command_id = int(header_match[1], 16) if command_id not in self.__running_commands: warnings.warn( - f"Cannot find running command for {line}", + f"Cannot find running command for {reply}", ArchonControllerWarning, ) return - self.__running_commands[command_id].process_reply(line) + self.__running_commands[command_id].process_reply(reply) async def stop(self): """Stops the client and cancels the command tracker.""" @@ -1276,9 +1276,9 @@ async def fetch( notifier("Reading frame buffer ...") # Set the expected length of binary buffer to read, including the prefixes. - self.set_binary_reply_size((1024 + 4) * n_blocks) + self.set_binary_reply_size(1024 * n_blocks) - cmd: ArchonCommand = await self.send_command( + cmd = await self.send_command( f"FETCH{start_address:08X}{n_blocks:08X}", timeout=None, ) @@ -1333,10 +1333,10 @@ async def fetch( return arr - def set_binary_reply_size(self, size: int): + def set_binary_reply_size(self, size: int | None): """Sets the size of the binary buffers.""" - self._binary_reply = bytearray(size) + self._binary_reply = bytearray(size) if size else None async def _listen(self): """Listens to the reader stream and callbacks on message received.""" @@ -1346,7 +1346,9 @@ async def _listen(self): assert self._client and self._client.reader + # Number of binary bytes received. n_binary = 0 + while True: # Max length of a reply is 1024 bytes for the message preceded by int: """Returns an identifier from the pool.""" if len(self._id_pool) == 0: - raise ArchonControllerError("No ids reamining in the pool!") + raise ArchonControllerError("No ids remaining in the pool!") return self._id_pool.pop() diff --git a/tests/conftest.py b/tests/conftest.py index 1180def..0c1fa81 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -77,7 +77,8 @@ async def handle_connection( reply = (reply.format(cid=cid) + "\n").encode() else: reply = reply.replace(b"{cid}", cid.encode()).ljust( - 1028, b" " + 1028, + b" ", ) writer.write(reply) await writer.drain() diff --git a/tests/controller/test_controller.py b/tests/controller/test_controller.py index 157077d..ce09ff6 100644 --- a/tests/controller/test_controller.py +++ b/tests/controller/test_controller.py @@ -31,12 +31,16 @@ async def test_controller(controller: ArchonController): @pytest.mark.commands([["PING", [b"<{cid}:12345"]]]) async def test_controller_binary_reply(controller: ArchonController): + controller.set_binary_reply_size(1024) + command = controller.send_command("ping") await command + assert command.status == command.status.DONE assert len(command.replies) == 1 assert len(command.replies[0].reply) == 1024 assert command.replies[0].reply.strip() == b"12345" + with pytest.raises(ArchonError): str(command.replies[0]) From b7c86dd4d17157fc3d0adae0e1819f5b32205d42 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20S=C3=A1nchez-Gallego?= Date: Fri, 29 Aug 2025 23:25:22 +0000 Subject: [PATCH 2/5] Fix setting ArchonCommandReply.reply when binary --- archon/controller/command.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/archon/controller/command.py b/archon/controller/command.py index 1a59a3a..96ecfeb 100644 --- a/archon/controller/command.py +++ b/archon/controller/command.py @@ -225,7 +225,7 @@ def __init__(self, raw_reply: bytes, command: ArchonCommand): if self.is_binary: # If the reply is binary we have already removed all the headers except # the one for the first block. - self.reply = rmessage + self.reply = raw_reply[4:] else: if rmessage.endswith(b"\n"): rmessage = rmessage[:-1] From 84fdb17183ffb385d05e14a42dfe4dcb01028fab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20S=C3=A1nchez-Gallego?= Date: Fri, 29 Aug 2025 23:25:46 +0000 Subject: [PATCH 3/5] Remove retry in case of mismatching buffer size --- archon/controller/controller.py | 35 ++++----------------------------- 1 file changed, 4 insertions(+), 31 deletions(-) diff --git a/archon/controller/controller.py b/archon/controller/controller.py index 5618649..8eeca0b 100644 --- a/archon/controller/controller.py +++ b/archon/controller/controller.py @@ -1176,7 +1176,6 @@ async def fetch( self, buffer_no: int = -1, notifier: Optional[Callable[[str], None]] = None, - is_retry: bool = False, *, return_buffer: Literal[False], ) -> numpy.ndarray: ... @@ -1186,7 +1185,6 @@ async def fetch( self, buffer_no: int = -1, notifier: Optional[Callable[[str], None]] = None, - is_retry: bool = False, *, return_buffer: Literal[True], ) -> tuple[numpy.ndarray, int]: ... @@ -1196,7 +1194,7 @@ async def fetch( self, buffer_no: int = -1, notifier: Optional[Callable[[str], None]] = None, - is_retry: bool = False, + *, return_buffer: bool = False, ) -> numpy.ndarray: ... @@ -1204,7 +1202,6 @@ async def fetch( self, buffer_no: int = -1, notifier: Optional[Callable[[str], None]] = None, - is_retry: bool = False, return_buffer: bool = False, ): """Fetches a frame buffer and returns a Numpy array. @@ -1219,11 +1216,6 @@ async def fetch( `.fetch` is called by the actor to report progress to the users. return_buffer If `True`, returns the buffer number returned. - is_retry - Internal keyword to handle retries. If the buffer fetch does not match - the expected size the code will automatically retry fetching the buffer - once. If that also fails it will pad the buffer with zeros to the expected - size. Returns ------- @@ -1299,29 +1291,10 @@ async def fetch( # now if the buffer size does not match what we expect, just pad with zeros. expected_size = height * width if expected_size != arr.size: - if is_retry is False: - notifier("Buffer data size does not match expected size. Retrying.") - return await self.fetch( - buffer_no=buffer_no, - notifier=notifier, - return_buffer=return_buffer, - is_retry=True, - ) - - message = ( + raise ArchonControllerError( "Buffer data size does not match expected size. " - f"Buffer size is {arr.size}; expected size is {expected_size}. " - "Padding with zeros." + f"Buffer size is {arr.size}; expected size is {expected_size}." ) - notifier(message) - warnings.warn(message, ArchonUserWarning) - - arr0 = arr.copy() - arr = numpy.zeros(expected_size, dtype=arr.dtype) - arr[: arr0.size] = arr0 - - else: - notifier(f"Buffer size is {arr.size}; expected size is {expected_size}.") arr = arr.reshape(height, width) @@ -1381,7 +1354,7 @@ async def _listen(self): # reply is going to arrive in the middle of it. I think that's unlikely, # and probably prevented by the controller, but it's worth keeping in # mind. - # + self._binary_reply[n_binary : n_binary + 1024] = data n_binary += 1024 # How many bytes of the binary reply have we read. if n_binary == len(self._binary_reply): From 44ac445a6a382836390602da20a70dd451bda1db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20S=C3=A1nchez-Gallego?= Date: Fri, 29 Aug 2025 23:27:41 +0000 Subject: [PATCH 4/5] Await call to exposure delegate reset coroutine --- archon/actor/commands/reset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/archon/actor/commands/reset.py b/archon/actor/commands/reset.py index cf96f36..9374dcf 100644 --- a/archon/actor/commands/reset.py +++ b/archon/actor/commands/reset.py @@ -30,6 +30,6 @@ async def reset(command: Command, controller: ArchonController): except (ArchonControllerError, ArchonError) as err: return error_controller(command, controller, f"Failed resetting: {err}") finally: - command.actor.exposure_delegate.reset() + await command.actor.exposure_delegate.reset() return True From 6d347a0b562b562e9cc8c2dd7721c0a2e12dbd7a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20S=C3=A1nchez-Gallego?= Date: Fri, 29 Aug 2025 21:42:09 -0700 Subject: [PATCH 5/5] Update changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 168173e..dd41a06 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,10 +5,12 @@ ### ✨ Improved * Update `sdss-clu` to 2.5.3. +* Remove retries when fetching the buffer is the shape does not match. ### 🔧 Fixed * Fix pixel shifts in fetched buffers. The shifts were caused by a search-and-replace of the header values in the binary reply, which could remove additional data values when the read pixels matched the header values. The fix removes the header as each binary reply is processed. +* `ExposureDelegate.reset()` not being called with `await` in `reset` command. ## 0.15.4 - August 10, 2025