Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions docs/snippets/dynamic.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ def __init__(
super().__init__(f"Ramp{index}", ios=[io])

async def initialise(self):
self.attributes.update(create_attributes(self._parameters))
for name, attribute in create_attributes(self._parameters).items():
self.add_attribute(name, attribute)


class TemperatureController(Controller):
Expand All @@ -119,7 +120,9 @@ async def initialise(self):
api = json.loads((await self._connection.send_query("API?\r\n")).strip("\r\n"))

ramps_api = api.pop("Ramps")
self.attributes.update(create_attributes(api))

for name, attribute in create_attributes(api).items():
self.add_attribute(name, attribute)

for idx, ramp_parameters in enumerate(ramps_api):
ramp_controller = TemperatureRampController(
Expand Down
4 changes: 1 addition & 3 deletions src/fastcs/control_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from fastcs.logging import logger as _fastcs_logger
from fastcs.tracer import Tracer
from fastcs.transport import Transport
from fastcs.util import validate_hinted_attributes

tracer = Tracer(name=__name__)
logger = _fastcs_logger.bind(logger_name=__name__)
Expand Down Expand Up @@ -85,8 +84,7 @@ def _stop_scan_tasks(self):

async def serve(self, interactive: bool = True) -> None:
await self._controller.initialise()
validate_hinted_attributes(self._controller)
self._controller.connect_attribute_ios()
self._controller.post_initialise()

self.controller_api = build_controller_api(self._controller)
self._scan_coros, self._initial_coros = (
Expand Down
246 changes: 153 additions & 93 deletions src/fastcs/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,30 @@
from collections import Counter
from collections.abc import Iterator, Mapping, MutableMapping, Sequence
from copy import deepcopy
from typing import get_type_hints
from typing import _GenericAlias, get_args, get_origin, get_type_hints # type: ignore

from fastcs.attribute_io import AttributeIO
from fastcs.attribute_io_ref import AttributeIORefT
from fastcs.attributes import Attribute, AttrR, AttrW
from fastcs.datatypes import T
from fastcs.datatypes import DataType, T
from fastcs.tracer import Tracer


class BaseController(Tracer):
"""Base class for controller."""
"""Base class for controllers

#: Attributes passed from the device at runtime.
attributes: dict[str, Attribute]
root_attribute: Attribute | None = None
Instances of this class can be loaded into FastCS to expose its Attributes to
the transport layer, which can then perform a specific function such as generating a
UI or creating parameters for a control system.

This class is public for type hinting purposes, but should not be inherited to
implement device drivers. Use either ``Controller`` or ``ControllerVector`` instead.

"""

# These class attributes can be overridden on child classes to define default
# behaviour of instantiated controllers
root_attribute: Attribute | None = None
description: str | None = None

def __init__(
Expand All @@ -29,70 +37,48 @@ def __init__(
) -> None:
super().__init__()

if (
description is not None
): # Use the argument over the one class defined description.
if description is not None:
# Use the argument over the one class defined description.
self.description = description

if not hasattr(self, "attributes"):
self.attributes = {}
self._path: list[str] = path or []
self.__sub_controller_tree: dict[str, BaseController] = {}

# Internal state that should not be accessed directly by base classes
self.__attributes: dict[str, Attribute] = {}
self.__sub_controllers: dict[str, BaseController] = {}
self.__hinted_attributes = self._parse_attribute_type_hints()

self._bind_attrs()

ios = ios or []
self._attribute_ref_io_map = {io.ref_type: io for io in ios}
self._validate_io(ios)

async def initialise(self):
"""Hook to dynamically add attributes before building the API"""
pass

def connect_attribute_ios(self) -> None:
"""Connect ``Attribute`` callbacks to ``AttributeIO``s"""
for attr in self.attributes.values():
ref = attr.io_ref if attr.has_io_ref() else None
if ref is None:
def _parse_attribute_type_hints(
self,
) -> dict[str, tuple[type[Attribute], type[DataType]]]:
hinted_attributes = {}
for name, hint in get_type_hints(type(self)).items():
if not isinstance(hint, _GenericAlias): # e.g. AttrR[int]
continue

io = self._attribute_ref_io_map.get(type(ref))
if io is None:
raise ValueError(
f"{self.__class__.__name__} does not have an AttributeIO "
f"to handle {attr.io_ref.__class__.__name__}"
)

if isinstance(attr, AttrW):
attr.set_on_put_callback(io.send)
if isinstance(attr, AttrR):
attr.set_update_callback(io.update)

for controller in self.sub_controllers.values():
controller.connect_attribute_ios()

@property
def path(self) -> list[str]:
"""Path prefix of attributes, recursively including parent Controllers."""
return self._path
origin = get_origin(hint)
if not isinstance(origin, type) or not issubclass(origin, Attribute):
continue

def set_path(self, path: list[str]):
if self._path:
raise ValueError(f"sub controller is already registered under {self.path}")
hinted_attributes[name] = (origin, get_args(hint)[0])

self._path = path
for attribute in self.attributes.values():
attribute.set_path(path)
return hinted_attributes

def _bind_attrs(self) -> None:
"""Search for `Attributes` and `Methods` to bind them to this instance.
"""Search for Attributes and Methods to bind them to this instance.

This method will search the attributes of this controller class to bind them to
this specific instance. For `Attribute`s, this is just a case of copying and
re-assigning to `self` to make it unique across multiple instances of this
controller class. For `Method`s, this requires creating a bound method from a
this specific instance. For Attributes, this is just a case of copying and
re-assigning to ``self`` to make it unique across multiple instances of this
controller class. For Methods, this requires creating a bound method from a
class method and a controller instance, so that it can be called from any
context with the controller instance passed as the `self` argument.
context with the controller instance passed as the ``self`` argument.

"""
# Lazy import to avoid circular references
Expand Down Expand Up @@ -126,74 +112,144 @@ def _validate_io(self, ios: Sequence[AttributeIO[T, AttributeIORefT]]):
f"More than one AttributeIO class handles {ref_type.__name__}"
)

def add_attribute(self, name, attribute: Attribute):
if name in self.attributes and attribute is not self.attributes[name]:
def __repr__(self):
name = self.__class__.__name__
path = ".".join(self.path) or None
sub_controllers = list(self.sub_controllers.keys()) or None

return f"{name}(path={path}, sub_controllers={sub_controllers})"

def __setattr__(self, name, value):
if isinstance(value, Attribute):
self.add_attribute(name, value)
elif isinstance(value, Controller):
self.add_sub_controller(name, value)
else:
super().__setattr__(name, value)

async def initialise(self):
"""Hook for subclasses to dynamically add attributes before building the API"""
pass

def post_initialise(self):
"""Hook to call after all attributes added, before serving the application"""
self._validate_hinted_attributes()
self._connect_attribute_ios()

def _validate_hinted_attributes(self):
"""Validate ``Attribute`` type-hints were introspected during initialisation"""
for name in self.__hinted_attributes:
attr = getattr(self, name, None)
if attr is None or not isinstance(attr, Attribute):
raise RuntimeError(
f"Controller `{self.__class__.__name__}` failed to introspect "
f"hinted attribute `{name}` during initialisation"
)

for subcontroller in self.sub_controllers.values():
subcontroller._validate_hinted_attributes() # noqa: SLF001

def _connect_attribute_ios(self) -> None:
"""Connect ``Attribute`` callbacks to ``AttributeIO``s"""
for attr in self.__attributes.values():
ref = attr.io_ref if attr.has_io_ref() else None
if ref is None:
continue

io = self._attribute_ref_io_map.get(type(ref))
if io is None:
raise ValueError(
f"{self.__class__.__name__} does not have an AttributeIO "
f"to handle {attr.io_ref.__class__.__name__}"
)

if isinstance(attr, AttrW):
attr.set_on_put_callback(io.send)
if isinstance(attr, AttrR):
attr.set_update_callback(io.update)

for controller in self.sub_controllers.values():
controller._connect_attribute_ios() # noqa: SLF001

@property
def path(self) -> list[str]:
"""Path prefix of attributes, recursively including parent Controllers."""
return self._path

def set_path(self, path: list[str]):
if self._path:
raise ValueError(f"sub controller is already registered under {self.path}")

self._path = path
for attribute in self.__attributes.values():
attribute.set_path(path)

def add_attribute(self, name, attr: Attribute):
if name in self.__attributes:
raise ValueError(
f"Cannot add attribute {attribute}. "
f"Cannot add attribute {attr}. "
f"Controller {self} has has existing attribute {name}: "
f"{self.attributes[name]}"
f"{self.__attributes[name]}"
)
elif name in self.__sub_controller_tree.keys():
elif name in self.__hinted_attributes:
attr_class, attr_dtype = self.__hinted_attributes[name]
if not isinstance(attr, attr_class):
raise RuntimeError(
f"Controller '{self.__class__.__name__}' introspection of "
f"hinted attribute '{name}' does not match defined access mode. "
f"Expected '{attr_class.__name__}', got '{type(attr).__name__}'."
)
if attr_dtype is not None and attr_dtype != attr.datatype.dtype:
raise RuntimeError(
f"Controller '{self.__class__.__name__}' introspection of "
f"hinted attribute '{name}' does not match defined datatype. "
f"Expected '{attr_dtype.__name__}', "
f"got '{attr.datatype.dtype.__name__}'."
)
elif name in self.__sub_controllers.keys():
raise ValueError(
f"Cannot add attribute {attribute}. "
f"Cannot add attribute {attr}. "
f"Controller {self} has existing sub controller {name}: "
f"{self.__sub_controller_tree[name]}"
f"{self.__sub_controllers[name]}"
)

attribute.set_name(name)
attribute.set_path(self.path)
self.attributes[name] = attribute
super().__setattr__(name, attribute)
attr.set_name(name)
attr.set_path(self.path)
self.__attributes[name] = attr
super().__setattr__(name, attr)

@property
def attributes(self) -> dict[str, Attribute]:
return self.__attributes

def add_sub_controller(self, name: str, sub_controller: BaseController):
if name in self.__sub_controller_tree.keys():
if name in self.__sub_controllers.keys():
raise ValueError(
f"Cannot add sub controller {sub_controller}. "
f"Controller {self} has existing sub controller {name}: "
f"{self.__sub_controller_tree[name]}"
f"{self.__sub_controllers[name]}"
)
elif name in self.attributes:
elif name in self.__attributes:
raise ValueError(
f"Cannot add sub controller {sub_controller}. "
f"Controller {self} has existing attribute {name}: "
f"{self.attributes[name]}"
f"{self.__attributes[name]}"
)

sub_controller.set_path(self.path + [name])
self.__sub_controller_tree[name] = sub_controller
self.__sub_controllers[name] = sub_controller
super().__setattr__(name, sub_controller)

if isinstance(sub_controller.root_attribute, Attribute):
self.attributes[name] = sub_controller.root_attribute
self.__attributes[name] = sub_controller.root_attribute

@property
def sub_controllers(self) -> dict[str, BaseController]:
return self.__sub_controller_tree

def __repr__(self):
name = self.__class__.__name__
path = ".".join(self.path) or None
sub_controllers = list(self.sub_controllers.keys()) or None

return f"{name}(path={path}, sub_controllers={sub_controllers})"

def __setattr__(self, name, value):
if isinstance(value, Attribute):
self.add_attribute(name, value)
elif isinstance(value, Controller):
self.add_sub_controller(name, value)
else:
super().__setattr__(name, value)
return self.__sub_controllers


class Controller(BaseController):
"""Top-level controller for a device.

This is the primary class for implementing device support in FastCS. Instances of
this class can be loaded into a FastCS to expose its ``Attribute``s to the transport
layer, which can then perform a specific function with the set of ``Attributes``,
such as generating a UI or creating parameters for a control system.
"""
"""Controller containing Attributes and named sub Controllers"""

def __init__(
self,
Expand All @@ -218,8 +274,12 @@ async def disconnect(self) -> None:


class ControllerVector(MutableMapping[int, Controller], BaseController):
"""A controller with a collection of identical sub controllers distinguished
by a numeric value"""
"""Controller containing Attributes and indexed sub Controllers

The sub controllers registered with this Controller should be instances of the same
Controller type, distinguished only by an integer index. The indexes do not need
to be continiguous.
"""

def __init__(
self,
Expand Down
Loading