diff --git a/src/nexusrpc/syncio/__init__.py b/src/nexusrpc/syncio/__init__.py deleted file mode 100644 index 7de40be..0000000 --- a/src/nexusrpc/syncio/__init__.py +++ /dev/null @@ -1,19 +0,0 @@ -""" -Components for implementing Nexus handlers that use synchronous I/O. - -By default the components of the nexusrpc library use asynchronous I/O (`async def`). -This module provides alternative components based on traditional synchronous I/O -(`def`). - -Server/worker authors will use this module to create top-level Nexus handlers that -expose `def` methods such as `start_operation` and `cancel_operation`. - -Nexus service/operation authors will use this module to obtain a synchronous I/O -version of the `sync_operation` decorator. -""" - -from ._serializer import LazyValue - -__all__ = [ - "LazyValue", -] diff --git a/src/nexusrpc/syncio/_serializer.py b/src/nexusrpc/syncio/_serializer.py deleted file mode 100644 index 3cb9b18..0000000 --- a/src/nexusrpc/syncio/_serializer.py +++ /dev/null @@ -1,58 +0,0 @@ -from __future__ import annotations - -from typing import ( - Any, - Iterable, - Mapping, - Optional, - Type, -) - -from nexusrpc._serializer import Content, LazyValueT, Serializer - - -class LazyValue(LazyValueT): - """ - A container for a value encoded in an underlying stream. - - It is used to stream inputs and outputs in the various client and server APIs. - - For the `async def` version of this class, see :py:class:`nexusrpc.LazyValue`. - """ - - def __init__( - self, - serializer: Serializer, - headers: Mapping[str, str], - stream: Optional[Iterable[bytes]] = None, - ) -> None: - """ - Args: - serializer: The serializer to use for consuming the value. - headers: Headers that include information on how to process the stream's content. - Headers constructed by the framework always have lower case keys. - User provided keys are treated case-insensitively. - stream: Iterable that contains request or response data. None means empty data. - """ - self.serializer = serializer - self.headers = headers - self.stream = stream - - def consume(self, as_type: Optional[Type[Any]] = None) -> Any: - """ - Consume the underlying reader stream, deserializing via the embedded serializer. - """ - if self.stream is None: - return self.serializer.deserialize( - Content(headers=self.headers, data=b""), as_type=as_type - ) - elif not isinstance(self.stream, Iterable): - raise ValueError("When using consume_sync, stream must be an Iterable") - - return self.serializer.deserialize( - Content( - headers=self.headers, - data=b"".join([c for c in self.stream]), - ), - as_type=as_type, - ) diff --git a/src/nexusrpc/syncio/handler/__init__.py b/src/nexusrpc/syncio/handler/__init__.py deleted file mode 100644 index 20dda3b..0000000 --- a/src/nexusrpc/syncio/handler/__init__.py +++ /dev/null @@ -1,18 +0,0 @@ -""" -Components for implementing Nexus handlers that use synchronous I/O. - -See :py:mod:`nexusrpc.handler` for the asynchronous I/O version of this module. - -Server/worker authors will use this module to create the top-level Nexus handlers -responsible for dispatching requests to Nexus operations. - -Nexus service/operation authors will use this module to implement operation handler -methods within service handler classes. -""" - -from ._core import Handler, sync_operation - -__all__ = [ - "Handler", - "sync_operation", -] diff --git a/src/nexusrpc/syncio/handler/_core.py b/src/nexusrpc/syncio/handler/_core.py deleted file mode 100644 index f378d20..0000000 --- a/src/nexusrpc/syncio/handler/_core.py +++ /dev/null @@ -1,292 +0,0 @@ -from __future__ import annotations - -import concurrent.futures -from typing import ( - Any, - Callable, - Optional, - Sequence, - Union, - overload, -) - -from typing_extensions import TypeGuard - -import nexusrpc -from nexusrpc import InputT, OperationInfo, OutputT -from nexusrpc._common import ServiceHandlerT -from nexusrpc._serializer import LazyValueT -from nexusrpc._util import ( - get_callable_name, - is_async_callable, - set_operation_definition, - set_operation_factory, -) -from nexusrpc.handler._common import ( - CancelOperationContext, - FetchOperationInfoContext, - FetchOperationResultContext, - StartOperationContext, - StartOperationResultAsync, - StartOperationResultSync, -) -from nexusrpc.handler._core import BaseServiceCollectionHandler -from nexusrpc.handler._util import get_start_method_input_and_output_type_annotations - -from ...handler._operation_handler import OperationHandler - - -class Handler(BaseServiceCollectionHandler): - """ - A Nexus handler with non-async `def` methods. - - This class does not support user operation handlers that are `async def` methods. - For a Handler class with `async def` methods that supports `async def` and `def` - user operation handlers, see :py:class:`nexusrpc.handler.Handler`. - - A Nexus handler manages a collection of Nexus service handlers. - - Operation requests are dispatched to a :py:class:`ServiceHandler` based on the - service name in the operation context. - - Example: - .. code-block:: python - - import concurrent.futures - import nexusrpc.syncio.handler - - # Create service handler instances with sync operations - my_service = MySyncServiceHandler() - - # Create executor for running sync operations - executor = concurrent.futures.ThreadPoolExecutor(max_workers=10) - - # Create syncio handler (requires executor) - handler = nexusrpc.syncio.handler.Handler([my_service], executor=executor) - - # Use handler to process requests (methods are non-async) - result = handler.start_operation(ctx, input_lazy_value) - - """ - - executor: concurrent.futures.Executor # type: ignore[assignment] - - def __init__( - self, - user_service_handlers: Sequence[Any], - executor: concurrent.futures.Executor, - ): - super().__init__(user_service_handlers, executor) - self._validate_all_operation_handlers_are_sync() - if not self.executor: - raise RuntimeError("A syncio Handler must be initialized with an executor.") - - def start_operation( - self, - ctx: StartOperationContext, - input: LazyValueT, - ) -> Union[ - StartOperationResultSync[Any], - StartOperationResultAsync, - ]: - """Handle a Start Operation request. - - Args: - ctx: The operation context. - input: The input to the operation, as a LazyValue. - """ - service_handler = self._get_service_handler(ctx.service) - op_handler = service_handler._get_operation_handler(ctx.operation) - op = service_handler.service.operations[ctx.operation] - deserialized_input = input.consume(as_type=op.input_type) - assert self._assert_not_async_callable(op_handler.start) - # TODO(preview): apply middleware stack - return self.executor.submit(op_handler.start, ctx, deserialized_input).result() - - def cancel_operation(self, ctx: CancelOperationContext, token: str) -> None: - """Handle a Cancel Operation request. - - Args: - ctx: The operation context. - token: The operation token. - """ - service_handler = self._get_service_handler(ctx.service) - op_handler = service_handler._get_operation_handler(ctx.operation) - assert self._assert_not_async_callable(op_handler.cancel) - if not self.executor: - raise RuntimeError( - "Operation cancel handler method is not an `async def` method but " - "no executor was provided to the Handler constructor." - ) - return self.executor.submit(op_handler.cancel, ctx, token).result() - - def fetch_operation_info( - self, ctx: FetchOperationInfoContext, token: str - ) -> OperationInfo: - raise NotImplementedError - - def fetch_operation_result( - self, ctx: FetchOperationResultContext, token: str - ) -> Any: - raise NotImplementedError - - def _validate_all_operation_handlers_are_sync(self) -> None: - for service_handler in self.service_handlers.values(): - for op_handler in service_handler.operation_handlers.values(): - self._assert_not_async_callable(op_handler.start) - self._assert_not_async_callable(op_handler.cancel) - self._assert_not_async_callable(op_handler.fetch_info) - self._assert_not_async_callable(op_handler.fetch_result) - - def _assert_not_async_callable( - self, method: Callable[..., Any] - ) -> TypeGuard[Callable[..., Any]]: - if is_async_callable(method): - raise RuntimeError( - f"Operation handler method {method} is an `async def` method, " - "but you are using nexusrpc.syncio.handler.Handler, " - "which is for `def` methods. Use nexusrpc.handler.Handler instead." - ) - return True - - -class SyncOperationHandler(OperationHandler[InputT, OutputT]): - """ - An :py:class:`nexusrpc.handler.OperationHandler` that is limited to responding synchronously. - - The name 'SyncOperationHandler' means that it responds synchronously, in the - sense that the start method delivers the final operation result as its return - value, rather than returning an operation token representing an in-progress - operation. - - This version of the class uses `def` methods. For the async version, see - :py:class:`nexusrpc.handler.SyncOperationHandler`. - """ - - def __init__(self, start: Callable[[StartOperationContext, InputT], OutputT]): - if is_async_callable(start): - raise RuntimeError( - f"{start} is an `async def` method. " - "SyncOperationHandler must be initialized with a `def` method. " - "To use `async def` methods, use nexusrpc.handler.SyncOperationHandler." - ) - self._start = start - if start.__doc__: - if start_func := getattr(self.start, "__func__", None): - start_func.__doc__ = start.__doc__ - - def start( - self, ctx: StartOperationContext, input: InputT - ) -> StartOperationResultSync[OutputT]: - """ - Start the operation and return its final result synchronously. - """ - return StartOperationResultSync(self._start(ctx, input)) - - def fetch_info(self, ctx: FetchOperationInfoContext, token: str) -> OperationInfo: - raise NotImplementedError( - "Cannot fetch operation info for an operation that responded synchronously." - ) - - def fetch_result(self, ctx: FetchOperationResultContext, token: str) -> OutputT: - raise NotImplementedError( - "Cannot fetch the result of an operation that responded synchronously." - ) - - def cancel(self, ctx: CancelOperationContext, token: str) -> None: - raise NotImplementedError( - "An operation that responded synchronously cannot be cancelled." - ) - - -@overload -def sync_operation( - start: Callable[[ServiceHandlerT, StartOperationContext, InputT], OutputT], -) -> Callable[[ServiceHandlerT, StartOperationContext, InputT], OutputT]: ... - - -@overload -def sync_operation( - *, - name: Optional[str] = None, -) -> Callable[ - [Callable[[ServiceHandlerT, StartOperationContext, InputT], OutputT]], - Callable[[ServiceHandlerT, StartOperationContext, InputT], OutputT], -]: ... - - -def sync_operation( - start: Optional[ - Callable[[ServiceHandlerT, StartOperationContext, InputT], OutputT] - ] = None, - *, - name: Optional[str] = None, -) -> Union[ - Callable[[ServiceHandlerT, StartOperationContext, InputT], OutputT], - Callable[ - [Callable[[ServiceHandlerT, StartOperationContext, InputT], OutputT]], - Callable[[ServiceHandlerT, StartOperationContext, InputT], OutputT], - ], -]: - """ - Decorator marking a method as the start method for a synchronous operation. - - This is the synchronous I/O version using `def` methods. - - Example: - .. code-block:: python - - import requests - from nexusrpc.handler import service_handler - from nexusrpc.syncio.handler import sync_operation - - @service_handler - class MySyncServiceHandler: - @sync_operation - def process_data( - self, ctx: StartOperationContext, input: str - ) -> str: - # You can use synchronous I/O libraries - response = requests.get("https://api.example.com/data") - data = response.json() - return f"Processed: {data}" - """ - if is_async_callable(start): - raise TypeError( - "syncio sync_operation decorator must be used on a `def` operation method" - ) - - def decorator( - start: Callable[[ServiceHandlerT, StartOperationContext, InputT], OutputT], - ) -> Callable[[ServiceHandlerT, StartOperationContext, InputT], OutputT]: - def operation_handler_factory( - self: ServiceHandlerT, - ) -> OperationHandler[InputT, OutputT]: - def _start(ctx: StartOperationContext, input: InputT) -> OutputT: - return start(self, ctx, input) - - _start.__doc__ = start.__doc__ - return SyncOperationHandler(_start) - - input_type, output_type = get_start_method_input_and_output_type_annotations( - start - ) - - method_name = get_callable_name(start) - set_operation_definition( - operation_handler_factory, - nexusrpc.Operation( - name=name or method_name, - method_name=method_name, - input_type=input_type, - output_type=output_type, - ), - ) - - set_operation_factory(start, operation_handler_factory) - return start - - if start is None: - return decorator - - return decorator(start) diff --git a/tests/handler/test_handler_syncio.py b/tests/handler/test_handler_syncio.py deleted file mode 100644 index 92af6be..0000000 --- a/tests/handler/test_handler_syncio.py +++ /dev/null @@ -1,56 +0,0 @@ -from concurrent.futures import ThreadPoolExecutor -from dataclasses import dataclass -from typing import Any, Optional, Type - -import pytest - -from nexusrpc import Content -from nexusrpc.handler import ( - StartOperationContext, - StartOperationResultSync, - service_handler, -) -from nexusrpc.syncio import LazyValue -from nexusrpc.syncio.handler import Handler, sync_operation - - -class _TestCase: - user_service_handler: Any - - -class SyncHandlerHappyPath: - @service_handler - class MyService: - @sync_operation - def incr(self, ctx: StartOperationContext, input: int) -> int: - return input + 1 - - user_service_handler = MyService() - - -@pytest.mark.parametrize("test_case", [SyncHandlerHappyPath]) -def test_sync_handler_happy_path(test_case: Type[_TestCase]): - handler = Handler( - user_service_handlers=[test_case.user_service_handler], - executor=ThreadPoolExecutor(max_workers=1), - ) - ctx = StartOperationContext( - service="MyService", - operation="incr", - headers={}, - request_id="request_id", - ) - result = handler.start_operation(ctx, LazyValue(DummySerializer(1), headers={})) - assert isinstance(result, StartOperationResultSync) - assert result.value == 2 - - -@dataclass -class DummySerializer: - value: int - - def serialize(self, value: Any) -> Content: - raise NotImplementedError - - def deserialize(self, content: Content, as_type: Optional[Type[Any]] = None) -> Any: - return self.value diff --git a/tests/handler/test_invalid_usage.py b/tests/handler/test_invalid_usage.py index 8cdfb2a..4b3fb22 100644 --- a/tests/handler/test_invalid_usage.py +++ b/tests/handler/test_invalid_usage.py @@ -3,24 +3,18 @@ handler implementations. """ -import concurrent.futures from typing import Any, Callable import pytest import nexusrpc from nexusrpc.handler import ( - Handler, StartOperationContext, service_handler, sync_operation, ) from nexusrpc.handler._decorators import operation_handler from nexusrpc.handler._operation_handler import OperationHandler -from nexusrpc.syncio.handler import ( - Handler as SyncioHandler, - sync_operation as syncio_sync_operation, -) class _TestCase: @@ -115,49 +109,6 @@ def my_op(self, ctx: StartOperationContext, input: None) -> None: ... ) -class SyncioDecoratorWithAsyncioMethod(_TestCase): - @staticmethod - def build(): - @nexusrpc.service - class SD: - my_op: nexusrpc.Operation[None, None] - - @service_handler(service=SD) - class SH: - @syncio_sync_operation - async def my_op(self, ctx: StartOperationContext, input: None) -> None: ... - - error_message = ( - "syncio sync_operation decorator must be used on a `def` operation method" - ) - - -class AsyncioHandlerWithSyncioOperation(_TestCase): - @staticmethod - def build(): - @service_handler - class SH: - @syncio_sync_operation - def my_op(self, ctx: StartOperationContext, input: None) -> None: ... - - Handler([SH()]) - - error_message = "Use nexusrpc.syncio.handler.Handler instead" - - -class SyncioHandlerWithAsyncioOperation(_TestCase): - @staticmethod - def build(): - @service_handler - class SH: - @sync_operation - async def my_op(self, ctx: StartOperationContext, input: None) -> None: ... - - SyncioHandler([SH()], concurrent.futures.ThreadPoolExecutor()) - - error_message = "Use nexusrpc.handler.Handler instead" - - class ServiceDefinitionHasDuplicateMethodNames(_TestCase): @staticmethod def build(): @@ -197,10 +148,6 @@ def op(self) -> OperationHandler: ... ServiceDefinitionOperationHasNoTypeParams, ServiceDefinitionHasExtraOp, ServiceHandlerHasExtraOp, - AsyncioDecoratorWithSyncioMethod, - SyncioDecoratorWithAsyncioMethod, - AsyncioHandlerWithSyncioOperation, - SyncioHandlerWithAsyncioOperation, ServiceDefinitionHasDuplicateMethodNames, OperationHandlerNoInputOutputTypeAnnotationsWithoutServiceDefinition, ], diff --git a/tests/handler/test_service_handler_decorator_validates_against_service_contract.py b/tests/handler/test_service_handler_decorator_validates_against_service_contract.py index 3f54784..894b9c0 100644 --- a/tests/handler/test_service_handler_decorator_validates_against_service_contract.py +++ b/tests/handler/test_service_handler_decorator_validates_against_service_contract.py @@ -280,7 +280,7 @@ def test_service_decorator_enforces_interface_implementation( assert test_case.error_message in str(err) else: if expected_warning := getattr(test_case, "expected_warning", None): - [warning] = test_case.captured_warnings + [warning] = getattr(test_case, "captured_warnings", []) assert expected_warning in str(warning.message) assert issubclass(warning.category, UserWarning) diff --git a/tests/handler/test_service_handler_from_user_instance.py b/tests/handler/test_service_handler_from_user_instance.py deleted file mode 100644 index 83c93a1..0000000 --- a/tests/handler/test_service_handler_from_user_instance.py +++ /dev/null @@ -1,32 +0,0 @@ -from __future__ import annotations - -import pytest - -from nexusrpc.handler import StartOperationContext, service_handler -from nexusrpc.syncio import handler as syncio_handler - -if False: - - @service_handler - class MyServiceHandlerWithCallableInstance: - class SyncOperationWithCallableInstance: - def __call__( - self, - _handler: MyServiceHandlerWithCallableInstance, - ctx: StartOperationContext, - input: int, - ) -> int: - return input - - sync_operation_with_callable_instance = syncio_handler.sync_operation( - name="sync_operation_with_callable_instance", - )( - SyncOperationWithCallableInstance(), - ) - - -@pytest.mark.skip(reason="TODO(preview): support callable instance") -def test_service_handler_from_user_instance(): - # service_handler = MyServiceHandlerWithCallableInstance() - # ServiceHandler.from_user_instance(service_handler) - pass diff --git a/tests/handler/test_sync_operation_handler_decorator_creates_valid_operation_handler.py b/tests/handler/test_sync_operation_handler_decorator_creates_valid_operation_handler.py index 535dd57..0423596 100644 --- a/tests/handler/test_sync_operation_handler_decorator_creates_valid_operation_handler.py +++ b/tests/handler/test_sync_operation_handler_decorator_creates_valid_operation_handler.py @@ -9,7 +9,6 @@ service_handler, sync_operation, ) -from nexusrpc.syncio import handler as syncio_handler @service_handler @@ -17,14 +16,6 @@ class MyServiceHandler: def __init__(self): self.mutable_container = [] - @syncio_handler.sync_operation - def my_def_op(self, ctx: StartOperationContext, input: int) -> int: - """ - This is the docstring for the `my_def_op` sync operation. - """ - self.mutable_container.append(input) - return input + 1 - @sync_operation(name="foo") async def my_async_def_op(self, ctx: StartOperationContext, input: int) -> int: """ @@ -34,24 +25,6 @@ async def my_async_def_op(self, ctx: StartOperationContext, input: int) -> int: return input + 2 -def test_def_sync_handler(): - user_instance = MyServiceHandler() - op_handler_factory, _ = get_operation_factory(user_instance.my_def_op) - assert op_handler_factory - op_handler = op_handler_factory(user_instance) - assert not is_async_callable(op_handler.start) - assert ( - str(op_handler.start.__doc__).strip() - == "This is the docstring for the `my_def_op` sync operation." - ) - assert not user_instance.mutable_container - ctx = mock.Mock(spec=StartOperationContext) - result = op_handler.start(ctx, 1) - assert isinstance(result, StartOperationResultSync) - assert result.value == 2 - assert user_instance.mutable_container == [1] - - @pytest.mark.asyncio async def test_async_def_sync_handler(): user_instance = MyServiceHandler()