Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 18 additions & 18 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
services:
tiled:
# see the file ./tiled/deploy/config.yml for detailed configuration of tiled
image: ghcr.io/bluesky/tiled:main
ports:
- "8000:8000"
environment:
- TILED_SINGLE_USER_API_KEY=${TILED_SINGLE_USER_API_KEY}
volumes:
- ./services/tiled/deploy:/deploy:Z
- ./data:/data:Z
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://tiled:8000/healthz"]
interval: 10s
timeout: 5s
retries: 3
networks:
aps_net:
# see the file ./tiled/deploy/config.yml for detailed configuration of tiled
image: ghcr.io/bluesky/tiled:main
ports:
- "8000:8000"
environment:
- TILED_SINGLE_USER_API_KEY=${TILED_SINGLE_USER_API_KEY}
volumes:
- ./services/tiled/deploy:/deploy:Z
- ./data:/data:Z
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://tiled:8000/healthz"]
interval: 10s
timeout: 5s
retries: 3
networks:
aps_net:

processor:
command: python -m tr_ap_xps.apps.processor_cli
command: python -m tr_ap_xps.apps.processor_cli_tpx
build:
context: .
dockerfile: Dockerfile_processor
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ authors = [
]
description = "a package to perform computations and suggestion during a time-resolved AP-XPS experiment"
readme = "README.md"
requires-python = ">=3.10"
requires-python = ">=3.11,<3.13"
classifiers = [
"Programming Language :: Python :: 3",
"Operating System :: OS Independent",
Expand Down
3 changes: 3 additions & 0 deletions settings.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ xps_operator:
lv_zmq_listener:
zmq_pub_address: "tcp://localhost"
zmq_pub_port: 5555
tpx_zmq_listener:
zmq_pub_address: "tcp://localhost"
zmq_pub_port: 5657
websockets_publisher:
host: "0.0.0.0"
port: 8001
78 changes: 78 additions & 0 deletions src/tr_ap_xps/apps/processor_cli_tpx.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import asyncio
import logging
import signal

import typer
from tiled.client import from_uri
from tiled.client.node import Container

from ..config import settings
# from ..labview import XPSLabviewZMQListener, setup_zmq
from ..timepix import XPSTimepixZMQListener, setup_zmq
from ..log_utils import setup_logger
from ..pipeline.xps_operator import XPSOperator
from ..tiled import TiledPublisher
from ..websockets import XPSWSResultPublisher

app = typer.Typer()
logger = logging.getLogger("tr_ap_xps")
setup_logger(logger)

app_settings = settings.xps_operator

def tiled_runs_container() -> Container:
try:
client = from_uri(app_settings.tiled_uri, api_key=app_settings.tiled_api_key)
if client.get("runs") is None: # TODO test case
client.create_container("runs")
return client["runs"]
except Exception as e:
logger.error(f"Error connecting to Tiled: {e}")


@app.command()
async def listen() -> None:
try:
logger.setLevel(app_settings.log_level.upper())
logger.debug("DEBUG LOGGING SET")
logger.info(
f"tpx_zmq_socket_address: {app_settings.tpx_zmq_listener.zmq_pub_address}"
)
logger.info(f"tpx_zmq_socket_port: {app_settings.tpx_zmq_listener.zmq_pub_port}")
logger.info(f"tiled_uri: {app_settings.tiled_uri}")
logger.info(
f"tiled_api_key: {'****' if app_settings.tiled_api_key else 'NOT SET!!!'}"
)

received_sigterm = {"received": False} # Define the variable received_sigterm

# setup websocket server
operator = XPSOperator()
ws_publisher = XPSWSResultPublisher(app_settings.websocket_url)
# tiled_pub = TiledPublisher(tiled_runs_container())

operator.add_publisher(ws_publisher)
# operator.add_publisher(tiled_pub)
# connect to labview zmq

tpx_zmq_socket = setup_zmq()
listener = XPSTimepixZMQListener(operator=operator, zmq_socket=tpx_zmq_socket)

# Wait for both tasks to complete
await asyncio.gather(listener.start(), ws_publisher.start())

def handle_sigterm(signum, frame):
logger.info("SIGTERM received, stopping...")
received_sigterm["received"] = True
asyncio.create_task(listener.stop())
asyncio.create_task(ws_publisher.stop())

# Register the handler for SIGTERM
signal.signal(signal.SIGTERM, handle_sigterm)
except Exception as e:
logger.error(f"Error setting up XPS processor {e}")
raise e


if __name__ == "__main__":
asyncio.run(listen())
48 changes: 32 additions & 16 deletions src/tr_ap_xps/pipeline/xps_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from arroyopy.operator import Operator
from arroyopy.schemas import Message

from ..schemas import DataFrameModel, XPSRawEvent, XPSResultStop, XPSStart, XPSStop
from ..schemas import XPSResult, XPSRawEvent, XPSResultStop, XPSStart, XPSStop
from ..timing import timer
from .xps_processor import XPSProcessor

Expand All @@ -17,8 +17,9 @@ class XPSOperator(Operator):

"""

def __init__(self) -> None:
def __init__(self, build_heatmaps: bool = False) -> None:
self.xps_processor = None
self.build_heatmaps = build_heatmaps

async def process(self, message: Message) -> None:
"""
Expand All @@ -40,19 +41,34 @@ async def process(self, message: Message) -> None:
await self.publish(message)

elif isinstance(message, XPSRawEvent):
if not self.xps_processor:
logger.error(
"Received XPSRawEvent without an active XPSProcessor. Started after labview started?"

if self.build_heatmaps:
if not self.xps_processor:
logger.error(
"Received XPSRawEvent without an active XPSProcessor. Started after labview started?"
)
return
result: XPSResult = await asyncio.to_thread(
self.xps_processor.process_frame, message
)
else:
result = XPSResult(
shot_num=message.image_info.frame_number,
integrated_frames=message.image,
rolling_mean=None,
rolling_std=None,
frame_number=message.image_info.frame_number,
detected_peaks=None,
vfft=None,
ifft=None,
shot_recent=None,
shot_mean=None,
shot_std=None,
)
return
result: XPSRawEvent = await asyncio.to_thread(
self.xps_processor.process_frame, message
)
if result:
await self.publish(result)

elif isinstance(message, XPSStop):
data_frame_model = DataFrameModel(df=timer.timing_dataframe)
new_msg = XPSResultStop(function_timings=data_frame_model)
await self.publish(new_msg)
self.xps_processor = None
await self.publish(result)
# elif isinstance(message, XPSStop):
# data_frame_model = DataFrameModel(df=timer.timing_dataframe)
# new_msg = XPSResultStop(function_timings=data_frame_model)
# await self.publish(new_msg)
# self.xps_processor = None
18 changes: 9 additions & 9 deletions src/tr_ap_xps/schemas.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Literal
from typing import Literal, Optional

from pydantic import BaseModel, Field

Expand Down Expand Up @@ -135,15 +135,15 @@ class XPSResult(Event, XPSMessage):
calculations are made.
"""

frame_number: int
frame_number: Optional[int] = None
integrated_frames: NumpyArrayModel
detected_peaks: DataFrameModel
vfft: NumpyArrayModel
ifft: NumpyArrayModel
shot_num: int
shot_recent: NumpyArrayModel
shot_mean: NumpyArrayModel
shot_std: NumpyArrayModel
detected_peaks: Optional[DataFrameModel] = None
vfft: Optional[NumpyArrayModel] = None
ifft: Optional[NumpyArrayModel] = None
shot_num: Optional[int] = None
shot_recent: Optional[NumpyArrayModel] = None
shot_mean: Optional[NumpyArrayModel] = None
shot_std: Optional[NumpyArrayModel] = None


class XPSResultStop(Stop, XPSMessage):
Expand Down
105 changes: 105 additions & 0 deletions src/tr_ap_xps/timepix.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import json
import logging
import uuid

import numpy as np
import msgpack
import zmq.asyncio

from arroyopy.zmq import ZMQListener
from arroyosas.schemas import RawFrameEvent, ImageInfo

from .config import settings
from .schemas import NumpyArrayModel, XPSImageInfo, XPSRawEvent, XPSStart, XPSStop



app_settings = settings.xps_operator

logger = logging.getLogger(__name__)


def setup_zmq():
ctx = zmq.asyncio.Context()
lv_zmq_socket = ctx.socket(zmq.SUB)
lv_zmq_socket.setsockopt(zmq.RCVHWM, 100000)
logger.info(
f"binding to: {app_settings.tpx_zmq_listener.zmq_pub_address}:{app_settings.tpx_zmq_listener.zmq_pub_port}"
)
lv_zmq_socket.connect(
f"{app_settings.tpx_zmq_listener.zmq_pub_address}:{app_settings.tpx_zmq_listener.zmq_pub_port}"
)
lv_zmq_socket.setsockopt(zmq.SUBSCRIBE, b"")
return lv_zmq_socket


class XPSTimepixZMQListener(ZMQListener):
stop_signal = False

async def start(self):
logger.info("Listener started")
current_image_info: XPSImageInfo = None
while True:
try:
if self.stop_signal:
logger.info("Stopping listener.")
break
metadata_msg_packed = await self.zmq_socket.recv()
raw_message = await self.zmq_socket.recv()
# print(raw_message[0:300])
try:
metadata = msgpack.unpackb(metadata_msg_packed)
except Exception as e:
logger.error(f"Error unpacking message: {e}")
continue


# Must be an event with an image
if logger.getEffectiveLevel() == logging.DEBUG:
logger.debug(f"event: {metadata.keys()}")

await self.operator.process(
self._build_event(raw_message, metadata)
)
logger.debug("event processed")
except Exception as e:
logger.error(e)

@staticmethod
def _build_event(
image: bytes,
metadata: dict,

) -> XPSRawEvent:
shape = tuple(metadata["shape"])
dtype = metadata["dtype"]

image_info = XPSImageInfo(
frame_number=0,
width=shape[0],
height=shape[1],
data_type=dtype
)

array_received = np.frombuffer(image, dtype=dtype).reshape(shape)
image_info.frame_number = metadata.get("flush_number")
return XPSRawEvent(
image=NumpyArrayModel(array=array_received), image_info=image_info
)


if __name__ == "__main__":
from .log_utils import setup_logger # noqa: F401

class DummyOperator:
async def process(self, event: XPSRawEvent):
logger.info(
f"Dummy operator received event with image shape: {event.image.array.shape}"
)


setup_logger(logger)
zmq_socket = setup_zmq()
listener = XPSTimepixZMQListener(zmq_socket=zmq_socket, operator=DummyOperator())
import asyncio
asyncio.run(listener.start())
6 changes: 1 addition & 5 deletions src/tr_ap_xps/websockets.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,15 +139,11 @@ def pack_images(message: XPSResult) -> bytes:
"""
return msgpack.packb(
{
# "raw": convert_to_uint8(message.integrated_frames.array),
# "vfft": convert_to_uint8(message.vfft.array),
# "ifft": convert_to_uint8(message.ifft.array),
"raw": convert_to_uint8(message.integrated_frames.array),
"width": message.shot_mean.array.shape[0],
"height": message.shot_mean.array.shape[1],
"fitted": json.dumps(peaks_output(message.detected_peaks.df)),
"shot_num": message.shot_num,
"shot_recent": convert_to_uint8(message.shot_recent.array),
"shot_mean": convert_to_uint8(message.shot_mean.array),
"shot_std": convert_to_uint8(message.shot_std.array),
}
)
Loading