Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 25 additions & 20 deletions src/fastcs/transport/epics/pva/_pv_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,6 @@ async def put(self, pv: SharedPV, op: ServerOperation):
cast_value = cast_from_p4p_value(self._attr_w, raw_value)

await self._attr_w.process_without_display_update(cast_value)
if type(self._attr_w) is AttrW:
# For AttrRW a post is done from the `_process_callback`.
pv.post(cast_to_p4p_value(self._attr_w, cast_value))
op.done()


Expand Down Expand Up @@ -100,35 +97,43 @@ async def put(self, pv: SharedPV, op: ServerOperation):
raise RuntimeError("Commands should only take the value `True`.")


def make_shared_pv(attribute: Attribute) -> SharedPV:
initial_value = (
attribute.get()
if isinstance(attribute, AttrR)
else attribute.datatype.initial_value
)

def _make_shared_pv_arguments(attribute: Attribute) -> dict[str, object]:
type_ = make_p4p_type(attribute)
kwargs = {"initial": cast_to_p4p_value(attribute, initial_value)}
if isinstance(type_, (NTEnum | NTNDArray | NTTable)):
kwargs["nt"] = type_
return {"nt": type_}
else:

def _wrap(value: dict):
return Value(type_, value)

kwargs["wrap"] = _wrap
return {"wrap": _wrap}


def make_shared_read_pv(attribute: AttrR) -> SharedPV:
shared_pv = SharedPV(
initial=cast_to_p4p_value(attribute, attribute.get()),
**_make_shared_pv_arguments(attribute),
)

if isinstance(attribute, AttrW):
kwargs["handler"] = WritePvHandler(attribute)
async def on_update(value):
shared_pv.post(cast_to_p4p_value(attribute, value))

shared_pv = SharedPV(**kwargs)
attribute.add_update_callback(on_update)

if isinstance(attribute, AttrR):
return shared_pv


def make_shared_write_pv(attribute: AttrW) -> SharedPV:
shared_pv = SharedPV(
handler=WritePvHandler(attribute),
initial=cast_to_p4p_value(attribute, attribute.datatype.initial_value),
**_make_shared_pv_arguments(attribute),
)

async def on_update(value):
shared_pv.post(cast_to_p4p_value(attribute, value))
async def async_write_display(value):
shared_pv.post(cast_to_p4p_value(attribute, value))

attribute.add_update_callback(on_update)
attribute.add_write_display_callback(async_write_display)

return shared_pv

Expand Down
24 changes: 20 additions & 4 deletions src/fastcs/transport/epics/pva/ioc.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@
from fastcs.controller_api import ControllerAPI
from fastcs.util import snake_to_pascal

from ._pv_handlers import make_command_pv, make_shared_pv
from ._pv_handlers import (
make_command_pv,
make_shared_read_pv,
make_shared_write_pv,
)
from .pvi_tree import AccessModeType, PviTree


Expand Down Expand Up @@ -42,9 +46,21 @@ async def parse_attributes(

for attr_name, attribute in controller_api.attributes.items():
pv_name = get_pv_name(pv_prefix, attr_name)
attribute_pv = make_shared_pv(attribute)
provider.add(pv_name, attribute_pv)
pvi_tree.add_signal(pv_name, _attribute_to_access(attribute))
match attribute:
case AttrRW():
attribute_pv = make_shared_write_pv(attribute)
attribute_pv_rbv = make_shared_read_pv(attribute)
provider.add(pv_name, attribute_pv)
provider.add(f"{pv_name}_RBV", attribute_pv_rbv)
pvi_tree.add_signal(pv_name, "rw")
case AttrR():
attribute_pv = make_shared_read_pv(attribute)
provider.add(pv_name, attribute_pv)
pvi_tree.add_signal(pv_name, "r")
case AttrW():
attribute_pv = make_shared_write_pv(attribute)
provider.add(pv_name, attribute_pv)
pvi_tree.add_signal(pv_name, "w")

for attr_name, method in controller_api.command_methods.items():
pv_name = get_pv_name(pv_prefix, attr_name)
Expand Down
17 changes: 13 additions & 4 deletions tests/transport/epics/pva/test_p4p.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ async def _wait_and_set_attr_r():
await controller.b.set(-0.9111111)

a_values, b_values = [], []
a_monitor = ctxt.monitor(f"{pv_prefix}:A", a_values.append)
a_monitor = ctxt.monitor(f"{pv_prefix}:A_RBV", a_values.append)
b_monitor = ctxt.monitor(f"{pv_prefix}:B", b_values.append)
serve = asyncio.ensure_future(fastcs.serve())
wait_and_set_attr_r = asyncio.ensure_future(_wait_and_set_attr_r())
Expand Down Expand Up @@ -448,6 +448,8 @@ class SomeController(Controller):

async def _wait_and_set_attrs():
await asyncio.sleep(0.1)
# This demonstrates an update from hardware,
# resulting in only a change in the read back.
await asyncio.gather(
controller.some_waveform.set(server_set_waveform_value),
controller.some_table.set(server_set_table_value),
Expand All @@ -457,19 +459,26 @@ async def _wait_and_set_attrs():
async def _wait_and_put_pvs():
await asyncio.sleep(0.3)
ctxt = Context("pva")
# This demonstrates a client put,
# resulting in a change in the demand and read back.
await asyncio.gather(
ctxt.put(f"{pv_prefix}:SomeWaveform", client_put_waveform_value),
ctxt.put(f"{pv_prefix}:SomeTable", client_put_table_value),
ctxt.put(f"{pv_prefix}:SomeEnum", client_put_enum_value),
)

waveform_values, table_values, enum_values = [], [], []
waveform_monitor = ctxt.monitor(f"{pv_prefix}:SomeWaveform", waveform_values.append)
table_monitor = ctxt.monitor(f"{pv_prefix}:SomeTable", table_values.append)

# Monitoring read backs to capture both client and server sets.
waveform_monitor = ctxt.monitor(
f"{pv_prefix}:SomeWaveform_RBV", waveform_values.append
)
table_monitor = ctxt.monitor(f"{pv_prefix}:SomeTable_RBV", table_values.append)
enum_monitor = ctxt.monitor(
f"{pv_prefix}:SomeEnum",
f"{pv_prefix}:SomeEnum_RBV",
enum_values.append,
)

serve = asyncio.ensure_future(fastcs.serve())
wait_and_set_attrs = asyncio.ensure_future(_wait_and_set_attrs())
wait_and_put_pvs = asyncio.ensure_future(_wait_and_put_pvs())
Expand Down
Loading