Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@
("py:class", "p4p.nt.ndarray.NTNDArray"),
("py:class", "p4p.nt.NTTable"),
# Problems in FastCS itself
("py:class", "T"),
("py:class", "AttrIOUpdateCallback"),
("py:class", "fastcs.transport.epics.pva.pvi_tree._PviSignalInfo"),
("py:class", "fastcs.logging._logging.LogLevel"),
("py:class", "fastcs.logging._graylog.GraylogEndpoint"),
Expand All @@ -102,6 +104,7 @@
]
nitpick_ignore_regex = [
("py:class", "fastcs.*.T"),
("py:obj", "fastcs.*.T"),
(r"py:.*", r"fastcs\.demo.*"),
(r"py:.*", r"tickit.*"),
]
Expand Down
2 changes: 1 addition & 1 deletion docs/snippets/dynamic.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ async def update(self, attr: AttrR[NumberT, TemperatureControllerAttributeIORef]
response = await self._connection.send_query(f"{query}\r\n")
value = response.strip("\r\n")

await attr.set(attr.dtype(value))
await attr.update(attr.dtype(value))

async def send(
self, attr: AttrW[NumberT, TemperatureControllerAttributeIORef], value: NumberT
Expand Down
2 changes: 1 addition & 1 deletion docs/snippets/static07.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async def update(self, attr: AttrR[NumberT, IDAttributeIORef]):
response = await self._connection.send_query("ID?\r\n")
value = response.strip("\r\n")

await attr.set(attr.dtype(value))
await attr.update(attr.dtype(value))


class TemperatureController(Controller):
Expand Down
2 changes: 1 addition & 1 deletion docs/snippets/static08.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ async def update(self, attr: AttrR[NumberT, TemperatureControllerAttributeIORef]
response = await self._connection.send_query(f"{query}\r\n")
value = response.strip("\r\n")

await attr.set(attr.dtype(value))
await attr.update(attr.dtype(value))


class TemperatureController(Controller):
Expand Down
2 changes: 1 addition & 1 deletion docs/snippets/static09.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ async def update(self, attr: AttrR[NumberT, TemperatureControllerAttributeIORef]
response = await self._connection.send_query(f"{query}\r\n")
value = response.strip("\r\n")

await attr.set(attr.dtype(value))
await attr.update(attr.dtype(value))

async def send(
self, attr: AttrW[NumberT, TemperatureControllerAttributeIORef], value: NumberT
Expand Down
2 changes: 1 addition & 1 deletion docs/snippets/static10.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ async def update(self, attr: AttrR[NumberT, TemperatureControllerAttributeIORef]
response = await self._connection.send_query(f"{query}\r\n")
value = response.strip("\r\n")

await attr.set(attr.dtype(value))
await attr.update(attr.dtype(value))

async def send(
self, attr: AttrW[NumberT, TemperatureControllerAttributeIORef], value: NumberT
Expand Down
2 changes: 1 addition & 1 deletion docs/snippets/static11.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async def update(self, attr: AttrR[NumberT, TemperatureControllerAttributeIORef]
response = await self._connection.send_query(f"{query}\r\n")
value = response.strip("\r\n")

await attr.set(attr.dtype(value))
await attr.update(attr.dtype(value))

async def send(
self, attr: AttrW[NumberT, TemperatureControllerAttributeIORef], value: NumberT
Expand Down
4 changes: 2 additions & 2 deletions docs/snippets/static12.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ async def update(self, attr: AttrR[NumberT, TemperatureControllerAttributeIORef]
response = await self._connection.send_query(f"{query}\r\n")
value = response.strip("\r\n")

await attr.set(attr.dtype(value))
await attr.update(attr.dtype(value))

async def send(
self, attr: AttrW[NumberT, TemperatureControllerAttributeIORef], value: NumberT
Expand Down Expand Up @@ -94,7 +94,7 @@ async def update_voltages(self):
(await self._connection.send_query("V?\r\n")).strip("\r\n")
)
for index, controller in enumerate(self._ramp_controllers):
await controller.voltage.set(float(voltages[index]))
await controller.voltage.update(float(voltages[index]))


gui_options = EpicsGUIOptions(
Expand Down
6 changes: 3 additions & 3 deletions docs/snippets/static13.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ async def update(self, attr: AttrR[NumberT, TemperatureControllerAttributeIORef]
response = await self._connection.send_query(f"{query}\r\n")
value = response.strip("\r\n")

await attr.set(attr.dtype(value))
await attr.update(attr.dtype(value))

async def send(
self, attr: AttrW[NumberT, TemperatureControllerAttributeIORef], value: NumberT
Expand Down Expand Up @@ -95,12 +95,12 @@ async def update_voltages(self):
(await self._connection.send_query("V?\r\n")).strip("\r\n")
)
for index, controller in enumerate(self._ramp_controllers):
await controller.voltage.set(float(voltages[index]))
await controller.voltage.update(float(voltages[index]))

@command()
async def disable_all(self) -> None:
for rc in self._ramp_controllers:
await rc.enabled.process(OnOffEnum.Off)
await rc.enabled.put(OnOffEnum.Off, sync_setpoint=True)
# TODO: The requests all get concatenated and the sim doesn't handle it
await asyncio.sleep(0.1)

Expand Down
4 changes: 2 additions & 2 deletions src/fastcs/attribute_io.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Any, Generic, cast, get_args

from fastcs.attribute_io_ref import AttributeIORef, AttributeIORefT
from fastcs.attributes import AttrR, AttrRW
from fastcs.attributes import AttrR, AttrW
from fastcs.datatypes import T
from fastcs.tracer import Tracer

Expand Down Expand Up @@ -32,7 +32,7 @@ def __init__(self):
async def update(self, attr: AttrR[T, AttributeIORefT]) -> None:
raise NotImplementedError()

async def send(self, attr: AttrRW[T, AttributeIORefT], value: T) -> None:
async def send(self, attr: AttrW[T, AttributeIORefT], value: T) -> None:
raise NotImplementedError()


Expand Down
2 changes: 1 addition & 1 deletion src/fastcs/attribute_io_ref.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ class AttributeIORef:


AttributeIORefT = TypeVar(
"AttributeIORefT", bound=AttributeIORef, default=AttributeIORef
"AttributeIORefT", bound=AttributeIORef, default=AttributeIORef, covariant=True
)
193 changes: 139 additions & 54 deletions src/fastcs/attributes.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,19 @@
from __future__ import annotations

import asyncio
from collections.abc import Callable
from typing import Generic
from collections.abc import Awaitable, Callable
from typing import Any, Generic

from fastcs.attribute_io_ref import AttributeIORefT
from fastcs.datatypes import (
ATTRIBUTE_TYPES,
AttrSetCallback,
AttrUpdateCallback,
DataType,
T,
)
from fastcs.datatypes import ATTRIBUTE_TYPES, DataType, T
from fastcs.logging import logger as _logger
from fastcs.tracer import Tracer

ONCE = float("inf")
"""Special value to indicate that an attribute should be updated once on start up."""

logger = _logger.bind(logger_name=__name__)


class Attribute(Generic[T, AttributeIORefT], Tracer):
"""Base FastCS attribute.
Expand Down Expand Up @@ -97,6 +94,14 @@ def __repr__(self):
return f"{self.__class__.__name__}({self._name}, {self._datatype})"


AttrIOUpdateCallback = Callable[["AttrR[T, Any]"], Awaitable[None]]
"""An AttributeIO callback that takes an AttrR and updates its value"""
AttrUpdateCallback = Callable[[], Awaitable[None]]
"""A callback to be called periodically to update an attribute"""
AttrOnUpdateCallback = Callable[[T], Awaitable[None]]
"""A callback to be called when the value of the attribute is updated"""


class AttrR(Attribute[T, AttributeIORefT]):
"""A read-only ``Attribute``."""

Expand All @@ -108,42 +113,80 @@ def __init__(
initial_value: T | None = None,
description: str | None = None,
) -> None:
super().__init__(
datatype, # type: ignore
io_ref,
group,
description=description,
)
super().__init__(datatype, io_ref, group, description=description)
self._value: T = (
datatype.initial_value if initial_value is None else initial_value
)
self._on_set_callbacks: list[AttrSetCallback[T]] | None = None
self._on_update_callbacks: list[AttrUpdateCallback] | None = None
self._update_callback: AttrIOUpdateCallback[T] | None = None
"""Callback to update the value of the attribute with an IO to the source"""
self._on_update_callbacks: list[AttrOnUpdateCallback[T]] | None = None
"""Callbacks to publish changes to the value of the attribute"""

def get(self) -> T:
"""Get the cached value of the attribute."""
return self._value

async def set(self, value: T) -> None:
async def update(self, value: T) -> None:
"""Update the value of the attibute

This sets the cached value of the attribute presented in the API. It should
generally only be called from an IO or a controller that is updating the value
from some underlying source.

To request a change to the setpoint of the attribute, use the ``put`` method,
which will attempt to apply the change to the underlying source.

"""
self.log_event("Attribute set", attribute=self, value=value)

self._value = self._datatype.validate(value)

if self._on_set_callbacks is not None:
await asyncio.gather(*[cb(self._value) for cb in self._on_set_callbacks])
if self._on_update_callbacks is not None:
await asyncio.gather(*[cb(self._value) for cb in self._on_update_callbacks])

def add_on_update_callback(self, callback: AttrOnUpdateCallback[T]) -> None:
"""Add a callback to be called when the value of the attribute is updated

def add_set_callback(self, callback: AttrSetCallback[T]) -> None:
if self._on_set_callbacks is None:
self._on_set_callbacks = []
self._on_set_callbacks.append(callback)
The callback will be called with the updated value.

def add_update_callback(self, callback: AttrUpdateCallback):
"""
if self._on_update_callbacks is None:
self._on_update_callbacks = []
self._on_update_callbacks.append(callback)

async def update(self):
if self._on_update_callbacks is not None:
await asyncio.gather(*[cb() for cb in self._on_update_callbacks])
def set_update_callback(self, callback: AttrIOUpdateCallback[T]):
"""Set the callback to update the value of the attribute from the source

The callback will be converted to an async task and called periodically.

"""
if self._update_callback is not None:
raise RuntimeError("Attribute already has an IO update callback")

self._update_callback = callback

def bind_update_callback(self) -> AttrUpdateCallback:
"""Bind self into the registered IO update callback"""
if self._update_callback is None:
raise RuntimeError("Attribute has no update callback")
else:
update_callback = self._update_callback

async def update_attribute():
try:
self.log_event("Update attribute", topic=self)
await update_callback(self)
except Exception:
logger.opt(exception=True).error("Update loop failed", attribute=self)
raise

return update_attribute


AttrOnPutCallback = Callable[["AttrW[T, Any]", T], Awaitable[None]]
"""Callbacks to be called when the setpoint of an attribute is changed"""
AttrSyncSetpointCallback = Callable[[T], Awaitable[None]]
"""Callbacks to be called when the setpoint of an attribute is changed"""


class AttrW(Attribute[T, AttributeIORefT]):
Expand All @@ -162,41 +205,83 @@ def __init__(
group,
description=description,
)
self._process_callbacks: list[AttrSetCallback[T]] | None = None
self._write_display_callbacks: list[AttrSetCallback[T]] | None = None
self._on_put_callback: AttrOnPutCallback[T] | None = None
"""Callback to action a change to the setpoint of the attribute"""
self._sync_setpoint_callbacks: list[AttrSyncSetpointCallback[T]] = []
"""Callbacks to publish changes to the setpoint of the attribute"""

async def put(self, setpoint: T, sync_setpoint: bool = False) -> None:
"""Set the setpoint of the attribute

This should be called by clients to the attribute such as transports to apply a
change to the attribute. The ``_on_put_callback`` will be called with this new
setpoint, which may or may not take effect depending on the validity of the new
value. For example, if the attribute has an IO to some device, the value might
be rejected.

To directly change the value of the attribute, for example from an update loop
that has read a new value from some underlying source, call the ``set`` method.

"""
setpoint = self._datatype.validate(setpoint)
if self._on_put_callback is not None:
await self._on_put_callback(self, setpoint)

if sync_setpoint:
await self._call_sync_setpoint_callbacks(setpoint)

async def _call_sync_setpoint_callbacks(self, setpoint: T) -> None:
if self._sync_setpoint_callbacks:
await asyncio.gather(
*[cb(setpoint) for cb in self._sync_setpoint_callbacks]
)

def set_on_put_callback(self, callback: AttrOnPutCallback[T]) -> None:
"""Set the callback to call when the setpoint is changed

async def process(self, value: T) -> None:
await self.process_without_display_update(value)
await self.update_display_without_process(value)
The callback will be called with the attribute and the new setpoint.

async def process_without_display_update(self, value: T) -> None:
value = self._datatype.validate(value)
if self._process_callbacks:
await asyncio.gather(*[cb(value) for cb in self._process_callbacks])
"""
if self._on_put_callback is not None:
raise RuntimeError("Attribute already has an on put callback")

async def update_display_without_process(self, value: T) -> None:
value = self._datatype.validate(value)
if self._write_display_callbacks:
await asyncio.gather(*[cb(value) for cb in self._write_display_callbacks])
self._on_put_callback = callback

def add_process_callback(self, callback: AttrSetCallback[T]) -> None:
if self._process_callbacks is None:
self._process_callbacks = []
self._process_callbacks.append(callback)
def add_sync_setpoint_callback(self, callback: AttrSyncSetpointCallback[T]) -> None:
"""Add a callback to publish changes to the setpoint of the attribute

def has_process_callback(self) -> bool:
return bool(self._process_callbacks)
The callback will be called with the new setpoint.

def add_write_display_callback(self, callback: AttrSetCallback[T]) -> None:
if self._write_display_callbacks is None:
self._write_display_callbacks = []
self._write_display_callbacks.append(callback)
"""
self._sync_setpoint_callbacks.append(callback)


class AttrRW(AttrR[T, AttributeIORefT], AttrW[T, AttributeIORefT]):
"""A read-write ``Attribute``."""

async def process(self, value: T) -> None:
await self.set(value)
def __init__(
self,
datatype: DataType[T],
io_ref: AttributeIORefT | None = None,
group: str | None = None,
initial_value: T | None = None,
description: str | None = None,
):
super().__init__(datatype, io_ref, group, initial_value, description)

self._setpoint_initialised = False

if io_ref is None:
self.set_on_put_callback(self._internal_update)

async def _internal_update(self, attr: AttrW[T, AttributeIORefT], value: T):
"""Update value directly when Attribute has no IO"""
assert attr is self
await self.update(value)

async def update(self, value: T):
await super().update(value)

await super().process(value) # type: ignore
if not self._setpoint_initialised:
await self._call_sync_setpoint_callbacks(value)
self._setpoint_initialised = True
Loading