Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 14 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,17 @@

This project uses [uv](https://docs.astral.sh/uv/) to manage its dependencies. If you're using Nix, there's also a `flake.nix` to get you started (`nix develop .#uv2nix` works to give you a dev environment).

Code formatting is done with [ruff](https://docs.astral.sh/ruff/), just use `ruff format src`.
Code formatting and checking is done with [ruff](https://docs.astral.sh/ruff/), just use `ruff format src`.

## Running mr-t – normal mode
## Running mr-t — attached to a Simplon stream

If you have uv installed (see above) running the main program should be as easy as:

```
uv run mr_t --eiger-zmq-host-and-port $host --udp-host localhost --udp-port 9000
uv run mr_t_server --eiger-zmq-host-and-port $host --udp-host localhost --udp-port 9000
```

Which will receive images from the Dectris detector `$host:9999` and also listen for UDP messages on `localhost:9000`.
Which will receive images from the Dectris detector `$host:9999` and also listen for UDP messages on `localhost:9000`. Instead of using an actual Detector, you can also use one of the [Simplon](https://github.com/pmiddend/simplon-stub) API [mocks](https://github.com/AustralianSynchrotron/ansto-simplon-api).

You can also just use plain Python, of course:

Expand All @@ -29,12 +29,12 @@ Note that you have to install the dependencies mentioned in `pyproject.toml` bef

There is a configurable `--frame-cache-limit` which, if you set it, will limit the number of frames held in memory to be no higher than this number. Meaning, the ZeroMQ messages will be held until the receiver picks them up.

## Running mr-t — simulation
## Running mr-t — feed from an HDF5 file

If you already have a finished image series stored in an HDF5 file, you can tell mr-t to read images from this file, instead of waiting for images via ZMQ. A sample command line looks like this:

```
uv run mr_t --input-h5-file ~/178_data-00000.nx5 --frame-cache-limit 5 --udp-host localhost --udp-port 9000
uv run mr_t_server --input-h5-file $myhdf5file --frame-cache-limit 5 --udp-host localhost --udp-port 9000
```

Note that in addition to `--input-h5-file` we are passing `--frame-cache-limit 5`. This will read at most 5 frames from the HDF5 file and wait until the other side (the FPGA) has actually pulled images from this cache. If you don't do this, and the receiving end is too slow, you will eat up a lot of RAM with all the cached images.
Expand Down Expand Up @@ -73,9 +73,11 @@ Given a *UDP packet request* for a frame `frameno`, there are a few consideratio

### ZeroMQ messages

A *ZmqHeader* message has to contain a `config` dictionary, which should be there if the `header_detail` for the stream subsystem of the Dectris detector is set to `all` or `basic`. We need the config because it gives us the `nimages` and `ntrigger` values, determining how many frames we have in each series. We then will the `CurrentSeries` structure with a new series ID (monotonically increasing the last one, starting at 0) and mostly zero values. We also store the new series ID value in `last_series_id`, so that the counter can increase next time.
A *ZmqHeader* message has to contain a `config` dictionary, which should be there if the `header_detail` for the stream subsystem of the Dectris detector is set to `all` or `basic` (the default). We need the config because it gives us the `nimages` and `ntrigger` values, determining how many frames we have in each series. We then fill the `CurrentSeries` structure with a new series ID (monotonically increasing the last one, starting at 0) and mostly zero values. We also store the new series ID value in `last_series_id`, so that the counter can increase next time.

A *ZmqImage* message only contains a `memoryview` with the whole frame's data (there is a per-image `config` that you can set, too, but we don't use it). The frame's ID we generate ourselves by taking the last frame's number in the `saved_frames` dictionary and increasing by 1 (or taking 0 if we don't have any frames yet). We also remember the ID as the last complete frame (which is used for premature end of series).
If the *ZmqHeader* contains a "header appendix" — which you can set via a Simplon API call — then this will be taken, verbatim, as the "series name". This is so the experimenter can name the current dataset with a human-readable name and not just a numeric ID. If the appendix is not given, this series name will be `series$id` with the detector-provided series ID.

A *ZmqImage* message contains a `memoryview` with the whole frame's data (there is a per-image `config` that you can set, too, but we don't use it). The frame's ID we generate ourselves by taking the last frame's number in the `saved_frames` dictionary and increasing by 1 (or taking 0 if we don't have any frames yet). We also remember the ID as the last complete frame (which is used for premature end of series). The message also contains the image's bit depth, size and encoding (for example, if it was compressed). We want to transport that information to the receiver, so we wait for the first frame and store the frame's information for later sending.

A *ZmqSeriesEnd* simply sets the current series' `ended` boolean to `True`.

Expand All @@ -90,7 +92,10 @@ There are _four_ types of UDP messages that are sent back and forth between the
- **Ping** (message type 0): has no content (so it's just 1 byte long), is sent from the client to the server. Will be answered by a Pong (see below)
- **Pong** (message type 1)
1. _series ID_ (32 bit unsigned integer) of the image series currently going on, or 0 if there is no image series
2. _frame count_ (32 bit unsigned integer) of the current series (or 0 if there is no image series)
2. _bit depth_ (8 bit unsigned integer) of the images in the series
3. _frame count_ (32 bit unsigned integer) of the current series (or 0 if there is no image series)
4. _length of series name_ (16 bit unsigned integer)
5. _series name_ (raw bytes, latin1 encoded, not zero terminated)
- **Packet request** (message type 2)
1. _frame number_ (32 bit unsigned integer, starting at zero) the frame number to get bytes from
2. _start byte_ (32 bit unsigned integer, starting at zero) the start byte inside the requested frame
Expand Down
20 changes: 20 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -102,5 +102,25 @@ pythonVersion = "3.13"
[dependency-groups]
dev = [
"ruff>=0.14.8",
"ty>=0.0.2",
]

[tool.ruff]
target-version = "py313"

[tool.ruff.lint.isort]
force-single-line = true

[tool.ruff.lint]
# to turn this:
#
# from X import y,z
#
# into
#
# from X import y
# from X import z
select = ["I001", "F", "E", "W", "I", "N", "UP", "YTT", "ANN", "ASYNC", "S", "FBT", "B", "A", "C4", "T10", "ICN", "LOG", "G", "PIE", "T20", "PYI", "Q", "RET", "SLF", "SLOT", "SIM", "TID", "TC", "INT", "ARG", "PTH", "TD", "FIX", "PD", "PGH", "PL", "R", "W", "FLY", "NPY", "FAST", "AIR", "FURB", "RUF"]
ignore = ["E501", "E722", "UP035", "ANN401", "S101", "S310", "FBT001", "B017", "B008", "B904", "C405", "C401", "C400", "G004", "G003", "PYI041", "SIM105", "TC001", "PGH003", "PGH004", "PLR2004", "PLR0913", "PLR0915", "PLR0912", "PLR0911", "RUF001", "LOG015"]


14 changes: 12 additions & 2 deletions rusty-t/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::fs::File;
use std::io::stdout;
use std::io::Cursor;
use std::io::Error;
use std::io::ErrorKind;
Expand Down Expand Up @@ -41,6 +40,8 @@ enum LoopState {
struct PongPayload {
series_id: u32,
frame_count: u32,
bits_per_pixel: u8,
series_name: String
}

enum UdpRequest {
Expand Down Expand Up @@ -96,12 +97,21 @@ fn decode_response(bytes: Vec<u8>) -> Result<UdpResponse, std::io::Error> {
});
}

let bits_per_pixel = reader.read_u8()?;

let frame_count = reader.read_u32::<BigEndian>()?;

let _name_length = reader.read_u16::<BigEndian>()?;

let mut series_name: Vec<u8> = vec![];
reader.read_to_end(&mut series_name)?;

return Ok(UdpResponse::UdpPong {
pong_payload: Option::Some(PongPayload {
series_id,
frame_count,
bits_per_pixel,
series_name: String::from_utf8_lossy(&series_name[..]).to_string(),
}),
});
}
Expand Down Expand Up @@ -197,7 +207,7 @@ fn main() {
info!("no series: same as last series, waiting");
sleep(Duration::from_secs(2))
} else {
info!("no series: new series, switching state, waiting");
info!("no series: new series (bit depth {0}, name {1}), switching state, waiting", payload.bits_per_pixel, payload.series_name);
state = LoopState::InSeries {
series_id: payload.series_id,
frame_count: payload.frame_count,
Expand Down
23 changes: 14 additions & 9 deletions src/mr_t/eiger_stream1.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import asyncio
import json
from dataclasses import dataclass
from typing import Any, AsyncIterator, Callable, TypeAlias
from typing import Any
from typing import AsyncIterator
from typing import Callable

import structlog
import zmq
Expand All @@ -12,7 +14,7 @@ def get_zmq_header(msg: list[zmq.Frame]) -> dict[str, Any]:
return json.loads(msg[0].bytes.decode())


ZmqAppendix: TypeAlias = str | dict[str, Any]
type ZmqAppendix = str | dict[str, Any]


@dataclass(frozen=True)
Expand All @@ -30,9 +32,11 @@ class ZmqSeriesEnd:
@dataclass(frozen=True)
class ZmqImage:
data: memoryview
data_type: str
compression: str


ZmqMessage: TypeAlias = ZmqHeader | ZmqSeriesEnd | ZmqImage
type ZmqMessage = ZmqHeader | ZmqSeriesEnd | ZmqImage


def decode_zmq_appendix(appendix: bytes) -> ZmqAppendix:
Expand All @@ -53,7 +57,7 @@ def decode_zmq_message(parts: list[zmq.Frame]) -> ZmqMessage:
htype = header["htype"]

if htype == "dimage-1.0":
# meta = json.loads(parts[1].bytes.decode())
meta = json.loads(parts[1].bytes.decode())
# shape = tuple(meta["shape"][::-1]) # Eiger shape order is reversed
# dtype = meta["type"]
# size = meta["size"]
Expand All @@ -71,7 +75,7 @@ def decode_zmq_message(parts: list[zmq.Frame]) -> ZmqMessage:
"Unexpected number of parts in image message: %s", len(parts)
)

return ZmqImage(data)
return ZmqImage(data, meta["type"], meta["encoding"])

if htype == "dheader-1.0":
detail = header["header_detail"]
Expand All @@ -84,7 +88,7 @@ def decode_zmq_message(parts: list[zmq.Frame]) -> ZmqMessage:
has_appendix = True
else:
raise ValueError(
'Unexpected number of parts for "none" detail: {}'.format(n_parts)
f'Unexpected number of parts for "none" detail: {n_parts}'
)
elif detail == "basic":
if n_parts == 2:
Expand All @@ -93,7 +97,7 @@ def decode_zmq_message(parts: list[zmq.Frame]) -> ZmqMessage:
has_appendix = True
else:
raise ValueError(
'Unexpected number of parts for "basic" detail: {}'.format(n_parts)
f'Unexpected number of parts for "basic" detail: {n_parts}'
)
elif detail == "all":
if n_parts == 8:
Expand All @@ -102,7 +106,7 @@ def decode_zmq_message(parts: list[zmq.Frame]) -> ZmqMessage:
has_appendix = True
else:
raise ValueError(
'Unexpected number of parts for "all" detail: {}'.format(n_parts)
f'Unexpected number of parts for "all" detail: {n_parts}'
)

config = (
Expand All @@ -115,12 +119,13 @@ def decode_zmq_message(parts: list[zmq.Frame]) -> ZmqMessage:

if htype == "dseries_end-1.0":
return ZmqSeriesEnd()
raise ValueError("Unsupported htype: '{}'".format(htype))
raise ValueError(f"Unsupported htype: '{htype}'")


async def receive_zmq_messages(
zmq_target: str, log: structlog.BoundLogger, cache_full: Callable[[], bool]
) -> AsyncIterator[ZmqMessage]:
# Somehow doesn't type-check, but it's a library issue
zmq_context = zmq.asyncio.Context() # type: ignore

zmq_socket = zmq_context.socket(zmq.PULL)
Expand Down
45 changes: 41 additions & 4 deletions src/mr_t/h5.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,46 @@
import asyncio
from pathlib import Path
from typing import AsyncIterator
from typing import Callable

import h5py
from typing import AsyncIterator, Callable
import structlog

from mr_t.eiger_stream1 import ZmqHeader, ZmqImage, ZmqMessage, ZmqSeriesEnd
from mr_t.eiger_stream1 import ZmqHeader
from mr_t.eiger_stream1 import ZmqImage
from mr_t.eiger_stream1 import ZmqMessage
from mr_t.eiger_stream1 import ZmqSeriesEnd

parent_log = structlog.get_logger()


def _dataset_to_compression(ds: h5py.Dataset) -> str:
pl = ds.id.get_create_plist()
n_filters = pl.get_nfilters()
assert isinstance(n_filters, int)
if n_filters == 0:
return ""
has_lz4 = False
has_bs = False
for i in range(n_filters):
filter_id, _flags, _cd_vals, _name = pl.get_filter(i)

# https://github.com/DiamondLightSource/hdf5filters/blob/master/h5lzfilter/H5Zlz4.c#L22
if filter_id == 32004:
has_lz4 = True
elif filter_id == 32008:
has_bs = True
return (
"bs8-lz4<"
if has_bs and has_lz4
else "bs8<"
if has_bs
else "lz8<"
if has_lz4
else ""
)


async def receive_h5_messages(
input_file: Path, log: structlog.BoundLogger, cache_full: Callable[[], bool]
) -> AsyncIterator[ZmqMessage]:
Expand All @@ -21,9 +53,14 @@ async def receive_h5_messages(
yield ZmqHeader(
series_id="1", config={"nimages": frame_count, "ntrigger": 1}, appendix=None
)
for frame_number in range(0, frame_count):
for frame_number in range(frame_count):
if cache_full():
await asyncio.sleep(0.5)
continue
yield ZmqImage(dataset[frame_number][:].tobytes()) # type: ignore
yield ZmqImage(
dataset[frame_number][:].tobytes(), # type: ignore
# Should be something like "uint32"
data_type=dataset.dtype.name, # type: ignore
compression=_dataset_to_compression(dataset),
)
yield ZmqSeriesEnd()
Loading