diff --git a/.gitignore b/.gitignore index 0f33bf297..6d4d09c6b 100644 --- a/.gitignore +++ b/.gitignore @@ -45,6 +45,7 @@ coverage.xml cov.xml .pytest_cache/ .mypy_cache/ +.benchmarks/ # Translations *.mo diff --git a/pyproject.toml b/pyproject.toml index c1663382a..d84c768b1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,6 +34,7 @@ dev = [ "pydata-sphinx-theme>=0.12", "pyright", "pytest", + "pytest-benchmark", "pytest-cov", "pytest-mock", "pytest-asyncio", @@ -69,7 +70,7 @@ reportMissingImports = false # Ignore missing stubs in imported modules [tool.pytest.ini_options] # Run pytest with all our checkers, and don't spam us with massive tracebacks on error addopts = """ - --tb=native -vv --doctest-modules --doctest-glob="*.rst" + --tb=native -vv --doctest-modules --doctest-glob="*.rst" --benchmark-sort=mean --benchmark-columns="mean, min, max, outliers, ops, rounds" """ # https://iscinumpy.gitlab.io/post/bound-version-constraints/#watch-for-warnings filterwarnings = "error" diff --git a/src/fastcs/attributes.py b/src/fastcs/attributes.py index 95426385e..dc6fcca43 100644 --- a/src/fastcs/attributes.py +++ b/src/fastcs/attributes.py @@ -71,7 +71,7 @@ def __init__( ) -> None: assert datatype.dtype in ATTRIBUTE_TYPES, ( f"Attr type must be one of {ATTRIBUTE_TYPES}" - ", received type {datatype.dtype}" + f", received type {datatype.dtype}" ) self._datatype: DataType[T] = datatype self._access_mode: AttrMode = access_mode diff --git a/src/fastcs/backend.py b/src/fastcs/backend.py index 07b26171c..a7e8980fd 100644 --- a/src/fastcs/backend.py +++ b/src/fastcs/backend.py @@ -1,11 +1,8 @@ import asyncio from collections import defaultdict from collections.abc import Callable -from concurrent.futures import Future from types import MethodType -from softioc.asyncio_dispatcher import AsyncioDispatcher - from .attributes import AttrR, AttrW, Sender, Updater from .controller import Controller, SingleMapping from .exceptions import FastCSException @@ -15,19 +12,15 @@ class Backend: def __init__( self, controller: Controller, - loop: asyncio.AbstractEventLoop | None = None, + loop: asyncio.AbstractEventLoop, ): - self.dispatcher = AsyncioDispatcher(loop) - self._loop = self.dispatcher.loop + self._loop = loop self._controller = controller self._initial_coros = [controller.connect] - self._scan_futures: set[Future] = set() - - asyncio.run_coroutine_threadsafe( - self._controller.initialise(), self._loop - ).result() + self._scan_tasks: set[asyncio.Task] = set() + loop.run_until_complete(self._controller.initialise()) self._link_process_tasks() def _link_process_tasks(self): @@ -36,28 +29,26 @@ def _link_process_tasks(self): _link_attribute_sender_class(single_mapping) def __del__(self): - self.stop_scan_futures() + self._stop_scan_tasks() - def run(self): - self._run_initial_futures() - self.start_scan_futures() + async def serve(self): + await self._run_initial_coros() + await self._start_scan_tasks() - def _run_initial_futures(self): + async def _run_initial_coros(self): for coro in self._initial_coros: - future = asyncio.run_coroutine_threadsafe(coro(), self._loop) - future.result() + await coro() - def start_scan_futures(self): - self._scan_futures = { - asyncio.run_coroutine_threadsafe(coro(), self._loop) - for coro in _get_scan_coros(self._controller) + async def _start_scan_tasks(self): + self._scan_tasks = { + self._loop.create_task(coro()) for coro in _get_scan_coros(self._controller) } - def stop_scan_futures(self): - for future in self._scan_futures: - if not future.done(): + def _stop_scan_tasks(self): + for task in self._scan_tasks: + if not task.done(): try: - future.cancel() + task.cancel() except asyncio.CancelledError: pass diff --git a/src/fastcs/launch.py b/src/fastcs/launch.py index d1eb12ddf..7d4d93c83 100644 --- a/src/fastcs/launch.py +++ b/src/fastcs/launch.py @@ -1,3 +1,4 @@ +import asyncio import inspect import json from pathlib import Path @@ -19,7 +20,9 @@ from .transport.tango.options import TangoOptions # Define a type alias for transport options -TransportOptions: TypeAlias = EpicsOptions | TangoOptions | RestOptions | GraphQLOptions +TransportOptions: TypeAlias = list[ + EpicsOptions | TangoOptions | RestOptions | GraphQLOptions +] class FastCS: @@ -28,48 +31,63 @@ def __init__( controller: Controller, transport_options: TransportOptions, ): - self._backend = Backend(controller) - self._transport: TransportAdapter - match transport_options: - case EpicsOptions(): - from .transport.epics.adapter import EpicsTransport - - self._transport = EpicsTransport( - controller, - self._backend.dispatcher, - transport_options, - ) - case GraphQLOptions(): - from .transport.graphQL.adapter import GraphQLTransport - - self._transport = GraphQLTransport( - controller, - transport_options, - ) - case TangoOptions(): - from .transport.tango.adapter import TangoTransport - - self._transport = TangoTransport( - controller, - transport_options, - ) - case RestOptions(): - from .transport.rest.adapter import RestTransport - - self._transport = RestTransport( - controller, - transport_options, - ) + self._loop = asyncio.get_event_loop() + self._backend = Backend(controller, self._loop) + transport: TransportAdapter + self._transports: list[TransportAdapter] = [] + for option in transport_options: + match option: + case EpicsOptions(): + from .transport.epics.adapter import EpicsTransport + + transport = EpicsTransport( + controller, + self._loop, + option, + ) + case TangoOptions(): + from .transport.tango.adapter import TangoTransport + + transport = TangoTransport( + controller, + self._loop, + option, + ) + case RestOptions(): + from .transport.rest.adapter import RestTransport + + transport = RestTransport( + controller, + option, + ) + case GraphQLOptions(): + from .transport.graphQL.adapter import GraphQLTransport + + transport = GraphQLTransport( + controller, + option, + ) + self._transports.append(transport) def create_docs(self) -> None: - self._transport.create_docs() + for transport in self._transports: + if hasattr(transport.options, "docs"): + transport.create_docs() def create_gui(self) -> None: - self._transport.create_gui() + for transport in self._transports: + if hasattr(transport.options, "gui"): + transport.create_docs() - def run(self) -> None: - self._backend.run() - self._transport.run() + def run(self): + self._loop.run_until_complete( + self.serve(), + ) + + async def serve(self) -> None: + coros = [self._backend.serve()] + coros.extend([transport.serve() for transport in self._transports]) + await asyncio.gather(*coros) def launch( @@ -158,10 +176,8 @@ def run( instance_options.transport, ) - if "gui" in options_yaml["transport"]: - instance.create_gui() - if "docs" in options_yaml["transport"]: - instance.create_docs() + instance.create_gui() + instance.create_docs() instance.run() @launch_typer.command(name="version", help=f"{controller_class.__name__} version") diff --git a/src/fastcs/transport/adapter.py b/src/fastcs/transport/adapter.py index bb2ce856e..af18bd2a5 100644 --- a/src/fastcs/transport/adapter.py +++ b/src/fastcs/transport/adapter.py @@ -1,9 +1,15 @@ from abc import ABC, abstractmethod +from typing import Any class TransportAdapter(ABC): + @property @abstractmethod - def run(self) -> None: + def options(self) -> Any: + pass + + @abstractmethod + async def serve(self) -> None: pass @abstractmethod diff --git a/src/fastcs/transport/epics/adapter.py b/src/fastcs/transport/epics/adapter.py index 3162d170f..4383fb579 100644 --- a/src/fastcs/transport/epics/adapter.py +++ b/src/fastcs/transport/epics/adapter.py @@ -1,4 +1,4 @@ -from softioc.asyncio_dispatcher import AsyncioDispatcher +import asyncio from fastcs.controller import Controller from fastcs.transport.adapter import TransportAdapter @@ -13,14 +13,22 @@ class EpicsTransport(TransportAdapter): def __init__( self, controller: Controller, - dispatcher: AsyncioDispatcher, + loop: asyncio.AbstractEventLoop, options: EpicsOptions | None = None, ) -> None: - self.options = options or EpicsOptions() self._controller = controller - self._dispatcher = dispatcher + self._loop = loop + self._options = options or EpicsOptions() self._pv_prefix = self.options.ioc.pv_prefix - self._ioc = EpicsIOC(self.options.ioc.pv_prefix, controller) + self._ioc = EpicsIOC( + self.options.ioc.pv_prefix, + controller, + self._options.ioc, + ) + + @property + def options(self) -> EpicsOptions: + return self._options def create_docs(self) -> None: EpicsDocs(self._controller).create_docs(self.options.docs) @@ -28,5 +36,7 @@ def create_docs(self) -> None: def create_gui(self) -> None: EpicsGUI(self._controller, self._pv_prefix).create_gui(self.options.gui) - def run(self): - self._ioc.run(self._dispatcher) + async def serve(self) -> None: + self._ioc.run(self._loop) + while True: + await asyncio.sleep(1) diff --git a/src/fastcs/transport/epics/ioc.py b/src/fastcs/transport/epics/ioc.py index eca534004..bbec96c14 100644 --- a/src/fastcs/transport/epics/ioc.py +++ b/src/fastcs/transport/epics/ioc.py @@ -1,3 +1,4 @@ +import asyncio from collections.abc import Callable from dataclasses import asdict from types import MethodType @@ -50,7 +51,7 @@ def __init__( controller: Controller, options: EpicsIOCOptions | None = None, ): - self.options = options or EpicsIOCOptions() + self._options = options or EpicsIOCOptions() self._controller = controller _add_pvi_info(f"{pv_prefix}:PVI") _add_sub_controller_pvi_info(pv_prefix, controller) @@ -60,18 +61,12 @@ def __init__( def run( self, - dispatcher: AsyncioDispatcher, + loop: asyncio.AbstractEventLoop, ) -> None: + dispatcher = AsyncioDispatcher(loop) # Needs running loop builder.LoadDatabase() softioc.iocInit(dispatcher) - if self.options.terminal: - context = { - "dispatcher": dispatcher, - "controller": self._controller, - } - softioc.interactive_ioc(context) - def _add_pvi_info( pvi: str, diff --git a/src/fastcs/transport/epics/options.py b/src/fastcs/transport/epics/options.py index b24d11de0..f7131abfe 100644 --- a/src/fastcs/transport/epics/options.py +++ b/src/fastcs/transport/epics/options.py @@ -23,7 +23,6 @@ class EpicsGUIOptions: @dataclass class EpicsIOCOptions: - terminal: bool = True pv_prefix: str = "MY-DEVICE-PREFIX" diff --git a/src/fastcs/transport/graphQL/adapter.py b/src/fastcs/transport/graphQL/adapter.py index 5b573c020..2c7eaec7c 100644 --- a/src/fastcs/transport/graphQL/adapter.py +++ b/src/fastcs/transport/graphQL/adapter.py @@ -11,14 +11,18 @@ def __init__( controller: Controller, options: GraphQLOptions | None = None, ): - self.options = options or GraphQLOptions() + self._options = options or GraphQLOptions() self._server = GraphQLServer(controller) + @property + def options(self) -> GraphQLOptions: + return self._options + def create_docs(self) -> None: raise NotImplementedError def create_gui(self) -> None: raise NotImplementedError - def run(self) -> None: - self._server.run(self.options.gql) + async def serve(self) -> None: + await self._server.serve(self.options.gql) diff --git a/src/fastcs/transport/graphQL/graphQL.py b/src/fastcs/transport/graphQL/graphQL.py index 85bde07b5..106edc47d 100644 --- a/src/fastcs/transport/graphQL/graphQL.py +++ b/src/fastcs/transport/graphQL/graphQL.py @@ -31,16 +31,17 @@ def _create_app(self) -> GraphQL: return app - def run(self, options: GraphQLServerOptions | None = None) -> None: - if options is None: - options = GraphQLServerOptions() - - uvicorn.run( - self._app, - host=options.host, - port=options.port, - log_level=options.log_level, + async def serve(self, options: GraphQLServerOptions | None = None) -> None: + options = options or GraphQLServerOptions() + self._server = uvicorn.Server( + uvicorn.Config( + app=self._app, + host=options.host, + port=options.port, + log_level=options.log_level, + ) ) + await self._server.serve() class GraphQLAPI: diff --git a/src/fastcs/transport/rest/adapter.py b/src/fastcs/transport/rest/adapter.py index dd96ad819..7e98c5da9 100644 --- a/src/fastcs/transport/rest/adapter.py +++ b/src/fastcs/transport/rest/adapter.py @@ -11,14 +11,18 @@ def __init__( controller: Controller, options: RestOptions | None = None, ): - self.options = options or RestOptions() + self._options = options or RestOptions() self._server = RestServer(controller) + @property + def options(self) -> RestOptions: + return self._options + def create_docs(self) -> None: raise NotImplementedError def create_gui(self) -> None: raise NotImplementedError - def run(self) -> None: - self._server.run(self.options.rest) + async def serve(self) -> None: + await self._server.serve(self.options.rest) diff --git a/src/fastcs/transport/rest/rest.py b/src/fastcs/transport/rest/rest.py index 9eb0e9254..cf61a6305 100644 --- a/src/fastcs/transport/rest/rest.py +++ b/src/fastcs/transport/rest/rest.py @@ -23,14 +23,17 @@ def _create_app(self): return app - def run(self, options: RestServerOptions | None) -> None: + async def serve(self, options: RestServerOptions | None): options = options or RestServerOptions() - uvicorn.run( - self._app, - host=options.host, - port=options.port, - log_level=options.log_level, + self._server = uvicorn.Server( + uvicorn.Config( + app=self._app, + host=options.host, + port=options.port, + log_level=options.log_level, + ) ) + await self._server.serve() def _put_request_body(attribute: AttrW[T]): diff --git a/src/fastcs/transport/tango/adapter.py b/src/fastcs/transport/tango/adapter.py index 7c5f103ad..40018694b 100644 --- a/src/fastcs/transport/tango/adapter.py +++ b/src/fastcs/transport/tango/adapter.py @@ -1,3 +1,5 @@ +import asyncio + from fastcs.controller import Controller from fastcs.transport.adapter import TransportAdapter @@ -9,10 +11,15 @@ class TangoTransport(TransportAdapter): def __init__( self, controller: Controller, + loop: asyncio.AbstractEventLoop, options: TangoOptions | None = None, ): - self.options = options or TangoOptions() - self._dsr = TangoDSR(controller) + self._options = options or TangoOptions() + self._dsr = TangoDSR(controller, loop) + + @property + def options(self) -> TangoOptions: + return self._options def create_docs(self) -> None: raise NotImplementedError @@ -20,5 +27,9 @@ def create_docs(self) -> None: def create_gui(self) -> None: raise NotImplementedError - def run(self) -> None: - self._dsr.run(self.options.dsr) + async def serve(self) -> None: + coro = asyncio.to_thread( + self._dsr.run, + self.options.dsr, + ) + await coro diff --git a/src/fastcs/transport/tango/dsr.py b/src/fastcs/transport/tango/dsr.py index 9b7f6f716..7cb3546b7 100644 --- a/src/fastcs/transport/tango/dsr.py +++ b/src/fastcs/transport/tango/dsr.py @@ -1,4 +1,5 @@ -from collections.abc import Awaitable, Callable +import asyncio +from collections.abc import Awaitable, Callable, Coroutine from typing import Any import tango @@ -13,7 +14,9 @@ def _wrap_updater_fget( - attr_name: str, attribute: AttrR, controller: BaseController + attr_name: str, + attribute: AttrR, + controller: BaseController, ) -> Callable[[Any], Any]: async def fget(tango_device: Device): tango_device.info_stream(f"called fget method: {attr_name}") @@ -30,17 +33,34 @@ def _tango_display_format(attribute: Attribute) -> str: return "6.2f" # `tango.server.attribute` default for `format` +async def _run_threadsafe_blocking( + coro: Coroutine[Any, Any, Any], loop: asyncio.AbstractEventLoop +) -> None: + """ + Wraps a concurrent.futures.Future object as an + asyncio.Future to make it awaitable and then awaits it + """ + future = asyncio.run_coroutine_threadsafe(coro, loop) + await asyncio.wrap_future(future) + + def _wrap_updater_fset( - attr_name: str, attribute: AttrW, controller: BaseController + attr_name: str, + attribute: AttrW, + controller: BaseController, + loop: asyncio.AbstractEventLoop, ) -> Callable[[Any, Any], Any]: async def fset(tango_device: Device, val): tango_device.info_stream(f"called fset method: {attr_name}") - await attribute.process(val) + coro = attribute.process(val) + await _run_threadsafe_blocking(coro, loop) return fset -def _collect_dev_attributes(controller: BaseController) -> dict[str, Any]: +def _collect_dev_attributes( + controller: BaseController, loop: asyncio.AbstractEventLoop +) -> dict[str, Any]: collection: dict[str, Any] = {} for single_mapping in controller.get_controller_mappings(): path = single_mapping.controller.path @@ -58,7 +78,7 @@ def _collect_dev_attributes(controller: BaseController) -> dict[str, Any]: attr_name, attribute, single_mapping.controller ), fset=_wrap_updater_fset( - attr_name, attribute, single_mapping.controller + attr_name, attribute, single_mapping.controller, loop ), access=AttrWriteType.READ_WRITE, format=_tango_display_format(attribute), @@ -79,7 +99,7 @@ def _collect_dev_attributes(controller: BaseController) -> dict[str, Any]: dtype=attribute.datatype.dtype, access=AttrWriteType.WRITE, fset=_wrap_updater_fset( - attr_name, attribute, single_mapping.controller + attr_name, attribute, single_mapping.controller, loop ), format=_tango_display_format(attribute), ) @@ -88,19 +108,26 @@ def _collect_dev_attributes(controller: BaseController) -> dict[str, Any]: def _wrap_command_f( - method_name: str, method: Callable, controller: BaseController + method_name: str, + method: Callable, + controller: BaseController, + loop: asyncio.AbstractEventLoop, ) -> Callable[..., Awaitable[None]]: async def _dynamic_f(tango_device: Device) -> None: tango_device.info_stream( f"called {'_'.join(controller.path)} f method: {method_name}" ) - return await getattr(controller, method.__name__)() + coro = getattr(controller, method.__name__)() + await _run_threadsafe_blocking(coro, loop) _dynamic_f.__name__ = method_name return _dynamic_f -def _collect_dev_commands(controller: BaseController) -> dict[str, Any]: +def _collect_dev_commands( + controller: BaseController, + loop: asyncio.AbstractEventLoop, +) -> dict[str, Any]: collection: dict[str, Any] = {} for single_mapping in controller.get_controller_mappings(): path = single_mapping.controller.path @@ -109,7 +136,9 @@ def _collect_dev_commands(controller: BaseController) -> dict[str, Any]: cmd_name = name.title().replace("_", "") d_cmd_name = f"{'_'.join(path)}_{cmd_name}" if path else cmd_name collection[d_cmd_name] = server.command( - f=_wrap_command_f(d_cmd_name, method.fn, single_mapping.controller) + f=_wrap_command_f( + d_cmd_name, method.fn, single_mapping.controller, loop + ) ) return collection @@ -146,15 +175,20 @@ def _collect_dsr_args(options: TangoDSROptions) -> list[str]: class TangoDSR: - def __init__(self, controller: BaseController): + def __init__( + self, + controller: BaseController, + loop: asyncio.AbstractEventLoop, + ): self._controller = controller + self._loop = loop self.dev_class = self._controller.__class__.__name__ self._device = self._create_device() def _create_device(self): class_dict: dict = { - **_collect_dev_attributes(self._controller), - **_collect_dev_commands(self._controller), + **_collect_dev_attributes(self._controller, self._loop), + **_collect_dev_commands(self._controller, self._loop), **_collect_dev_properties(self._controller), **_collect_dev_init(self._controller), **_collect_dev_flags(self._controller), diff --git a/tests/benchmarking/compose.yaml b/tests/benchmarking/compose.yaml new file mode 100644 index 000000000..f9b68df1d --- /dev/null +++ b/tests/benchmarking/compose.yaml @@ -0,0 +1,22 @@ +# Spin up a local minimal tango cs + +version: "3.7" +services: + mysql: + image: registry.gitlab.com/tango-controls/docker/mysql:5 + environment: + - MYSQL_ROOT_PASSWORD=root + + tango-cs: + hostname: localhost + image: registry.gitlab.com/tango-controls/docker/tango-cs:9 + ports: + - "10000:10000" + environment: + - TANGO_HOST=localhost:10000 + - MYSQL_HOST=mysql:3306 + - MYSQL_USER=tango + - MYSQL_PASSWORD=tango + - MYSQL_DATABASE=tango + depends_on: + - mysql diff --git a/tests/benchmarking/controller.py b/tests/benchmarking/controller.py new file mode 100644 index 000000000..107b3f55f --- /dev/null +++ b/tests/benchmarking/controller.py @@ -0,0 +1,29 @@ +from fastcs import FastCS +from fastcs.attributes import AttrR, AttrW +from fastcs.controller import Controller +from fastcs.datatypes import Bool, Int +from fastcs.transport.epics.options import EpicsIOCOptions, EpicsOptions +from fastcs.transport.rest.options import RestOptions, RestServerOptions +from fastcs.transport.tango.options import TangoDSROptions, TangoOptions + + +class TestController(Controller): + read_int: AttrR = AttrR(Int(), initial_value=0) + write_bool: AttrW = AttrW(Bool()) + + +def run(): + transport_options = [ + RestOptions(rest=RestServerOptions(port=8090)), + EpicsOptions(ioc=EpicsIOCOptions(pv_prefix="BENCHMARK-DEVICE")), + TangoOptions(dsr=TangoDSROptions(dev_name="MY/BENCHMARK/DEVICE")), + ] + instance = FastCS( + TestController(), + transport_options, + ) + instance.run() + + +if __name__ == "__main__": + run() diff --git a/tests/benchmarking/test_benchmarking.py b/tests/benchmarking/test_benchmarking.py new file mode 100644 index 000000000..27f351c5e --- /dev/null +++ b/tests/benchmarking/test_benchmarking.py @@ -0,0 +1,172 @@ +import contextlib +import multiprocessing +import os + +import pytest +import requests +import tango +from p4p.client.thread import Context + +FASTCS_BENCHMARKING = os.getenv("FASTCS_BENCHMARKING") == "true" +GET_ENDPOINT = "http://localhost:8090/read-int" +PUT_ENDPOINT = "http://localhost:8090/write-bool" +GET_PV = "BENCHMARK-DEVICE:ReadInt" +PUT_PV = "BENCHMARK-DEVICE:WriteBool" +TANGO_DEVICE = "MY/BENCHMARK/DEVICE" +READ_ATTR = "ReadInt" +WRITE_ATTR = "WriteBool" +REST_CLIENTS = 9 +os.environ["TANGO_HOST"] = "localhost:10000" + + +def rest_get(): + requests.get(GET_ENDPOINT) + + +def rest_put(): + requests.put(PUT_ENDPOINT, json={"value": "false"}) + + +ctx = Context("pva") + + +def epics_get(): + ctx.get(GET_PV) + + +def epics_put(): + ctx.put(PUT_PV, 0) + + +def bg_get(event, url): + """Workload for a rest subprocess.""" + while not event.is_set(): + requests.get(url) + + +def bg_pass(event, url): + """Workload for a rest subprocess.""" + while not event.is_set(): + pass + + +@contextlib.contextmanager +def background_traffic(rest_target): + """Context manager to manage background clients""" + stop_event = multiprocessing.Event() + processes = [ + multiprocessing.Process( + target=bg_get if rest_target else bg_pass, + args=(stop_event, rest_target), + ) + for _ in range(REST_CLIENTS) + ] + for process in processes: + process.start() + + try: + yield processes + finally: + # Signal the processes to stop + stop_event.set() + # Wait for the processes to finish + for process in processes: + process.join() + + +@pytest.mark.skipif(not FASTCS_BENCHMARKING, reason="export FASTCS_BENCHMARKING=true") +@pytest.mark.benchmark(group="test-rest") +def test_rest_get(benchmark, test_controller): + benchmark(rest_get) + + +@pytest.mark.skipif(not FASTCS_BENCHMARKING, reason="export FASTCS_BENCHMARKING=true") +@pytest.mark.benchmark(group="test-rest") +def test_rest_get_loaded_request(benchmark, test_controller): + with background_traffic(GET_ENDPOINT): + benchmark(rest_get) + + +@pytest.mark.skipif(not FASTCS_BENCHMARKING, reason="export FASTCS_BENCHMARKING=true") +@pytest.mark.benchmark(group="test-rest") +def test_rest_get_loaded_baseline(benchmark, test_controller): + with background_traffic(None): + benchmark(rest_get) + + +@pytest.mark.skipif(not FASTCS_BENCHMARKING, reason="export FASTCS_BENCHMARKING=true") +@pytest.mark.benchmark(group="test-rest") +def test_rest_put(benchmark, test_controller): + benchmark(rest_put) + + +@pytest.mark.skipif(not FASTCS_BENCHMARKING, reason="export FASTCS_BENCHMARKING=true") +@pytest.mark.benchmark(group="test-epics") +def test_epics_get(benchmark, test_controller): + benchmark(epics_get) + + +@pytest.mark.skipif(not FASTCS_BENCHMARKING, reason="export FASTCS_BENCHMARKING=true") +@pytest.mark.benchmark(group="test-epics") +def test_epics_get_loaded_request(benchmark, test_controller): + with background_traffic(GET_ENDPOINT): + benchmark(epics_get) + + +@pytest.mark.skipif(not FASTCS_BENCHMARKING, reason="export FASTCS_BENCHMARKING=true") +@pytest.mark.benchmark(group="test-epics") +def test_epics_get_loaded_baseline(benchmark, test_controller): + with background_traffic(None): + benchmark(epics_get) + + +@pytest.mark.skipif(not FASTCS_BENCHMARKING, reason="export FASTCS_BENCHMARKING=true") +@pytest.mark.benchmark(group="test-epics") +def test_epics_put(benchmark, test_controller): + benchmark(epics_put) + + +@pytest.mark.skipif(not FASTCS_BENCHMARKING, reason="export FASTCS_BENCHMARKING=true") +@pytest.mark.benchmark(group="test-tango") +def test_tango_get(benchmark, test_controller): + device = tango.DeviceProxy(TANGO_DEVICE) + + def to_do(): + device.read_attribute(READ_ATTR) + + benchmark(to_do) + + +@pytest.mark.skipif(not FASTCS_BENCHMARKING, reason="export FASTCS_BENCHMARKING=true") +@pytest.mark.benchmark(group="test-tango") +def test_tango_get_loaded_request(benchmark, test_controller): + device = tango.DeviceProxy(TANGO_DEVICE) + + def to_do(): + device.read_attribute(READ_ATTR) + + with background_traffic(GET_ENDPOINT): + benchmark(to_do) + + +@pytest.mark.skipif(not FASTCS_BENCHMARKING, reason="export FASTCS_BENCHMARKING=true") +@pytest.mark.benchmark(group="test-tango") +def test_tango_get_loaded_baseline(benchmark, test_controller): + device = tango.DeviceProxy(TANGO_DEVICE) + + def to_do(): + device.read_attribute(READ_ATTR) + + with background_traffic(None): + benchmark(to_do) + + +@pytest.mark.skipif(not FASTCS_BENCHMARKING, reason="export FASTCS_BENCHMARKING=true") +@pytest.mark.benchmark(group="test-tango") +def test_tango_put(benchmark, test_controller): + device = tango.DeviceProxy(TANGO_DEVICE) + + def to_do(): + device.write_attribute(WRITE_ATTR, 0) + + benchmark(to_do) diff --git a/tests/conftest.py b/tests/conftest.py index fad58597e..831deddab 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,6 +1,7 @@ import copy import os import random +import signal import string import subprocess import time @@ -15,6 +16,7 @@ from fastcs.attributes import AttrR, AttrRW, AttrW, Handler, Sender, Updater from fastcs.controller import Controller, SubController from fastcs.datatypes import Bool, Float, Int, String +from fastcs.transport.tango.dsr import register_dev from fastcs.wrappers import command, scan DATA_PATH = Path(__file__).parent / "data" @@ -168,6 +170,7 @@ def assertable_controller(class_mocker: MockerFixture): @pytest.fixture(scope="module") def ioc(): + TIMEOUT = 10 process = subprocess.Popen( ["python", HERE / "ioc.py"], stdin=subprocess.PIPE, @@ -180,15 +183,84 @@ def ioc(): while "iocRun: All initialization complete" not in ( process.stdout.readline().strip() # type: ignore ): - if time.monotonic() - start_time > 10: + if time.monotonic() - start_time > TIMEOUT: raise TimeoutError("IOC did not start in time") yield # close backend caches before the event loop purge_channel_caches() - try: - print(process.communicate("exit")[0]) - except ValueError: - # Someone else already called communicate - pass + + # Close open files + for f in [process.stdin, process.stdout, process.stderr]: + if f: + f.close() + process.send_signal(signal.SIGINT) + process.wait(TIMEOUT) + + +@pytest.fixture(scope="session") +def tango_system(): + subprocess.run( + ["podman", "compose", "-f", HERE / "benchmarking" / "compose.yaml", "up", "-d"], + check=True, + ) + yield + subprocess.run( + ["podman", "compose", "-f", HERE / "benchmarking" / "compose.yaml", "down"], + check=True, + ) + + +@pytest.fixture(scope="session") +def register_device(): + ATTEMPTS = 10 + SLEEP = 1 + + if not os.getenv("TANGO_HOST"): + raise RuntimeError("TANGO_HOST not defined") + + for attempt in range(1, ATTEMPTS + 1): + try: + register_dev( + dev_name="MY/BENCHMARK/DEVICE", + dev_class="TestController", + dsr_instance="MY_SERVER_INSTANCE", + ) + break + except Exception: + time.sleep(SLEEP) + if attempt == ATTEMPTS: + raise TimeoutError("Tango device could not be registered") + + +@pytest.fixture(scope="session") +def test_controller(tango_system, register_device): + TIMEOUT = 10 + process = subprocess.Popen( + ["python", HERE / "benchmarking" / "controller.py"], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + universal_newlines=True, + ) + + start_time = time.monotonic() + while "Uvicorn running" not in ( + process.stdout.readline().strip() # type: ignore + ): + if time.monotonic() - start_time > TIMEOUT: + raise TimeoutError("Controller did not start in time") + + # close backend caches before the event loop + purge_channel_caches() + + # Stop buffer from getting full and blocking the subprocess + for f in [process.stdin, process.stdout, process.stderr]: + if f: + f.close() + + yield process + + process.send_signal(signal.SIGINT) + process.wait(TIMEOUT) diff --git a/tests/data/config_full.yaml b/tests/data/config.yaml similarity index 55% rename from tests/data/config_full.yaml rename to tests/data/config.yaml index b396f679c..bd53b6945 100644 --- a/tests/data/config_full.yaml +++ b/tests/data/config.yaml @@ -1,7 +1,10 @@ # yaml-language-server: $schema=schema.json transport: - ioc: {} - docs: {} - gui: {} + - ioc: {} + docs: {} + gui: {} + - rest: {} + - dsr: {} + - gql: {} controller: name: controller-name diff --git a/tests/data/config_minimal.yaml b/tests/data/config_minimal.yaml deleted file mode 100644 index 8b887af6a..000000000 --- a/tests/data/config_minimal.yaml +++ /dev/null @@ -1,3 +0,0 @@ -# yaml-language-server: $schema=schema.json -transport: - dsr: {} diff --git a/tests/data/schema.json b/tests/data/schema.json new file mode 100644 index 000000000..847a9abb0 --- /dev/null +++ b/tests/data/schema.json @@ -0,0 +1,218 @@ +{ + "$defs": { + "EpicsDocsOptions": { + "properties": { + "path": { + "default": "/home/esq51579/WIP/FastCS", + "format": "path", + "title": "Path", + "type": "string" + }, + "depth": { + "anyOf": [ + { + "type": "integer" + }, + { + "type": "null" + } + ], + "default": null, + "title": "Depth" + } + }, + "title": "EpicsDocsOptions", + "type": "object" + }, + "EpicsGUIFormat": { + "enum": [ + ".bob", + ".edl" + ], + "title": "EpicsGUIFormat", + "type": "string" + }, + "EpicsGUIOptions": { + "properties": { + "output_path": { + "default": "/home/esq51579/WIP/FastCS/output.bob", + "format": "path", + "title": "Output Path", + "type": "string" + }, + "file_format": { + "$ref": "#/$defs/EpicsGUIFormat", + "default": ".bob" + }, + "title": { + "default": "Simple Device", + "title": "Title", + "type": "string" + } + }, + "title": "EpicsGUIOptions", + "type": "object" + }, + "EpicsIOCOptions": { + "properties": { + "pv_prefix": { + "default": "MY-DEVICE-PREFIX", + "title": "Pv Prefix", + "type": "string" + } + }, + "title": "EpicsIOCOptions", + "type": "object" + }, + "EpicsOptions": { + "properties": { + "docs": { + "$ref": "#/$defs/EpicsDocsOptions" + }, + "gui": { + "$ref": "#/$defs/EpicsGUIOptions" + }, + "ioc": { + "$ref": "#/$defs/EpicsIOCOptions" + } + }, + "title": "EpicsOptions", + "type": "object" + }, + "GraphQLOptions": { + "properties": { + "gql": { + "$ref": "#/$defs/GraphQLServerOptions" + } + }, + "title": "GraphQLOptions", + "type": "object" + }, + "GraphQLServerOptions": { + "properties": { + "host": { + "default": "localhost", + "title": "Host", + "type": "string" + }, + "port": { + "default": 8080, + "title": "Port", + "type": "integer" + }, + "log_level": { + "default": "info", + "title": "Log Level", + "type": "string" + } + }, + "title": "GraphQLServerOptions", + "type": "object" + }, + "RestOptions": { + "properties": { + "rest": { + "$ref": "#/$defs/RestServerOptions" + } + }, + "title": "RestOptions", + "type": "object" + }, + "RestServerOptions": { + "properties": { + "host": { + "default": "localhost", + "title": "Host", + "type": "string" + }, + "port": { + "default": 8080, + "title": "Port", + "type": "integer" + }, + "log_level": { + "default": "info", + "title": "Log Level", + "type": "string" + } + }, + "title": "RestServerOptions", + "type": "object" + }, + "SomeConfig": { + "properties": { + "name": { + "title": "Name", + "type": "string" + } + }, + "required": [ + "name" + ], + "title": "SomeConfig", + "type": "object" + }, + "TangoDSROptions": { + "properties": { + "dev_name": { + "default": "MY/DEVICE/NAME", + "title": "Dev Name", + "type": "string" + }, + "dsr_instance": { + "default": "MY_SERVER_INSTANCE", + "title": "Dsr Instance", + "type": "string" + }, + "debug": { + "default": false, + "title": "Debug", + "type": "boolean" + } + }, + "title": "TangoDSROptions", + "type": "object" + }, + "TangoOptions": { + "properties": { + "dsr": { + "$ref": "#/$defs/TangoDSROptions" + } + }, + "title": "TangoOptions", + "type": "object" + } + }, + "additionalProperties": false, + "properties": { + "controller": { + "$ref": "#/$defs/SomeConfig" + }, + "transport": { + "items": { + "anyOf": [ + { + "$ref": "#/$defs/EpicsOptions" + }, + { + "$ref": "#/$defs/TangoOptions" + }, + { + "$ref": "#/$defs/RestOptions" + }, + { + "$ref": "#/$defs/GraphQLOptions" + } + ] + }, + "title": "Transport", + "type": "array" + } + }, + "required": [ + "controller", + "transport" + ], + "title": "IsHinted", + "type": "object" +} diff --git a/tests/ioc.py b/tests/ioc.py index 12918a675..a42ef6a29 100644 --- a/tests/ioc.py +++ b/tests/ioc.py @@ -23,7 +23,7 @@ def run(): epics_options = EpicsOptions(ioc=EpicsIOCOptions(pv_prefix="DEVICE")) controller = ParentController() controller.register_sub_controller("Child", ChildController()) - fastcs = FastCS(controller, epics_options) + fastcs = FastCS(controller, [epics_options]) fastcs.run() diff --git a/tests/test_backend.py b/tests/test_backend.py index c9f4f8e61..0a446a522 100644 --- a/tests/test_backend.py +++ b/tests/test_backend.py @@ -1,47 +1,31 @@ import asyncio -import pytest - from fastcs.backend import Backend -class DummyBackend(Backend): - def __init__(self, controller): - super().__init__(controller) - - self.init_task_called = False - self._initial_coros.append(self.init_task) - - async def init_task(self): - self.init_task_called = True - - def _run(self): - asyncio.run_coroutine_threadsafe(asyncio.sleep(0.3), self._loop) +def test_backend(controller): + loop = asyncio.get_event_loop() + backend = Backend(controller, loop) - -@pytest.mark.asyncio -async def test_backend(controller): - backend = DummyBackend(controller) - - # Controller should be initialised by Backend, but not connected + # Controller should be initialised by Backend and not connected assert controller.initialised assert not controller.connected # Controller Attributes with a Sender should have a _process_callback created assert controller.read_write_int.has_process_callback() - backend.run() - - # Controller should have been connected by Backend - assert controller.connected + async def test_wrapper(): + loop.create_task(backend.serve()) + await asyncio.sleep(0) # Yield to task - # Initial tasks should be complete - assert backend.init_task_called + # Controller should have been connected by Backend + assert controller.connected - # Scan tasks should be running - for _ in range(3): - count = controller.count - await asyncio.sleep(0.1) - assert controller.count > count + # Scan tasks should be running + for _ in range(3): + count = controller.count + await asyncio.sleep(0.01) + assert controller.count > count + backend._stop_scan_tasks() - backend.stop_scan_futures() + loop.run_until_complete(test_wrapper()) diff --git a/tests/test_cli.py b/tests/test_cli.py index ee8416e3c..7a16a0bfd 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -6,5 +6,4 @@ def test_cli_version(): cmd = [sys.executable, "-m", "fastcs", "--version"] - info = "INFO: PVXS QSRV2 is loaded, permitted, and ENABLED.\n" - assert subprocess.check_output(cmd).decode().strip() == info + __version__ + assert subprocess.check_output(cmd).decode().strip() == __version__ diff --git a/tests/test_launch.py b/tests/test_launch.py index ce26573f6..c2732c88b 100644 --- a/tests/test_launch.py +++ b/tests/test_launch.py @@ -7,7 +7,9 @@ from typer.testing import CliRunner from fastcs.__main__ import __version__ +from fastcs.attributes import AttrR from fastcs.controller import Controller +from fastcs.datatypes import Int from fastcs.exceptions import LaunchError from fastcs.launch import TransportOptions, _launch, launch @@ -28,6 +30,8 @@ def __init__(self, arg): class IsHinted(Controller): + read = AttrR(Int()) + def __init__(self, arg: SomeConfig) -> None: super().__init__() @@ -117,27 +121,13 @@ def test_no_version(): assert result.stdout == expected -def test_launch_minimal(mocker: MockerFixture, data): - run = mocker.patch("fastcs.launch.FastCS.run") - gui = mocker.patch("fastcs.launch.FastCS.create_gui") - docs = mocker.patch("fastcs.launch.FastCS.create_docs") - - app = _launch(SingleArg) - result = runner.invoke(app, ["run", str(data / "config_minimal.yaml")]) - assert result.exit_code == 0 - - run.assert_called_once() - gui.assert_not_called() - docs.assert_not_called() - - -def test_launch_full(mocker: MockerFixture, data): +def test_launch(mocker: MockerFixture, data): run = mocker.patch("fastcs.launch.FastCS.run") gui = mocker.patch("fastcs.launch.FastCS.create_gui") docs = mocker.patch("fastcs.launch.FastCS.create_docs") app = _launch(IsHinted) - result = runner.invoke(app, ["run", str(data / "config_full.yaml")]) + result = runner.invoke(app, ["run", str(data / "config.yaml")]) assert result.exit_code == 0 run.assert_called_once() diff --git a/tests/transport/tango/test_dsr.py b/tests/transport/tango/test_dsr.py index 09646c379..9f1c6629b 100644 --- a/tests/transport/tango/test_dsr.py +++ b/tests/transport/tango/test_dsr.py @@ -1,3 +1,6 @@ +import asyncio +from unittest import mock + import pytest from tango import DevState from tango.test_context import DeviceTestContext @@ -5,13 +8,23 @@ from fastcs.transport.tango.adapter import TangoTransport +async def patch_run_threadsafe_blocking(coro, loop): + await coro + + class TestTangoDevice: @pytest.fixture(scope="class") def tango_context(self, assertable_controller): - # https://tango-controls.readthedocs.io/projects/pytango/en/v9.5.1/testing/test_context.html - device = TangoTransport(assertable_controller)._dsr._device - with DeviceTestContext(device, debug=0) as proxy: - yield proxy + with mock.patch( + "fastcs.transport.tango.dsr._run_threadsafe_blocking", + patch_run_threadsafe_blocking, + ): + device = TangoTransport( + assertable_controller, asyncio.AbstractEventLoop() + )._dsr._device + # https://tango-controls.readthedocs.io/projects/pytango/en/v9.5.1/testing/test_context.html + with DeviceTestContext(device, debug=0) as proxy: + yield proxy def test_list_attributes(self, tango_context): assert list(tango_context.get_attribute_list()) == [