Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@
### ✨ Improved

* Update `sdss-clu` to 2.5.3.
* Remove retries when fetching the buffer is the shape does not match.

### 🔧 Fixed

* Fix pixel shifts in fetched buffers. The shifts were caused by a search-and-replace of the header values in the binary reply, which could remove additional data values when the read pixels matched the header values. The fix removes the header as each binary reply is processed.
* `ExposureDelegate.reset()` not being called with `await` in `reset` command.


## 0.15.4 - August 10, 2025
Expand Down
2 changes: 1 addition & 1 deletion archon/actor/commands/reset.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,6 @@ async def reset(command: Command, controller: ArchonController):
except (ArchonControllerError, ArchonError) as err:
return error_controller(command, controller, f"Failed resetting: {err}")
finally:
command.actor.exposure_delegate.reset()
await command.actor.exposure_delegate.reset()

return True
12 changes: 7 additions & 5 deletions archon/controller/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,15 +219,17 @@ def __init__(self, raw_reply: bytes, command: ArchonCommand):
rtype, rcid, rbin, rmessage = parsed.groups()
self.type: str = rtype.decode()
self.command_id: int = int(rcid, 16)
self.is_binary: bool = rbin.decode() == ":"
self.is_binary: bool = rbin.decode("latin-1") == ":"

self.reply: str | bytes
if self.is_binary:
# If the reply is binary, remove the prefixes and save the full
# content as the reply.
self.reply = raw_reply.replace(b"<" + rcid + b":", b"")
# If the reply is binary we have already removed all the headers except
# the one for the first block.
self.reply = raw_reply[4:]
else:
self.reply = rmessage.decode().strip()
if rmessage.endswith(b"\n"):
rmessage = rmessage[:-1]
self.reply = rmessage.decode("latin-1")

def __str__(self) -> str:
if isinstance(self.reply, bytes):
Expand Down
102 changes: 39 additions & 63 deletions archon/controller/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,26 +322,26 @@ async def send_and_wait(
else:
warnings.warn(f"Failed running {command_string}.", ArchonUserWarning)

async def process_message(self, line: bytes) -> None: # type: ignore
async def process_message(self, reply: bytes) -> None: # type: ignore
"""Processes a message from the Archon and associates it with its command."""

match = re.match(b"^[<|?]([0-9A-F]{2})", line)
if match is None:
header_match = re.match(b"^[<|?]([0-9A-F]{2})", reply)
if header_match is None:
warnings.warn(
f"Received invalid reply {line.decode()}",
f"Received invalid reply {reply.decode()}",
ArchonControllerWarning,
)
return

command_id = int(match[1], 16)
command_id = int(header_match[1], 16)
if command_id not in self.__running_commands:
warnings.warn(
f"Cannot find running command for {line}",
f"Cannot find running command for {reply}",
ArchonControllerWarning,
)
return

self.__running_commands[command_id].process_reply(line)
self.__running_commands[command_id].process_reply(reply)

async def stop(self):
"""Stops the client and cancels the command tracker."""
Expand Down Expand Up @@ -1176,7 +1176,6 @@ async def fetch(
self,
buffer_no: int = -1,
notifier: Optional[Callable[[str], None]] = None,
is_retry: bool = False,
*,
return_buffer: Literal[False],
) -> numpy.ndarray: ...
Expand All @@ -1186,7 +1185,6 @@ async def fetch(
self,
buffer_no: int = -1,
notifier: Optional[Callable[[str], None]] = None,
is_retry: bool = False,
*,
return_buffer: Literal[True],
) -> tuple[numpy.ndarray, int]: ...
Expand All @@ -1196,15 +1194,14 @@ async def fetch(
self,
buffer_no: int = -1,
notifier: Optional[Callable[[str], None]] = None,
is_retry: bool = False,
*,
return_buffer: bool = False,
) -> numpy.ndarray: ...

async def fetch(
self,
buffer_no: int = -1,
notifier: Optional[Callable[[str], None]] = None,
is_retry: bool = False,
return_buffer: bool = False,
):
"""Fetches a frame buffer and returns a Numpy array.
Expand All @@ -1219,11 +1216,6 @@ async def fetch(
`.fetch` is called by the actor to report progress to the users.
return_buffer
If `True`, returns the buffer number returned.
is_retry
Internal keyword to handle retries. If the buffer fetch does not match
the expected size the code will automatically retry fetching the buffer
once. If that also fails it will pad the buffer with zeros to the expected
size.

Returns
-------
Expand Down Expand Up @@ -1276,9 +1268,9 @@ async def fetch(
notifier("Reading frame buffer ...")

# Set the expected length of binary buffer to read, including the prefixes.
self.set_binary_reply_size((1024 + 4) * n_blocks)
self.set_binary_reply_size(1024 * n_blocks)

cmd: ArchonCommand = await self.send_command(
cmd = await self.send_command(
f"FETCH{start_address:08X}{n_blocks:08X}",
timeout=None,
)
Expand All @@ -1299,29 +1291,10 @@ async def fetch(
# now if the buffer size does not match what we expect, just pad with zeros.
expected_size = height * width
if expected_size != arr.size:
if is_retry is False:
notifier("Buffer data size does not match expected size. Retrying.")
return await self.fetch(
buffer_no=buffer_no,
notifier=notifier,
return_buffer=return_buffer,
is_retry=True,
)

message = (
raise ArchonControllerError(
"Buffer data size does not match expected size. "
f"Buffer size is {arr.size}; expected size is {expected_size}. "
"Padding with zeros."
f"Buffer size is {arr.size}; expected size is {expected_size}."
)
notifier(message)
warnings.warn(message, ArchonUserWarning)

arr0 = arr.copy()
arr = numpy.zeros(expected_size, dtype=arr.dtype)
arr[: arr0.size] = arr0

else:
notifier(f"Buffer size is {arr.size}; expected size is {expected_size}.")

arr = arr.reshape(height, width)

Expand All @@ -1333,10 +1306,10 @@ async def fetch(

return arr

def set_binary_reply_size(self, size: int):
def set_binary_reply_size(self, size: int | None):
"""Sets the size of the binary buffers."""

self._binary_reply = bytearray(size)
self._binary_reply = bytearray(size) if size else None

async def _listen(self):
"""Listens to the reader stream and callbacks on message received."""
Expand All @@ -1346,24 +1319,27 @@ async def _listen(self):

assert self._client and self._client.reader

# Number of binary bytes received.
n_binary = 0

while True:
# Max length of a reply is 1024 bytes for the message preceded by <xx:
# We read the first four characters (the maximum length of a complete
# message: ?xx\n or <xx\n). If the message ends in a newline, we are done;
# if the message ends with ":", it means what follows are 1024 binary
# characters without a newline; otherwise, read until the newline which
# marks the end of this message. In binary, if the response is < 1024
# bytes, the remaining bytes are filled with NULL (0x00).
# bytes, the remaining bytes are filled with 0xFF.
try:
line = await self._client.reader.readexactly(4)
header = await self._client.reader.readexactly(4)
except asyncio.IncompleteReadError:
return

if line[-1] == ord(b"\n"):
pass
elif line[-1] == ord(b":"):
line += await self._client.reader.readexactly(1024)
if header[-1] == ord(b"\n"):
reply = header
elif header[-1] == ord(b":") and self._binary_reply is not None:
data = await self._client.reader.readexactly(1024)

# If we know the length of the binary reply to expect, we set that
# slice of the bytearray and continue. We wait until all the buffer
# has been read before sending the notification. This is significantly
Expand All @@ -1378,29 +1354,29 @@ async def _listen(self):
# reply is going to arrive in the middle of it. I think that's unlikely,
# and probably prevented by the controller, but it's worth keeping in
# mind.
#
if self._binary_reply:
self._binary_reply[n_binary : n_binary + 1028] = line
n_binary += 1028 # How many bytes of the binary reply have we read.
if n_binary == len(self._binary_reply):
# This was the last chunk. Set line to the full reply and
# reset the binary reply and counter.
line = self._binary_reply
self._binary_reply = None
n_binary = 0
else:
# Skip notifying because the binary reply is still incomplete.
continue

self._binary_reply[n_binary : n_binary + 1024] = data
n_binary += 1024 # How many bytes of the binary reply have we read.
if n_binary == len(self._binary_reply):
# This was the last chunk. Set reply to the full reply
# (include one header) and reset the binary reply and counter.
reply = header + bytes(self._binary_reply)
self.set_binary_reply_size(None)
n_binary = 0
else:
# Skip notifying because the binary reply is still incomplete.
continue
else:
line += await self._client.reader.readuntil(b"\n")
data = await self._client.reader.readuntil(b"\n")
reply = header + data[:-1]

self.notify(line)
self.notify(reply)

def _get_id(self) -> int:
"""Returns an identifier from the pool."""

if len(self._id_pool) == 0:
raise ArchonControllerError("No ids reamining in the pool!")
raise ArchonControllerError("No ids remaining in the pool!")

return self._id_pool.pop()

Expand Down
3 changes: 2 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ async def handle_connection(
reply = (reply.format(cid=cid) + "\n").encode()
else:
reply = reply.replace(b"{cid}", cid.encode()).ljust(
1028, b" "
1028,
b" ",
)
writer.write(reply)
await writer.drain()
Expand Down
4 changes: 4 additions & 0 deletions tests/controller/test_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,16 @@ async def test_controller(controller: ArchonController):

@pytest.mark.commands([["PING", [b"<{cid}:12345"]]])
async def test_controller_binary_reply(controller: ArchonController):
controller.set_binary_reply_size(1024)

command = controller.send_command("ping")
await command

assert command.status == command.status.DONE
assert len(command.replies) == 1
assert len(command.replies[0].reply) == 1024
assert command.replies[0].reply.strip() == b"12345"

with pytest.raises(ArchonError):
str(command.replies[0])

Expand Down