From 7ee50fd3a3bf426f00d5cf0989c3e892568bd124 Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Wed, 29 Oct 2025 12:00:24 -0700 Subject: [PATCH 01/26] Add abstract base class and a no op implementation to enable task cancellation in operation handlers --- src/nexusrpc/handler/_cancellation.py | 52 ++++++++++ src/nexusrpc/handler/_common.py | 10 ++ tests/handler/test_cancellation.py | 135 ++++++++++++++++++++++++++ 3 files changed, 197 insertions(+) create mode 100644 src/nexusrpc/handler/_cancellation.py create mode 100644 tests/handler/test_cancellation.py diff --git a/src/nexusrpc/handler/_cancellation.py b/src/nexusrpc/handler/_cancellation.py new file mode 100644 index 0000000..1d68e67 --- /dev/null +++ b/src/nexusrpc/handler/_cancellation.py @@ -0,0 +1,52 @@ +from abc import ABC, abstractmethod +from typing import Optional + + +class OperationTaskCancellation(ABC): + """ + Indicates whether a a Nexus task has been cancelled during a sync operation or before an async operation has + returned a token. + + Nexus worker implementations are expected to provide an implementation that enables + cooperative cancellation for both sync and async operation handlers. + """ + + @abstractmethod + def is_cancelled(self) -> bool: + """Return True if the associated task has been cancelled.""" + raise NotImplementedError + + @abstractmethod + def cancellation_details(self) -> Optional[str]: + """Provide additional context for the cancellation, if available.""" + raise NotImplementedError + + @abstractmethod + def wait_until_cancelled(self, timeout: Optional[float] = None) -> bool: + """Block until cancellation occurs or the optional timeout elapses.""" + raise NotImplementedError + + @abstractmethod + async def wait_until_cancelled_async(self): + """Await cancellation using async primitives.""" + raise NotImplementedError + + +class UncancellableOperationTaskCancellation(OperationTaskCancellation): + """An :py:class:`OperationTaskCancellation` that never cancels. Used by default if a Nexus worker implementation does not implement task cancellation.""" + + def is_cancelled(self) -> bool: + """Always report not cancelled.""" + return False + + def cancellation_details(self) -> Optional[str]: + """Return None because no cancellation data is ever available.""" + return None + + def wait_until_cancelled(self, timeout: Optional[float] = None): + """Never block because cancellation will not occur.""" + return False + + async def wait_until_cancelled_async(self): + """Never await cancellation because it cannot be triggered.""" + pass diff --git a/src/nexusrpc/handler/_common.py b/src/nexusrpc/handler/_common.py index 8fcbc59..56f71f3 100644 --- a/src/nexusrpc/handler/_common.py +++ b/src/nexusrpc/handler/_common.py @@ -6,6 +6,10 @@ from typing import Any, Generic, Optional from nexusrpc._common import Link, OutputT +from nexusrpc.handler._cancellation import ( + OperationTaskCancellation, + UncancellableOperationTaskCancellation, +) @dataclass(frozen=True) @@ -35,6 +39,12 @@ def __new__(cls, *args: Any, **kwargs: Any): """ Optional header fields sent by the caller. """ + task_cancellation: OperationTaskCancellation = field( + default_factory=UncancellableOperationTaskCancellation, kw_only=True + ) + """ + Task cancellation information indicating that a running task should be interrupted. This is distinct from operation cancellation. + """ @dataclass(frozen=True) diff --git a/tests/handler/test_cancellation.py b/tests/handler/test_cancellation.py new file mode 100644 index 0000000..e11ba21 --- /dev/null +++ b/tests/handler/test_cancellation.py @@ -0,0 +1,135 @@ +import asyncio +import threading +from typing import Optional + +import pytest + +from nexusrpc import LazyValue +from nexusrpc.handler import ( + CancelOperationContext, + Handler, + OperationHandler, + StartOperationContext, + StartOperationResultAsync, + service_handler, +) +from nexusrpc.handler._cancellation import OperationTaskCancellation +from nexusrpc.handler._common import StartOperationResultSync +from nexusrpc.handler._decorators import operation_handler, sync_operation +from tests.helpers import DummySerializer + + +class CancellableAsyncOperationHandler(OperationHandler[None, None]): + async def start( + self, ctx: StartOperationContext, input: None + ) -> StartOperationResultAsync: + try: + await asyncio.wait_for( + ctx.task_cancellation.wait_until_cancelled_async(), timeout=1 + ) + except TimeoutError as err: + raise RuntimeError("Expected cancellation") from err + + details = ctx.task_cancellation.cancellation_details() + if not details: + raise RuntimeError("Expected cancellation details") + + # normally you return a token but for this test + # we use the token to indicate success by returning the expected + # cancellation details + return StartOperationResultAsync(details) + + async def cancel(self, ctx: CancelOperationContext, token: str) -> None: + pass + + +@service_handler +class MyService: + @operation_handler + def cancellable_async(self) -> OperationHandler[None, None]: + return CancellableAsyncOperationHandler() + + @sync_operation + async def cancellable_sync(self, ctx: StartOperationContext, input: None) -> str: + cancelled = ctx.task_cancellation.wait_until_cancelled(1) + if not cancelled: + raise RuntimeError("Expected cancellation") + + details = ctx.task_cancellation.cancellation_details() + if not details: + raise RuntimeError("Expected cancellation details") + + return details + + +class TestOperationTaskCancellation(OperationTaskCancellation): + # A naive implementation of cancellation for use in tests + def __init__(self): + self._details = None + self._evt = threading.Event() + self._lock = threading.Lock() + + def is_cancelled(self) -> bool: + return self._evt.is_set() + + def cancellation_details(self) -> Optional[str]: + with self._lock: + return self._details + + def wait_until_cancelled(self, timeout: float | None = None) -> bool: + return self._evt.wait(timeout) + + async def wait_until_cancelled_async(self): + while not self.is_cancelled(): + await asyncio.sleep(0.05) + + def cancel(self): + with self._lock: + self._details = "test cancellation occurred" + self._evt.set() + + +@pytest.mark.asyncio +async def test_cancellation_sync_operation(): + handler = Handler(user_service_handlers=[MyService()]) + cancellation = TestOperationTaskCancellation() + start_ctx = StartOperationContext( + service="MyService", + operation="cancellable_sync", + headers={}, + request_id="request_id", + task_cancellation=cancellation, + ) + + operation_task = asyncio.create_task( + handler.start_operation( + start_ctx, LazyValue(serializer=DummySerializer(None), headers={}) + ) + ) + + cancellation.cancel() + result = await operation_task + assert result == StartOperationResultSync("test cancellation occurred") + + +@pytest.mark.asyncio +async def test_cancellation_async_operation(): + handler = Handler(user_service_handlers=[MyService()]) + cancellation = TestOperationTaskCancellation() + start_ctx = StartOperationContext( + service="MyService", + operation="cancellable_async", + headers={}, + request_id="request_id", + task_cancellation=cancellation, + ) + + operation_task = asyncio.create_task( + handler.start_operation( + start_ctx, LazyValue(serializer=DummySerializer(None), headers={}) + ) + ) + + cancellation.cancel() + result = await operation_task + assert result == StartOperationResultAsync("test cancellation occurred") From dbaca99c970a5482b1d624242978ffb6d977ed72 Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Wed, 29 Oct 2025 12:11:49 -0700 Subject: [PATCH 02/26] fix some linter errors --- src/nexusrpc/handler/_cancellation.py | 2 +- tests/handler/test_cancellation.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/nexusrpc/handler/_cancellation.py b/src/nexusrpc/handler/_cancellation.py index 1d68e67..1353df7 100644 --- a/src/nexusrpc/handler/_cancellation.py +++ b/src/nexusrpc/handler/_cancellation.py @@ -27,7 +27,7 @@ def wait_until_cancelled(self, timeout: Optional[float] = None) -> bool: raise NotImplementedError @abstractmethod - async def wait_until_cancelled_async(self): + async def wait_until_cancelled_async(self) -> None: """Await cancellation using async primitives.""" raise NotImplementedError diff --git a/tests/handler/test_cancellation.py b/tests/handler/test_cancellation.py index e11ba21..21c1501 100644 --- a/tests/handler/test_cancellation.py +++ b/tests/handler/test_cancellation.py @@ -50,7 +50,7 @@ def cancellable_async(self) -> OperationHandler[None, None]: return CancellableAsyncOperationHandler() @sync_operation - async def cancellable_sync(self, ctx: StartOperationContext, input: None) -> str: + async def cancellable_sync(self, ctx: StartOperationContext, _input: None) -> str: cancelled = ctx.task_cancellation.wait_until_cancelled(1) if not cancelled: raise RuntimeError("Expected cancellation") From 847d9cb903d962e95d147de26b02fb1e412ee7cb Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Wed, 29 Oct 2025 14:39:57 -0700 Subject: [PATCH 03/26] Some PR feedback. Up min python version to 3.10 --- .github/workflows/ci.yml | 4 +- pyproject.toml | 50 +++++++------- src/nexusrpc/handler/__init__.py | 5 +- src/nexusrpc/handler/_cancellation.py | 52 -------------- src/nexusrpc/handler/_common.py | 40 ++++++++--- tests/handler/test_async_operation.py | 4 +- tests/handler/test_cancellation.py | 45 +++--------- tests/handler/test_request_routing.py | 5 +- ...rrectly_functioning_operation_factories.py | 2 + tests/helpers.py | 32 +++++++++ uv.lock | 68 +------------------ 11 files changed, 111 insertions(+), 196 deletions(-) delete mode 100644 src/nexusrpc/handler/_cancellation.py diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 14bf0b4..0b04811 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -10,7 +10,7 @@ jobs: runs-on: ${{ matrix.os }} strategy: matrix: - python-version: ['3.9', '3.13', '3.14'] + python-version: ['3.10', '3.13', '3.14'] os: [ubuntu-latest, macos-latest, windows-latest] steps: @@ -55,7 +55,7 @@ jobs: - name: Install uv uses: astral-sh/setup-uv@v6 with: - python-version: '3.9' + python-version: '3.10' - name: Install dependencies run: uv sync diff --git a/pyproject.toml b/pyproject.toml index 47e06e6..31bb98b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,26 +8,24 @@ name = "nexus-rpc" version = "1.1.0" description = "Nexus Python SDK" readme = "README.md" -authors = [ - { name = "Temporal Technologies", email = "sdk@temporal.io" } -] -requires-python = ">=3.9" -dependencies = [ - "typing-extensions>=4.12.2", -] +authors = [{ name = "Temporal Technologies", email = "sdk@temporal.io" }] +requires-python = ">=3.10" +license = "MIT" +license-files = ["LICENSE"] +dependencies = ["typing-extensions>=4.12.2"] [dependency-groups] dev = [ - "basedpyright>=1.30.1", - "mypy>=1.15.0", - "poethepoet>=0.35.0", - "pydoctor>=25.4.0", - "pyright>=1.1", - "pytest>=8.3.5", - "pytest-asyncio>=0.26.0", - "pytest-cov>=6.1.1", - "pytest-pretty>=1.3.0", - "ruff>=0.12.0", + "basedpyright>=1.30.1", + "mypy>=1.15.0", + "poethepoet>=0.35.0", + "pydoctor>=25.4.0", + "pyright>=1.1", + "pytest>=8.3.5", + "pytest-asyncio>=0.26.0", + "pytest-cov>=6.1.1", + "pytest-pretty>=1.3.0", + "ruff>=0.12.0", ] [build-system] @@ -39,19 +37,17 @@ packages = ["src/nexusrpc"] [tool.poe.tasks] lint = [ - {cmd = "uv run basedpyright"}, - {cmd = "uv run pyright"}, - {cmd = "uv run mypy --check-untyped-defs src"}, - {cmd = "uv run ruff check --select I"}, - {cmd = "uv run ruff format --check"}, + { cmd = "uv run basedpyright" }, + { cmd = "uv run pyright" }, + { cmd = "uv run mypy --check-untyped-defs src" }, + { cmd = "uv run ruff check --select I" }, + { cmd = "uv run ruff format --check" }, ] format = [ - {cmd = "uv run ruff check --select I --fix"}, - {cmd = "uv run ruff format"}, -] -docs = [ - {cmd = "uv run pydoctor src/nexusrpc"}, + { cmd = "uv run ruff check --select I --fix" }, + { cmd = "uv run ruff format" }, ] +docs = [{ cmd = "uv run pydoctor src/nexusrpc" }] [tool.pyright] # https://microsoft.github.io/pyright/#/configuration?id=type-check-rule-overrides diff --git a/src/nexusrpc/handler/__init__.py b/src/nexusrpc/handler/__init__.py index 795868d..9b9fdf2 100644 --- a/src/nexusrpc/handler/__init__.py +++ b/src/nexusrpc/handler/__init__.py @@ -13,12 +13,13 @@ from ._common import ( CancelOperationContext, OperationContext, + OperationTaskCancellation, StartOperationContext, StartOperationResultAsync, StartOperationResultSync, ) from ._core import Handler as Handler -from ._decorators import service_handler, sync_operation +from ._decorators import operation_handler, service_handler, sync_operation from ._operation_handler import OperationHandler as OperationHandler __all__ = [ @@ -26,9 +27,11 @@ "Handler", "OperationContext", "OperationHandler", + "OperationTaskCancellation", "service_handler", "StartOperationContext", "StartOperationResultAsync", "StartOperationResultSync", "sync_operation", + "operation_handler", ] diff --git a/src/nexusrpc/handler/_cancellation.py b/src/nexusrpc/handler/_cancellation.py deleted file mode 100644 index 1353df7..0000000 --- a/src/nexusrpc/handler/_cancellation.py +++ /dev/null @@ -1,52 +0,0 @@ -from abc import ABC, abstractmethod -from typing import Optional - - -class OperationTaskCancellation(ABC): - """ - Indicates whether a a Nexus task has been cancelled during a sync operation or before an async operation has - returned a token. - - Nexus worker implementations are expected to provide an implementation that enables - cooperative cancellation for both sync and async operation handlers. - """ - - @abstractmethod - def is_cancelled(self) -> bool: - """Return True if the associated task has been cancelled.""" - raise NotImplementedError - - @abstractmethod - def cancellation_details(self) -> Optional[str]: - """Provide additional context for the cancellation, if available.""" - raise NotImplementedError - - @abstractmethod - def wait_until_cancelled(self, timeout: Optional[float] = None) -> bool: - """Block until cancellation occurs or the optional timeout elapses.""" - raise NotImplementedError - - @abstractmethod - async def wait_until_cancelled_async(self) -> None: - """Await cancellation using async primitives.""" - raise NotImplementedError - - -class UncancellableOperationTaskCancellation(OperationTaskCancellation): - """An :py:class:`OperationTaskCancellation` that never cancels. Used by default if a Nexus worker implementation does not implement task cancellation.""" - - def is_cancelled(self) -> bool: - """Always report not cancelled.""" - return False - - def cancellation_details(self) -> Optional[str]: - """Return None because no cancellation data is ever available.""" - return None - - def wait_until_cancelled(self, timeout: Optional[float] = None): - """Never block because cancellation will not occur.""" - return False - - async def wait_until_cancelled_async(self): - """Never await cancellation because it cannot be triggered.""" - pass diff --git a/src/nexusrpc/handler/_common.py b/src/nexusrpc/handler/_common.py index 56f71f3..4aa4345 100644 --- a/src/nexusrpc/handler/_common.py +++ b/src/nexusrpc/handler/_common.py @@ -1,15 +1,41 @@ from __future__ import annotations -from abc import ABC +from abc import ABC, abstractmethod from collections.abc import Mapping, Sequence from dataclasses import dataclass, field from typing import Any, Generic, Optional from nexusrpc._common import Link, OutputT -from nexusrpc.handler._cancellation import ( - OperationTaskCancellation, - UncancellableOperationTaskCancellation, -) + + +class OperationTaskCancellation(ABC): + """ + Indicates whether a a Nexus task has been cancelled during a sync operation or before an async operation has + returned a token. + + Nexus worker implementations are expected to provide an implementation that enables + cooperative cancellation for both sync and async operation handlers. + """ + + @abstractmethod + def is_cancelled(self) -> bool: + """Return True if the associated task has been cancelled.""" + raise NotImplementedError + + @abstractmethod + def cancellation_reason(self) -> Optional[str]: + """Provide additional context for the cancellation, if available.""" + raise NotImplementedError + + @abstractmethod + def wait_until_cancelled_sync(self, timeout: Optional[float] = None) -> bool: + """Block until cancellation occurs or the optional timeout elapses.""" + raise NotImplementedError + + @abstractmethod + async def wait_until_cancelled(self) -> None: + """Await cancellation using async primitives.""" + raise NotImplementedError @dataclass(frozen=True) @@ -39,9 +65,7 @@ def __new__(cls, *args: Any, **kwargs: Any): """ Optional header fields sent by the caller. """ - task_cancellation: OperationTaskCancellation = field( - default_factory=UncancellableOperationTaskCancellation, kw_only=True - ) + task_cancellation: OperationTaskCancellation """ Task cancellation information indicating that a running task should be interrupted. This is distinct from operation cancellation. """ diff --git a/tests/handler/test_async_operation.py b/tests/handler/test_async_operation.py index 7cd99a0..bbb99a8 100644 --- a/tests/handler/test_async_operation.py +++ b/tests/handler/test_async_operation.py @@ -12,7 +12,7 @@ service_handler, ) from nexusrpc.handler._decorators import operation_handler -from tests.helpers import DummySerializer +from tests.helpers import DummySerializer, TestOperationTaskCancellation _operation_results: dict[str, int] = {} @@ -44,6 +44,7 @@ async def test_async_operation_happy_path(): operation="incr", headers={}, request_id="request_id", + task_cancellation=TestOperationTaskCancellation(), ) start_result = await handler.start_operation( start_ctx, LazyValue(DummySerializer(1), headers={}) @@ -55,6 +56,7 @@ async def test_async_operation_happy_path(): service="MyService", operation="incr", headers={}, + task_cancellation=TestOperationTaskCancellation(), ) await handler.cancel_operation(cancel_ctx, start_result.token) assert start_result.token not in _operation_results diff --git a/tests/handler/test_cancellation.py b/tests/handler/test_cancellation.py index 21c1501..626ecd2 100644 --- a/tests/handler/test_cancellation.py +++ b/tests/handler/test_cancellation.py @@ -1,6 +1,4 @@ import asyncio -import threading -from typing import Optional import pytest @@ -11,12 +9,12 @@ OperationHandler, StartOperationContext, StartOperationResultAsync, + StartOperationResultSync, + operation_handler, service_handler, + sync_operation, ) -from nexusrpc.handler._cancellation import OperationTaskCancellation -from nexusrpc.handler._common import StartOperationResultSync -from nexusrpc.handler._decorators import operation_handler, sync_operation -from tests.helpers import DummySerializer +from tests.helpers import DummySerializer, TestOperationTaskCancellation class CancellableAsyncOperationHandler(OperationHandler[None, None]): @@ -25,12 +23,12 @@ async def start( ) -> StartOperationResultAsync: try: await asyncio.wait_for( - ctx.task_cancellation.wait_until_cancelled_async(), timeout=1 + ctx.task_cancellation.wait_until_cancelled(), timeout=1 ) except TimeoutError as err: raise RuntimeError("Expected cancellation") from err - details = ctx.task_cancellation.cancellation_details() + details = ctx.task_cancellation.cancellation_reason() if not details: raise RuntimeError("Expected cancellation details") @@ -51,44 +49,17 @@ def cancellable_async(self) -> OperationHandler[None, None]: @sync_operation async def cancellable_sync(self, ctx: StartOperationContext, _input: None) -> str: - cancelled = ctx.task_cancellation.wait_until_cancelled(1) + cancelled = ctx.task_cancellation.wait_until_cancelled_sync(1) if not cancelled: raise RuntimeError("Expected cancellation") - details = ctx.task_cancellation.cancellation_details() + details = ctx.task_cancellation.cancellation_reason() if not details: raise RuntimeError("Expected cancellation details") return details -class TestOperationTaskCancellation(OperationTaskCancellation): - # A naive implementation of cancellation for use in tests - def __init__(self): - self._details = None - self._evt = threading.Event() - self._lock = threading.Lock() - - def is_cancelled(self) -> bool: - return self._evt.is_set() - - def cancellation_details(self) -> Optional[str]: - with self._lock: - return self._details - - def wait_until_cancelled(self, timeout: float | None = None) -> bool: - return self._evt.wait(timeout) - - async def wait_until_cancelled_async(self): - while not self.is_cancelled(): - await asyncio.sleep(0.05) - - def cancel(self): - with self._lock: - self._details = "test cancellation occurred" - self._evt.set() - - @pytest.mark.asyncio async def test_cancellation_sync_operation(): handler = Handler(user_service_handlers=[MyService()]) diff --git a/tests/handler/test_request_routing.py b/tests/handler/test_request_routing.py index 485458e..64b54bc 100644 --- a/tests/handler/test_request_routing.py +++ b/tests/handler/test_request_routing.py @@ -13,8 +13,7 @@ sync_operation, ) from nexusrpc.handler._common import StartOperationResultSync - -from ..helpers import DummySerializer +from tests.helpers import DummySerializer, TestOperationTaskCancellation @dataclass_transform() @@ -114,6 +113,7 @@ async def test_request_routing_with_service_definition( operation=request_op, headers={}, request_id="request-id", + task_cancellation=TestOperationTaskCancellation(), ) handler = Handler(user_service_handlers=[test_case.UserServiceHandler()]) result = await handler.start_operation( @@ -181,6 +181,7 @@ async def test_request_routing_without_service_definition( operation=request_op, headers={}, request_id="request-id", + task_cancellation=TestOperationTaskCancellation(), ) handler = Handler(user_service_handlers=[test_case.UserServiceHandler()]) result = await handler.start_operation( diff --git a/tests/handler/test_service_handler_decorator_results_in_correctly_functioning_operation_factories.py b/tests/handler/test_service_handler_decorator_results_in_correctly_functioning_operation_factories.py index 15e592e..bbe1c0c 100644 --- a/tests/handler/test_service_handler_decorator_results_in_correctly_functioning_operation_factories.py +++ b/tests/handler/test_service_handler_decorator_results_in_correctly_functioning_operation_factories.py @@ -23,6 +23,7 @@ from nexusrpc.handler._operation_handler import ( collect_operation_handler_factories_by_method_name, ) +from tests.helpers import TestOperationTaskCancellation @dataclass_transform() @@ -89,6 +90,7 @@ async def test_collected_operation_factories_match_service_definition( operation="operation", headers={}, request_id="request_id", + task_cancellation=TestOperationTaskCancellation(), ) async def execute( diff --git a/tests/helpers.py b/tests/helpers.py index 4338511..021d55d 100644 --- a/tests/helpers.py +++ b/tests/helpers.py @@ -1,7 +1,10 @@ +import asyncio +import threading from dataclasses import dataclass from typing import Any, Optional from nexusrpc import Content +from nexusrpc.handler import OperationTaskCancellation @dataclass @@ -17,3 +20,32 @@ async def deserialize( as_type: Optional[type[Any]] = None, # pyright: ignore[reportUnusedParameter] ) -> Any: return self.value + + +class TestOperationTaskCancellation(OperationTaskCancellation): + __test__ = False + + # A naive implementation of cancellation for use in tests + def __init__(self): + self._details = None + self._evt = threading.Event() + self._lock = threading.Lock() + + def is_cancelled(self) -> bool: + return self._evt.is_set() + + def cancellation_reason(self) -> Optional[str]: + with self._lock: + return self._details + + def wait_until_cancelled_sync(self, timeout: float | None = None) -> bool: + return self._evt.wait(timeout) + + async def wait_until_cancelled(self): + while not self.is_cancelled(): + await asyncio.sleep(0.05) + + def cancel(self): + with self._lock: + self._details = "test cancellation occurred" + self._evt.set() diff --git a/uv.lock b/uv.lock index 71fe358..1ddde42 100644 --- a/uv.lock +++ b/uv.lock @@ -1,11 +1,6 @@ version = 1 -revision = 2 -requires-python = ">=3.9" -resolution-markers = [ - "python_full_version >= '3.11'", - "python_full_version == '3.10.*'", - "python_full_version < '3.10'", -] +revision = 3 +requires-python = ">=3.10" [[package]] name = "attrs" @@ -20,9 +15,6 @@ wheels = [ name = "automat" version = "25.4.16" source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "typing-extensions", marker = "python_full_version < '3.10'" }, -] sdist = { url = "https://files.pythonhosted.org/packages/e3/0f/d40bbe294bbf004d436a8bcbcfaadca8b5140d39ad0ad3d73d1a8ba15f14/automat-25.4.16.tar.gz", hash = "sha256:0017591a5477066e90d26b0e696ddc143baafd87b588cfac8100bc6be9634de0", size = 129977, upload-time = "2025-04-16T20:12:16.002Z" } wheels = [ { url = "https://files.pythonhosted.org/packages/02/ff/1175b0b7371e46244032d43a56862d0af455823b5280a50c63d99cc50f18/automat-25.4.16-py3-none-any.whl", hash = "sha256:04e9bce696a8d5671ee698005af6e5a9fa15354140a87f4870744604dcdd3ba1", size = 42842, upload-time = "2025-04-16T20:12:14.447Z" }, @@ -125,19 +117,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/78/be/8392efc43487ac051eee6c36d5fbd63032d78f7728cb37aebcc98191f1ff/charset_normalizer-3.4.2-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:4a476b06fbcf359ad25d34a057b7219281286ae2477cc5ff5e3f70a246971148", size = 149166, upload-time = "2025-05-02T08:33:15.458Z" }, { url = "https://files.pythonhosted.org/packages/44/96/392abd49b094d30b91d9fbda6a69519e95802250b777841cf3bda8fe136c/charset_normalizer-3.4.2-cp313-cp313-win32.whl", hash = "sha256:aaeeb6a479c7667fbe1099af9617c83aaca22182d6cf8c53966491a0f1b7ffb7", size = 98064, upload-time = "2025-05-02T08:33:17.06Z" }, { url = "https://files.pythonhosted.org/packages/e9/b0/0200da600134e001d91851ddc797809e2fe0ea72de90e09bec5a2fbdaccb/charset_normalizer-3.4.2-cp313-cp313-win_amd64.whl", hash = "sha256:aa6af9e7d59f9c12b33ae4e9450619cf2488e2bbe9b44030905877f0b2324980", size = 105641, upload-time = "2025-05-02T08:33:18.753Z" }, - { url = "https://files.pythonhosted.org/packages/28/f8/dfb01ff6cc9af38552c69c9027501ff5a5117c4cc18dcd27cb5259fa1888/charset_normalizer-3.4.2-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:005fa3432484527f9732ebd315da8da8001593e2cf46a3d817669f062c3d9ed4", size = 201671, upload-time = "2025-05-02T08:34:12.696Z" }, - { url = "https://files.pythonhosted.org/packages/32/fb/74e26ee556a9dbfe3bd264289b67be1e6d616329403036f6507bb9f3f29c/charset_normalizer-3.4.2-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:e92fca20c46e9f5e1bb485887d074918b13543b1c2a1185e69bb8d17ab6236a7", size = 144744, upload-time = "2025-05-02T08:34:14.665Z" }, - { url = "https://files.pythonhosted.org/packages/ad/06/8499ee5aa7addc6f6d72e068691826ff093329fe59891e83b092ae4c851c/charset_normalizer-3.4.2-cp39-cp39-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:50bf98d5e563b83cc29471fa114366e6806bc06bc7a25fd59641e41445327836", size = 154993, upload-time = "2025-05-02T08:34:17.134Z" }, - { url = "https://files.pythonhosted.org/packages/f1/a2/5e4c187680728219254ef107a6949c60ee0e9a916a5dadb148c7ae82459c/charset_normalizer-3.4.2-cp39-cp39-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:721c76e84fe669be19c5791da68232ca2e05ba5185575086e384352e2c309597", size = 147382, upload-time = "2025-05-02T08:34:19.081Z" }, - { url = "https://files.pythonhosted.org/packages/4c/fe/56aca740dda674f0cc1ba1418c4d84534be51f639b5f98f538b332dc9a95/charset_normalizer-3.4.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:82d8fd25b7f4675d0c47cf95b594d4e7b158aca33b76aa63d07186e13c0e0ab7", size = 149536, upload-time = "2025-05-02T08:34:21.073Z" }, - { url = "https://files.pythonhosted.org/packages/53/13/db2e7779f892386b589173dd689c1b1e304621c5792046edd8a978cbf9e0/charset_normalizer-3.4.2-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:b3daeac64d5b371dea99714f08ffc2c208522ec6b06fbc7866a450dd446f5c0f", size = 151349, upload-time = "2025-05-02T08:34:23.193Z" }, - { url = "https://files.pythonhosted.org/packages/69/35/e52ab9a276186f729bce7a0638585d2982f50402046e4b0faa5d2c3ef2da/charset_normalizer-3.4.2-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:dccab8d5fa1ef9bfba0590ecf4d46df048d18ffe3eec01eeb73a42e0d9e7a8ba", size = 146365, upload-time = "2025-05-02T08:34:25.187Z" }, - { url = "https://files.pythonhosted.org/packages/a6/d8/af7333f732fc2e7635867d56cb7c349c28c7094910c72267586947561b4b/charset_normalizer-3.4.2-cp39-cp39-musllinux_1_2_i686.whl", hash = "sha256:aaf27faa992bfee0264dc1f03f4c75e9fcdda66a519db6b957a3f826e285cf12", size = 154499, upload-time = "2025-05-02T08:34:27.359Z" }, - { url = "https://files.pythonhosted.org/packages/7a/3d/a5b2e48acef264d71e036ff30bcc49e51bde80219bb628ba3e00cf59baac/charset_normalizer-3.4.2-cp39-cp39-musllinux_1_2_ppc64le.whl", hash = "sha256:eb30abc20df9ab0814b5a2524f23d75dcf83cde762c161917a2b4b7b55b1e518", size = 157735, upload-time = "2025-05-02T08:34:29.798Z" }, - { url = "https://files.pythonhosted.org/packages/85/d8/23e2c112532a29f3eef374375a8684a4f3b8e784f62b01da931186f43494/charset_normalizer-3.4.2-cp39-cp39-musllinux_1_2_s390x.whl", hash = "sha256:c72fbbe68c6f32f251bdc08b8611c7b3060612236e960ef848e0a517ddbe76c5", size = 154786, upload-time = "2025-05-02T08:34:31.858Z" }, - { url = "https://files.pythonhosted.org/packages/c7/57/93e0169f08ecc20fe82d12254a200dfaceddc1c12a4077bf454ecc597e33/charset_normalizer-3.4.2-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:982bb1e8b4ffda883b3d0a521e23abcd6fd17418f6d2c4118d257a10199c0ce3", size = 150203, upload-time = "2025-05-02T08:34:33.88Z" }, - { url = "https://files.pythonhosted.org/packages/2c/9d/9bf2b005138e7e060d7ebdec7503d0ef3240141587651f4b445bdf7286c2/charset_normalizer-3.4.2-cp39-cp39-win32.whl", hash = "sha256:43e0933a0eff183ee85833f341ec567c0980dae57c464d8a508e1b2ceb336471", size = 98436, upload-time = "2025-05-02T08:34:35.907Z" }, - { url = "https://files.pythonhosted.org/packages/6d/24/5849d46cf4311bbf21b424c443b09b459f5b436b1558c04e45dbb7cc478b/charset_normalizer-3.4.2-cp39-cp39-win_amd64.whl", hash = "sha256:d11b54acf878eef558599658b0ffca78138c8c3655cf4f3a4a673c437e67732e", size = 105772, upload-time = "2025-05-02T08:34:37.935Z" }, { url = "https://files.pythonhosted.org/packages/20/94/c5790835a017658cbfabd07f3bfb549140c3ac458cfc196323996b10095a/charset_normalizer-3.4.2-py3-none-any.whl", hash = "sha256:7f56930ab0abd1c45cd15be65cc741c28b1c9a34876ce8c17a2fa107810c0af0", size = 52626, upload-time = "2025-05-02T08:34:40.053Z" }, ] @@ -228,16 +207,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/50/63/2d624ac7d7ccd4ebbd3c6a9eba9d7fc4491a1226071360d59dd84928ccb2/coverage-7.8.2-cp313-cp313t-win32.whl", hash = "sha256:3f5673888d3676d0a745c3d0e16da338c5eea300cb1f4ada9c872981265e76d8", size = 215109, upload-time = "2025-05-23T11:39:26.722Z" }, { url = "https://files.pythonhosted.org/packages/22/5e/7053b71462e970e869111c1853afd642212568a350eba796deefdfbd0770/coverage-7.8.2-cp313-cp313t-win_amd64.whl", hash = "sha256:2c08b05ee8d7861e45dc5a2cc4195c8c66dca5ac613144eb6ebeaff2d502e73d", size = 216268, upload-time = "2025-05-23T11:39:28.429Z" }, { url = "https://files.pythonhosted.org/packages/07/69/afa41aa34147655543dbe96994f8a246daf94b361ccf5edfd5df62ce066a/coverage-7.8.2-cp313-cp313t-win_arm64.whl", hash = "sha256:1e1448bb72b387755e1ff3ef1268a06617afd94188164960dba8d0245a46004b", size = 214071, upload-time = "2025-05-23T11:39:30.55Z" }, - { url = "https://files.pythonhosted.org/packages/71/1e/388267ad9c6aa126438acc1ceafede3bb746afa9872e3ec5f0691b7d5efa/coverage-7.8.2-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:496948261eaac5ac9cf43f5d0a9f6eb7a6d4cb3bedb2c5d294138142f5c18f2a", size = 211566, upload-time = "2025-05-23T11:39:32.333Z" }, - { url = "https://files.pythonhosted.org/packages/8f/a5/acc03e5cf0bba6357f5e7c676343de40fbf431bb1e115fbebf24b2f7f65e/coverage-7.8.2-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:eacd2de0d30871eff893bab0b67840a96445edcb3c8fd915e6b11ac4b2f3fa6d", size = 211996, upload-time = "2025-05-23T11:39:34.512Z" }, - { url = "https://files.pythonhosted.org/packages/5b/a2/0fc0a9f6b7c24fa4f1d7210d782c38cb0d5e692666c36eaeae9a441b6755/coverage-7.8.2-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:b039ffddc99ad65d5078ef300e0c7eed08c270dc26570440e3ef18beb816c1ca", size = 240741, upload-time = "2025-05-23T11:39:36.252Z" }, - { url = "https://files.pythonhosted.org/packages/e6/da/1c6ba2cf259710eed8916d4fd201dccc6be7380ad2b3b9f63ece3285d809/coverage-7.8.2-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:0e49824808d4375ede9dd84e9961a59c47f9113039f1a525e6be170aa4f5c34d", size = 238672, upload-time = "2025-05-23T11:39:38.03Z" }, - { url = "https://files.pythonhosted.org/packages/ac/51/c8fae0dc3ca421e6e2509503696f910ff333258db672800c3bdef256265a/coverage-7.8.2-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b069938961dfad881dc2f8d02b47645cd2f455d3809ba92a8a687bf513839787", size = 239769, upload-time = "2025-05-23T11:39:40.24Z" }, - { url = "https://files.pythonhosted.org/packages/59/8e/b97042ae92c59f40be0c989df090027377ba53f2d6cef73c9ca7685c26a6/coverage-7.8.2-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:de77c3ba8bb686d1c411e78ee1b97e6e0b963fb98b1637658dd9ad2c875cf9d7", size = 239555, upload-time = "2025-05-23T11:39:42.3Z" }, - { url = "https://files.pythonhosted.org/packages/47/35/b8893e682d6e96b1db2af5997fc13ef62219426fb17259d6844c693c5e00/coverage-7.8.2-cp39-cp39-musllinux_1_2_i686.whl", hash = "sha256:1676628065a498943bd3f64f099bb573e08cf1bc6088bbe33cf4424e0876f4b3", size = 237768, upload-time = "2025-05-23T11:39:44.069Z" }, - { url = "https://files.pythonhosted.org/packages/03/6c/023b0b9a764cb52d6243a4591dcb53c4caf4d7340445113a1f452bb80591/coverage-7.8.2-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:8e1a26e7e50076e35f7afafde570ca2b4d7900a491174ca357d29dece5aacee7", size = 238757, upload-time = "2025-05-23T11:39:46.195Z" }, - { url = "https://files.pythonhosted.org/packages/03/ed/3af7e4d721bd61a8df7de6de9e8a4271e67f3d9e086454558fd9f48eb4f6/coverage-7.8.2-cp39-cp39-win32.whl", hash = "sha256:6782a12bf76fa61ad9350d5a6ef5f3f020b57f5e6305cbc663803f2ebd0f270a", size = 214166, upload-time = "2025-05-23T11:39:47.934Z" }, - { url = "https://files.pythonhosted.org/packages/9d/30/ee774b626773750dc6128354884652507df3c59d6aa8431526107e595227/coverage-7.8.2-cp39-cp39-win_amd64.whl", hash = "sha256:1efa4166ba75ccefd647f2d78b64f53f14fb82622bc94c5a5cb0a622f50f1c9e", size = 215050, upload-time = "2025-05-23T11:39:50.252Z" }, { url = "https://files.pythonhosted.org/packages/69/2f/572b29496d8234e4a7773200dd835a0d32d9e171f2d974f3fe04a9dbc271/coverage-7.8.2-pp39.pp310.pp311-none-any.whl", hash = "sha256:ec455eedf3ba0bbdf8f5a570012617eb305c63cb9f03428d39bf544cb2b94837", size = 203636, upload-time = "2025-05-23T11:39:52.002Z" }, { url = "https://files.pythonhosted.org/packages/a0/1a/0b9c32220ad694d66062f571cc5cedfa9997b64a591e8a500bb63de1bd40/coverage-7.8.2-py3-none-any.whl", hash = "sha256:726f32ee3713f7359696331a18daf0c3b3a70bb0ae71141b9d3c52be7c595e32", size = 203623, upload-time = "2025-05-23T11:39:53.846Z" }, ] @@ -397,17 +366,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/7a/40/631c238f1f338eb09f4acb0f34ab5862c4e9d7eda11c1b685471a4c5ea37/msgpack-1.1.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:b4c01941fd2ff87c2a934ee6055bda4ed353a7846b8d4f341c428109e9fcde8c", size = 399082, upload-time = "2024-09-10T04:25:18.398Z" }, { url = "https://files.pythonhosted.org/packages/e9/1b/fa8a952be252a1555ed39f97c06778e3aeb9123aa4cccc0fd2acd0b4e315/msgpack-1.1.0-cp313-cp313-win32.whl", hash = "sha256:7c9a35ce2c2573bada929e0b7b3576de647b0defbd25f5139dcdaba0ae35a4cc", size = 69037, upload-time = "2024-09-10T04:24:52.798Z" }, { url = "https://files.pythonhosted.org/packages/b6/bc/8bd826dd03e022153bfa1766dcdec4976d6c818865ed54223d71f07862b3/msgpack-1.1.0-cp313-cp313-win_amd64.whl", hash = "sha256:bce7d9e614a04d0883af0b3d4d501171fbfca038f12c77fa838d9f198147a23f", size = 75140, upload-time = "2024-09-10T04:24:31.288Z" }, - { url = "https://files.pythonhosted.org/packages/f7/3b/544a5c5886042b80e1f4847a4757af3430f60d106d8d43bb7be72c9e9650/msgpack-1.1.0-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:53258eeb7a80fc46f62fd59c876957a2d0e15e6449a9e71842b6d24419d88ca1", size = 150713, upload-time = "2024-09-10T04:25:23.397Z" }, - { url = "https://files.pythonhosted.org/packages/93/af/d63f25bcccd3d6f06fd518ba4a321f34a4370c67b579ca5c70b4a37721b4/msgpack-1.1.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:7e7b853bbc44fb03fbdba34feb4bd414322180135e2cb5164f20ce1c9795ee48", size = 84277, upload-time = "2024-09-10T04:24:34.656Z" }, - { url = "https://files.pythonhosted.org/packages/92/9b/5c0dfb0009b9f96328664fecb9f8e4e9c8a1ae919e6d53986c1b813cb493/msgpack-1.1.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:f3e9b4936df53b970513eac1758f3882c88658a220b58dcc1e39606dccaaf01c", size = 81357, upload-time = "2024-09-10T04:24:56.603Z" }, - { url = "https://files.pythonhosted.org/packages/d1/7c/3a9ee6ec9fc3e47681ad39b4d344ee04ff20a776b594fba92d88d8b68356/msgpack-1.1.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:46c34e99110762a76e3911fc923222472c9d681f1094096ac4102c18319e6468", size = 371256, upload-time = "2024-09-10T04:25:11.473Z" }, - { url = "https://files.pythonhosted.org/packages/f7/0a/8a213cecea7b731c540f25212ba5f9a818f358237ac51a44d448bd753690/msgpack-1.1.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8a706d1e74dd3dea05cb54580d9bd8b2880e9264856ce5068027eed09680aa74", size = 377868, upload-time = "2024-09-10T04:25:24.535Z" }, - { url = "https://files.pythonhosted.org/packages/1b/94/a82b0db0981e9586ed5af77d6cfb343da05d7437dceaae3b35d346498110/msgpack-1.1.0-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:534480ee5690ab3cbed89d4c8971a5c631b69a8c0883ecfea96c19118510c846", size = 363370, upload-time = "2024-09-10T04:24:21.812Z" }, - { url = "https://files.pythonhosted.org/packages/93/fc/6c7f0dcc1c913e14861e16eaf494c07fc1dde454ec726ff8cebcf348ae53/msgpack-1.1.0-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:8cf9e8c3a2153934a23ac160cc4cba0ec035f6867c8013cc6077a79823370346", size = 358970, upload-time = "2024-09-10T04:24:24.741Z" }, - { url = "https://files.pythonhosted.org/packages/1f/c6/e4a04c0089deace870dabcdef5c9f12798f958e2e81d5012501edaff342f/msgpack-1.1.0-cp39-cp39-musllinux_1_2_i686.whl", hash = "sha256:3180065ec2abbe13a4ad37688b61b99d7f9e012a535b930e0e683ad6bc30155b", size = 366358, upload-time = "2024-09-10T04:25:45.955Z" }, - { url = "https://files.pythonhosted.org/packages/b6/54/7d8317dac590cf16b3e08e3fb74d2081e5af44eb396f0effa13f17777f30/msgpack-1.1.0-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:c5a91481a3cc573ac8c0d9aace09345d989dc4a0202b7fcb312c88c26d4e71a8", size = 370336, upload-time = "2024-09-10T04:24:26.918Z" }, - { url = "https://files.pythonhosted.org/packages/dc/6f/a5a1f43b6566831e9630e5bc5d86034a8884386297302be128402555dde1/msgpack-1.1.0-cp39-cp39-win32.whl", hash = "sha256:f80bc7d47f76089633763f952e67f8214cb7b3ee6bfa489b3cb6a84cfac114cd", size = 68683, upload-time = "2024-09-10T04:24:32.984Z" }, - { url = "https://files.pythonhosted.org/packages/5f/e8/2162621e18dbc36e2bc8492fd0e97b3975f5d89fe0472ae6d5f7fbdd8cf7/msgpack-1.1.0-cp39-cp39-win_amd64.whl", hash = "sha256:4d1b7ff2d6146e16e8bd665ac726a89c74163ef8cd39fa8c1087d4e52d3a2325", size = 74787, upload-time = "2024-09-10T04:25:14.524Z" }, ] [[package]] @@ -445,12 +403,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/d2/8b/801aa06445d2de3895f59e476f38f3f8d610ef5d6908245f07d002676cbf/mypy-1.15.0-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:c43a7682e24b4f576d93072216bf56eeff70d9140241f9edec0c104d0c515036", size = 12402541, upload-time = "2025-02-05T03:49:57.623Z" }, { url = "https://files.pythonhosted.org/packages/c7/67/5a4268782eb77344cc613a4cf23540928e41f018a9a1ec4c6882baf20ab8/mypy-1.15.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:baefc32840a9f00babd83251560e0ae1573e2f9d1b067719479bfb0e987c6357", size = 12494348, upload-time = "2025-02-05T03:48:52.361Z" }, { url = "https://files.pythonhosted.org/packages/83/3e/57bb447f7bbbfaabf1712d96f9df142624a386d98fb026a761532526057e/mypy-1.15.0-cp313-cp313-win_amd64.whl", hash = "sha256:b9378e2c00146c44793c98b8d5a61039a048e31f429fb0eb546d93f4b000bedf", size = 9373648, upload-time = "2025-02-05T03:49:11.395Z" }, - { url = "https://files.pythonhosted.org/packages/5a/fa/79cf41a55b682794abe71372151dbbf856e3008f6767057229e6649d294a/mypy-1.15.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:e601a7fa172c2131bff456bb3ee08a88360760d0d2f8cbd7a75a65497e2df078", size = 10737129, upload-time = "2025-02-05T03:50:24.509Z" }, - { url = "https://files.pythonhosted.org/packages/d3/33/dd8feb2597d648de29e3da0a8bf4e1afbda472964d2a4a0052203a6f3594/mypy-1.15.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:712e962a6357634fef20412699a3655c610110e01cdaa6180acec7fc9f8513ba", size = 9856335, upload-time = "2025-02-05T03:49:36.398Z" }, - { url = "https://files.pythonhosted.org/packages/e4/b5/74508959c1b06b96674b364ffeb7ae5802646b32929b7701fc6b18447592/mypy-1.15.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:f95579473af29ab73a10bada2f9722856792a36ec5af5399b653aa28360290a5", size = 11611935, upload-time = "2025-02-05T03:49:14.154Z" }, - { url = "https://files.pythonhosted.org/packages/6c/53/da61b9d9973efcd6507183fdad96606996191657fe79701b2c818714d573/mypy-1.15.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:8f8722560a14cde92fdb1e31597760dc35f9f5524cce17836c0d22841830fd5b", size = 12365827, upload-time = "2025-02-05T03:48:59.458Z" }, - { url = "https://files.pythonhosted.org/packages/c1/72/965bd9ee89540c79a25778cc080c7e6ef40aa1eeac4d52cec7eae6eb5228/mypy-1.15.0-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:1fbb8da62dc352133d7d7ca90ed2fb0e9d42bb1a32724c287d3c76c58cbaa9c2", size = 12541924, upload-time = "2025-02-05T03:50:03.12Z" }, - { url = "https://files.pythonhosted.org/packages/46/d0/f41645c2eb263e6c77ada7d76f894c580c9ddb20d77f0c24d34273a4dab2/mypy-1.15.0-cp39-cp39-win_amd64.whl", hash = "sha256:d10d994b41fb3497719bbf866f227b3489048ea4bbbb5015357db306249f7980", size = 9271176, upload-time = "2025-02-05T03:50:10.86Z" }, { url = "https://files.pythonhosted.org/packages/09/4e/a7d65c7322c510de2c409ff3828b03354a7c43f5a8ed458a7a131b41c7b9/mypy-1.15.0-py3-none-any.whl", hash = "sha256:5469affef548bd1895d86d3bf10ce2b44e33d86923c29e4d675b3e323437ea3e", size = 2221777, upload-time = "2025-02-05T03:50:08.348Z" }, ] @@ -643,7 +595,6 @@ version = "0.26.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "pytest" }, - { name = "typing-extensions", marker = "python_full_version < '3.10'" }, ] sdist = { url = "https://files.pythonhosted.org/packages/8e/c4/453c52c659521066969523e87d85d54139bbd17b78f09532fb8eb8cdb58e/pytest_asyncio-0.26.0.tar.gz", hash = "sha256:c4df2a697648241ff39e7f0e4a73050b03f123f760673956cf0d72a4990e312f", size = 54156, upload-time = "2025-03-25T06:22:28.883Z" } wheels = [ @@ -718,15 +669,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/fe/0f/25911a9f080464c59fab9027482f822b86bf0608957a5fcc6eaac85aa515/PyYAML-6.0.2-cp313-cp313-musllinux_1_1_x86_64.whl", hash = "sha256:68ccc6023a3400877818152ad9a1033e3db8625d899c72eacb5a668902e4d652", size = 751597, upload-time = "2024-08-06T20:32:56.985Z" }, { url = "https://files.pythonhosted.org/packages/14/0d/e2c3b43bbce3cf6bd97c840b46088a3031085179e596d4929729d8d68270/PyYAML-6.0.2-cp313-cp313-win32.whl", hash = "sha256:bc2fa7c6b47d6bc618dd7fb02ef6fdedb1090ec036abab80d4681424b84c1183", size = 140527, upload-time = "2024-08-06T20:33:03.001Z" }, { url = "https://files.pythonhosted.org/packages/fa/de/02b54f42487e3d3c6efb3f89428677074ca7bf43aae402517bc7cca949f3/PyYAML-6.0.2-cp313-cp313-win_amd64.whl", hash = "sha256:8388ee1976c416731879ac16da0aff3f63b286ffdd57cdeb95f3f2e085687563", size = 156446, upload-time = "2024-08-06T20:33:04.33Z" }, - { url = "https://files.pythonhosted.org/packages/65/d8/b7a1db13636d7fb7d4ff431593c510c8b8fca920ade06ca8ef20015493c5/PyYAML-6.0.2-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:688ba32a1cffef67fd2e9398a2efebaea461578b0923624778664cc1c914db5d", size = 184777, upload-time = "2024-08-06T20:33:25.896Z" }, - { url = "https://files.pythonhosted.org/packages/0a/02/6ec546cd45143fdf9840b2c6be8d875116a64076218b61d68e12548e5839/PyYAML-6.0.2-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:a8786accb172bd8afb8be14490a16625cbc387036876ab6ba70912730faf8e1f", size = 172318, upload-time = "2024-08-06T20:33:27.212Z" }, - { url = "https://files.pythonhosted.org/packages/0e/9a/8cc68be846c972bda34f6c2a93abb644fb2476f4dcc924d52175786932c9/PyYAML-6.0.2-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d8e03406cac8513435335dbab54c0d385e4a49e4945d2909a581c83647ca0290", size = 720891, upload-time = "2024-08-06T20:33:28.974Z" }, - { url = "https://files.pythonhosted.org/packages/e9/6c/6e1b7f40181bc4805e2e07f4abc10a88ce4648e7e95ff1abe4ae4014a9b2/PyYAML-6.0.2-cp39-cp39-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:f753120cb8181e736c57ef7636e83f31b9c0d1722c516f7e86cf15b7aa57ff12", size = 722614, upload-time = "2024-08-06T20:33:34.157Z" }, - { url = "https://files.pythonhosted.org/packages/3d/32/e7bd8535d22ea2874cef6a81021ba019474ace0d13a4819c2a4bce79bd6a/PyYAML-6.0.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3b1fdb9dc17f5a7677423d508ab4f243a726dea51fa5e70992e59a7411c89d19", size = 737360, upload-time = "2024-08-06T20:33:35.84Z" }, - { url = "https://files.pythonhosted.org/packages/d7/12/7322c1e30b9be969670b672573d45479edef72c9a0deac3bb2868f5d7469/PyYAML-6.0.2-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:0b69e4ce7a131fe56b7e4d770c67429700908fc0752af059838b1cfb41960e4e", size = 699006, upload-time = "2024-08-06T20:33:37.501Z" }, - { url = "https://files.pythonhosted.org/packages/82/72/04fcad41ca56491995076630c3ec1e834be241664c0c09a64c9a2589b507/PyYAML-6.0.2-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:a9f8c2e67970f13b16084e04f134610fd1d374bf477b17ec1599185cf611d725", size = 723577, upload-time = "2024-08-06T20:33:39.389Z" }, - { url = "https://files.pythonhosted.org/packages/ed/5e/46168b1f2757f1fcd442bc3029cd8767d88a98c9c05770d8b420948743bb/PyYAML-6.0.2-cp39-cp39-win32.whl", hash = "sha256:6395c297d42274772abc367baaa79683958044e5d3835486c16da75d2a694631", size = 144593, upload-time = "2024-08-06T20:33:46.63Z" }, - { url = "https://files.pythonhosted.org/packages/19/87/5124b1c1f2412bb95c59ec481eaf936cd32f0fe2a7b16b97b81c4c017a6a/PyYAML-6.0.2-cp39-cp39-win_amd64.whl", hash = "sha256:39693e1f8320ae4f43943590b49779ffb98acb81f788220ea932a6b6c51004d8", size = 162312, upload-time = "2024-08-06T20:33:49.073Z" }, ] [[package]] @@ -909,10 +851,4 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/b8/f6/54548df6dc73e30ac6c8a7ff1da73ac9007ba38f866397091d5a82237bd3/zope.interface-7.2-cp313-cp313-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:eb23f58a446a7f09db85eda09521a498e109f137b85fb278edb2e34841055398", size = 259237, upload-time = "2024-11-28T08:48:31.71Z" }, { url = "https://files.pythonhosted.org/packages/b6/66/ac05b741c2129fdf668b85631d2268421c5cd1a9ff99be1674371139d665/zope.interface-7.2-cp313-cp313-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a71a5b541078d0ebe373a81a3b7e71432c61d12e660f1d67896ca62d9628045b", size = 264696, upload-time = "2024-11-28T08:48:41.161Z" }, { url = "https://files.pythonhosted.org/packages/0a/2f/1bccc6f4cc882662162a1158cda1a7f616add2ffe322b28c99cb031b4ffc/zope.interface-7.2-cp313-cp313-win_amd64.whl", hash = "sha256:4893395d5dd2ba655c38ceb13014fd65667740f09fa5bb01caa1e6284e48c0cd", size = 212472, upload-time = "2024-11-28T08:49:56.587Z" }, - { url = "https://files.pythonhosted.org/packages/8c/2c/1f49dc8b4843c4f0848d8e43191aed312bad946a1563d1bf9e46cf2816ee/zope.interface-7.2-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:7bd449c306ba006c65799ea7912adbbfed071089461a19091a228998b82b1fdb", size = 208349, upload-time = "2024-11-28T08:49:28.872Z" }, - { url = "https://files.pythonhosted.org/packages/ed/7d/83ddbfc8424c69579a90fc8edc2b797223da2a8083a94d8dfa0e374c5ed4/zope.interface-7.2-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:a19a6cc9c6ce4b1e7e3d319a473cf0ee989cbbe2b39201d7c19e214d2dfb80c7", size = 208799, upload-time = "2024-11-28T08:49:30.616Z" }, - { url = "https://files.pythonhosted.org/packages/36/22/b1abd91854c1be03f5542fe092e6a745096d2eca7704d69432e119100583/zope.interface-7.2-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:72cd1790b48c16db85d51fbbd12d20949d7339ad84fd971427cf00d990c1f137", size = 254267, upload-time = "2024-11-28T09:18:21.059Z" }, - { url = "https://files.pythonhosted.org/packages/2a/dd/fcd313ee216ad0739ae00e6126bc22a0af62a74f76a9ca668d16cd276222/zope.interface-7.2-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:52e446f9955195440e787596dccd1411f543743c359eeb26e9b2c02b077b0519", size = 248614, upload-time = "2024-11-28T08:48:41.953Z" }, - { url = "https://files.pythonhosted.org/packages/88/d4/4ba1569b856870527cec4bf22b91fe704b81a3c1a451b2ccf234e9e0666f/zope.interface-7.2-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2ad9913fd858274db8dd867012ebe544ef18d218f6f7d1e3c3e6d98000f14b75", size = 253800, upload-time = "2024-11-28T08:48:46.637Z" }, - { url = "https://files.pythonhosted.org/packages/69/da/c9cfb384c18bd3a26d9fc6a9b5f32ccea49ae09444f097eaa5ca9814aff9/zope.interface-7.2-cp39-cp39-win_amd64.whl", hash = "sha256:1090c60116b3da3bfdd0c03406e2f14a1ff53e5771aebe33fec1edc0a350175d", size = 211980, upload-time = "2024-11-28T08:50:35.681Z" }, ] From 13fde5b3f46c143187b5911db688e1d79fe8d766 Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Mon, 3 Nov 2025 08:57:38 -0800 Subject: [PATCH 04/26] Update some docs to more clearly highlight expected behavior of operation handlers and the potential race condition if but wait_until_.. and is_cancelled are used at the same time --- src/nexusrpc/handler/_common.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/nexusrpc/handler/_common.py b/src/nexusrpc/handler/_common.py index 4aa4345..819f645 100644 --- a/src/nexusrpc/handler/_common.py +++ b/src/nexusrpc/handler/_common.py @@ -15,6 +15,8 @@ class OperationTaskCancellation(ABC): Nexus worker implementations are expected to provide an implementation that enables cooperative cancellation for both sync and async operation handlers. + + Operation Handler implementations are expected to periodically check :py:attr:`is_cancelled` or use :py:attr:`wait_until_cancelled` / :py:attr:`wait_until_cancelled_sync` to cancel in flight work when appropriate. """ @abstractmethod @@ -29,12 +31,12 @@ def cancellation_reason(self) -> Optional[str]: @abstractmethod def wait_until_cancelled_sync(self, timeout: Optional[float] = None) -> bool: - """Block until cancellation occurs or the optional timeout elapses.""" + """Block until cancellation occurs or the optional timeout elapses. Nexus worker implementations may return `True` for :py:attr:`is_cancelled` before this method returns and therefore may cause a race condition if both are used in tandem.""" raise NotImplementedError @abstractmethod async def wait_until_cancelled(self) -> None: - """Await cancellation using async primitives.""" + """Await cancellation using async primitives. Nexus worker implementations may return `True` for :py:attr:`is_cancelled` before this method returns and therefore may cause a race condition if both are used in tandem.""" raise NotImplementedError From b18edccb2f73602dbbfc2986897aa1f8e412c1db Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Fri, 7 Nov 2025 11:28:53 -0800 Subject: [PATCH 05/26] Simple logging interceptor working with an InterceptedOperationHandler concept --- src/nexusrpc/handler/__init__.py | 7 +- src/nexusrpc/handler/_common.py | 20 +++- src/nexusrpc/handler/_core.py | 78 ++++++++++++--- src/nexusrpc/handler/_operation_handler.py | 23 +++-- tests/handler/test_interceptors.py | 108 +++++++++++++++++++++ 5 files changed, 209 insertions(+), 27 deletions(-) create mode 100644 tests/handler/test_interceptors.py diff --git a/src/nexusrpc/handler/__init__.py b/src/nexusrpc/handler/__init__.py index 9b9fdf2..a8de902 100644 --- a/src/nexusrpc/handler/__init__.py +++ b/src/nexusrpc/handler/__init__.py @@ -12,24 +12,29 @@ from ._common import ( CancelOperationContext, + CancelOperationResult, OperationContext, OperationTaskCancellation, + StartOperationResult, StartOperationContext, StartOperationResultAsync, StartOperationResultSync, ) -from ._core import Handler as Handler +from ._core import Handler as Handler, OperationHandlerInterceptor from ._decorators import operation_handler, service_handler, sync_operation from ._operation_handler import OperationHandler as OperationHandler __all__ = [ "CancelOperationContext", + "CancelOperationResult", "Handler", "OperationContext", "OperationHandler", "OperationTaskCancellation", + "OperationHandlerInterceptor", "service_handler", "StartOperationContext", + "StartOperationResult", "StartOperationResultAsync", "StartOperationResultSync", "sync_operation", diff --git a/src/nexusrpc/handler/_common.py b/src/nexusrpc/handler/_common.py index 819f645..c32ed11 100644 --- a/src/nexusrpc/handler/_common.py +++ b/src/nexusrpc/handler/_common.py @@ -3,7 +3,8 @@ from abc import ABC, abstractmethod from collections.abc import Mapping, Sequence from dataclasses import dataclass, field -from typing import Any, Generic, Optional +from datetime import datetime +from typing import Any, Awaitable, Generic, Optional from nexusrpc._common import Link, OutputT @@ -72,6 +73,12 @@ def __new__(cls, *args: Any, **kwargs: Any): Task cancellation information indicating that a running task should be interrupted. This is distinct from operation cancellation. """ + request_deadline: Optional[datetime] = field(default=None, kw_only=True) + """ + Get the deadline for the operation handler method. Note that this is the time by which the + current _request_ should complete, not the _operation_'s deadline. + """ + @dataclass(frozen=True) class StartOperationContext(OperationContext): @@ -143,3 +150,14 @@ class StartOperationResultAsync: A token representing the in-progress operation that the caller can submit with subsequent ``fetch_info``, ``fetch_result``, or ``cancel`` requests. """ + + +StartOperationResult = ( + StartOperationResultSync[OutputT] + | Awaitable[StartOperationResultSync[OutputT]] + | StartOperationResultAsync + | Awaitable[StartOperationResultAsync] + | Awaitable[StartOperationResultSync[OutputT] | StartOperationResultAsync] +) + +CancelOperationResult = None | Awaitable[None] diff --git a/src/nexusrpc/handler/_core.py b/src/nexusrpc/handler/_core.py index c39aa57..c064700 100644 --- a/src/nexusrpc/handler/_core.py +++ b/src/nexusrpc/handler/_core.py @@ -102,7 +102,7 @@ from abc import ABC, abstractmethod from collections.abc import Awaitable, Mapping, Sequence from dataclasses import dataclass -from typing import Any, Callable, Optional, Union +from typing import Any, Callable, Optional, Union, cast from typing_extensions import Self, TypeGuard @@ -119,6 +119,7 @@ ) from ._operation_handler import ( OperationHandler, + InterceptedOperationHandler, collect_operation_handler_factories_by_method_name, ) @@ -211,6 +212,41 @@ def _get_service_handler(self, service_name: str) -> ServiceHandler: return service +class OperationHandlerInterceptor(ABC): + def intercept_operation_handler( + self, next: InterceptedOperationHandler[Any, Any] + ) -> InterceptedOperationHandler[Any, Any]: + return next + + +class _InterceptedOperationHandler(InterceptedOperationHandler[Any, Any]): + def __init__( + self, + executor: _Executor | None, + op_handler: OperationHandler[Any, Any], + ): + self._executor = executor + self._op_handler = op_handler + + async def start( + self, ctx: StartOperationContext, input: Any + ) -> StartOperationResultSync[Any] | StartOperationResultAsync: + if is_async_callable(self._op_handler.start): + return await self._op_handler.start(ctx, input) + else: + assert self._executor + return await self._executor.submit_to_event_loop( + self._op_handler.start, ctx, input + ) + + async def cancel(self, ctx: CancelOperationContext, token: str) -> None: + if is_async_callable(self._op_handler.cancel): + return await self._op_handler.cancel(ctx, token) + else: + assert self._executor + return self._executor.submit(self._op_handler.cancel, ctx, token).result() + + class Handler(BaseServiceCollectionHandler): """ A Nexus handler manages a collection of Nexus service handlers. @@ -248,7 +284,11 @@ def __init__( self, user_service_handlers: Sequence[Any], executor: Optional[concurrent.futures.Executor] = None, + interceptors: Sequence[OperationHandlerInterceptor] | None = None, ): + self._interceptors = cast( + Sequence[OperationHandlerInterceptor], interceptors or [] + ) super().__init__(user_service_handlers, executor=executor) if not self.executor: self._validate_all_operation_handlers_are_async() @@ -268,17 +308,28 @@ async def start_operation( input: The input to the operation, as a LazyValue. """ service_handler = self._get_service_handler(ctx.service) - op_handler = service_handler._get_operation_handler(ctx.operation) # pyright: ignore[reportPrivateUsage] + op_handler = self._get_operation_handler(ctx.service, ctx.operation) + # op_handler = service_handler._get_operation_handler(ctx.operation) # pyright: ignore[reportPrivateUsage] + op_defn = service_handler.service.operation_definitions[ctx.operation] deserialized_input = await input.consume(as_type=op_defn.input_type) - # TODO(preview): apply middleware stack - if is_async_callable(op_handler.start): - return await op_handler.start(ctx, deserialized_input) - else: - assert self.executor - return await self.executor.submit_to_event_loop( - op_handler.start, ctx, deserialized_input + return await op_handler.start(ctx, deserialized_input) + + def _get_operation_handler( + self, service_name: str, operation: str + ) -> InterceptedOperationHandler[Any, Any]: + service_handler = self._get_service_handler(service_name) + op_handler: InterceptedOperationHandler[Any, Any] = ( + _InterceptedOperationHandler( + self.executor, service_handler._get_operation_handler(operation) ) + ) # pyright: ignore[reportPrivateUsage] + + for interceptor in reversed(self._interceptors): + op_handler = interceptor.intercept_operation_handler(op_handler) + op_handler = _InterceptedOperationHandler(self.executor, op_handler) + + return op_handler async def cancel_operation(self, ctx: CancelOperationContext, token: str) -> None: """Handle a Cancel Operation request. @@ -287,13 +338,8 @@ async def cancel_operation(self, ctx: CancelOperationContext, token: str) -> Non ctx: The operation context. token: The operation token. """ - service_handler = self._get_service_handler(ctx.service) - op_handler = service_handler._get_operation_handler(ctx.operation) # pyright: ignore[reportPrivateUsage] - if is_async_callable(op_handler.cancel): - return await op_handler.cancel(ctx, token) - else: - assert self.executor - return self.executor.submit(op_handler.cancel, ctx, token).result() + op_handler = self._get_operation_handler(ctx.service, ctx.operation) # pyright: ignore[reportPrivateUsage] + return await op_handler.cancel(ctx, token) def _validate_all_operation_handlers_are_async(self) -> None: for service_handler in self.service_handlers.values(): diff --git a/src/nexusrpc/handler/_operation_handler.py b/src/nexusrpc/handler/_operation_handler.py index 02a55b9..da32cfd 100644 --- a/src/nexusrpc/handler/_operation_handler.py +++ b/src/nexusrpc/handler/_operation_handler.py @@ -19,6 +19,8 @@ CancelOperationContext, StartOperationContext, StartOperationResultAsync, + StartOperationResult, + CancelOperationResult, StartOperationResultSync, ) @@ -39,12 +41,7 @@ class OperationHandler(ABC, Generic[InputT, OutputT]): @abstractmethod def start( self, ctx: StartOperationContext, input: InputT - ) -> Union[ - StartOperationResultSync[OutputT], - Awaitable[StartOperationResultSync[OutputT]], - StartOperationResultAsync, - Awaitable[StartOperationResultAsync], - ]: + ) -> StartOperationResult[OutputT]: """ Start the operation, completing either synchronously or asynchronously. @@ -54,9 +51,7 @@ def start( ... @abstractmethod - def cancel( - self, ctx: CancelOperationContext, token: str - ) -> Union[None, Awaitable[None]]: + def cancel(self, ctx: CancelOperationContext, token: str) -> CancelOperationResult: """ Cancel the operation. """ @@ -104,6 +99,16 @@ async def cancel(self, ctx: CancelOperationContext, token: str) -> None: ) +class InterceptedOperationHandler(OperationHandler[InputT, OutputT]): + @abstractmethod + async def start( + self, ctx: StartOperationContext, input: InputT + ) -> StartOperationResultSync[OutputT] | StartOperationResultAsync: ... + + @abstractmethod + async def cancel(self, ctx: CancelOperationContext, token: str) -> None: ... + + def collect_operation_handler_factories_by_method_name( user_service_cls: type[ServiceHandlerT], service: Optional[ServiceDefinition], diff --git a/tests/handler/test_interceptors.py b/tests/handler/test_interceptors.py new file mode 100644 index 0000000..d28cb1d --- /dev/null +++ b/tests/handler/test_interceptors.py @@ -0,0 +1,108 @@ +import logging +from typing import Any +import uuid + +import pytest + +from nexusrpc import LazyValue +from nexusrpc.handler import ( + CancelOperationResult, + CancelOperationContext, + Handler, + OperationHandler, + OperationHandlerInterceptor, + StartOperationResult, + StartOperationContext, + StartOperationResultAsync, + service_handler, +) +from nexusrpc.handler._common import StartOperationResultSync +from nexusrpc.handler._decorators import operation_handler +from nexusrpc.handler._operation_handler import InterceptedOperationHandler +from tests.helpers import DummySerializer + +_operation_results: dict[str, int] = {} + +logger = logging.getLogger() + + +class MyAsyncOperationHandler(OperationHandler[int, int]): + async def start( + self, ctx: StartOperationContext, input: int + ) -> StartOperationResultAsync: + token = str(uuid.uuid4()) + _operation_results[token] = input + 1 + return StartOperationResultAsync(token) + + async def cancel(self, ctx: CancelOperationContext, token: str) -> None: + del _operation_results[token] + + +@service_handler +class MyService: + @operation_handler + def incr(self) -> OperationHandler[int, int]: + return MyAsyncOperationHandler() + + +class MyInterceptor(OperationHandlerInterceptor): + def intercept_operation_handler( + self, next: InterceptedOperationHandler[Any, Any] + ) -> InterceptedOperationHandler[Any, Any]: + return LoggingOperationHandler(next) + + +class LoggingOperationHandler(InterceptedOperationHandler[Any, Any]): + def __init__(self, next: InterceptedOperationHandler[Any, Any]) -> None: + self._next = next + + async def start( + self, ctx: StartOperationContext, input: Any + ) -> StartOperationResultSync[Any] | StartOperationResultAsync: + logger.info("%s.%s: start operation", ctx.service, ctx.operation) + result = await self._next.start(ctx, input) + if isinstance(result, StartOperationResultAsync): + logger.info( + "%s.%s: start operation completed async. token=%s", + ctx.service, + ctx.operation, + result.token, + ) + else: + logger.info( + "%s.%s: start operation completed sync. value=%s", + ctx.service, + ctx.operation, + result.value, + ) + return result + + async def cancel(self, ctx: CancelOperationContext, token: str) -> None: + logger.info("%s.%s: cancel token=%s", ctx.service, ctx.operation, token) + return await self._next.cancel(ctx, token) + + +@pytest.mark.asyncio +async def test_async_operation_happy_path(): + handler = Handler( + user_service_handlers=[MyService()], interceptors=[MyInterceptor()] + ) + start_ctx = StartOperationContext( + service="MyService", + operation="incr", + headers={}, + request_id="request_id", + ) + start_result = await handler.start_operation( + start_ctx, LazyValue(DummySerializer(1), headers={}) + ) + assert isinstance(start_result, StartOperationResultAsync) + assert start_result.token + + cancel_ctx = CancelOperationContext( + service="MyService", + operation="incr", + headers={}, + ) + await handler.cancel_operation(cancel_ctx, start_result.token) + assert start_result.token not in _operation_results From 0f76a23271895a6b15ddf2c869dbb7b8171813a5 Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Fri, 7 Nov 2025 12:09:11 -0800 Subject: [PATCH 06/26] Update test to confirm interceptors are applied in the order provided. Add test to confirm interceptors work for sync operation handlers --- tests/handler/test_interceptors.py | 87 +++++++++++++++++++++++++++--- 1 file changed, 81 insertions(+), 6 deletions(-) diff --git a/tests/handler/test_interceptors.py b/tests/handler/test_interceptors.py index d28cb1d..94b939b 100644 --- a/tests/handler/test_interceptors.py +++ b/tests/handler/test_interceptors.py @@ -15,6 +15,7 @@ StartOperationContext, StartOperationResultAsync, service_handler, + sync_operation, ) from nexusrpc.handler._common import StartOperationResultSync from nexusrpc.handler._decorators import operation_handler @@ -45,20 +46,65 @@ def incr(self) -> OperationHandler[int, int]: return MyAsyncOperationHandler() -class MyInterceptor(OperationHandlerInterceptor): +@service_handler +class MyServiceSync: + @sync_operation + async def incr(self, ctx: StartOperationContext, input: int) -> int: + return input + 1 + + +class CountingInterceptor(OperationHandlerInterceptor): + def __init__(self) -> None: + self.num_start = 0 + self.num_cancel = 0 + def intercept_operation_handler( self, next: InterceptedOperationHandler[Any, Any] ) -> InterceptedOperationHandler[Any, Any]: - return LoggingOperationHandler(next) + return CountingOperationHandler(next, self) -class LoggingOperationHandler(InterceptedOperationHandler[Any, Any]): - def __init__(self, next: InterceptedOperationHandler[Any, Any]) -> None: +class CountingOperationHandler(InterceptedOperationHandler[Any, Any]): + def __init__( + self, + next: InterceptedOperationHandler[Any, Any], + interceptor: CountingInterceptor, + ) -> None: self._next = next + self._interceptor = interceptor async def start( self, ctx: StartOperationContext, input: Any ) -> StartOperationResultSync[Any] | StartOperationResultAsync: + self._interceptor.num_start += 1 + return await self._next.start(ctx, input) + + async def cancel(self, ctx: CancelOperationContext, token: str) -> None: + self._interceptor.num_cancel += 1 + return await self._next.cancel(ctx, token) + + +class AssertingInterceptor(OperationHandlerInterceptor): + def __init__(self, counter: CountingInterceptor) -> None: + self._counter = counter + + def intercept_operation_handler( + self, next: InterceptedOperationHandler[Any, Any] + ) -> InterceptedOperationHandler[Any, Any]: + return AssertingOperationHandler(next, self._counter) + + +class AssertingOperationHandler(InterceptedOperationHandler[Any, Any]): + def __init__( + self, next: InterceptedOperationHandler[Any, Any], counter: CountingInterceptor + ) -> None: + self._next = next + self._counter = counter + + async def start( + self, ctx: StartOperationContext, input: Any + ) -> StartOperationResultSync[Any] | StartOperationResultAsync: + assert self._counter.num_start == 0 logger.info("%s.%s: start operation", ctx.service, ctx.operation) result = await self._next.start(ctx, input) if isinstance(result, StartOperationResultAsync): @@ -78,14 +124,17 @@ async def start( return result async def cancel(self, ctx: CancelOperationContext, token: str) -> None: + assert self._counter.num_cancel == 0 logger.info("%s.%s: cancel token=%s", ctx.service, ctx.operation, token) return await self._next.cancel(ctx, token) @pytest.mark.asyncio -async def test_async_operation_happy_path(): +async def test_async_operation_interceptors_applied(): + counting_interceptor = CountingInterceptor() handler = Handler( - user_service_handlers=[MyService()], interceptors=[MyInterceptor()] + user_service_handlers=[MyService()], + interceptors=[AssertingInterceptor(counting_interceptor), counting_interceptor], ) start_ctx = StartOperationContext( service="MyService", @@ -106,3 +155,29 @@ async def test_async_operation_happy_path(): ) await handler.cancel_operation(cancel_ctx, start_result.token) assert start_result.token not in _operation_results + + assert counting_interceptor.num_start == 1 + assert counting_interceptor.num_cancel == 1 + + +@pytest.mark.asyncio +async def test_sync_operation_interceptors_applied(): + counting_interceptor = CountingInterceptor() + handler = Handler( + user_service_handlers=[MyServiceSync()], + interceptors=[AssertingInterceptor(counting_interceptor), counting_interceptor], + ) + start_ctx = StartOperationContext( + service="MyServiceSync", + operation="incr", + headers={}, + request_id="request_id", + ) + start_result = await handler.start_operation( + start_ctx, LazyValue(DummySerializer(1), headers={}) + ) + assert isinstance(start_result, StartOperationResultSync) + assert start_result.value == 2 + + assert counting_interceptor.num_start == 1 + assert counting_interceptor.num_cancel == 0 From 40b4eb320485c6caa61ae52c1a296cc33e31237a Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Fri, 7 Nov 2025 14:40:35 -0800 Subject: [PATCH 07/26] Do some renaming. Add some doc strings. remove type aliases that wound up not being very useful. Update sync test to force use of the executor. --- src/nexusrpc/handler/__init__.py | 8 +- src/nexusrpc/handler/_common.py | 11 -- src/nexusrpc/handler/_core.py | 141 +++++++++++++-------- src/nexusrpc/handler/_operation_handler.py | 35 +++-- tests/handler/test_interceptors.py | 55 +++++--- 5 files changed, 152 insertions(+), 98 deletions(-) diff --git a/src/nexusrpc/handler/__init__.py b/src/nexusrpc/handler/__init__.py index a8de902..8dd67b6 100644 --- a/src/nexusrpc/handler/__init__.py +++ b/src/nexusrpc/handler/__init__.py @@ -12,29 +12,27 @@ from ._common import ( CancelOperationContext, - CancelOperationResult, OperationContext, OperationTaskCancellation, - StartOperationResult, StartOperationContext, StartOperationResultAsync, StartOperationResultSync, ) from ._core import Handler as Handler, OperationHandlerInterceptor from ._decorators import operation_handler, service_handler, sync_operation -from ._operation_handler import OperationHandler as OperationHandler +from ._operation_handler import AwaitableOperationHandler, OperationHandler __all__ = [ + "AwaitableOperationHandler", "CancelOperationContext", - "CancelOperationResult", "Handler", "OperationContext", "OperationHandler", "OperationTaskCancellation", "OperationHandlerInterceptor", + "operation_handler", "service_handler", "StartOperationContext", - "StartOperationResult", "StartOperationResultAsync", "StartOperationResultSync", "sync_operation", diff --git a/src/nexusrpc/handler/_common.py b/src/nexusrpc/handler/_common.py index c32ed11..fc6a03c 100644 --- a/src/nexusrpc/handler/_common.py +++ b/src/nexusrpc/handler/_common.py @@ -150,14 +150,3 @@ class StartOperationResultAsync: A token representing the in-progress operation that the caller can submit with subsequent ``fetch_info``, ``fetch_result``, or ``cancel`` requests. """ - - -StartOperationResult = ( - StartOperationResultSync[OutputT] - | Awaitable[StartOperationResultSync[OutputT]] - | StartOperationResultAsync - | Awaitable[StartOperationResultAsync] - | Awaitable[StartOperationResultSync[OutputT] | StartOperationResultAsync] -) - -CancelOperationResult = None | Awaitable[None] diff --git a/src/nexusrpc/handler/_core.py b/src/nexusrpc/handler/_core.py index c064700..324dd7e 100644 --- a/src/nexusrpc/handler/_core.py +++ b/src/nexusrpc/handler/_core.py @@ -118,8 +118,8 @@ StartOperationResultSync, ) from ._operation_handler import ( + AwaitableOperationHandler, OperationHandler, - InterceptedOperationHandler, collect_operation_handler_factories_by_method_name, ) @@ -212,41 +212,6 @@ def _get_service_handler(self, service_name: str) -> ServiceHandler: return service -class OperationHandlerInterceptor(ABC): - def intercept_operation_handler( - self, next: InterceptedOperationHandler[Any, Any] - ) -> InterceptedOperationHandler[Any, Any]: - return next - - -class _InterceptedOperationHandler(InterceptedOperationHandler[Any, Any]): - def __init__( - self, - executor: _Executor | None, - op_handler: OperationHandler[Any, Any], - ): - self._executor = executor - self._op_handler = op_handler - - async def start( - self, ctx: StartOperationContext, input: Any - ) -> StartOperationResultSync[Any] | StartOperationResultAsync: - if is_async_callable(self._op_handler.start): - return await self._op_handler.start(ctx, input) - else: - assert self._executor - return await self._executor.submit_to_event_loop( - self._op_handler.start, ctx, input - ) - - async def cancel(self, ctx: CancelOperationContext, token: str) -> None: - if is_async_callable(self._op_handler.cancel): - return await self._op_handler.cancel(ctx, token) - else: - assert self._executor - return self._executor.submit(self._op_handler.cancel, ctx, token).result() - - class Handler(BaseServiceCollectionHandler): """ A Nexus handler manages a collection of Nexus service handlers. @@ -308,29 +273,12 @@ async def start_operation( input: The input to the operation, as a LazyValue. """ service_handler = self._get_service_handler(ctx.service) - op_handler = self._get_operation_handler(ctx.service, ctx.operation) - # op_handler = service_handler._get_operation_handler(ctx.operation) # pyright: ignore[reportPrivateUsage] + op_handler = self._get_operation_handler(service_handler, ctx.operation) op_defn = service_handler.service.operation_definitions[ctx.operation] deserialized_input = await input.consume(as_type=op_defn.input_type) return await op_handler.start(ctx, deserialized_input) - def _get_operation_handler( - self, service_name: str, operation: str - ) -> InterceptedOperationHandler[Any, Any]: - service_handler = self._get_service_handler(service_name) - op_handler: InterceptedOperationHandler[Any, Any] = ( - _InterceptedOperationHandler( - self.executor, service_handler._get_operation_handler(operation) - ) - ) # pyright: ignore[reportPrivateUsage] - - for interceptor in reversed(self._interceptors): - op_handler = interceptor.intercept_operation_handler(op_handler) - op_handler = _InterceptedOperationHandler(self.executor, op_handler) - - return op_handler - async def cancel_operation(self, ctx: CancelOperationContext, token: str) -> None: """Handle a Cancel Operation request. @@ -338,9 +286,27 @@ async def cancel_operation(self, ctx: CancelOperationContext, token: str) -> Non ctx: The operation context. token: The operation token. """ - op_handler = self._get_operation_handler(ctx.service, ctx.operation) # pyright: ignore[reportPrivateUsage] + service_handler = self._get_service_handler(ctx.service) + op_handler = self._get_operation_handler(service_handler, ctx.operation) return await op_handler.cancel(ctx, token) + def _get_operation_handler( + self, service_handler: ServiceHandler, operation: str + ) -> AwaitableOperationHandler[Any, Any]: + """ + Get the specified handler for the specified operation from the given service_handler and apply all interceptors. + """ + op_handler: AwaitableOperationHandler[Any, Any] = ( + _EnsuredAwaitableOperationHandler( + self.executor, service_handler.get_operation_handler(operation) + ) + ) + + for interceptor in reversed(self._interceptors): + op_handler = interceptor.intercept_operation_handler(op_handler) + + return op_handler + def _validate_all_operation_handlers_are_async(self) -> None: for service_handler in self.service_handlers.values(): for op_handler in service_handler.operation_handlers.values(): @@ -406,7 +372,7 @@ def from_user_instance(cls, user_instance: Any) -> Self: operation_handlers=op_handlers, ) - def _get_operation_handler(self, operation_name: str) -> OperationHandler[Any, Any]: + def get_operation_handler(self, operation_name: str) -> OperationHandler[Any, Any]: """Return an operation handler, given the operation name.""" if operation_name not in self.service.operation_definitions: raise HandlerError( @@ -447,3 +413,66 @@ def submit( self, fn: Callable[..., Any], *args: Any ) -> concurrent.futures.Future[Any]: return self._executor.submit(fn, *args) + + +class OperationHandlerInterceptor(ABC): + """ + Interceptor for operation handlers. + + This should be extended by any operation handler interceptors. + """ + + def intercept_operation_handler( + self, next: AwaitableOperationHandler[Any, Any] + ) -> AwaitableOperationHandler[Any, Any]: + """ + Method called for intercepting operation handlers. + + Args: + next: The underlying operation handler that this interceptor + should delegate to. + + Returns: + The new interceptor that will be used to invoke + :py:attr:`OperationHandler.start` or :py:attr:`OperationHandler.cancel`. + """ + return next + + +class _EnsuredAwaitableOperationHandler(AwaitableOperationHandler[Any, Any]): + """ + An :py:class:`AwaitableOperationHandler` that wraps an :py:class:`OperationHandler` and uses an :py:class:`_Executor` to ensure + that the `:py:attr:`start` and :py:attr:`cancel` methods are awaitable. + """ + + def __init__( + self, + executor: _Executor | None, + op_handler: OperationHandler[Any, Any], + ): + self._executor = executor + self._op_handler = op_handler + + async def start( + self, ctx: StartOperationContext, input: Any + ) -> StartOperationResultSync[Any] | StartOperationResultAsync: + """ + Start the operation using the wrapped :py:class:`OperationHandler`. + """ + if is_async_callable(self._op_handler.start): + return await self._op_handler.start(ctx, input) + else: + assert self._executor + return await self._executor.submit_to_event_loop( + self._op_handler.start, ctx, input + ) + + async def cancel(self, ctx: CancelOperationContext, token: str) -> None: + """ + Cancel an operation using the wrapped :py:class:`OperationHandler`. + """ + if is_async_callable(self._op_handler.cancel): + return await self._op_handler.cancel(ctx, token) + else: + assert self._executor + return self._executor.submit(self._op_handler.cancel, ctx, token).result() diff --git a/src/nexusrpc/handler/_operation_handler.py b/src/nexusrpc/handler/_operation_handler.py index da32cfd..e9f1176 100644 --- a/src/nexusrpc/handler/_operation_handler.py +++ b/src/nexusrpc/handler/_operation_handler.py @@ -3,7 +3,7 @@ import inspect from abc import ABC, abstractmethod from collections.abc import Awaitable -from typing import Any, Callable, Generic, Optional, Union +from typing import Any, Callable, Generic, Optional from nexusrpc._common import InputT, OutputT, ServiceHandlerT from nexusrpc._service import Operation, OperationDefinition, ServiceDefinition @@ -19,8 +19,6 @@ CancelOperationContext, StartOperationContext, StartOperationResultAsync, - StartOperationResult, - CancelOperationResult, StartOperationResultSync, ) @@ -41,7 +39,13 @@ class OperationHandler(ABC, Generic[InputT, OutputT]): @abstractmethod def start( self, ctx: StartOperationContext, input: InputT - ) -> StartOperationResult[OutputT]: + ) -> ( + StartOperationResultSync[OutputT] + | Awaitable[StartOperationResultSync[OutputT]] + | StartOperationResultAsync + | Awaitable[StartOperationResultAsync] + | Awaitable[StartOperationResultSync[OutputT] | StartOperationResultAsync] + ): """ Start the operation, completing either synchronously or asynchronously. @@ -51,7 +55,7 @@ def start( ... @abstractmethod - def cancel(self, ctx: CancelOperationContext, token: str) -> CancelOperationResult: + def cancel(self, ctx: CancelOperationContext, token: str) -> None | Awaitable[None]: """ Cancel the operation. """ @@ -99,14 +103,29 @@ async def cancel(self, ctx: CancelOperationContext, token: str) -> None: ) -class InterceptedOperationHandler(OperationHandler[InputT, OutputT]): +class AwaitableOperationHandler(OperationHandler[InputT, OutputT]): + """ + An :py:class:`OperationHandler` where :py:method:`start` and :py:method:`cancel` + can be awaited by an async runtime. It can produce a result synchronously by returning + :py:class:`StartOperationResultSync` or asynchronously by returning :py:class:`StartOperationResultAsync` + in the same fashion that :py:class:`OperationHandler` does. + """ + @abstractmethod async def start( self, ctx: StartOperationContext, input: InputT - ) -> StartOperationResultSync[OutputT] | StartOperationResultAsync: ... + ) -> StartOperationResultSync[OutputT] | StartOperationResultAsync: + """ + Start the operation and return it's result or an async token. + """ + ... @abstractmethod - async def cancel(self, ctx: CancelOperationContext, token: str) -> None: ... + async def cancel(self, ctx: CancelOperationContext, token: str) -> None: + """ + Cancel an in progress operation identified by the given token. + """ + ... def collect_operation_handler_factories_by_method_name( diff --git a/tests/handler/test_interceptors.py b/tests/handler/test_interceptors.py index 94b939b..c975cd1 100644 --- a/tests/handler/test_interceptors.py +++ b/tests/handler/test_interceptors.py @@ -1,25 +1,24 @@ +import concurrent.futures import logging -from typing import Any import uuid +from typing import Any import pytest from nexusrpc import LazyValue from nexusrpc.handler import ( - CancelOperationResult, + AwaitableOperationHandler, CancelOperationContext, Handler, OperationHandler, OperationHandlerInterceptor, - StartOperationResult, StartOperationContext, StartOperationResultAsync, + StartOperationResultSync, service_handler, sync_operation, ) -from nexusrpc.handler._common import StartOperationResultSync from nexusrpc.handler._decorators import operation_handler -from nexusrpc.handler._operation_handler import InterceptedOperationHandler from tests.helpers import DummySerializer _operation_results: dict[str, int] = {} @@ -49,7 +48,7 @@ def incr(self) -> OperationHandler[int, int]: @service_handler class MyServiceSync: @sync_operation - async def incr(self, ctx: StartOperationContext, input: int) -> int: + def incr(self, ctx: StartOperationContext, input: int) -> int: return input + 1 @@ -59,15 +58,20 @@ def __init__(self) -> None: self.num_cancel = 0 def intercept_operation_handler( - self, next: InterceptedOperationHandler[Any, Any] - ) -> InterceptedOperationHandler[Any, Any]: + self, next: AwaitableOperationHandler[Any, Any] + ) -> AwaitableOperationHandler[Any, Any]: return CountingOperationHandler(next, self) -class CountingOperationHandler(InterceptedOperationHandler[Any, Any]): +class CountingOperationHandler(AwaitableOperationHandler[Any, Any]): + """ + An :py:class:`AwaitableOperationHandler` that wraps a counting interceptor + that counts the number of calls to each handler method. + """ + def __init__( self, - next: InterceptedOperationHandler[Any, Any], + next: AwaitableOperationHandler[Any, Any], interceptor: CountingInterceptor, ) -> None: self._next = next @@ -84,19 +88,24 @@ async def cancel(self, ctx: CancelOperationContext, token: str) -> None: return await self._next.cancel(ctx, token) -class AssertingInterceptor(OperationHandlerInterceptor): +class MustBeFirstInterceptor(OperationHandlerInterceptor): def __init__(self, counter: CountingInterceptor) -> None: self._counter = counter def intercept_operation_handler( - self, next: InterceptedOperationHandler[Any, Any] - ) -> InterceptedOperationHandler[Any, Any]: - return AssertingOperationHandler(next, self._counter) + self, next: AwaitableOperationHandler[Any, Any] + ) -> AwaitableOperationHandler[Any, Any]: + return MustBeFirstOperationHandler(next, self._counter) + +class MustBeFirstOperationHandler(AwaitableOperationHandler[Any, Any]): + """ + An :py:class:`AwaitableOperationHandler` that wraps a counting interceptor + and asserts that the wrapped interceptor has a count of 0 for each handler method + """ -class AssertingOperationHandler(InterceptedOperationHandler[Any, Any]): def __init__( - self, next: InterceptedOperationHandler[Any, Any], counter: CountingInterceptor + self, next: AwaitableOperationHandler[Any, Any], counter: CountingInterceptor ) -> None: self._next = next self._counter = counter @@ -106,7 +115,9 @@ async def start( ) -> StartOperationResultSync[Any] | StartOperationResultAsync: assert self._counter.num_start == 0 logger.info("%s.%s: start operation", ctx.service, ctx.operation) + result = await self._next.start(ctx, input) + if isinstance(result, StartOperationResultAsync): logger.info( "%s.%s: start operation completed async. token=%s", @@ -121,6 +132,7 @@ async def start( ctx.operation, result.value, ) + return result async def cancel(self, ctx: CancelOperationContext, token: str) -> None: @@ -134,7 +146,10 @@ async def test_async_operation_interceptors_applied(): counting_interceptor = CountingInterceptor() handler = Handler( user_service_handlers=[MyService()], - interceptors=[AssertingInterceptor(counting_interceptor), counting_interceptor], + interceptors=[ + MustBeFirstInterceptor(counting_interceptor), + counting_interceptor, + ], ) start_ctx = StartOperationContext( service="MyService", @@ -165,7 +180,11 @@ async def test_sync_operation_interceptors_applied(): counting_interceptor = CountingInterceptor() handler = Handler( user_service_handlers=[MyServiceSync()], - interceptors=[AssertingInterceptor(counting_interceptor), counting_interceptor], + executor=concurrent.futures.ThreadPoolExecutor(), + interceptors=[ + MustBeFirstInterceptor(counting_interceptor), + counting_interceptor, + ], ) start_ctx = StartOperationContext( service="MyServiceSync", From 139173bb4327eabe874585894bdac0b392f6cf97 Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Fri, 7 Nov 2025 14:42:07 -0800 Subject: [PATCH 08/26] Remove request_deadline as that's part of a different PR --- src/nexusrpc/handler/_common.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/nexusrpc/handler/_common.py b/src/nexusrpc/handler/_common.py index fc6a03c..0488396 100644 --- a/src/nexusrpc/handler/_common.py +++ b/src/nexusrpc/handler/_common.py @@ -73,12 +73,6 @@ def __new__(cls, *args: Any, **kwargs: Any): Task cancellation information indicating that a running task should be interrupted. This is distinct from operation cancellation. """ - request_deadline: Optional[datetime] = field(default=None, kw_only=True) - """ - Get the deadline for the operation handler method. Note that this is the time by which the - current _request_ should complete, not the _operation_'s deadline. - """ - @dataclass(frozen=True) class StartOperationContext(OperationContext): From 6561cd3c96b593baa61232f278e3ac906ded646b Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Fri, 7 Nov 2025 14:46:09 -0800 Subject: [PATCH 09/26] remove some unused imports --- src/nexusrpc/handler/_common.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/nexusrpc/handler/_common.py b/src/nexusrpc/handler/_common.py index 0488396..819f645 100644 --- a/src/nexusrpc/handler/_common.py +++ b/src/nexusrpc/handler/_common.py @@ -3,8 +3,7 @@ from abc import ABC, abstractmethod from collections.abc import Mapping, Sequence from dataclasses import dataclass, field -from datetime import datetime -from typing import Any, Awaitable, Generic, Optional +from typing import Any, Generic, Optional from nexusrpc._common import Link, OutputT From 0c8100ee285af47597eff11eca22524870d8b4c1 Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Fri, 7 Nov 2025 15:03:42 -0800 Subject: [PATCH 10/26] Use public export in tests --- tests/handler/test_interceptors.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/handler/test_interceptors.py b/tests/handler/test_interceptors.py index c975cd1..648267e 100644 --- a/tests/handler/test_interceptors.py +++ b/tests/handler/test_interceptors.py @@ -15,10 +15,10 @@ StartOperationContext, StartOperationResultAsync, StartOperationResultSync, + operation_handler, service_handler, sync_operation, ) -from nexusrpc.handler._decorators import operation_handler from tests.helpers import DummySerializer _operation_results: dict[str, int] = {} From 0d37c349bb18ea86eaaece0aafaf4cc39e5c944d Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Fri, 7 Nov 2025 15:10:04 -0800 Subject: [PATCH 11/26] Fix some linter errors --- src/nexusrpc/handler/_operation_handler.py | 2 +- tests/handler/test_interceptors.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/nexusrpc/handler/_operation_handler.py b/src/nexusrpc/handler/_operation_handler.py index e9f1176..cc7a359 100644 --- a/src/nexusrpc/handler/_operation_handler.py +++ b/src/nexusrpc/handler/_operation_handler.py @@ -103,7 +103,7 @@ async def cancel(self, ctx: CancelOperationContext, token: str) -> None: ) -class AwaitableOperationHandler(OperationHandler[InputT, OutputT]): +class AwaitableOperationHandler(OperationHandler[InputT, OutputT], ABC): """ An :py:class:`OperationHandler` where :py:method:`start` and :py:method:`cancel` can be awaited by an async runtime. It can produce a result synchronously by returning diff --git a/tests/handler/test_interceptors.py b/tests/handler/test_interceptors.py index 648267e..1d095e8 100644 --- a/tests/handler/test_interceptors.py +++ b/tests/handler/test_interceptors.py @@ -48,7 +48,7 @@ def incr(self) -> OperationHandler[int, int]: @service_handler class MyServiceSync: @sync_operation - def incr(self, ctx: StartOperationContext, input: int) -> int: + def incr(self, ctx: StartOperationContext, input: int) -> int: # type: ignore[reportUnusedParameter] return input + 1 From 60ef746c67bdf01654747e1af517aa957d9d4b76 Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Fri, 7 Nov 2025 15:15:01 -0800 Subject: [PATCH 12/26] use cancellation in tests after rebasing to support new python --- tests/handler/test_interceptors.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/handler/test_interceptors.py b/tests/handler/test_interceptors.py index 1d095e8..d0ab9d6 100644 --- a/tests/handler/test_interceptors.py +++ b/tests/handler/test_interceptors.py @@ -19,7 +19,7 @@ service_handler, sync_operation, ) -from tests.helpers import DummySerializer +from tests.helpers import DummySerializer, TestOperationTaskCancellation _operation_results: dict[str, int] = {} @@ -156,6 +156,7 @@ async def test_async_operation_interceptors_applied(): operation="incr", headers={}, request_id="request_id", + task_cancellation=TestOperationTaskCancellation(), ) start_result = await handler.start_operation( start_ctx, LazyValue(DummySerializer(1), headers={}) @@ -167,6 +168,7 @@ async def test_async_operation_interceptors_applied(): service="MyService", operation="incr", headers={}, + task_cancellation=TestOperationTaskCancellation(), ) await handler.cancel_operation(cancel_ctx, start_result.token) assert start_result.token not in _operation_results @@ -191,6 +193,7 @@ async def test_sync_operation_interceptors_applied(): operation="incr", headers={}, request_id="request_id", + task_cancellation=TestOperationTaskCancellation(), ) start_result = await handler.start_operation( start_ctx, LazyValue(DummySerializer(1), headers={}) From be5d42abaaed027862ce57b54ca3d89bef4d5277 Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Fri, 7 Nov 2025 15:25:24 -0800 Subject: [PATCH 13/26] fix docstring errors --- src/nexusrpc/handler/_core.py | 2 +- src/nexusrpc/handler/_operation_handler.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/nexusrpc/handler/_core.py b/src/nexusrpc/handler/_core.py index 324dd7e..a85bfb8 100644 --- a/src/nexusrpc/handler/_core.py +++ b/src/nexusrpc/handler/_core.py @@ -442,7 +442,7 @@ def intercept_operation_handler( class _EnsuredAwaitableOperationHandler(AwaitableOperationHandler[Any, Any]): """ An :py:class:`AwaitableOperationHandler` that wraps an :py:class:`OperationHandler` and uses an :py:class:`_Executor` to ensure - that the `:py:attr:`start` and :py:attr:`cancel` methods are awaitable. + that the :py:attr:`start` and :py:attr:`cancel` methods are awaitable. """ def __init__( diff --git a/src/nexusrpc/handler/_operation_handler.py b/src/nexusrpc/handler/_operation_handler.py index cc7a359..99f0ea0 100644 --- a/src/nexusrpc/handler/_operation_handler.py +++ b/src/nexusrpc/handler/_operation_handler.py @@ -105,7 +105,7 @@ async def cancel(self, ctx: CancelOperationContext, token: str) -> None: class AwaitableOperationHandler(OperationHandler[InputT, OutputT], ABC): """ - An :py:class:`OperationHandler` where :py:method:`start` and :py:method:`cancel` + An :py:class:`OperationHandler` where :py:attr:`start` and :py:attr:`cancel` can be awaited by an async runtime. It can produce a result synchronously by returning :py:class:`StartOperationResultSync` or asynchronously by returning :py:class:`StartOperationResultAsync` in the same fashion that :py:class:`OperationHandler` does. From 2935dbcf45c1da19a396e18fff15468f1f84998e Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Mon, 10 Nov 2025 16:14:20 -0800 Subject: [PATCH 14/26] rename interceptor to middleware. Expose operation context to middleware --- src/nexusrpc/handler/__init__.py | 4 +-- src/nexusrpc/handler/_core.py | 20 +++++++------ tests/handler/test_async_operation.py | 2 +- tests/handler/test_cancellation.py | 1 - ...est_interceptors.py => test_middleware.py} | 29 ++++++++++--------- 5 files changed, 29 insertions(+), 27 deletions(-) rename tests/handler/{test_interceptors.py => test_middleware.py} (89%) diff --git a/src/nexusrpc/handler/__init__.py b/src/nexusrpc/handler/__init__.py index 3ac42fd..6ca6f2f 100644 --- a/src/nexusrpc/handler/__init__.py +++ b/src/nexusrpc/handler/__init__.py @@ -18,7 +18,7 @@ StartOperationResultAsync, StartOperationResultSync, ) -from ._core import Handler, OperationHandlerInterceptor +from ._core import Handler, OperationHandlerMiddleware from ._decorators import operation_handler, service_handler, sync_operation from ._operation_handler import AwaitableOperationHandler, OperationHandler @@ -29,7 +29,7 @@ "OperationContext", "OperationHandler", "OperationTaskCancellation", - "OperationHandlerInterceptor", + "OperationHandlerMiddleware", "operation_handler", "service_handler", "StartOperationContext", diff --git a/src/nexusrpc/handler/_core.py b/src/nexusrpc/handler/_core.py index a85bfb8..6d431e9 100644 --- a/src/nexusrpc/handler/_core.py +++ b/src/nexusrpc/handler/_core.py @@ -113,6 +113,7 @@ from ._common import ( CancelOperationContext, + OperationContext, StartOperationContext, StartOperationResultAsync, StartOperationResultSync, @@ -249,10 +250,10 @@ def __init__( self, user_service_handlers: Sequence[Any], executor: Optional[concurrent.futures.Executor] = None, - interceptors: Sequence[OperationHandlerInterceptor] | None = None, + interceptors: Sequence[OperationHandlerMiddleware] | None = None, ): self._interceptors = cast( - Sequence[OperationHandlerInterceptor], interceptors or [] + Sequence[OperationHandlerMiddleware], interceptors or [] ) super().__init__(user_service_handlers, executor=executor) if not self.executor: @@ -273,7 +274,7 @@ async def start_operation( input: The input to the operation, as a LazyValue. """ service_handler = self._get_service_handler(ctx.service) - op_handler = self._get_operation_handler(service_handler, ctx.operation) + op_handler = self._get_operation_handler(ctx, service_handler, ctx.operation) op_defn = service_handler.service.operation_definitions[ctx.operation] deserialized_input = await input.consume(as_type=op_defn.input_type) @@ -287,11 +288,11 @@ async def cancel_operation(self, ctx: CancelOperationContext, token: str) -> Non token: The operation token. """ service_handler = self._get_service_handler(ctx.service) - op_handler = self._get_operation_handler(service_handler, ctx.operation) + op_handler = self._get_operation_handler(ctx, service_handler, ctx.operation) return await op_handler.cancel(ctx, token) def _get_operation_handler( - self, service_handler: ServiceHandler, operation: str + self, ctx: OperationContext, service_handler: ServiceHandler, operation: str ) -> AwaitableOperationHandler[Any, Any]: """ Get the specified handler for the specified operation from the given service_handler and apply all interceptors. @@ -303,7 +304,7 @@ def _get_operation_handler( ) for interceptor in reversed(self._interceptors): - op_handler = interceptor.intercept_operation_handler(op_handler) + op_handler = interceptor.intercept(ctx, op_handler) return op_handler @@ -415,20 +416,21 @@ def submit( return self._executor.submit(fn, *args) -class OperationHandlerInterceptor(ABC): +class OperationHandlerMiddleware(ABC): """ Interceptor for operation handlers. This should be extended by any operation handler interceptors. """ - def intercept_operation_handler( - self, next: AwaitableOperationHandler[Any, Any] + def intercept( + self, ctx: OperationContext, next: AwaitableOperationHandler[Any, Any] ) -> AwaitableOperationHandler[Any, Any]: """ Method called for intercepting operation handlers. Args: + ctx: The :py:class:`OperationContext` that will be passed to the operation handler. next: The underlying operation handler that this interceptor should delegate to. diff --git a/tests/handler/test_async_operation.py b/tests/handler/test_async_operation.py index bbb99a8..867de88 100644 --- a/tests/handler/test_async_operation.py +++ b/tests/handler/test_async_operation.py @@ -9,9 +9,9 @@ OperationHandler, StartOperationContext, StartOperationResultAsync, + operation_handler, service_handler, ) -from nexusrpc.handler._decorators import operation_handler from tests.helpers import DummySerializer, TestOperationTaskCancellation _operation_results: dict[str, int] = {} diff --git a/tests/handler/test_cancellation.py b/tests/handler/test_cancellation.py index 626ecd2..f599157 100644 --- a/tests/handler/test_cancellation.py +++ b/tests/handler/test_cancellation.py @@ -1,5 +1,4 @@ import asyncio - import pytest from nexusrpc import LazyValue diff --git a/tests/handler/test_interceptors.py b/tests/handler/test_middleware.py similarity index 89% rename from tests/handler/test_interceptors.py rename to tests/handler/test_middleware.py index d0ab9d6..deea151 100644 --- a/tests/handler/test_interceptors.py +++ b/tests/handler/test_middleware.py @@ -8,10 +8,11 @@ from nexusrpc import LazyValue from nexusrpc.handler import ( AwaitableOperationHandler, + OperationContext, CancelOperationContext, Handler, OperationHandler, - OperationHandlerInterceptor, + OperationHandlerMiddleware, StartOperationContext, StartOperationResultAsync, StartOperationResultSync, @@ -52,13 +53,13 @@ def incr(self, ctx: StartOperationContext, input: int) -> int: # type: ignore[r return input + 1 -class CountingInterceptor(OperationHandlerInterceptor): +class CountingMiddleware(OperationHandlerMiddleware): def __init__(self) -> None: self.num_start = 0 self.num_cancel = 0 - def intercept_operation_handler( - self, next: AwaitableOperationHandler[Any, Any] + def intercept( + self, ctx: OperationContext, next: AwaitableOperationHandler[Any, Any] ) -> AwaitableOperationHandler[Any, Any]: return CountingOperationHandler(next, self) @@ -72,7 +73,7 @@ class CountingOperationHandler(AwaitableOperationHandler[Any, Any]): def __init__( self, next: AwaitableOperationHandler[Any, Any], - interceptor: CountingInterceptor, + interceptor: CountingMiddleware, ) -> None: self._next = next self._interceptor = interceptor @@ -88,12 +89,12 @@ async def cancel(self, ctx: CancelOperationContext, token: str) -> None: return await self._next.cancel(ctx, token) -class MustBeFirstInterceptor(OperationHandlerInterceptor): - def __init__(self, counter: CountingInterceptor) -> None: +class MustBeFirstMiddleware(OperationHandlerMiddleware): + def __init__(self, counter: CountingMiddleware) -> None: self._counter = counter - def intercept_operation_handler( - self, next: AwaitableOperationHandler[Any, Any] + def intercept( + self, ctx: OperationContext, next: AwaitableOperationHandler[Any, Any] ) -> AwaitableOperationHandler[Any, Any]: return MustBeFirstOperationHandler(next, self._counter) @@ -105,7 +106,7 @@ class MustBeFirstOperationHandler(AwaitableOperationHandler[Any, Any]): """ def __init__( - self, next: AwaitableOperationHandler[Any, Any], counter: CountingInterceptor + self, next: AwaitableOperationHandler[Any, Any], counter: CountingMiddleware ) -> None: self._next = next self._counter = counter @@ -143,11 +144,11 @@ async def cancel(self, ctx: CancelOperationContext, token: str) -> None: @pytest.mark.asyncio async def test_async_operation_interceptors_applied(): - counting_interceptor = CountingInterceptor() + counting_interceptor = CountingMiddleware() handler = Handler( user_service_handlers=[MyService()], interceptors=[ - MustBeFirstInterceptor(counting_interceptor), + MustBeFirstMiddleware(counting_interceptor), counting_interceptor, ], ) @@ -179,12 +180,12 @@ async def test_async_operation_interceptors_applied(): @pytest.mark.asyncio async def test_sync_operation_interceptors_applied(): - counting_interceptor = CountingInterceptor() + counting_interceptor = CountingMiddleware() handler = Handler( user_service_handlers=[MyServiceSync()], executor=concurrent.futures.ThreadPoolExecutor(), interceptors=[ - MustBeFirstInterceptor(counting_interceptor), + MustBeFirstMiddleware(counting_interceptor), counting_interceptor, ], ) From 74ca843b6873825f6bd2edf07d866bef45f20011 Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Mon, 10 Nov 2025 16:18:07 -0800 Subject: [PATCH 15/26] fix formatting and linter errors --- src/nexusrpc/handler/_core.py | 4 +++- tests/handler/test_cancellation.py | 1 + tests/handler/test_middleware.py | 2 +- 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/nexusrpc/handler/_core.py b/src/nexusrpc/handler/_core.py index 6d431e9..8d203da 100644 --- a/src/nexusrpc/handler/_core.py +++ b/src/nexusrpc/handler/_core.py @@ -424,7 +424,9 @@ class OperationHandlerMiddleware(ABC): """ def intercept( - self, ctx: OperationContext, next: AwaitableOperationHandler[Any, Any] + self, + ctx: OperationContext, # type: ignore[reportUnusedParameter] + next: AwaitableOperationHandler[Any, Any], ) -> AwaitableOperationHandler[Any, Any]: """ Method called for intercepting operation handlers. diff --git a/tests/handler/test_cancellation.py b/tests/handler/test_cancellation.py index f599157..626ecd2 100644 --- a/tests/handler/test_cancellation.py +++ b/tests/handler/test_cancellation.py @@ -1,4 +1,5 @@ import asyncio + import pytest from nexusrpc import LazyValue diff --git a/tests/handler/test_middleware.py b/tests/handler/test_middleware.py index deea151..8c992ac 100644 --- a/tests/handler/test_middleware.py +++ b/tests/handler/test_middleware.py @@ -8,9 +8,9 @@ from nexusrpc import LazyValue from nexusrpc.handler import ( AwaitableOperationHandler, - OperationContext, CancelOperationContext, Handler, + OperationContext, OperationHandler, OperationHandlerMiddleware, StartOperationContext, From 835d43d48af3bcd4e923ae155a289aad96a381c6 Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Fri, 14 Nov 2025 08:49:46 -0800 Subject: [PATCH 16/26] Remove return repetitive types in OperationHandler.start. Make OperationHandlerMiddleware.intercept an abstract method. --- src/nexusrpc/handler/_core.py | 3 ++- src/nexusrpc/handler/_operation_handler.py | 2 -- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/nexusrpc/handler/_core.py b/src/nexusrpc/handler/_core.py index 8d203da..04800e5 100644 --- a/src/nexusrpc/handler/_core.py +++ b/src/nexusrpc/handler/_core.py @@ -423,6 +423,7 @@ class OperationHandlerMiddleware(ABC): This should be extended by any operation handler interceptors. """ + @abstractmethod def intercept( self, ctx: OperationContext, # type: ignore[reportUnusedParameter] @@ -440,7 +441,7 @@ def intercept( The new interceptor that will be used to invoke :py:attr:`OperationHandler.start` or :py:attr:`OperationHandler.cancel`. """ - return next + ... class _EnsuredAwaitableOperationHandler(AwaitableOperationHandler[Any, Any]): diff --git a/src/nexusrpc/handler/_operation_handler.py b/src/nexusrpc/handler/_operation_handler.py index 99f0ea0..9d28f2c 100644 --- a/src/nexusrpc/handler/_operation_handler.py +++ b/src/nexusrpc/handler/_operation_handler.py @@ -41,9 +41,7 @@ def start( self, ctx: StartOperationContext, input: InputT ) -> ( StartOperationResultSync[OutputT] - | Awaitable[StartOperationResultSync[OutputT]] | StartOperationResultAsync - | Awaitable[StartOperationResultAsync] | Awaitable[StartOperationResultSync[OutputT] | StartOperationResultAsync] ): """ From 4205c931d133017d321f9fe1630b465f35d27156 Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Fri, 14 Nov 2025 09:01:32 -0800 Subject: [PATCH 17/26] Move deploy-docs to it's own workflow that runs on push to main --- .github/workflows/ci.yml | 37 +++---------------------------- .github/workflows/deploy-docs.yml | 37 +++++++++++++++++++++++++++++++ 2 files changed, 40 insertions(+), 34 deletions(-) create mode 100644 .github/workflows/deploy-docs.yml diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0b04811..e1582f1 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -3,14 +3,15 @@ name: CI on: pull_request: push: - branches: [ main ] + branches: + - main jobs: lint-test-docs: runs-on: ${{ matrix.os }} strategy: matrix: - python-version: ['3.10', '3.13', '3.14'] + python-version: ['3.10', '3.14'] os: [ubuntu-latest, macos-latest, windows-latest] steps: @@ -38,35 +39,3 @@ jobs: with: name: coverage-html-report-${{ matrix.os }}-${{ matrix.python-version }} path: coverage_html_report/ - - deploy-docs: - runs-on: ubuntu-latest - needs: lint-test-docs - # TODO(preview): deploy on releases only - permissions: - contents: read - pages: write - id-token: write - - steps: - - name: Checkout repository - uses: actions/checkout@v4 - - - name: Install uv - uses: astral-sh/setup-uv@v6 - with: - python-version: '3.10' - - - name: Install dependencies - run: uv sync - - - name: Build API docs - run: uv run poe docs - - - name: Upload docs to GitHub Pages - uses: actions/upload-pages-artifact@v3 - with: - path: apidocs - - - name: Deploy to GitHub Pages - uses: actions/deploy-pages@v4 diff --git a/.github/workflows/deploy-docs.yml b/.github/workflows/deploy-docs.yml new file mode 100644 index 0000000..1c89669 --- /dev/null +++ b/.github/workflows/deploy-docs.yml @@ -0,0 +1,37 @@ +name: CI + +on: + push: + branches: + - main + +jobs: + deploy-docs: + runs-on: ubuntu-latest + permissions: + contents: read + pages: write + id-token: write + + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Install uv + uses: astral-sh/setup-uv@v6 + with: + python-version: '3.10' + + - name: Install dependencies + run: uv sync + + - name: Build API docs + run: uv run poe docs + + - name: Upload docs to GitHub Pages + uses: actions/upload-pages-artifact@v3 + with: + path: apidocs + + - name: Deploy to GitHub Pages + uses: actions/deploy-pages@v4 From d933f839b553ace845ac49215885b6944cc24a3b Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Fri, 14 Nov 2025 09:02:00 -0800 Subject: [PATCH 18/26] Fix workflow name in deploy-docs --- .github/workflows/deploy-docs.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/deploy-docs.yml b/.github/workflows/deploy-docs.yml index 1c89669..ddff9d6 100644 --- a/.github/workflows/deploy-docs.yml +++ b/.github/workflows/deploy-docs.yml @@ -1,4 +1,4 @@ -name: CI +name: Deploy Docs on: push: From 96ddecf77fcc75e19af74837670e1aee4df21bbb Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Fri, 14 Nov 2025 09:05:51 -0800 Subject: [PATCH 19/26] export LazyValueT and Serializer from _serializer.py --- src/nexusrpc/__init__.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/nexusrpc/__init__.py b/src/nexusrpc/__init__.py index bc02763..595ce5c 100644 --- a/src/nexusrpc/__init__.py +++ b/src/nexusrpc/__init__.py @@ -25,7 +25,7 @@ OperationErrorState, OutputT, ) -from ._serializer import Content, LazyValue +from ._serializer import Content, LazyValue, LazyValueT, Serializer from ._service import Operation, OperationDefinition, ServiceDefinition, service from ._util import ( get_operation, @@ -42,12 +42,14 @@ "HandlerErrorType", "InputT", "LazyValue", + "LazyValueT", "Link", "Operation", "OperationDefinition", "OperationError", "OperationErrorState", "OutputT", + "Serializer", "service", "ServiceDefinition", "set_operation", From 59c96d8ba127de82db3025376c994ed85024d512 Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Fri, 14 Nov 2025 09:07:52 -0800 Subject: [PATCH 20/26] remove the work 'docs' from the 'lint-test' job --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e1582f1..72943ab 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -7,7 +7,7 @@ on: - main jobs: - lint-test-docs: + lint-test: runs-on: ${{ matrix.os }} strategy: matrix: From 5f2a39989c9860bb879126cee33de16cac40d3db Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Fri, 14 Nov 2025 16:23:01 -0800 Subject: [PATCH 21/26] Rename AwaitableOperationHandler to MiddlewareSafeOperationHandler --- src/nexusrpc/handler/__init__.py | 4 ++-- src/nexusrpc/handler/_core.py | 12 ++++++------ src/nexusrpc/handler/_operation_handler.py | 2 +- tests/handler/test_middleware.py | 20 +++++++++++--------- 4 files changed, 20 insertions(+), 18 deletions(-) diff --git a/src/nexusrpc/handler/__init__.py b/src/nexusrpc/handler/__init__.py index 6ca6f2f..40567d2 100644 --- a/src/nexusrpc/handler/__init__.py +++ b/src/nexusrpc/handler/__init__.py @@ -20,10 +20,10 @@ ) from ._core import Handler, OperationHandlerMiddleware from ._decorators import operation_handler, service_handler, sync_operation -from ._operation_handler import AwaitableOperationHandler, OperationHandler +from ._operation_handler import MiddlewareSafeOperationHandler, OperationHandler __all__ = [ - "AwaitableOperationHandler", + "MiddlewareSafeOperationHandler", "CancelOperationContext", "Handler", "OperationContext", diff --git a/src/nexusrpc/handler/_core.py b/src/nexusrpc/handler/_core.py index 04800e5..f6e2e21 100644 --- a/src/nexusrpc/handler/_core.py +++ b/src/nexusrpc/handler/_core.py @@ -119,7 +119,7 @@ StartOperationResultSync, ) from ._operation_handler import ( - AwaitableOperationHandler, + MiddlewareSafeOperationHandler, OperationHandler, collect_operation_handler_factories_by_method_name, ) @@ -293,11 +293,11 @@ async def cancel_operation(self, ctx: CancelOperationContext, token: str) -> Non def _get_operation_handler( self, ctx: OperationContext, service_handler: ServiceHandler, operation: str - ) -> AwaitableOperationHandler[Any, Any]: + ) -> MiddlewareSafeOperationHandler[Any, Any]: """ Get the specified handler for the specified operation from the given service_handler and apply all interceptors. """ - op_handler: AwaitableOperationHandler[Any, Any] = ( + op_handler: MiddlewareSafeOperationHandler[Any, Any] = ( _EnsuredAwaitableOperationHandler( self.executor, service_handler.get_operation_handler(operation) ) @@ -427,8 +427,8 @@ class OperationHandlerMiddleware(ABC): def intercept( self, ctx: OperationContext, # type: ignore[reportUnusedParameter] - next: AwaitableOperationHandler[Any, Any], - ) -> AwaitableOperationHandler[Any, Any]: + next: MiddlewareSafeOperationHandler[Any, Any], + ) -> MiddlewareSafeOperationHandler[Any, Any]: """ Method called for intercepting operation handlers. @@ -444,7 +444,7 @@ def intercept( ... -class _EnsuredAwaitableOperationHandler(AwaitableOperationHandler[Any, Any]): +class _EnsuredAwaitableOperationHandler(MiddlewareSafeOperationHandler[Any, Any]): """ An :py:class:`AwaitableOperationHandler` that wraps an :py:class:`OperationHandler` and uses an :py:class:`_Executor` to ensure that the :py:attr:`start` and :py:attr:`cancel` methods are awaitable. diff --git a/src/nexusrpc/handler/_operation_handler.py b/src/nexusrpc/handler/_operation_handler.py index 9d28f2c..85659e8 100644 --- a/src/nexusrpc/handler/_operation_handler.py +++ b/src/nexusrpc/handler/_operation_handler.py @@ -101,7 +101,7 @@ async def cancel(self, ctx: CancelOperationContext, token: str) -> None: ) -class AwaitableOperationHandler(OperationHandler[InputT, OutputT], ABC): +class MiddlewareSafeOperationHandler(OperationHandler[InputT, OutputT], ABC): """ An :py:class:`OperationHandler` where :py:attr:`start` and :py:attr:`cancel` can be awaited by an async runtime. It can produce a result synchronously by returning diff --git a/tests/handler/test_middleware.py b/tests/handler/test_middleware.py index 8c992ac..afa5318 100644 --- a/tests/handler/test_middleware.py +++ b/tests/handler/test_middleware.py @@ -7,7 +7,7 @@ from nexusrpc import LazyValue from nexusrpc.handler import ( - AwaitableOperationHandler, + MiddlewareSafeOperationHandler, CancelOperationContext, Handler, OperationContext, @@ -59,12 +59,12 @@ def __init__(self) -> None: self.num_cancel = 0 def intercept( - self, ctx: OperationContext, next: AwaitableOperationHandler[Any, Any] - ) -> AwaitableOperationHandler[Any, Any]: + self, ctx: OperationContext, next: MiddlewareSafeOperationHandler[Any, Any] + ) -> MiddlewareSafeOperationHandler[Any, Any]: return CountingOperationHandler(next, self) -class CountingOperationHandler(AwaitableOperationHandler[Any, Any]): +class CountingOperationHandler(MiddlewareSafeOperationHandler[Any, Any]): """ An :py:class:`AwaitableOperationHandler` that wraps a counting interceptor that counts the number of calls to each handler method. @@ -72,7 +72,7 @@ class CountingOperationHandler(AwaitableOperationHandler[Any, Any]): def __init__( self, - next: AwaitableOperationHandler[Any, Any], + next: MiddlewareSafeOperationHandler[Any, Any], interceptor: CountingMiddleware, ) -> None: self._next = next @@ -94,19 +94,21 @@ def __init__(self, counter: CountingMiddleware) -> None: self._counter = counter def intercept( - self, ctx: OperationContext, next: AwaitableOperationHandler[Any, Any] - ) -> AwaitableOperationHandler[Any, Any]: + self, ctx: OperationContext, next: MiddlewareSafeOperationHandler[Any, Any] + ) -> MiddlewareSafeOperationHandler[Any, Any]: return MustBeFirstOperationHandler(next, self._counter) -class MustBeFirstOperationHandler(AwaitableOperationHandler[Any, Any]): +class MustBeFirstOperationHandler(MiddlewareSafeOperationHandler[Any, Any]): """ An :py:class:`AwaitableOperationHandler` that wraps a counting interceptor and asserts that the wrapped interceptor has a count of 0 for each handler method """ def __init__( - self, next: AwaitableOperationHandler[Any, Any], counter: CountingMiddleware + self, + next: MiddlewareSafeOperationHandler[Any, Any], + counter: CountingMiddleware, ) -> None: self._next = next self._counter = counter From ae3b58f011585f0fa913793f489d02ca58abdaf2 Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Fri, 14 Nov 2025 16:24:04 -0800 Subject: [PATCH 22/26] Run formatter --- tests/handler/test_middleware.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/handler/test_middleware.py b/tests/handler/test_middleware.py index afa5318..d855467 100644 --- a/tests/handler/test_middleware.py +++ b/tests/handler/test_middleware.py @@ -7,9 +7,9 @@ from nexusrpc import LazyValue from nexusrpc.handler import ( - MiddlewareSafeOperationHandler, CancelOperationContext, Handler, + MiddlewareSafeOperationHandler, OperationContext, OperationHandler, OperationHandlerMiddleware, From b42c0e955f47a9bd89f2d29fe4a948babdeaf06e Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Thu, 20 Nov 2025 09:41:09 -0800 Subject: [PATCH 23/26] remove generic args in MiddlewareSafeOperationHandler since it by definition, must always be OperationHandler[Any,Any] --- src/nexusrpc/handler/_operation_handler.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/nexusrpc/handler/_operation_handler.py b/src/nexusrpc/handler/_operation_handler.py index 85659e8..5a9c16a 100644 --- a/src/nexusrpc/handler/_operation_handler.py +++ b/src/nexusrpc/handler/_operation_handler.py @@ -101,7 +101,7 @@ async def cancel(self, ctx: CancelOperationContext, token: str) -> None: ) -class MiddlewareSafeOperationHandler(OperationHandler[InputT, OutputT], ABC): +class MiddlewareSafeOperationHandler(OperationHandler[Any, Any], ABC): """ An :py:class:`OperationHandler` where :py:attr:`start` and :py:attr:`cancel` can be awaited by an async runtime. It can produce a result synchronously by returning @@ -111,8 +111,8 @@ class MiddlewareSafeOperationHandler(OperationHandler[InputT, OutputT], ABC): @abstractmethod async def start( - self, ctx: StartOperationContext, input: InputT - ) -> StartOperationResultSync[OutputT] | StartOperationResultAsync: + self, ctx: StartOperationContext, input: Any + ) -> StartOperationResultSync[Any] | StartOperationResultAsync: """ Start the operation and return it's result or an async token. """ From 0f37ce88ffd8b43426caa4decad6b73cbbe7f4cb Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Thu, 20 Nov 2025 09:47:55 -0800 Subject: [PATCH 24/26] Finish removing generic args from MiddlewareSafeOperationHandler --- src/nexusrpc/handler/_core.py | 14 ++++++-------- tests/handler/test_middleware.py | 16 ++++++++-------- 2 files changed, 14 insertions(+), 16 deletions(-) diff --git a/src/nexusrpc/handler/_core.py b/src/nexusrpc/handler/_core.py index f6e2e21..885b71e 100644 --- a/src/nexusrpc/handler/_core.py +++ b/src/nexusrpc/handler/_core.py @@ -293,14 +293,12 @@ async def cancel_operation(self, ctx: CancelOperationContext, token: str) -> Non def _get_operation_handler( self, ctx: OperationContext, service_handler: ServiceHandler, operation: str - ) -> MiddlewareSafeOperationHandler[Any, Any]: + ) -> MiddlewareSafeOperationHandler: """ Get the specified handler for the specified operation from the given service_handler and apply all interceptors. """ - op_handler: MiddlewareSafeOperationHandler[Any, Any] = ( - _EnsuredAwaitableOperationHandler( - self.executor, service_handler.get_operation_handler(operation) - ) + op_handler: MiddlewareSafeOperationHandler = _EnsuredAwaitableOperationHandler( + self.executor, service_handler.get_operation_handler(operation) ) for interceptor in reversed(self._interceptors): @@ -427,8 +425,8 @@ class OperationHandlerMiddleware(ABC): def intercept( self, ctx: OperationContext, # type: ignore[reportUnusedParameter] - next: MiddlewareSafeOperationHandler[Any, Any], - ) -> MiddlewareSafeOperationHandler[Any, Any]: + next: MiddlewareSafeOperationHandler, + ) -> MiddlewareSafeOperationHandler: """ Method called for intercepting operation handlers. @@ -444,7 +442,7 @@ def intercept( ... -class _EnsuredAwaitableOperationHandler(MiddlewareSafeOperationHandler[Any, Any]): +class _EnsuredAwaitableOperationHandler(MiddlewareSafeOperationHandler): """ An :py:class:`AwaitableOperationHandler` that wraps an :py:class:`OperationHandler` and uses an :py:class:`_Executor` to ensure that the :py:attr:`start` and :py:attr:`cancel` methods are awaitable. diff --git a/tests/handler/test_middleware.py b/tests/handler/test_middleware.py index d855467..fe6ec90 100644 --- a/tests/handler/test_middleware.py +++ b/tests/handler/test_middleware.py @@ -59,12 +59,12 @@ def __init__(self) -> None: self.num_cancel = 0 def intercept( - self, ctx: OperationContext, next: MiddlewareSafeOperationHandler[Any, Any] - ) -> MiddlewareSafeOperationHandler[Any, Any]: + self, ctx: OperationContext, next: MiddlewareSafeOperationHandler + ) -> MiddlewareSafeOperationHandler: return CountingOperationHandler(next, self) -class CountingOperationHandler(MiddlewareSafeOperationHandler[Any, Any]): +class CountingOperationHandler(MiddlewareSafeOperationHandler): """ An :py:class:`AwaitableOperationHandler` that wraps a counting interceptor that counts the number of calls to each handler method. @@ -72,7 +72,7 @@ class CountingOperationHandler(MiddlewareSafeOperationHandler[Any, Any]): def __init__( self, - next: MiddlewareSafeOperationHandler[Any, Any], + next: MiddlewareSafeOperationHandler, interceptor: CountingMiddleware, ) -> None: self._next = next @@ -94,12 +94,12 @@ def __init__(self, counter: CountingMiddleware) -> None: self._counter = counter def intercept( - self, ctx: OperationContext, next: MiddlewareSafeOperationHandler[Any, Any] - ) -> MiddlewareSafeOperationHandler[Any, Any]: + self, ctx: OperationContext, next: MiddlewareSafeOperationHandler + ) -> MiddlewareSafeOperationHandler: return MustBeFirstOperationHandler(next, self._counter) -class MustBeFirstOperationHandler(MiddlewareSafeOperationHandler[Any, Any]): +class MustBeFirstOperationHandler(MiddlewareSafeOperationHandler): """ An :py:class:`AwaitableOperationHandler` that wraps a counting interceptor and asserts that the wrapped interceptor has a count of 0 for each handler method @@ -107,7 +107,7 @@ class MustBeFirstOperationHandler(MiddlewareSafeOperationHandler[Any, Any]): def __init__( self, - next: MiddlewareSafeOperationHandler[Any, Any], + next: MiddlewareSafeOperationHandler, counter: CountingMiddleware, ) -> None: self._next = next From bde8337f83b0ba50690632edfa4ac06b3928c322 Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Mon, 24 Nov 2025 13:17:14 -0800 Subject: [PATCH 25/26] Update old reference from 'interceptors' -> 'middleware' --- src/nexusrpc/handler/_core.py | 4 ++-- tests/handler/test_middleware.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/nexusrpc/handler/_core.py b/src/nexusrpc/handler/_core.py index 885b71e..4a4a199 100644 --- a/src/nexusrpc/handler/_core.py +++ b/src/nexusrpc/handler/_core.py @@ -250,10 +250,10 @@ def __init__( self, user_service_handlers: Sequence[Any], executor: Optional[concurrent.futures.Executor] = None, - interceptors: Sequence[OperationHandlerMiddleware] | None = None, + middleware: Sequence[OperationHandlerMiddleware] | None = None, ): self._interceptors = cast( - Sequence[OperationHandlerMiddleware], interceptors or [] + Sequence[OperationHandlerMiddleware], middleware or [] ) super().__init__(user_service_handlers, executor=executor) if not self.executor: diff --git a/tests/handler/test_middleware.py b/tests/handler/test_middleware.py index fe6ec90..cc926f5 100644 --- a/tests/handler/test_middleware.py +++ b/tests/handler/test_middleware.py @@ -149,7 +149,7 @@ async def test_async_operation_interceptors_applied(): counting_interceptor = CountingMiddleware() handler = Handler( user_service_handlers=[MyService()], - interceptors=[ + middleware=[ MustBeFirstMiddleware(counting_interceptor), counting_interceptor, ], @@ -186,7 +186,7 @@ async def test_sync_operation_interceptors_applied(): handler = Handler( user_service_handlers=[MyServiceSync()], executor=concurrent.futures.ThreadPoolExecutor(), - interceptors=[ + middleware=[ MustBeFirstMiddleware(counting_interceptor), counting_interceptor, ], From 875e2eafc1dcbb39eb2c31c17d89741c58ddb02d Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Mon, 24 Nov 2025 13:21:27 -0800 Subject: [PATCH 26/26] Remove _all_ reference to interceptors --- src/nexusrpc/handler/_core.py | 18 +++++++-------- tests/handler/test_middleware.py | 38 ++++++++++++++++---------------- 2 files changed, 27 insertions(+), 29 deletions(-) diff --git a/src/nexusrpc/handler/_core.py b/src/nexusrpc/handler/_core.py index 4a4a199..eb3b01a 100644 --- a/src/nexusrpc/handler/_core.py +++ b/src/nexusrpc/handler/_core.py @@ -252,9 +252,7 @@ def __init__( executor: Optional[concurrent.futures.Executor] = None, middleware: Sequence[OperationHandlerMiddleware] | None = None, ): - self._interceptors = cast( - Sequence[OperationHandlerMiddleware], middleware or [] - ) + self._middleware = cast(Sequence[OperationHandlerMiddleware], middleware or []) super().__init__(user_service_handlers, executor=executor) if not self.executor: self._validate_all_operation_handlers_are_async() @@ -295,14 +293,14 @@ def _get_operation_handler( self, ctx: OperationContext, service_handler: ServiceHandler, operation: str ) -> MiddlewareSafeOperationHandler: """ - Get the specified handler for the specified operation from the given service_handler and apply all interceptors. + Get the specified handler for the specified operation from the given service_handler and apply all middleware. """ op_handler: MiddlewareSafeOperationHandler = _EnsuredAwaitableOperationHandler( self.executor, service_handler.get_operation_handler(operation) ) - for interceptor in reversed(self._interceptors): - op_handler = interceptor.intercept(ctx, op_handler) + for middleware in reversed(self._middleware): + op_handler = middleware.intercept(ctx, op_handler) return op_handler @@ -416,9 +414,9 @@ def submit( class OperationHandlerMiddleware(ABC): """ - Interceptor for operation handlers. + Middleware for operation handlers. - This should be extended by any operation handler interceptors. + This should be extended by any operation handler middelware. """ @abstractmethod @@ -432,11 +430,11 @@ def intercept( Args: ctx: The :py:class:`OperationContext` that will be passed to the operation handler. - next: The underlying operation handler that this interceptor + next: The underlying operation handler that this middleware should delegate to. Returns: - The new interceptor that will be used to invoke + The new middleware that will be used to invoke :py:attr:`OperationHandler.start` or :py:attr:`OperationHandler.cancel`. """ ... diff --git a/tests/handler/test_middleware.py b/tests/handler/test_middleware.py index cc926f5..f5636b3 100644 --- a/tests/handler/test_middleware.py +++ b/tests/handler/test_middleware.py @@ -66,26 +66,26 @@ def intercept( class CountingOperationHandler(MiddlewareSafeOperationHandler): """ - An :py:class:`AwaitableOperationHandler` that wraps a counting interceptor + An :py:class:`AwaitableOperationHandler` that wraps a counting middleware that counts the number of calls to each handler method. """ def __init__( self, next: MiddlewareSafeOperationHandler, - interceptor: CountingMiddleware, + middleware: CountingMiddleware, ) -> None: self._next = next - self._interceptor = interceptor + self._middleware = middleware async def start( self, ctx: StartOperationContext, input: Any ) -> StartOperationResultSync[Any] | StartOperationResultAsync: - self._interceptor.num_start += 1 + self._middleware.num_start += 1 return await self._next.start(ctx, input) async def cancel(self, ctx: CancelOperationContext, token: str) -> None: - self._interceptor.num_cancel += 1 + self._middleware.num_cancel += 1 return await self._next.cancel(ctx, token) @@ -101,8 +101,8 @@ def intercept( class MustBeFirstOperationHandler(MiddlewareSafeOperationHandler): """ - An :py:class:`AwaitableOperationHandler` that wraps a counting interceptor - and asserts that the wrapped interceptor has a count of 0 for each handler method + An :py:class:`AwaitableOperationHandler` that wraps a counting middleware + and asserts that the wrapped middleware has a count of 0 for each handler method """ def __init__( @@ -145,13 +145,13 @@ async def cancel(self, ctx: CancelOperationContext, token: str) -> None: @pytest.mark.asyncio -async def test_async_operation_interceptors_applied(): - counting_interceptor = CountingMiddleware() +async def test_async_operation_middleware_applied(): + counting_middleware = CountingMiddleware() handler = Handler( user_service_handlers=[MyService()], middleware=[ - MustBeFirstMiddleware(counting_interceptor), - counting_interceptor, + MustBeFirstMiddleware(counting_middleware), + counting_middleware, ], ) start_ctx = StartOperationContext( @@ -176,19 +176,19 @@ async def test_async_operation_interceptors_applied(): await handler.cancel_operation(cancel_ctx, start_result.token) assert start_result.token not in _operation_results - assert counting_interceptor.num_start == 1 - assert counting_interceptor.num_cancel == 1 + assert counting_middleware.num_start == 1 + assert counting_middleware.num_cancel == 1 @pytest.mark.asyncio -async def test_sync_operation_interceptors_applied(): - counting_interceptor = CountingMiddleware() +async def test_sync_operation_middleware_applied(): + counting_middleware = CountingMiddleware() handler = Handler( user_service_handlers=[MyServiceSync()], executor=concurrent.futures.ThreadPoolExecutor(), middleware=[ - MustBeFirstMiddleware(counting_interceptor), - counting_interceptor, + MustBeFirstMiddleware(counting_middleware), + counting_middleware, ], ) start_ctx = StartOperationContext( @@ -204,5 +204,5 @@ async def test_sync_operation_interceptors_applied(): assert isinstance(start_result, StartOperationResultSync) assert start_result.value == 2 - assert counting_interceptor.num_start == 1 - assert counting_interceptor.num_cancel == 0 + assert counting_middleware.num_start == 1 + assert counting_middleware.num_cancel == 0