Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ This project uses [uv](https://docs.astral.sh/uv/) to manage its dependencies. I

Code formatting is done with [ruff](https://docs.astral.sh/ruff/), just use `ruff format src`.

## Running mr-t
## Running mr-t – normal mode

If you have uv installed (see above) running the main program should be as easy as:

Expand All @@ -29,6 +29,18 @@ Note that you have to install the dependencies mentioned in `pyproject.toml` bef

There is a configurable `--frame-cache-limit` which, if you set it, will limit the number of frames held in memory to be no higher than this number. Meaning, the ZeroMQ messages will be held until the receiver picks them up.

## Running mr-t — simulation

If you already have a finished image series stored in an HDF5 file, you can tell mr-t to read images from this file, instead of waiting for images via ZMQ. A sample command line looks like this:

```
uv run mr_t --input-h5-file ~/178_data-00000.nx5 --frame-cache-limit 5 --udp-host localhost --udp-port 9000
```

Note that in addition to `--input-h5-file` we are passing `--frame-cache-limit 5`. This will read at most 5 frames from the HDF5 file and wait until the other side (the FPGA) has actually pulled images from this cache. If you don't do this, and the receiving end is too slow, you will eat up a lot of RAM with all the cached images.

Currently only one HDF5 file is supported to be read, since the Dectris detectors write a `master` file which you can specify if you want to feed a whole image series.

## How it works

### Main loop
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ requires-python = ">=3.12"
dependencies = [
"asyncudp==0.11.0",
"culsans==0.7.1",
"h5py==3.14.0",
"pyzmq==26.2.0",
"structlog==24.4.0",
"typed-argument-parser==1.10.1",
Expand Down
37 changes: 22 additions & 15 deletions src/mr_t/h5.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,29 @@
import culsans
import asyncio
from pathlib import Path
import h5py
from typing import AsyncIterator, Callable
import structlog

from mr_t.eiger_stream1 import ZmqMessage
from mr_t.eiger_stream1 import ZmqHeader, ZmqImage, ZmqMessage, ZmqSeriesEnd

parent_log = structlog.get_logger()


def h5_writer_process(queue: culsans.SyncQueue[int]) -> None:
parent_log.info("in writer process, starting main loop")
while True:
element = queue.get() # noqa: F841
parent_log.info(f"got new element, queue size now: {queue.qsize()}")
parent_log.info("sleeping a bit")
# time.sleep(5)
queue.join()
async def receive_h5_messages(
input_file: Path, log: structlog.BoundLogger, cache_full: Callable[[], bool]
) -> AsyncIterator[ZmqMessage]:
log.info(f"begin reading frames from file {input_file}")


async def write_to_h5(msg: ZmqMessage, q: culsans.AsyncQueue[ZmqMessage]) -> None:
parent_log.info("writing to h5")
await q.put(msg)
parent_log.info("writing to h5 DONE")
with h5py.File(input_file, "r") as f:
dataset = f["entry/data/data"]
assert isinstance(dataset, h5py.Dataset)
frame_count = len(dataset)
yield ZmqHeader(
series_id="1", config={"nimages": frame_count, "ntrigger": 1}, appendix=None
)
for frame_number in range(0, frame_count):
if cache_full():
await asyncio.sleep(0.5)
continue
yield ZmqImage(dataset[frame_number][:].tobytes())
yield ZmqSeriesEnd()
51 changes: 44 additions & 7 deletions src/mr_t/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,16 @@
import struct
import sys
import logging
from typing import Any, AsyncIterable, AsyncIterator, Optional, TypeAlias, TypeVar
from typing import (
Any,
AsyncIterable,
AsyncIterator,
Callable,
Optional,
TypeAlias,
TypeVar,
)
from pathlib import Path

import asyncudp
import structlog
Expand All @@ -15,14 +24,20 @@
ZmqSeriesEnd,
receive_zmq_messages,
)
from mr_t.h5 import receive_h5_messages

parent_log = structlog.get_logger()


class Arguments(Tap):
udp_port: int
udp_host: str
eiger_zmq_host_and_port: str
eiger_zmq_host_and_port: Optional[str] = ( # host:port of the Eiger ZMQ interface
None
)
input_h5_file: Optional[ # hdf5 file to feed into Mr. T to mock the detector
Path
] = None
frame_cache_limit: Optional[ # Limit the number of incoming ZeroMQ images to this number (can prevent memory overruns)
int
] = None
Expand Down Expand Up @@ -191,13 +206,35 @@ class CurrentSeries:
async def main_async() -> None:
args = Arguments(underscores_to_dashes=True).parse_args()

if args.eiger_zmq_host_and_port is None and args.input_h5_file is None:
sys.stderr.write(
"invalid arguments: specify either an Eiger ZMQ host and port, or an H5 file to use"
)
sys.exit(1)
if args.eiger_zmq_host_and_port is not None and args.input_h5_file is not None:
sys.stderr.write(
"invalid arguments: specify either an Eiger ZMQ host and port, or an H5 file to use, but not both!"
)
sys.exit(2)

current_series: CurrentSeries | None = None
sender = receive_zmq_messages(
zmq_target=args.eiger_zmq_host_and_port,
log=parent_log.bind(system="eiger"),
cache_full=lambda: len(current_series.saved_frames) > args.frame_cache_limit
cache_full: Callable[[], bool] = (
lambda: len(current_series.saved_frames) > args.frame_cache_limit
if current_series is not None and args.frame_cache_limit is not None
else False,
else False
)
sender = (
receive_zmq_messages(
zmq_target=args.eiger_zmq_host_and_port,
log=parent_log.bind(system="eiger"),
cache_full=cache_full,
)
if args.eiger_zmq_host_and_port is not None
else receive_h5_messages(
args.input_h5_file, # type: ignore
log=parent_log.bind(system="5"),
cache_full=cache_full,
)
)
sock = await asyncudp.create_socket(local_addr=(args.udp_host, args.udp_port))
receiver = udp_receiver(log=parent_log.bind(system="udp"), sock=sock)
Expand Down
Loading