Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 58 additions & 37 deletions docs/snippets/dynamic.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
from __future__ import annotations

import json
from dataclasses import dataclass
from typing import Any, Literal
from dataclasses import KW_ONLY, dataclass
from typing import Any, Literal, TypeVar

from pydantic import BaseModel, ConfigDict, ValidationError

from fastcs.attributes import AttrHandlerRW, Attribute, AttrR, AttrRW, AttrW
from fastcs.attribute_io import AttributeIO
from fastcs.attribute_io_ref import AttributeIORef
from fastcs.attributes import Attribute, AttrR, AttrRW, AttrW
from fastcs.connections import IPConnection, IPConnectionSettings
from fastcs.controller import Controller
from fastcs.datatypes import Bool, DataType, Float, Int, String
from fastcs.launch import FastCS
from fastcs.transport.epics.ca.options import EpicsCAOptions
from fastcs.transport.epics.ca.transport import EpicsCATransport
from fastcs.transport.epics.options import EpicsIOCOptions


Expand Down Expand Up @@ -46,74 +46,95 @@ def create_attributes(parameters: dict[str, Any]) -> dict[str, Attribute]:
print(f"Failed to validate parameter '{parameter}'\n{e}")
continue

handler = TemperatureControllerHandler(parameter.command)
io_ref = TemperatureControllerAttributeIORef(parameter.command)
match parameter.access_mode:
case "r":
attributes[name] = AttrR(parameter.fastcs_datatype, handler=handler)
attributes[name] = AttrR(parameter.fastcs_datatype, io_ref=io_ref)
case "rw":
attributes[name] = AttrRW(parameter.fastcs_datatype, handler=handler)
attributes[name] = AttrRW(parameter.fastcs_datatype, io_ref=io_ref)

return attributes


NumberT = TypeVar("NumberT", int, float)


@dataclass
class TemperatureControllerHandler(AttrHandlerRW):
command_name: str
class TemperatureControllerAttributeIORef(AttributeIORef):
name: str
_: KW_ONLY
update_period: float | None = 0.2
_controller: TemperatureController | None = None

async def update(self, attr: AttrR):
response = await self.controller.connection.send_query(
f"{self.command_name}?\r\n"
)

class TemperatureControllerAttributeIO(
AttributeIO[NumberT, TemperatureControllerAttributeIORef]
):
def __init__(self, connection: IPConnection):
super().__init__()

self._connection = connection

async def update(self, attr: AttrR[NumberT, TemperatureControllerAttributeIORef]):
query = f"{attr.io_ref.name}?"
response = await self._connection.send_query(f"{query}\r\n")
value = response.strip("\r\n")

await attr.set(attr.dtype(value))

async def put(self, attr: AttrW, value: Any):
await self.controller.connection.send_command(
f"{self.command_name}={value}\r\n"
)
async def send(
self, attr: AttrW[NumberT, TemperatureControllerAttributeIORef], value: NumberT
) -> None:
command = f"{attr.io_ref.name}={attr.dtype(value)}"
await self._connection.send_command(f"{command}\r\n")


class TemperatureRampController(Controller):
def __init__(self, index: int, connection: IPConnection):
super().__init__(f"Ramp {index}")
def __init__(
self,
index: int,
parameters: dict[str, TemperatureControllerParameter],
io: TemperatureControllerAttributeIO,
):
self._parameters = parameters
super().__init__(f"Ramp{index}", ios=[io])

self.connection = connection

async def initialise(self, parameters: dict[str, Any]):
self.attributes.update(create_attributes(parameters))
async def initialise(self):
self.attributes.update(create_attributes(self._parameters))


class TemperatureController(Controller):
def __init__(self, settings: IPConnectionSettings):
super().__init__()

self._ip_settings = settings
self.connection = IPConnection()
self._connection = IPConnection()

self._io = TemperatureControllerAttributeIO(self._connection)
super().__init__(ios=[self._io])

async def connect(self):
await self.connection.connect(self._ip_settings)
await self._connection.connect(self._ip_settings)

async def initialise(self):
await self.connect()

api = json.loads((await self.connection.send_query("API?\r\n")).strip("\r\n"))
api = json.loads((await self._connection.send_query("API?\r\n")).strip("\r\n"))

ramps_api = api.pop("Ramps")
self.attributes.update(create_attributes(api))

for idx, ramp_parameters in enumerate(ramps_api):
ramp_controller = TemperatureRampController(idx + 1, self.connection)
ramp_controller = TemperatureRampController(
idx + 1, ramp_parameters, self._io
)
await ramp_controller.initialise()
self.register_sub_controller(f"Ramp{idx + 1:02d}", ramp_controller)
await ramp_controller.initialise(ramp_parameters)

await self.connection.close()
await self._connection.close()


epics_options = EpicsCAOptions(ca_ioc=EpicsIOCOptions(pv_prefix="DEMO"))
epics_ca = EpicsCATransport(ca_ioc=EpicsIOCOptions(pv_prefix="DEMO"))
connection_settings = IPConnectionSettings("localhost", 25565)
fastcs = FastCS(TemperatureController(connection_settings), [epics_options])
fastcs = FastCS(TemperatureController(connection_settings), [epics_ca])


# fastcs.run() # Commented as this will block
if __name__ == "__main__":
fastcs.run()
4 changes: 3 additions & 1 deletion docs/snippets/static02.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ class TemperatureController(Controller):


fastcs = FastCS(TemperatureController(), [])
# fastcs.run() # Commented as this will block

if __name__ == "__main__":
fastcs.run()
4 changes: 3 additions & 1 deletion docs/snippets/static03.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,6 @@ class TemperatureController(Controller):


fastcs = FastCS(TemperatureController(), [])
# fastcs.run() # Commented as this will block

if __name__ == "__main__":
fastcs.run()
9 changes: 5 additions & 4 deletions docs/snippets/static04.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@
from fastcs.controller import Controller
from fastcs.datatypes import String
from fastcs.launch import FastCS
from fastcs.transport.epics.ca.options import EpicsCAOptions
from fastcs.transport.epics.ca.transport import EpicsCATransport
from fastcs.transport.epics.options import EpicsIOCOptions


class TemperatureController(Controller):
device_id = AttrR(String())


epics_options = EpicsCAOptions(ca_ioc=EpicsIOCOptions(pv_prefix="DEMO"))
fastcs = FastCS(TemperatureController(), [epics_options])
epics_ca = EpicsCATransport(ca_ioc=EpicsIOCOptions(pv_prefix="DEMO"))
fastcs = FastCS(TemperatureController(), [epics_ca])

# fastcs.run() # Commented as this will block
if __name__ == "__main__":
fastcs.run()
13 changes: 6 additions & 7 deletions docs/snippets/static05.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
from fastcs.controller import Controller
from fastcs.datatypes import String
from fastcs.launch import FastCS
from fastcs.transport.epics.ca.options import EpicsCAOptions, EpicsGUIOptions
from fastcs.transport.epics.ca.options import EpicsGUIOptions
from fastcs.transport.epics.ca.transport import EpicsCATransport
from fastcs.transport.epics.options import EpicsIOCOptions


Expand All @@ -15,12 +16,10 @@ class TemperatureController(Controller):
gui_options = EpicsGUIOptions(
output_path=Path(".") / "demo.bob", title="Demo Temperature Controller"
)
epics_options = EpicsCAOptions(
gui=gui_options,
ca_ioc=EpicsIOCOptions(pv_prefix="DEMO"),
)
fastcs = FastCS(TemperatureController(), [epics_options])
epics_ca = EpicsCATransport(gui=gui_options, ca_ioc=EpicsIOCOptions(pv_prefix="DEMO"))
fastcs = FastCS(TemperatureController(), [epics_ca])

fastcs.create_gui()

# fastcs.run() # Commented as this will block
if __name__ == "__main__":
fastcs.run()
16 changes: 7 additions & 9 deletions docs/snippets/static06.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from fastcs.controller import Controller
from fastcs.datatypes import String
from fastcs.launch import FastCS
from fastcs.transport.epics.ca.options import EpicsCAOptions
from fastcs.transport.epics.ca.transport import EpicsCATransport
from fastcs.transport.epics.options import EpicsGUIOptions, EpicsIOCOptions


Expand All @@ -16,22 +16,20 @@ def __init__(self, settings: IPConnectionSettings):
super().__init__()

self._ip_settings = settings
self.connection = IPConnection()
self._connection = IPConnection()

async def connect(self):
await self.connection.connect(self._ip_settings)
await self._connection.connect(self._ip_settings)


gui_options = EpicsGUIOptions(
output_path=Path(".") / "demo.bob", title="Demo Temperature Controller"
)
epics_options = EpicsCAOptions(
gui=gui_options,
ca_ioc=EpicsIOCOptions(pv_prefix="DEMO"),
)
epics_ca = EpicsCATransport(gui=gui_options, ca_ioc=EpicsIOCOptions(pv_prefix="DEMO"))
connection_settings = IPConnectionSettings("localhost", 25565)
fastcs = FastCS(TemperatureController(connection_settings), [epics_options])
fastcs = FastCS(TemperatureController(connection_settings), [epics_ca])

fastcs.create_gui()

# fastcs.run() # Commented as this will block
if __name__ == "__main__":
fastcs.run()
54 changes: 25 additions & 29 deletions docs/snippets/static07.py
Original file line number Diff line number Diff line change
@@ -1,63 +1,59 @@
from __future__ import annotations

from dataclasses import dataclass
from pathlib import Path
from typing import TypeVar

from fastcs.attributes import AttrHandlerR, AttrR
from fastcs.attribute_io import AttributeIO
from fastcs.attribute_io_ref import AttributeIORef
from fastcs.attributes import AttrR
from fastcs.connections import IPConnection, IPConnectionSettings
from fastcs.controller import BaseController, Controller
from fastcs.controller import Controller
from fastcs.datatypes import String
from fastcs.launch import FastCS
from fastcs.transport.epics.ca.options import EpicsCAOptions
from fastcs.transport.epics.ca.transport import EpicsCATransport
from fastcs.transport.epics.options import EpicsGUIOptions, EpicsIOCOptions

NumberT = TypeVar("NumberT", int, float)


@dataclass
class IDUpdater(AttrHandlerR):
class IDAttributeIORef(AttributeIORef):
update_period: float | None = 0.2
_controller: TemperatureController | None = None

async def initialise(self, controller: BaseController):
assert isinstance(controller, TemperatureController)
self._controller = controller

@property
def controller(self) -> TemperatureController:
if self._controller is None:
raise RuntimeError("Handler not initialised")
class IDAttributeIO(AttributeIO[NumberT, IDAttributeIORef]):
def __init__(self, connection: IPConnection):
super().__init__()

return self._controller
self._connection = connection

async def update(self, attr: AttrR):
response = await self.controller.connection.send_query("ID?\r\n")
async def update(self, attr: AttrR[NumberT, IDAttributeIORef]):
response = await self._connection.send_query("ID?\r\n")
value = response.strip("\r\n")

await attr.set(value)
await attr.set(attr.dtype(value))


class TemperatureController(Controller):
device_id = AttrR(String(), handler=IDUpdater())
device_id = AttrR(String(), io_ref=IDAttributeIORef())

def __init__(self, settings: IPConnectionSettings):
super().__init__()

self._ip_settings = settings
self.connection = IPConnection()
self._connection = IPConnection()

super().__init__(ios=[IDAttributeIO(self._connection)])

async def connect(self):
await self.connection.connect(self._ip_settings)
await self._connection.connect(self._ip_settings)


gui_options = EpicsGUIOptions(
output_path=Path(".") / "demo.bob", title="Demo Temperature Controller"
)
epics_options = EpicsCAOptions(
gui=gui_options,
ca_ioc=EpicsIOCOptions(pv_prefix="DEMO"),
)
epics_ca = EpicsCATransport(gui=gui_options, ca_ioc=EpicsIOCOptions(pv_prefix="DEMO"))
connection_settings = IPConnectionSettings("localhost", 25565)
fastcs = FastCS(TemperatureController(connection_settings), [epics_options])
fastcs = FastCS(TemperatureController(connection_settings), [epics_ca])

fastcs.create_gui()

# fastcs.run() # Commented as this will block
if __name__ == "__main__":
fastcs.run()
Loading