From 2277ce66e74bfd2e4ec10cefd2775f4533931f0e Mon Sep 17 00:00:00 2001 From: Marcell Nagy Date: Mon, 2 Dec 2024 12:29:57 +0000 Subject: [PATCH 01/11] Try to remove background thread --- .gitignore | 1 + pyproject.toml | 3 +- src/fastcs/attributes.py | 7 +- src/fastcs/backend.py | 49 +++--- src/fastcs/launch.py | 100 ++++++----- src/fastcs/transport/adapter.py | 8 +- src/fastcs/transport/epics/adapter.py | 24 ++- src/fastcs/transport/epics/ioc.py | 13 +- src/fastcs/transport/epics/options.py | 1 - src/fastcs/transport/epics/util.py | 6 +- src/fastcs/transport/graphQL/adapter.py | 10 +- src/fastcs/transport/graphQL/graphQL.py | 19 +- src/fastcs/transport/rest/adapter.py | 10 +- src/fastcs/transport/rest/rest.py | 15 +- src/fastcs/transport/tango/adapter.py | 19 +- src/fastcs/transport/tango/dsr.py | 80 +++++++-- tests/benchmarking/compose.yaml | 22 +++ tests/benchmarking/controller.py | 29 ++++ tests/benchmarking/test_benchmarking.py | 172 +++++++++++++++++++ tests/conftest.py | 84 ++++++++- tests/data/{config_full.yaml => config.yaml} | 9 +- tests/data/config_minimal.yaml | 3 - tests/ioc.py | 2 +- tests/test_backend.py | 48 ++---- tests/test_cli.py | 3 +- tests/test_launch.py | 24 +-- tests/transport/epics/test_ioc.py | 2 +- 27 files changed, 558 insertions(+), 205 deletions(-) create mode 100644 tests/benchmarking/compose.yaml create mode 100644 tests/benchmarking/controller.py create mode 100644 tests/benchmarking/test_benchmarking.py rename tests/data/{config_full.yaml => config.yaml} (55%) delete mode 100644 tests/data/config_minimal.yaml diff --git a/.gitignore b/.gitignore index 0f33bf297..6d4d09c6b 100644 --- a/.gitignore +++ b/.gitignore @@ -45,6 +45,7 @@ coverage.xml cov.xml .pytest_cache/ .mypy_cache/ +.benchmarks/ # Translations *.mo diff --git a/pyproject.toml b/pyproject.toml index c1663382a..d84c768b1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,6 +34,7 @@ dev = [ "pydata-sphinx-theme>=0.12", "pyright", "pytest", + "pytest-benchmark", "pytest-cov", "pytest-mock", "pytest-asyncio", @@ -69,7 +70,7 @@ reportMissingImports = false # Ignore missing stubs in imported modules [tool.pytest.ini_options] # Run pytest with all our checkers, and don't spam us with massive tracebacks on error addopts = """ - --tb=native -vv --doctest-modules --doctest-glob="*.rst" + --tb=native -vv --doctest-modules --doctest-glob="*.rst" --benchmark-sort=mean --benchmark-columns="mean, min, max, outliers, ops, rounds" """ # https://iscinumpy.gitlab.io/post/bound-version-constraints/#watch-for-warnings filterwarnings = "error" diff --git a/src/fastcs/attributes.py b/src/fastcs/attributes.py index b99b6b8f0..dc6fcca43 100644 --- a/src/fastcs/attributes.py +++ b/src/fastcs/attributes.py @@ -69,9 +69,10 @@ def __init__( allowed_values: list[T] | None = None, description: str | None = None, ) -> None: - assert ( - datatype.dtype in ATTRIBUTE_TYPES - ), f"Attr type must be one of {ATTRIBUTE_TYPES}, received type {datatype.dtype}" + assert datatype.dtype in ATTRIBUTE_TYPES, ( + f"Attr type must be one of {ATTRIBUTE_TYPES}" + f", received type {datatype.dtype}" + ) self._datatype: DataType[T] = datatype self._access_mode: AttrMode = access_mode self._group = group diff --git a/src/fastcs/backend.py b/src/fastcs/backend.py index 35ff2de0e..9ba8814b4 100644 --- a/src/fastcs/backend.py +++ b/src/fastcs/backend.py @@ -1,11 +1,8 @@ import asyncio from collections import defaultdict from collections.abc import Callable -from concurrent.futures import Future from types import MethodType -from softioc.asyncio_dispatcher import AsyncioDispatcher - from .attributes import AttrR, AttrW, Sender, Updater from .controller import Controller, SingleMapping from .exceptions import FastCSException @@ -15,19 +12,15 @@ class Backend: def __init__( self, controller: Controller, - loop: asyncio.AbstractEventLoop | None = None, + loop: asyncio.AbstractEventLoop, ): - self.dispatcher = AsyncioDispatcher(loop) - self._loop = self.dispatcher.loop + self._loop = loop self._controller = controller self._initial_coros = [controller.connect] - self._scan_futures: set[Future] = set() - - asyncio.run_coroutine_threadsafe( - self._controller.initialise(), self._loop - ).result() + self._scan_tasks: set[asyncio.Task] = set() + loop.run_until_complete(self._controller.initialise()) self._link_process_tasks() def _link_process_tasks(self): @@ -36,28 +29,26 @@ def _link_process_tasks(self): _link_attribute_sender_class(single_mapping) def __del__(self): - self.stop_scan_futures() + self._stop_scan_tasks() - def run(self): - self._run_initial_futures() - self.start_scan_futures() + async def serve(self): + await self._run_initial_tasks() + await self._start_scan_tasks() - def _run_initial_futures(self): + async def _run_initial_tasks(self): for coro in self._initial_coros: - future = asyncio.run_coroutine_threadsafe(coro(), self._loop) - future.result() + await coro() - def start_scan_futures(self): - self._scan_futures = { - asyncio.run_coroutine_threadsafe(coro(), self._loop) - for coro in _get_scan_coros(self._controller) + async def _start_scan_tasks(self): + self._scan_tasks = { + self._loop.create_task(coro()) for coro in _get_scan_coros(self._controller) } - def stop_scan_futures(self): - for future in self._scan_futures: - if not future.done(): + def _stop_scan_tasks(self): + for task in self._scan_tasks: + if not task.done(): try: - future.cancel() + task.cancel() except asyncio.CancelledError: pass @@ -83,9 +74,9 @@ def _link_attribute_sender_class(single_mapping: SingleMapping) -> None: for attr_name, attribute in single_mapping.attributes.items(): match attribute: case AttrW(sender=Sender()): - assert ( - not attribute.has_process_callback() - ), f"Cannot assign both put method and Sender object to {attr_name}" + assert not attribute.has_process_callback(), ( + f"Cannot assign both put method and Sender object to {attr_name}" + ) callback = _create_sender_callback(attribute, single_mapping.controller) attribute.set_process_callback(callback) diff --git a/src/fastcs/launch.py b/src/fastcs/launch.py index d1eb12ddf..7d4d93c83 100644 --- a/src/fastcs/launch.py +++ b/src/fastcs/launch.py @@ -1,3 +1,4 @@ +import asyncio import inspect import json from pathlib import Path @@ -19,7 +20,9 @@ from .transport.tango.options import TangoOptions # Define a type alias for transport options -TransportOptions: TypeAlias = EpicsOptions | TangoOptions | RestOptions | GraphQLOptions +TransportOptions: TypeAlias = list[ + EpicsOptions | TangoOptions | RestOptions | GraphQLOptions +] class FastCS: @@ -28,48 +31,63 @@ def __init__( controller: Controller, transport_options: TransportOptions, ): - self._backend = Backend(controller) - self._transport: TransportAdapter - match transport_options: - case EpicsOptions(): - from .transport.epics.adapter import EpicsTransport - - self._transport = EpicsTransport( - controller, - self._backend.dispatcher, - transport_options, - ) - case GraphQLOptions(): - from .transport.graphQL.adapter import GraphQLTransport - - self._transport = GraphQLTransport( - controller, - transport_options, - ) - case TangoOptions(): - from .transport.tango.adapter import TangoTransport - - self._transport = TangoTransport( - controller, - transport_options, - ) - case RestOptions(): - from .transport.rest.adapter import RestTransport - - self._transport = RestTransport( - controller, - transport_options, - ) + self._loop = asyncio.get_event_loop() + self._backend = Backend(controller, self._loop) + transport: TransportAdapter + self._transports: list[TransportAdapter] = [] + for option in transport_options: + match option: + case EpicsOptions(): + from .transport.epics.adapter import EpicsTransport + + transport = EpicsTransport( + controller, + self._loop, + option, + ) + case TangoOptions(): + from .transport.tango.adapter import TangoTransport + + transport = TangoTransport( + controller, + self._loop, + option, + ) + case RestOptions(): + from .transport.rest.adapter import RestTransport + + transport = RestTransport( + controller, + option, + ) + case GraphQLOptions(): + from .transport.graphQL.adapter import GraphQLTransport + + transport = GraphQLTransport( + controller, + option, + ) + self._transports.append(transport) def create_docs(self) -> None: - self._transport.create_docs() + for transport in self._transports: + if hasattr(transport.options, "docs"): + transport.create_docs() def create_gui(self) -> None: - self._transport.create_gui() + for transport in self._transports: + if hasattr(transport.options, "gui"): + transport.create_docs() - def run(self) -> None: - self._backend.run() - self._transport.run() + def run(self): + self._loop.run_until_complete( + self.serve(), + ) + + async def serve(self) -> None: + coros = [self._backend.serve()] + coros.extend([transport.serve() for transport in self._transports]) + await asyncio.gather(*coros) def launch( @@ -158,10 +176,8 @@ def run( instance_options.transport, ) - if "gui" in options_yaml["transport"]: - instance.create_gui() - if "docs" in options_yaml["transport"]: - instance.create_docs() + instance.create_gui() + instance.create_docs() instance.run() @launch_typer.command(name="version", help=f"{controller_class.__name__} version") diff --git a/src/fastcs/transport/adapter.py b/src/fastcs/transport/adapter.py index bb2ce856e..af18bd2a5 100644 --- a/src/fastcs/transport/adapter.py +++ b/src/fastcs/transport/adapter.py @@ -1,9 +1,15 @@ from abc import ABC, abstractmethod +from typing import Any class TransportAdapter(ABC): + @property @abstractmethod - def run(self) -> None: + def options(self) -> Any: + pass + + @abstractmethod + async def serve(self) -> None: pass @abstractmethod diff --git a/src/fastcs/transport/epics/adapter.py b/src/fastcs/transport/epics/adapter.py index 3162d170f..4383fb579 100644 --- a/src/fastcs/transport/epics/adapter.py +++ b/src/fastcs/transport/epics/adapter.py @@ -1,4 +1,4 @@ -from softioc.asyncio_dispatcher import AsyncioDispatcher +import asyncio from fastcs.controller import Controller from fastcs.transport.adapter import TransportAdapter @@ -13,14 +13,22 @@ class EpicsTransport(TransportAdapter): def __init__( self, controller: Controller, - dispatcher: AsyncioDispatcher, + loop: asyncio.AbstractEventLoop, options: EpicsOptions | None = None, ) -> None: - self.options = options or EpicsOptions() self._controller = controller - self._dispatcher = dispatcher + self._loop = loop + self._options = options or EpicsOptions() self._pv_prefix = self.options.ioc.pv_prefix - self._ioc = EpicsIOC(self.options.ioc.pv_prefix, controller) + self._ioc = EpicsIOC( + self.options.ioc.pv_prefix, + controller, + self._options.ioc, + ) + + @property + def options(self) -> EpicsOptions: + return self._options def create_docs(self) -> None: EpicsDocs(self._controller).create_docs(self.options.docs) @@ -28,5 +36,7 @@ def create_docs(self) -> None: def create_gui(self) -> None: EpicsGUI(self._controller, self._pv_prefix).create_gui(self.options.gui) - def run(self): - self._ioc.run(self._dispatcher) + async def serve(self) -> None: + self._ioc.run(self._loop) + while True: + await asyncio.sleep(1) diff --git a/src/fastcs/transport/epics/ioc.py b/src/fastcs/transport/epics/ioc.py index eca534004..bbec96c14 100644 --- a/src/fastcs/transport/epics/ioc.py +++ b/src/fastcs/transport/epics/ioc.py @@ -1,3 +1,4 @@ +import asyncio from collections.abc import Callable from dataclasses import asdict from types import MethodType @@ -50,7 +51,7 @@ def __init__( controller: Controller, options: EpicsIOCOptions | None = None, ): - self.options = options or EpicsIOCOptions() + self._options = options or EpicsIOCOptions() self._controller = controller _add_pvi_info(f"{pv_prefix}:PVI") _add_sub_controller_pvi_info(pv_prefix, controller) @@ -60,18 +61,12 @@ def __init__( def run( self, - dispatcher: AsyncioDispatcher, + loop: asyncio.AbstractEventLoop, ) -> None: + dispatcher = AsyncioDispatcher(loop) # Needs running loop builder.LoadDatabase() softioc.iocInit(dispatcher) - if self.options.terminal: - context = { - "dispatcher": dispatcher, - "controller": self._controller, - } - softioc.interactive_ioc(context) - def _add_pvi_info( pvi: str, diff --git a/src/fastcs/transport/epics/options.py b/src/fastcs/transport/epics/options.py index c70fa2206..6bcb4d8f9 100644 --- a/src/fastcs/transport/epics/options.py +++ b/src/fastcs/transport/epics/options.py @@ -23,7 +23,6 @@ class EpicsGUIOptions: @dataclass class EpicsIOCOptions: - terminal: bool = True pv_prefix: str = "MY-DEVICE-PREFIX" diff --git a/src/fastcs/transport/epics/util.py b/src/fastcs/transport/epics/util.py index b1ffa6089..c63b3cf85 100644 --- a/src/fastcs/transport/epics/util.py +++ b/src/fastcs/transport/epics/util.py @@ -36,9 +36,9 @@ def attr_is_enum(attribute: Attribute) -> bool: """ match attribute: - case Attribute( - datatype=String(), allowed_values=allowed_values - ) if allowed_values is not None and len(allowed_values) <= MBB_MAX_CHOICES: + case Attribute(datatype=String(), allowed_values=allowed_values) if ( + allowed_values is not None and len(allowed_values) <= MBB_MAX_CHOICES + ): return True case _: return False diff --git a/src/fastcs/transport/graphQL/adapter.py b/src/fastcs/transport/graphQL/adapter.py index 5b573c020..2c7eaec7c 100644 --- a/src/fastcs/transport/graphQL/adapter.py +++ b/src/fastcs/transport/graphQL/adapter.py @@ -11,14 +11,18 @@ def __init__( controller: Controller, options: GraphQLOptions | None = None, ): - self.options = options or GraphQLOptions() + self._options = options or GraphQLOptions() self._server = GraphQLServer(controller) + @property + def options(self) -> GraphQLOptions: + return self._options + def create_docs(self) -> None: raise NotImplementedError def create_gui(self) -> None: raise NotImplementedError - def run(self) -> None: - self._server.run(self.options.gql) + async def serve(self) -> None: + await self._server.serve(self.options.gql) diff --git a/src/fastcs/transport/graphQL/graphQL.py b/src/fastcs/transport/graphQL/graphQL.py index 85bde07b5..da4d676ec 100644 --- a/src/fastcs/transport/graphQL/graphQL.py +++ b/src/fastcs/transport/graphQL/graphQL.py @@ -31,16 +31,17 @@ def _create_app(self) -> GraphQL: return app - def run(self, options: GraphQLServerOptions | None = None) -> None: - if options is None: - options = GraphQLServerOptions() - - uvicorn.run( - self._app, - host=options.host, - port=options.port, - log_level=options.log_level, + async def serve(self, options: GraphQLServerOptions | None = None) -> None: + options = options or GraphQLServerOptions() + self._serv = uvicorn.Server( + uvicorn.Config( + app=self._app, + host=options.host, + port=options.port, + log_level=options.log_level, + ) ) + await self._serv.serve() class GraphQLAPI: diff --git a/src/fastcs/transport/rest/adapter.py b/src/fastcs/transport/rest/adapter.py index dd96ad819..7e98c5da9 100644 --- a/src/fastcs/transport/rest/adapter.py +++ b/src/fastcs/transport/rest/adapter.py @@ -11,14 +11,18 @@ def __init__( controller: Controller, options: RestOptions | None = None, ): - self.options = options or RestOptions() + self._options = options or RestOptions() self._server = RestServer(controller) + @property + def options(self) -> RestOptions: + return self._options + def create_docs(self) -> None: raise NotImplementedError def create_gui(self) -> None: raise NotImplementedError - def run(self) -> None: - self._server.run(self.options.rest) + async def serve(self) -> None: + await self._server.serve(self.options.rest) diff --git a/src/fastcs/transport/rest/rest.py b/src/fastcs/transport/rest/rest.py index 9eb0e9254..3384e5713 100644 --- a/src/fastcs/transport/rest/rest.py +++ b/src/fastcs/transport/rest/rest.py @@ -23,14 +23,17 @@ def _create_app(self): return app - def run(self, options: RestServerOptions | None) -> None: + async def serve(self, options: RestServerOptions | None): options = options or RestServerOptions() - uvicorn.run( - self._app, - host=options.host, - port=options.port, - log_level=options.log_level, + self._serv = uvicorn.Server( + uvicorn.Config( + app=self._app, + host=options.host, + port=options.port, + log_level=options.log_level, + ) ) + await self._serv.serve() def _put_request_body(attribute: AttrW[T]): diff --git a/src/fastcs/transport/tango/adapter.py b/src/fastcs/transport/tango/adapter.py index 7c5f103ad..73841faca 100644 --- a/src/fastcs/transport/tango/adapter.py +++ b/src/fastcs/transport/tango/adapter.py @@ -1,3 +1,5 @@ +import asyncio + from fastcs.controller import Controller from fastcs.transport.adapter import TransportAdapter @@ -9,10 +11,15 @@ class TangoTransport(TransportAdapter): def __init__( self, controller: Controller, + loop: asyncio.AbstractEventLoop | None = None, options: TangoOptions | None = None, ): - self.options = options or TangoOptions() - self._dsr = TangoDSR(controller) + self._options = options or TangoOptions() + self._dsr = TangoDSR(controller, loop) + + @property + def options(self) -> TangoOptions: + return self._options def create_docs(self) -> None: raise NotImplementedError @@ -20,5 +27,9 @@ def create_docs(self) -> None: def create_gui(self) -> None: raise NotImplementedError - def run(self) -> None: - self._dsr.run(self.options.dsr) + async def serve(self) -> None: + coro = asyncio.to_thread( + self._dsr.run, + self.options.dsr, + ) + await coro diff --git a/src/fastcs/transport/tango/dsr.py b/src/fastcs/transport/tango/dsr.py index 9b7f6f716..130b97341 100644 --- a/src/fastcs/transport/tango/dsr.py +++ b/src/fastcs/transport/tango/dsr.py @@ -1,3 +1,4 @@ +import asyncio from collections.abc import Awaitable, Callable from typing import Any @@ -13,7 +14,9 @@ def _wrap_updater_fget( - attr_name: str, attribute: AttrR, controller: BaseController + attr_name: str, + attribute: AttrR, + controller: BaseController, ) -> Callable[[Any], Any]: async def fget(tango_device: Device): tango_device.info_stream(f"called fget method: {attr_name}") @@ -31,16 +34,29 @@ def _tango_display_format(attribute: Attribute) -> str: def _wrap_updater_fset( - attr_name: str, attribute: AttrW, controller: BaseController + attr_name: str, + attribute: AttrW, + controller: BaseController, + loop: asyncio.AbstractEventLoop | None = None, ) -> Callable[[Any, Any], Any]: - async def fset(tango_device: Device, val): - tango_device.info_stream(f"called fset method: {attr_name}") - await attribute.process(val) + if loop: + + async def fset(tango_device: Device, val): + tango_device.info_stream(f"called fset method: {attr_name}") + future = asyncio.run_coroutine_threadsafe(attribute.process(val), loop) + await asyncio.wrap_future(future) + else: + + async def fset(tango_device: Device, val): + tango_device.info_stream(f"called fset method: {attr_name}") + await attribute.process(val) return fset -def _collect_dev_attributes(controller: BaseController) -> dict[str, Any]: +def _collect_dev_attributes( + controller: BaseController, loop: asyncio.AbstractEventLoop | None = None +) -> dict[str, Any]: collection: dict[str, Any] = {} for single_mapping in controller.get_controller_mappings(): path = single_mapping.controller.path @@ -58,7 +74,7 @@ def _collect_dev_attributes(controller: BaseController) -> dict[str, Any]: attr_name, attribute, single_mapping.controller ), fset=_wrap_updater_fset( - attr_name, attribute, single_mapping.controller + attr_name, attribute, single_mapping.controller, loop ), access=AttrWriteType.READ_WRITE, format=_tango_display_format(attribute), @@ -79,7 +95,7 @@ def _collect_dev_attributes(controller: BaseController) -> dict[str, Any]: dtype=attribute.datatype.dtype, access=AttrWriteType.WRITE, fset=_wrap_updater_fset( - attr_name, attribute, single_mapping.controller + attr_name, attribute, single_mapping.controller, loop ), format=_tango_display_format(attribute), ) @@ -88,19 +104,38 @@ def _collect_dev_attributes(controller: BaseController) -> dict[str, Any]: def _wrap_command_f( - method_name: str, method: Callable, controller: BaseController + method_name: str, + method: Callable, + controller: BaseController, + loop: asyncio.AbstractEventLoop | None = None, ) -> Callable[..., Awaitable[None]]: - async def _dynamic_f(tango_device: Device) -> None: - tango_device.info_stream( - f"called {'_'.join(controller.path)} f method: {method_name}" - ) - return await getattr(controller, method.__name__)() + if loop: + + async def _dynamic_f(tango_device: Device) -> None: + tango_device.info_stream( + f"called {'_'.join(controller.path)} f method: {method_name}" + ) + coro = getattr(controller, method.__name__)() + future = asyncio.run_coroutine_threadsafe(coro, loop) + await asyncio.wrap_future(future) + + else: + + async def _dynamic_f(tango_device: Device) -> None: + tango_device.info_stream( + f"called {'_'.join(controller.path)} f method: {method_name}" + ) + coro = getattr(controller, method.__name__)() + await coro _dynamic_f.__name__ = method_name return _dynamic_f -def _collect_dev_commands(controller: BaseController) -> dict[str, Any]: +def _collect_dev_commands( + controller: BaseController, + loop: asyncio.AbstractEventLoop | None = None, +) -> dict[str, Any]: collection: dict[str, Any] = {} for single_mapping in controller.get_controller_mappings(): path = single_mapping.controller.path @@ -109,7 +144,9 @@ def _collect_dev_commands(controller: BaseController) -> dict[str, Any]: cmd_name = name.title().replace("_", "") d_cmd_name = f"{'_'.join(path)}_{cmd_name}" if path else cmd_name collection[d_cmd_name] = server.command( - f=_wrap_command_f(d_cmd_name, method.fn, single_mapping.controller) + f=_wrap_command_f( + d_cmd_name, method.fn, single_mapping.controller, loop + ) ) return collection @@ -146,15 +183,20 @@ def _collect_dsr_args(options: TangoDSROptions) -> list[str]: class TangoDSR: - def __init__(self, controller: BaseController): + def __init__( + self, + controller: BaseController, + loop: asyncio.AbstractEventLoop | None = None, + ): self._controller = controller + self._loop = loop self.dev_class = self._controller.__class__.__name__ self._device = self._create_device() def _create_device(self): class_dict: dict = { - **_collect_dev_attributes(self._controller), - **_collect_dev_commands(self._controller), + **_collect_dev_attributes(self._controller, self._loop), + **_collect_dev_commands(self._controller, self._loop), **_collect_dev_properties(self._controller), **_collect_dev_init(self._controller), **_collect_dev_flags(self._controller), diff --git a/tests/benchmarking/compose.yaml b/tests/benchmarking/compose.yaml new file mode 100644 index 000000000..f9b68df1d --- /dev/null +++ b/tests/benchmarking/compose.yaml @@ -0,0 +1,22 @@ +# Spin up a local minimal tango cs + +version: "3.7" +services: + mysql: + image: registry.gitlab.com/tango-controls/docker/mysql:5 + environment: + - MYSQL_ROOT_PASSWORD=root + + tango-cs: + hostname: localhost + image: registry.gitlab.com/tango-controls/docker/tango-cs:9 + ports: + - "10000:10000" + environment: + - TANGO_HOST=localhost:10000 + - MYSQL_HOST=mysql:3306 + - MYSQL_USER=tango + - MYSQL_PASSWORD=tango + - MYSQL_DATABASE=tango + depends_on: + - mysql diff --git a/tests/benchmarking/controller.py b/tests/benchmarking/controller.py new file mode 100644 index 000000000..107b3f55f --- /dev/null +++ b/tests/benchmarking/controller.py @@ -0,0 +1,29 @@ +from fastcs import FastCS +from fastcs.attributes import AttrR, AttrW +from fastcs.controller import Controller +from fastcs.datatypes import Bool, Int +from fastcs.transport.epics.options import EpicsIOCOptions, EpicsOptions +from fastcs.transport.rest.options import RestOptions, RestServerOptions +from fastcs.transport.tango.options import TangoDSROptions, TangoOptions + + +class TestController(Controller): + read_int: AttrR = AttrR(Int(), initial_value=0) + write_bool: AttrW = AttrW(Bool()) + + +def run(): + transport_options = [ + RestOptions(rest=RestServerOptions(port=8090)), + EpicsOptions(ioc=EpicsIOCOptions(pv_prefix="BENCHMARK-DEVICE")), + TangoOptions(dsr=TangoDSROptions(dev_name="MY/BENCHMARK/DEVICE")), + ] + instance = FastCS( + TestController(), + transport_options, + ) + instance.run() + + +if __name__ == "__main__": + run() diff --git a/tests/benchmarking/test_benchmarking.py b/tests/benchmarking/test_benchmarking.py new file mode 100644 index 000000000..32ce502aa --- /dev/null +++ b/tests/benchmarking/test_benchmarking.py @@ -0,0 +1,172 @@ +import contextlib +import multiprocessing +import os + +import pytest +import requests +import tango +from p4p.client.thread import Context + +FASTCS_BENCHMARKING = os.getenv("FASTCS_BENCHMARKING") == "true" +GET_ENDPOINT = "http://localhost:8090/read-int" +PUT_ENDPOINT = "http://localhost:8090/write-bool" +GET_PV = "BENCHMARK-DEVICE:ReadInt" +PUT_PV = "BENCHMARK-DEVICE:WriteBool" +TANGO_DEVICE = "MY/BENCHMARK/DEVICE" +READ_ATTR = "ReadInt" +WRITE_ATTR = "WriteBool" +REST_CLIENTS = 9 +os.environ["TANGO_HOST"] = "localhost:10000" + + +def rest_get(): + requests.get(GET_ENDPOINT) + + +def rest_put(): + requests.put(PUT_ENDPOINT, json={"value": "false"}) + + +ctx = Context("pva") + + +def ca_get(): + ctx.get(GET_PV) + + +def ca_put(): + ctx.put(PUT_PV, 0) + + +def bg_get(event, url): + """Workload for a rest subprocess.""" + while not event.is_set(): + requests.get(url) + + +def bg_pass(event, url): + """Workload for a rest subprocess.""" + while not event.is_set(): + pass + + +@contextlib.contextmanager +def background_traffic(rest_target): + """Context manager to manage background clients""" + stop_event = multiprocessing.Event() + processes = [ + multiprocessing.Process( + target=bg_get if rest_target else bg_pass, + args=(stop_event, rest_target), + ) + for _ in range(REST_CLIENTS) + ] + for process in processes: + process.start() + + try: + yield processes + finally: + # Signal the processes to stop + stop_event.set() + # Wait for the processes to finish + for process in processes: + process.join() + + +@pytest.mark.skipif(not FASTCS_BENCHMARKING, reason="export FASTCS_BENCHMARKING=true") +@pytest.mark.benchmark(group="test-rest") +def test_rest_get(benchmark, test_controller): + benchmark(rest_get) + + +@pytest.mark.skipif(not FASTCS_BENCHMARKING, reason="export FASTCS_BENCHMARKING=true") +@pytest.mark.benchmark(group="test-rest") +def test_rest_get_loaded_request(benchmark, test_controller): + with background_traffic(GET_ENDPOINT): + benchmark(rest_get) + + +@pytest.mark.skipif(not FASTCS_BENCHMARKING, reason="export FASTCS_BENCHMARKING=true") +@pytest.mark.benchmark(group="test-rest") +def test_rest_get_loaded_baseline(benchmark, test_controller): + with background_traffic(None): + benchmark(rest_get) + + +@pytest.mark.skipif(not FASTCS_BENCHMARKING, reason="export FASTCS_BENCHMARKING=true") +@pytest.mark.benchmark(group="test-rest") +def test_rest_put(benchmark, test_controller): + benchmark(rest_put) + + +@pytest.mark.skipif(not FASTCS_BENCHMARKING, reason="export FASTCS_BENCHMARKING=true") +@pytest.mark.benchmark(group="test-ca") +def test_ca_get(benchmark, test_controller): + benchmark(ca_get) + + +@pytest.mark.skipif(not FASTCS_BENCHMARKING, reason="export FASTCS_BENCHMARKING=true") +@pytest.mark.benchmark(group="test-ca") +def test_ca_get_loaded_request(benchmark, test_controller): + with background_traffic(GET_ENDPOINT): + benchmark(ca_get) + + +@pytest.mark.skipif(not FASTCS_BENCHMARKING, reason="export FASTCS_BENCHMARKING=true") +@pytest.mark.benchmark(group="test-ca") +def test_ca_get_loaded_baseline(benchmark, test_controller): + with background_traffic(None): + benchmark(ca_get) + + +@pytest.mark.skipif(not FASTCS_BENCHMARKING, reason="export FASTCS_BENCHMARKING=true") +@pytest.mark.benchmark(group="test-ca") +def test_ca_put(benchmark, test_controller): + benchmark(ca_put) + + +@pytest.mark.skipif(not FASTCS_BENCHMARKING, reason="export FASTCS_BENCHMARKING=true") +@pytest.mark.benchmark(group="test-tango") +def test_tango_get(benchmark, test_controller): + device = tango.DeviceProxy(TANGO_DEVICE) + + def to_do(): + device.read_attribute(READ_ATTR) + + benchmark(to_do) + + +@pytest.mark.skipif(not FASTCS_BENCHMARKING, reason="export FASTCS_BENCHMARKING=true") +@pytest.mark.benchmark(group="test-tango") +def test_tango_get_loaded_request(benchmark, test_controller): + device = tango.DeviceProxy(TANGO_DEVICE) + + def to_do(): + device.read_attribute(READ_ATTR) + + with background_traffic(GET_ENDPOINT): + benchmark(to_do) + + +@pytest.mark.skipif(not FASTCS_BENCHMARKING, reason="export FASTCS_BENCHMARKING=true") +@pytest.mark.benchmark(group="test-tango") +def test_tango_get_loaded_baseline(benchmark, test_controller): + device = tango.DeviceProxy(TANGO_DEVICE) + + def to_do(): + device.read_attribute(READ_ATTR) + + with background_traffic(None): + benchmark(to_do) + + +@pytest.mark.skipif(not FASTCS_BENCHMARKING, reason="export FASTCS_BENCHMARKING=true") +@pytest.mark.benchmark(group="test-tango") +def test_tango_put(benchmark, test_controller): + device = tango.DeviceProxy(TANGO_DEVICE) + + def to_do(): + device.write_attribute(WRITE_ATTR, 0) + + benchmark(to_do) diff --git a/tests/conftest.py b/tests/conftest.py index fad58597e..831deddab 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,6 +1,7 @@ import copy import os import random +import signal import string import subprocess import time @@ -15,6 +16,7 @@ from fastcs.attributes import AttrR, AttrRW, AttrW, Handler, Sender, Updater from fastcs.controller import Controller, SubController from fastcs.datatypes import Bool, Float, Int, String +from fastcs.transport.tango.dsr import register_dev from fastcs.wrappers import command, scan DATA_PATH = Path(__file__).parent / "data" @@ -168,6 +170,7 @@ def assertable_controller(class_mocker: MockerFixture): @pytest.fixture(scope="module") def ioc(): + TIMEOUT = 10 process = subprocess.Popen( ["python", HERE / "ioc.py"], stdin=subprocess.PIPE, @@ -180,15 +183,84 @@ def ioc(): while "iocRun: All initialization complete" not in ( process.stdout.readline().strip() # type: ignore ): - if time.monotonic() - start_time > 10: + if time.monotonic() - start_time > TIMEOUT: raise TimeoutError("IOC did not start in time") yield # close backend caches before the event loop purge_channel_caches() - try: - print(process.communicate("exit")[0]) - except ValueError: - # Someone else already called communicate - pass + + # Close open files + for f in [process.stdin, process.stdout, process.stderr]: + if f: + f.close() + process.send_signal(signal.SIGINT) + process.wait(TIMEOUT) + + +@pytest.fixture(scope="session") +def tango_system(): + subprocess.run( + ["podman", "compose", "-f", HERE / "benchmarking" / "compose.yaml", "up", "-d"], + check=True, + ) + yield + subprocess.run( + ["podman", "compose", "-f", HERE / "benchmarking" / "compose.yaml", "down"], + check=True, + ) + + +@pytest.fixture(scope="session") +def register_device(): + ATTEMPTS = 10 + SLEEP = 1 + + if not os.getenv("TANGO_HOST"): + raise RuntimeError("TANGO_HOST not defined") + + for attempt in range(1, ATTEMPTS + 1): + try: + register_dev( + dev_name="MY/BENCHMARK/DEVICE", + dev_class="TestController", + dsr_instance="MY_SERVER_INSTANCE", + ) + break + except Exception: + time.sleep(SLEEP) + if attempt == ATTEMPTS: + raise TimeoutError("Tango device could not be registered") + + +@pytest.fixture(scope="session") +def test_controller(tango_system, register_device): + TIMEOUT = 10 + process = subprocess.Popen( + ["python", HERE / "benchmarking" / "controller.py"], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + universal_newlines=True, + ) + + start_time = time.monotonic() + while "Uvicorn running" not in ( + process.stdout.readline().strip() # type: ignore + ): + if time.monotonic() - start_time > TIMEOUT: + raise TimeoutError("Controller did not start in time") + + # close backend caches before the event loop + purge_channel_caches() + + # Stop buffer from getting full and blocking the subprocess + for f in [process.stdin, process.stdout, process.stderr]: + if f: + f.close() + + yield process + + process.send_signal(signal.SIGINT) + process.wait(TIMEOUT) diff --git a/tests/data/config_full.yaml b/tests/data/config.yaml similarity index 55% rename from tests/data/config_full.yaml rename to tests/data/config.yaml index b396f679c..bd53b6945 100644 --- a/tests/data/config_full.yaml +++ b/tests/data/config.yaml @@ -1,7 +1,10 @@ # yaml-language-server: $schema=schema.json transport: - ioc: {} - docs: {} - gui: {} + - ioc: {} + docs: {} + gui: {} + - rest: {} + - dsr: {} + - gql: {} controller: name: controller-name diff --git a/tests/data/config_minimal.yaml b/tests/data/config_minimal.yaml deleted file mode 100644 index 8b887af6a..000000000 --- a/tests/data/config_minimal.yaml +++ /dev/null @@ -1,3 +0,0 @@ -# yaml-language-server: $schema=schema.json -transport: - dsr: {} diff --git a/tests/ioc.py b/tests/ioc.py index 12918a675..a42ef6a29 100644 --- a/tests/ioc.py +++ b/tests/ioc.py @@ -23,7 +23,7 @@ def run(): epics_options = EpicsOptions(ioc=EpicsIOCOptions(pv_prefix="DEVICE")) controller = ParentController() controller.register_sub_controller("Child", ChildController()) - fastcs = FastCS(controller, epics_options) + fastcs = FastCS(controller, [epics_options]) fastcs.run() diff --git a/tests/test_backend.py b/tests/test_backend.py index c9f4f8e61..0a446a522 100644 --- a/tests/test_backend.py +++ b/tests/test_backend.py @@ -1,47 +1,31 @@ import asyncio -import pytest - from fastcs.backend import Backend -class DummyBackend(Backend): - def __init__(self, controller): - super().__init__(controller) - - self.init_task_called = False - self._initial_coros.append(self.init_task) - - async def init_task(self): - self.init_task_called = True - - def _run(self): - asyncio.run_coroutine_threadsafe(asyncio.sleep(0.3), self._loop) +def test_backend(controller): + loop = asyncio.get_event_loop() + backend = Backend(controller, loop) - -@pytest.mark.asyncio -async def test_backend(controller): - backend = DummyBackend(controller) - - # Controller should be initialised by Backend, but not connected + # Controller should be initialised by Backend and not connected assert controller.initialised assert not controller.connected # Controller Attributes with a Sender should have a _process_callback created assert controller.read_write_int.has_process_callback() - backend.run() - - # Controller should have been connected by Backend - assert controller.connected + async def test_wrapper(): + loop.create_task(backend.serve()) + await asyncio.sleep(0) # Yield to task - # Initial tasks should be complete - assert backend.init_task_called + # Controller should have been connected by Backend + assert controller.connected - # Scan tasks should be running - for _ in range(3): - count = controller.count - await asyncio.sleep(0.1) - assert controller.count > count + # Scan tasks should be running + for _ in range(3): + count = controller.count + await asyncio.sleep(0.01) + assert controller.count > count + backend._stop_scan_tasks() - backend.stop_scan_futures() + loop.run_until_complete(test_wrapper()) diff --git a/tests/test_cli.py b/tests/test_cli.py index ee8416e3c..7a16a0bfd 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -6,5 +6,4 @@ def test_cli_version(): cmd = [sys.executable, "-m", "fastcs", "--version"] - info = "INFO: PVXS QSRV2 is loaded, permitted, and ENABLED.\n" - assert subprocess.check_output(cmd).decode().strip() == info + __version__ + assert subprocess.check_output(cmd).decode().strip() == __version__ diff --git a/tests/test_launch.py b/tests/test_launch.py index cafae8742..c2732c88b 100644 --- a/tests/test_launch.py +++ b/tests/test_launch.py @@ -7,7 +7,9 @@ from typer.testing import CliRunner from fastcs.__main__ import __version__ +from fastcs.attributes import AttrR from fastcs.controller import Controller +from fastcs.datatypes import Int from fastcs.exceptions import LaunchError from fastcs.launch import TransportOptions, _launch, launch @@ -28,6 +30,8 @@ def __init__(self, arg): class IsHinted(Controller): + read = AttrR(Int()) + def __init__(self, arg: SomeConfig) -> None: super().__init__() @@ -102,7 +106,7 @@ def test_over_defined_schema(): def test_version(): impl_version = "0.0.1" - expected = f"SingleArg: {impl_version}\n" f"FastCS: {__version__}\n" + expected = f"SingleArg: {impl_version}\nFastCS: {__version__}\n" app = _launch(SingleArg, version=impl_version) result = runner.invoke(app, ["version"]) assert result.exit_code == 0 @@ -117,27 +121,13 @@ def test_no_version(): assert result.stdout == expected -def test_launch_minimal(mocker: MockerFixture, data): - run = mocker.patch("fastcs.launch.FastCS.run") - gui = mocker.patch("fastcs.launch.FastCS.create_gui") - docs = mocker.patch("fastcs.launch.FastCS.create_docs") - - app = _launch(SingleArg) - result = runner.invoke(app, ["run", str(data / "config_minimal.yaml")]) - assert result.exit_code == 0 - - run.assert_called_once() - gui.assert_not_called() - docs.assert_not_called() - - -def test_launch_full(mocker: MockerFixture, data): +def test_launch(mocker: MockerFixture, data): run = mocker.patch("fastcs.launch.FastCS.run") gui = mocker.patch("fastcs.launch.FastCS.create_gui") docs = mocker.patch("fastcs.launch.FastCS.create_docs") app = _launch(IsHinted) - result = runner.invoke(app, ["run", str(data / "config_full.yaml")]) + result = runner.invoke(app, ["run", str(data / "config.yaml")]) assert result.exit_code == 0 run.assert_called_once() diff --git a/tests/transport/epics/test_ioc.py b/tests/transport/epics/test_ioc.py index 516cb906b..3e71891eb 100644 --- a/tests/transport/epics/test_ioc.py +++ b/tests/transport/epics/test_ioc.py @@ -436,7 +436,7 @@ def test_long_pv_names_discarded(mocker: MockerFixture): assert long_name_controller.command_short_name.fastcs_method.enabled long_command_name = ( - "command_with_" "reallyreallyreallyreallyreallyreallyreally_long_name" + "command_with_reallyreallyreallyreallyreallyreallyreally_long_name" ) assert not getattr(long_name_controller, long_command_name).fastcs_method.enabled From c7bea6dfbb8eb1d8d744dcea2049ba81ee4a941a Mon Sep 17 00:00:00 2001 From: Marcell Nagy Date: Tue, 21 Jan 2025 15:46:15 +0000 Subject: [PATCH 02/11] Rename test_ca to test_epics for consistency --- tests/benchmarking/test_benchmarking.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/tests/benchmarking/test_benchmarking.py b/tests/benchmarking/test_benchmarking.py index 32ce502aa..51e09b645 100644 --- a/tests/benchmarking/test_benchmarking.py +++ b/tests/benchmarking/test_benchmarking.py @@ -101,28 +101,28 @@ def test_rest_put(benchmark, test_controller): @pytest.mark.skipif(not FASTCS_BENCHMARKING, reason="export FASTCS_BENCHMARKING=true") -@pytest.mark.benchmark(group="test-ca") -def test_ca_get(benchmark, test_controller): +@pytest.mark.benchmark(group="test-epics") +def test_epics_get(benchmark, test_controller): benchmark(ca_get) @pytest.mark.skipif(not FASTCS_BENCHMARKING, reason="export FASTCS_BENCHMARKING=true") -@pytest.mark.benchmark(group="test-ca") -def test_ca_get_loaded_request(benchmark, test_controller): +@pytest.mark.benchmark(group="test-epics") +def test_epics_get_loaded_request(benchmark, test_controller): with background_traffic(GET_ENDPOINT): benchmark(ca_get) @pytest.mark.skipif(not FASTCS_BENCHMARKING, reason="export FASTCS_BENCHMARKING=true") -@pytest.mark.benchmark(group="test-ca") -def test_ca_get_loaded_baseline(benchmark, test_controller): +@pytest.mark.benchmark(group="test-epics") +def test_epics_get_loaded_baseline(benchmark, test_controller): with background_traffic(None): benchmark(ca_get) @pytest.mark.skipif(not FASTCS_BENCHMARKING, reason="export FASTCS_BENCHMARKING=true") -@pytest.mark.benchmark(group="test-ca") -def test_ca_put(benchmark, test_controller): +@pytest.mark.benchmark(group="test-epics") +def test_epics_put(benchmark, test_controller): benchmark(ca_put) From 375bc0f1e0383aa5f3cc2de44a71690af0217294 Mon Sep 17 00:00:00 2001 From: Marcell Nagy Date: Thu, 23 Jan 2025 11:35:48 +0000 Subject: [PATCH 03/11] Add missing test schema --- tests/data/schema.json | 218 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 218 insertions(+) create mode 100644 tests/data/schema.json diff --git a/tests/data/schema.json b/tests/data/schema.json new file mode 100644 index 000000000..a8dce4d26 --- /dev/null +++ b/tests/data/schema.json @@ -0,0 +1,218 @@ +{ + "$defs": { + "EpicsDocsOptions": { + "properties": { + "path": { + "default": "/home/esq51579/WIP/FastCS", + "format": "path", + "title": "Path", + "type": "string" + }, + "depth": { + "anyOf": [ + { + "type": "integer" + }, + { + "type": "null" + } + ], + "default": null, + "title": "Depth" + } + }, + "title": "EpicsDocsOptions", + "type": "object" + }, + "EpicsGUIFormat": { + "enum": [ + ".bob", + ".edl" + ], + "title": "EpicsGUIFormat", + "type": "string" + }, + "EpicsGUIOptions": { + "properties": { + "output_path": { + "default": "/home/esq51579/WIP/FastCS/output.bob", + "format": "path", + "title": "Output Path", + "type": "string" + }, + "file_format": { + "$ref": "#/$defs/EpicsGUIFormat", + "default": ".bob" + }, + "title": { + "default": "Simple Device", + "title": "Title", + "type": "string" + } + }, + "title": "EpicsGUIOptions", + "type": "object" + }, + "EpicsIOCOptions": { + "properties": { + "pv_prefix": { + "default": "MY-DEVICE-PREFIX", + "title": "Pv Prefix", + "type": "string" + } + }, + "title": "EpicsIOCOptions", + "type": "object" + }, + "EpicsOptions": { + "properties": { + "docs": { + "$ref": "#/$defs/EpicsDocsOptions" + }, + "gui": { + "$ref": "#/$defs/EpicsGUIOptions" + }, + "ioc": { + "$ref": "#/$defs/EpicsIOCOptions" + } + }, + "title": "EpicsOptions", + "type": "object" + }, + "GraphQLOptions": { + "properties": { + "gql": { + "$ref": "#/$defs/GraphQLServerOptions" + } + }, + "title": "GraphQLOptions", + "type": "object" + }, + "GraphQLServerOptions": { + "properties": { + "host": { + "default": "localhost", + "title": "Host", + "type": "string" + }, + "port": { + "default": 8080, + "title": "Port", + "type": "integer" + }, + "log_level": { + "default": "info", + "title": "Log Level", + "type": "string" + } + }, + "title": "GraphQLServerOptions", + "type": "object" + }, + "RestOptions": { + "properties": { + "rest": { + "$ref": "#/$defs/RestServerOptions" + } + }, + "title": "RestOptions", + "type": "object" + }, + "RestServerOptions": { + "properties": { + "host": { + "default": "localhost", + "title": "Host", + "type": "string" + }, + "port": { + "default": 8080, + "title": "Port", + "type": "integer" + }, + "log_level": { + "default": "info", + "title": "Log Level", + "type": "string" + } + }, + "title": "RestServerOptions", + "type": "object" + }, + "SomeConfig": { + "properties": { + "name": { + "title": "Name", + "type": "string" + } + }, + "required": [ + "name" + ], + "title": "SomeConfig", + "type": "object" + }, + "TangoDSROptions": { + "properties": { + "dev_name": { + "default": "MY/DEVICE/NAME", + "title": "Dev Name", + "type": "string" + }, + "dsr_instance": { + "default": "MY_SERVER_INSTANCE", + "title": "Dsr Instance", + "type": "string" + }, + "debug": { + "default": false, + "title": "Debug", + "type": "boolean" + } + }, + "title": "TangoDSROptions", + "type": "object" + }, + "TangoOptions": { + "properties": { + "dsr": { + "$ref": "#/$defs/TangoDSROptions" + } + }, + "title": "TangoOptions", + "type": "object" + } + }, + "additionalProperties": false, + "properties": { + "controller": { + "$ref": "#/$defs/SomeConfig" + }, + "transport": { + "items": { + "anyOf": [ + { + "$ref": "#/$defs/EpicsOptions" + }, + { + "$ref": "#/$defs/TangoOptions" + }, + { + "$ref": "#/$defs/RestOptions" + }, + { + "$ref": "#/$defs/GraphQLOptions" + } + ] + }, + "title": "Transport", + "type": "array" + } + }, + "required": [ + "controller", + "transport" + ], + "title": "IsHinted", + "type": "object" +} \ No newline at end of file From 8ac899ab0be29f3f84a7b8336830ee5d304dffdd Mon Sep 17 00:00:00 2001 From: Marcell Nagy Date: Thu, 23 Jan 2025 11:37:10 +0000 Subject: [PATCH 04/11] Abstract away epics protocol at benchmarking --- tests/benchmarking/test_benchmarking.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/benchmarking/test_benchmarking.py b/tests/benchmarking/test_benchmarking.py index 51e09b645..27f351c5e 100644 --- a/tests/benchmarking/test_benchmarking.py +++ b/tests/benchmarking/test_benchmarking.py @@ -30,11 +30,11 @@ def rest_put(): ctx = Context("pva") -def ca_get(): +def epics_get(): ctx.get(GET_PV) -def ca_put(): +def epics_put(): ctx.put(PUT_PV, 0) @@ -103,27 +103,27 @@ def test_rest_put(benchmark, test_controller): @pytest.mark.skipif(not FASTCS_BENCHMARKING, reason="export FASTCS_BENCHMARKING=true") @pytest.mark.benchmark(group="test-epics") def test_epics_get(benchmark, test_controller): - benchmark(ca_get) + benchmark(epics_get) @pytest.mark.skipif(not FASTCS_BENCHMARKING, reason="export FASTCS_BENCHMARKING=true") @pytest.mark.benchmark(group="test-epics") def test_epics_get_loaded_request(benchmark, test_controller): with background_traffic(GET_ENDPOINT): - benchmark(ca_get) + benchmark(epics_get) @pytest.mark.skipif(not FASTCS_BENCHMARKING, reason="export FASTCS_BENCHMARKING=true") @pytest.mark.benchmark(group="test-epics") def test_epics_get_loaded_baseline(benchmark, test_controller): with background_traffic(None): - benchmark(ca_get) + benchmark(epics_get) @pytest.mark.skipif(not FASTCS_BENCHMARKING, reason="export FASTCS_BENCHMARKING=true") @pytest.mark.benchmark(group="test-epics") def test_epics_put(benchmark, test_controller): - benchmark(ca_put) + benchmark(epics_put) @pytest.mark.skipif(not FASTCS_BENCHMARKING, reason="export FASTCS_BENCHMARKING=true") From 3408d635ff90d6e1c5bf424bf65ed2a4ca3f8309 Mon Sep 17 00:00:00 2001 From: Marcell Nagy Date: Thu, 23 Jan 2025 11:40:51 +0000 Subject: [PATCH 05/11] Rename _serv to _server --- src/fastcs/transport/graphQL/graphQL.py | 4 ++-- src/fastcs/transport/rest/rest.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/fastcs/transport/graphQL/graphQL.py b/src/fastcs/transport/graphQL/graphQL.py index da4d676ec..106edc47d 100644 --- a/src/fastcs/transport/graphQL/graphQL.py +++ b/src/fastcs/transport/graphQL/graphQL.py @@ -33,7 +33,7 @@ def _create_app(self) -> GraphQL: async def serve(self, options: GraphQLServerOptions | None = None) -> None: options = options or GraphQLServerOptions() - self._serv = uvicorn.Server( + self._server = uvicorn.Server( uvicorn.Config( app=self._app, host=options.host, @@ -41,7 +41,7 @@ async def serve(self, options: GraphQLServerOptions | None = None) -> None: log_level=options.log_level, ) ) - await self._serv.serve() + await self._server.serve() class GraphQLAPI: diff --git a/src/fastcs/transport/rest/rest.py b/src/fastcs/transport/rest/rest.py index 3384e5713..cf61a6305 100644 --- a/src/fastcs/transport/rest/rest.py +++ b/src/fastcs/transport/rest/rest.py @@ -25,7 +25,7 @@ def _create_app(self): async def serve(self, options: RestServerOptions | None): options = options or RestServerOptions() - self._serv = uvicorn.Server( + self._server = uvicorn.Server( uvicorn.Config( app=self._app, host=options.host, @@ -33,7 +33,7 @@ async def serve(self, options: RestServerOptions | None): log_level=options.log_level, ) ) - await self._serv.serve() + await self._server.serve() def _put_request_body(attribute: AttrW[T]): From a192058b74fa94af22a7dba55aa6606509193ac4 Mon Sep 17 00:00:00 2001 From: Marcell Nagy Date: Thu, 23 Jan 2025 12:41:44 +0000 Subject: [PATCH 06/11] initial_tasks -> initial_coros --- src/fastcs/backend.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/fastcs/backend.py b/src/fastcs/backend.py index 9ba8814b4..68edf6b99 100644 --- a/src/fastcs/backend.py +++ b/src/fastcs/backend.py @@ -32,10 +32,10 @@ def __del__(self): self._stop_scan_tasks() async def serve(self): - await self._run_initial_tasks() + await self._run_initial_coros() await self._start_scan_tasks() - async def _run_initial_tasks(self): + async def _run_initial_coros(self): for coro in self._initial_coros: await coro() From c3993eaeed42948b503c33756212bb7ae63713d3 Mon Sep 17 00:00:00 2001 From: Marcell Nagy Date: Thu, 23 Jan 2025 13:09:29 +0000 Subject: [PATCH 07/11] Make ruff happy for a file that has nothing to do with it --- tests/data/schema.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/data/schema.json b/tests/data/schema.json index a8dce4d26..847a9abb0 100644 --- a/tests/data/schema.json +++ b/tests/data/schema.json @@ -215,4 +215,4 @@ ], "title": "IsHinted", "type": "object" -} \ No newline at end of file +} From e054b16bfc04392fa2b5e49330da77b1e17fc0c3 Mon Sep 17 00:00:00 2001 From: Marcell Nagy Date: Thu, 23 Jan 2025 14:17:15 +0000 Subject: [PATCH 08/11] Make ruff happy until it changes its mind again --- src/fastcs/attributes.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/fastcs/attributes.py b/src/fastcs/attributes.py index b99b6b8f0..de984a071 100644 --- a/src/fastcs/attributes.py +++ b/src/fastcs/attributes.py @@ -69,9 +69,9 @@ def __init__( allowed_values: list[T] | None = None, description: str | None = None, ) -> None: - assert ( - datatype.dtype in ATTRIBUTE_TYPES - ), f"Attr type must be one of {ATTRIBUTE_TYPES}, received type {datatype.dtype}" + assert datatype.dtype in ATTRIBUTE_TYPES, ( + f"Attr type must be one of {ATTRIBUTE_TYPES}, received type {datatype.dtype}" + ) self._datatype: DataType[T] = datatype self._access_mode: AttrMode = access_mode self._group = group From b901305b0015c29321a4748eff655d7737c41fdc Mon Sep 17 00:00:00 2001 From: Marcell Nagy Date: Thu, 23 Jan 2025 14:21:40 +0000 Subject: [PATCH 09/11] Save ruff from itself --- src/fastcs/attributes.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/fastcs/attributes.py b/src/fastcs/attributes.py index de984a071..dc6fcca43 100644 --- a/src/fastcs/attributes.py +++ b/src/fastcs/attributes.py @@ -70,7 +70,8 @@ def __init__( description: str | None = None, ) -> None: assert datatype.dtype in ATTRIBUTE_TYPES, ( - f"Attr type must be one of {ATTRIBUTE_TYPES}, received type {datatype.dtype}" + f"Attr type must be one of {ATTRIBUTE_TYPES}" + f", received type {datatype.dtype}" ) self._datatype: DataType[T] = datatype self._access_mode: AttrMode = access_mode From 9eb90b702c055f95665df87cbef627ed55c1699e Mon Sep 17 00:00:00 2001 From: Marcell Nagy Date: Fri, 24 Jan 2025 11:42:51 +0000 Subject: [PATCH 10/11] Patch tango methods over branching --- src/fastcs/transport/tango/adapter.py | 2 +- src/fastcs/transport/tango/dsr.py | 62 ++++++++++++--------------- tests/transport/tango/test_dsr.py | 26 +++++++++-- 3 files changed, 50 insertions(+), 40 deletions(-) diff --git a/src/fastcs/transport/tango/adapter.py b/src/fastcs/transport/tango/adapter.py index 73841faca..40018694b 100644 --- a/src/fastcs/transport/tango/adapter.py +++ b/src/fastcs/transport/tango/adapter.py @@ -11,7 +11,7 @@ class TangoTransport(TransportAdapter): def __init__( self, controller: Controller, - loop: asyncio.AbstractEventLoop | None = None, + loop: asyncio.AbstractEventLoop, options: TangoOptions | None = None, ): self._options = options or TangoOptions() diff --git a/src/fastcs/transport/tango/dsr.py b/src/fastcs/transport/tango/dsr.py index 130b97341..7cb3546b7 100644 --- a/src/fastcs/transport/tango/dsr.py +++ b/src/fastcs/transport/tango/dsr.py @@ -1,5 +1,5 @@ import asyncio -from collections.abc import Awaitable, Callable +from collections.abc import Awaitable, Callable, Coroutine from typing import Any import tango @@ -33,29 +33,33 @@ def _tango_display_format(attribute: Attribute) -> str: return "6.2f" # `tango.server.attribute` default for `format` +async def _run_threadsafe_blocking( + coro: Coroutine[Any, Any, Any], loop: asyncio.AbstractEventLoop +) -> None: + """ + Wraps a concurrent.futures.Future object as an + asyncio.Future to make it awaitable and then awaits it + """ + future = asyncio.run_coroutine_threadsafe(coro, loop) + await asyncio.wrap_future(future) + + def _wrap_updater_fset( attr_name: str, attribute: AttrW, controller: BaseController, - loop: asyncio.AbstractEventLoop | None = None, + loop: asyncio.AbstractEventLoop, ) -> Callable[[Any, Any], Any]: - if loop: - - async def fset(tango_device: Device, val): - tango_device.info_stream(f"called fset method: {attr_name}") - future = asyncio.run_coroutine_threadsafe(attribute.process(val), loop) - await asyncio.wrap_future(future) - else: - - async def fset(tango_device: Device, val): - tango_device.info_stream(f"called fset method: {attr_name}") - await attribute.process(val) + async def fset(tango_device: Device, val): + tango_device.info_stream(f"called fset method: {attr_name}") + coro = attribute.process(val) + await _run_threadsafe_blocking(coro, loop) return fset def _collect_dev_attributes( - controller: BaseController, loop: asyncio.AbstractEventLoop | None = None + controller: BaseController, loop: asyncio.AbstractEventLoop ) -> dict[str, Any]: collection: dict[str, Any] = {} for single_mapping in controller.get_controller_mappings(): @@ -107,26 +111,14 @@ def _wrap_command_f( method_name: str, method: Callable, controller: BaseController, - loop: asyncio.AbstractEventLoop | None = None, + loop: asyncio.AbstractEventLoop, ) -> Callable[..., Awaitable[None]]: - if loop: - - async def _dynamic_f(tango_device: Device) -> None: - tango_device.info_stream( - f"called {'_'.join(controller.path)} f method: {method_name}" - ) - coro = getattr(controller, method.__name__)() - future = asyncio.run_coroutine_threadsafe(coro, loop) - await asyncio.wrap_future(future) - - else: - - async def _dynamic_f(tango_device: Device) -> None: - tango_device.info_stream( - f"called {'_'.join(controller.path)} f method: {method_name}" - ) - coro = getattr(controller, method.__name__)() - await coro + async def _dynamic_f(tango_device: Device) -> None: + tango_device.info_stream( + f"called {'_'.join(controller.path)} f method: {method_name}" + ) + coro = getattr(controller, method.__name__)() + await _run_threadsafe_blocking(coro, loop) _dynamic_f.__name__ = method_name return _dynamic_f @@ -134,7 +126,7 @@ async def _dynamic_f(tango_device: Device) -> None: def _collect_dev_commands( controller: BaseController, - loop: asyncio.AbstractEventLoop | None = None, + loop: asyncio.AbstractEventLoop, ) -> dict[str, Any]: collection: dict[str, Any] = {} for single_mapping in controller.get_controller_mappings(): @@ -186,7 +178,7 @@ class TangoDSR: def __init__( self, controller: BaseController, - loop: asyncio.AbstractEventLoop | None = None, + loop: asyncio.AbstractEventLoop, ): self._controller = controller self._loop = loop diff --git a/tests/transport/tango/test_dsr.py b/tests/transport/tango/test_dsr.py index 09646c379..2ce75d0a4 100644 --- a/tests/transport/tango/test_dsr.py +++ b/tests/transport/tango/test_dsr.py @@ -1,17 +1,35 @@ +import asyncio +from collections.abc import Awaitable, Callable +from typing import Any +from unittest import mock + import pytest from tango import DevState +from tango.server import Device from tango.test_context import DeviceTestContext +from fastcs.attributes import AttrW +from fastcs.controller import BaseController from fastcs.transport.tango.adapter import TangoTransport +async def patch_run_threadsafe_blocking(coro, loop): + await coro + + class TestTangoDevice: @pytest.fixture(scope="class") def tango_context(self, assertable_controller): - # https://tango-controls.readthedocs.io/projects/pytango/en/v9.5.1/testing/test_context.html - device = TangoTransport(assertable_controller)._dsr._device - with DeviceTestContext(device, debug=0) as proxy: - yield proxy + with mock.patch( + "fastcs.transport.tango.dsr._run_threadsafe_blocking", + patch_run_threadsafe_blocking, + ): + device = TangoTransport( + assertable_controller, asyncio.AbstractEventLoop() + )._dsr._device + # https://tango-controls.readthedocs.io/projects/pytango/en/v9.5.1/testing/test_context.html + with DeviceTestContext(device, debug=0) as proxy: + yield proxy def test_list_attributes(self, tango_context): assert list(tango_context.get_attribute_list()) == [ From 27acd344afa4145a5666575c829e09f2ced52cee Mon Sep 17 00:00:00 2001 From: Marcell Nagy Date: Fri, 24 Jan 2025 11:46:00 +0000 Subject: [PATCH 11/11] Linting --- tests/transport/tango/test_dsr.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/tests/transport/tango/test_dsr.py b/tests/transport/tango/test_dsr.py index 2ce75d0a4..9f1c6629b 100644 --- a/tests/transport/tango/test_dsr.py +++ b/tests/transport/tango/test_dsr.py @@ -1,15 +1,10 @@ import asyncio -from collections.abc import Awaitable, Callable -from typing import Any from unittest import mock import pytest from tango import DevState -from tango.server import Device from tango.test_context import DeviceTestContext -from fastcs.attributes import AttrW -from fastcs.controller import BaseController from fastcs.transport.tango.adapter import TangoTransport