diff --git a/CHANGELOG.md b/CHANGELOG.md index c74758f..dd41a06 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,12 @@ ### ✨ Improved * Update `sdss-clu` to 2.5.3. +* Remove retries when fetching the buffer is the shape does not match. + +### 🔧 Fixed + +* Fix pixel shifts in fetched buffers. The shifts were caused by a search-and-replace of the header values in the binary reply, which could remove additional data values when the read pixels matched the header values. The fix removes the header as each binary reply is processed. +* `ExposureDelegate.reset()` not being called with `await` in `reset` command. ## 0.15.4 - August 10, 2025 diff --git a/archon/actor/commands/reset.py b/archon/actor/commands/reset.py index cf96f36..9374dcf 100644 --- a/archon/actor/commands/reset.py +++ b/archon/actor/commands/reset.py @@ -30,6 +30,6 @@ async def reset(command: Command, controller: ArchonController): except (ArchonControllerError, ArchonError) as err: return error_controller(command, controller, f"Failed resetting: {err}") finally: - command.actor.exposure_delegate.reset() + await command.actor.exposure_delegate.reset() return True diff --git a/archon/controller/command.py b/archon/controller/command.py index 4bd2bb6..96ecfeb 100644 --- a/archon/controller/command.py +++ b/archon/controller/command.py @@ -219,15 +219,17 @@ def __init__(self, raw_reply: bytes, command: ArchonCommand): rtype, rcid, rbin, rmessage = parsed.groups() self.type: str = rtype.decode() self.command_id: int = int(rcid, 16) - self.is_binary: bool = rbin.decode() == ":" + self.is_binary: bool = rbin.decode("latin-1") == ":" self.reply: str | bytes if self.is_binary: - # If the reply is binary, remove the prefixes and save the full - # content as the reply. - self.reply = raw_reply.replace(b"<" + rcid + b":", b"") + # If the reply is binary we have already removed all the headers except + # the one for the first block. + self.reply = raw_reply[4:] else: - self.reply = rmessage.decode().strip() + if rmessage.endswith(b"\n"): + rmessage = rmessage[:-1] + self.reply = rmessage.decode("latin-1") def __str__(self) -> str: if isinstance(self.reply, bytes): diff --git a/archon/controller/controller.py b/archon/controller/controller.py index e4b90c6..8eeca0b 100644 --- a/archon/controller/controller.py +++ b/archon/controller/controller.py @@ -322,26 +322,26 @@ async def send_and_wait( else: warnings.warn(f"Failed running {command_string}.", ArchonUserWarning) - async def process_message(self, line: bytes) -> None: # type: ignore + async def process_message(self, reply: bytes) -> None: # type: ignore """Processes a message from the Archon and associates it with its command.""" - match = re.match(b"^[<|?]([0-9A-F]{2})", line) - if match is None: + header_match = re.match(b"^[<|?]([0-9A-F]{2})", reply) + if header_match is None: warnings.warn( - f"Received invalid reply {line.decode()}", + f"Received invalid reply {reply.decode()}", ArchonControllerWarning, ) return - command_id = int(match[1], 16) + command_id = int(header_match[1], 16) if command_id not in self.__running_commands: warnings.warn( - f"Cannot find running command for {line}", + f"Cannot find running command for {reply}", ArchonControllerWarning, ) return - self.__running_commands[command_id].process_reply(line) + self.__running_commands[command_id].process_reply(reply) async def stop(self): """Stops the client and cancels the command tracker.""" @@ -1176,7 +1176,6 @@ async def fetch( self, buffer_no: int = -1, notifier: Optional[Callable[[str], None]] = None, - is_retry: bool = False, *, return_buffer: Literal[False], ) -> numpy.ndarray: ... @@ -1186,7 +1185,6 @@ async def fetch( self, buffer_no: int = -1, notifier: Optional[Callable[[str], None]] = None, - is_retry: bool = False, *, return_buffer: Literal[True], ) -> tuple[numpy.ndarray, int]: ... @@ -1196,7 +1194,7 @@ async def fetch( self, buffer_no: int = -1, notifier: Optional[Callable[[str], None]] = None, - is_retry: bool = False, + *, return_buffer: bool = False, ) -> numpy.ndarray: ... @@ -1204,7 +1202,6 @@ async def fetch( self, buffer_no: int = -1, notifier: Optional[Callable[[str], None]] = None, - is_retry: bool = False, return_buffer: bool = False, ): """Fetches a frame buffer and returns a Numpy array. @@ -1219,11 +1216,6 @@ async def fetch( `.fetch` is called by the actor to report progress to the users. return_buffer If `True`, returns the buffer number returned. - is_retry - Internal keyword to handle retries. If the buffer fetch does not match - the expected size the code will automatically retry fetching the buffer - once. If that also fails it will pad the buffer with zeros to the expected - size. Returns ------- @@ -1276,9 +1268,9 @@ async def fetch( notifier("Reading frame buffer ...") # Set the expected length of binary buffer to read, including the prefixes. - self.set_binary_reply_size((1024 + 4) * n_blocks) + self.set_binary_reply_size(1024 * n_blocks) - cmd: ArchonCommand = await self.send_command( + cmd = await self.send_command( f"FETCH{start_address:08X}{n_blocks:08X}", timeout=None, ) @@ -1299,29 +1291,10 @@ async def fetch( # now if the buffer size does not match what we expect, just pad with zeros. expected_size = height * width if expected_size != arr.size: - if is_retry is False: - notifier("Buffer data size does not match expected size. Retrying.") - return await self.fetch( - buffer_no=buffer_no, - notifier=notifier, - return_buffer=return_buffer, - is_retry=True, - ) - - message = ( + raise ArchonControllerError( "Buffer data size does not match expected size. " - f"Buffer size is {arr.size}; expected size is {expected_size}. " - "Padding with zeros." + f"Buffer size is {arr.size}; expected size is {expected_size}." ) - notifier(message) - warnings.warn(message, ArchonUserWarning) - - arr0 = arr.copy() - arr = numpy.zeros(expected_size, dtype=arr.dtype) - arr[: arr0.size] = arr0 - - else: - notifier(f"Buffer size is {arr.size}; expected size is {expected_size}.") arr = arr.reshape(height, width) @@ -1333,10 +1306,10 @@ async def fetch( return arr - def set_binary_reply_size(self, size: int): + def set_binary_reply_size(self, size: int | None): """Sets the size of the binary buffers.""" - self._binary_reply = bytearray(size) + self._binary_reply = bytearray(size) if size else None async def _listen(self): """Listens to the reader stream and callbacks on message received.""" @@ -1346,7 +1319,9 @@ async def _listen(self): assert self._client and self._client.reader + # Number of binary bytes received. n_binary = 0 + while True: # Max length of a reply is 1024 bytes for the message preceded by int: """Returns an identifier from the pool.""" if len(self._id_pool) == 0: - raise ArchonControllerError("No ids reamining in the pool!") + raise ArchonControllerError("No ids remaining in the pool!") return self._id_pool.pop() diff --git a/tests/conftest.py b/tests/conftest.py index 1180def..0c1fa81 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -77,7 +77,8 @@ async def handle_connection( reply = (reply.format(cid=cid) + "\n").encode() else: reply = reply.replace(b"{cid}", cid.encode()).ljust( - 1028, b" " + 1028, + b" ", ) writer.write(reply) await writer.drain() diff --git a/tests/controller/test_controller.py b/tests/controller/test_controller.py index 157077d..ce09ff6 100644 --- a/tests/controller/test_controller.py +++ b/tests/controller/test_controller.py @@ -31,12 +31,16 @@ async def test_controller(controller: ArchonController): @pytest.mark.commands([["PING", [b"<{cid}:12345"]]]) async def test_controller_binary_reply(controller: ArchonController): + controller.set_binary_reply_size(1024) + command = controller.send_command("ping") await command + assert command.status == command.status.DONE assert len(command.replies) == 1 assert len(command.replies[0].reply) == 1024 assert command.replies[0].reply.strip() == b"12345" + with pytest.raises(ArchonError): str(command.replies[0])