From 8ead5a4bf8f1bec9909aabfc43c7da4be5952b4b Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Tue, 6 Jan 2026 11:36:23 -0800 Subject: [PATCH] Update to nexus-rpc that allows creating nexusrpc.HandlerError from an error type string. Refactor converter to use the string constructor rather than determining the nexusrpc.HandlerErrorType. Update references to nexusrpc.HandlerError to use the renamed error_type field. --- pyproject.toml | 5 +++- temporalio/converter.py | 11 ++------ temporalio/nexus/_operation_handlers.py | 4 +-- temporalio/worker/_nexus.py | 28 +++++++++---------- tests/nexus/test_handler.py | 2 +- ...llation_types_when_cancel_handler_fails.py | 2 +- .../test_workflow_caller_error_chains.py | 14 +++++----- tests/nexus/test_workflow_caller_errors.py | 6 ++-- uv.lock | 8 ++---- 9 files changed, 36 insertions(+), 44 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index d660bfb46..70b16dd4c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -97,7 +97,7 @@ lint-docs = "uv run pydocstyle --ignore-decorators=overload" lint-types = [ { cmd = "uv run pyright" }, { cmd = "uv run mypy --namespace-packages --check-untyped-defs ." }, - { cmd = "uv run basedpyright" } + { cmd = "uv run basedpyright" }, ] run-bench = "uv run python scripts/run_bench.py" test = "uv run pytest" @@ -229,3 +229,6 @@ exclude = ["temporalio/bridge/target/**/*"] [tool.uv] # Prevent uv commands from building the package by default package = false + +[tool.uv.sources] +nexus-rpc = { git = "https://github.com/nexus-rpc/sdk-python.git", branch = "preserve-raw-handler-error-type" } diff --git a/temporalio/converter.py b/temporalio/converter.py index 3849a47f4..f05d4ec7f 100644 --- a/temporalio/converter.py +++ b/temporalio/converter.py @@ -1079,7 +1079,7 @@ def _nexus_handler_error_to_failure( if error.__cause__: self.to_failure(error.__cause__, payload_converter, failure.cause) failure.nexus_handler_failure_info.SetInParent() - failure.nexus_handler_failure_info.type = error.type.name + failure.nexus_handler_failure_info.type = error.error_type.name failure.nexus_handler_failure_info.retry_behavior = temporalio.api.enums.v1.NexusHandlerErrorRetryBehavior.ValueType( temporalio.api.enums.v1.NexusHandlerErrorRetryBehavior.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_RETRYABLE if error.retryable_override is True @@ -1184,13 +1184,6 @@ def from_failure( ) elif failure.HasField("nexus_handler_failure_info"): nexus_handler_failure_info = failure.nexus_handler_failure_info - try: - _type = nexusrpc.HandlerErrorType[nexus_handler_failure_info.type] - except KeyError: - logger.warning( - f"Unknown Nexus HandlerErrorType: {nexus_handler_failure_info.type}" - ) - _type = nexusrpc.HandlerErrorType.INTERNAL retryable_override = ( True if ( @@ -1206,7 +1199,7 @@ def from_failure( ) err = nexusrpc.HandlerError( failure.message or "Nexus handler error", - type=_type, + error_type=nexus_handler_failure_info.type, retryable_override=retryable_override, ) elif failure.HasField("nexus_operation_execution_failure_info"): diff --git a/temporalio/nexus/_operation_handlers.py b/temporalio/nexus/_operation_handlers.py index 68035ca41..2ff008f87 100644 --- a/temporalio/nexus/_operation_handlers.py +++ b/temporalio/nexus/_operation_handlers.py @@ -98,7 +98,7 @@ async def _cancel_workflow( raise HandlerError( "Failed to decode operation token as a workflow operation token. " "Canceling non-workflow operations is not supported.", - type=HandlerErrorType.NOT_FOUND, + error_type=HandlerErrorType.NOT_FOUND, ) from err ctx = _temporal_cancel_operation_context.get() @@ -109,6 +109,6 @@ async def _cancel_workflow( except Exception as err: raise HandlerError( "Failed to construct workflow handle from workflow operation token", - type=HandlerErrorType.NOT_FOUND, + error_type=HandlerErrorType.NOT_FOUND, ) from err await client_workflow_handle.cancel(**kwargs) diff --git a/temporalio/worker/_nexus.py b/temporalio/worker/_nexus.py index 8f6226ea3..0eee8afd7 100644 --- a/temporalio/worker/_nexus.py +++ b/temporalio/worker/_nexus.py @@ -450,7 +450,7 @@ async def _handler_error_to_proto( else temporalio.api.enums.v1.NexusHandlerErrorRetryBehavior.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_UNSPECIFIED ) return temporalio.api.nexus.v1.HandlerError( - error_type=handler_error.type.value, + error_type=handler_error.error_type.value, failure=await self._nexus_error_to_nexus_failure_proto(handler_error), retry_behavior=retry_behavior, ) @@ -480,7 +480,7 @@ async def deserialize( except Exception as err: raise nexusrpc.HandlerError( "Data converter failed to decode Nexus operation input", - type=nexusrpc.HandlerErrorType.BAD_REQUEST, + error_type=nexusrpc.HandlerErrorType.BAD_REQUEST, retryable_override=False, ) from err @@ -509,20 +509,20 @@ def _exception_to_handler_error(err: BaseException) -> nexusrpc.HandlerError: handler_err = nexusrpc.HandlerError( # TODO(nexus-preview): confirm what we want as message here err.message, - type=nexusrpc.HandlerErrorType.INTERNAL, + error_type=nexusrpc.HandlerErrorType.INTERNAL, retryable_override=not err.non_retryable, ) elif isinstance(err, WorkflowAlreadyStartedError): handler_err = nexusrpc.HandlerError( err.message, - type=nexusrpc.HandlerErrorType.INTERNAL, + error_type=nexusrpc.HandlerErrorType.INTERNAL, retryable_override=False, ) elif isinstance(err, RPCError): if err.status == RPCStatusCode.INVALID_ARGUMENT: handler_err = nexusrpc.HandlerError( err.message, - type=nexusrpc.HandlerErrorType.BAD_REQUEST, + error_type=nexusrpc.HandlerErrorType.BAD_REQUEST, ) elif err.status in [ RPCStatusCode.ALREADY_EXISTS, @@ -531,13 +531,13 @@ def _exception_to_handler_error(err: BaseException) -> nexusrpc.HandlerError: ]: handler_err = nexusrpc.HandlerError( err.message, - type=nexusrpc.HandlerErrorType.INTERNAL, + error_type=nexusrpc.HandlerErrorType.INTERNAL, retryable_override=False, ) elif err.status in [RPCStatusCode.ABORTED, RPCStatusCode.UNAVAILABLE]: handler_err = nexusrpc.HandlerError( err.message, - type=nexusrpc.HandlerErrorType.UNAVAILABLE, + error_type=nexusrpc.HandlerErrorType.UNAVAILABLE, ) elif err.status in [ RPCStatusCode.CANCELLED, @@ -552,35 +552,35 @@ def _exception_to_handler_error(err: BaseException) -> nexusrpc.HandlerError: # when the handler fails to auth with Temporal and should be considered # retryable. handler_err = nexusrpc.HandlerError( - err.message, type=nexusrpc.HandlerErrorType.INTERNAL + err.message, error_type=nexusrpc.HandlerErrorType.INTERNAL ) elif err.status == RPCStatusCode.NOT_FOUND: handler_err = nexusrpc.HandlerError( - err.message, type=nexusrpc.HandlerErrorType.NOT_FOUND + err.message, error_type=nexusrpc.HandlerErrorType.NOT_FOUND ) elif err.status == RPCStatusCode.RESOURCE_EXHAUSTED: handler_err = nexusrpc.HandlerError( err.message, - type=nexusrpc.HandlerErrorType.RESOURCE_EXHAUSTED, + error_type=nexusrpc.HandlerErrorType.RESOURCE_EXHAUSTED, ) elif err.status == RPCStatusCode.UNIMPLEMENTED: handler_err = nexusrpc.HandlerError( err.message, - type=nexusrpc.HandlerErrorType.NOT_IMPLEMENTED, + error_type=nexusrpc.HandlerErrorType.NOT_IMPLEMENTED, ) elif err.status == RPCStatusCode.DEADLINE_EXCEEDED: handler_err = nexusrpc.HandlerError( err.message, - type=nexusrpc.HandlerErrorType.UPSTREAM_TIMEOUT, + error_type=nexusrpc.HandlerErrorType.UPSTREAM_TIMEOUT, ) else: handler_err = nexusrpc.HandlerError( f"Unhandled RPC error status: {err.status}", - type=nexusrpc.HandlerErrorType.INTERNAL, + error_type=nexusrpc.HandlerErrorType.INTERNAL, ) else: handler_err = nexusrpc.HandlerError( - str(err), type=nexusrpc.HandlerErrorType.INTERNAL + str(err), error_type=nexusrpc.HandlerErrorType.INTERNAL ) handler_err.__cause__ = err return handler_err diff --git a/tests/nexus/test_handler.py b/tests/nexus/test_handler.py index 407e61caf..3fab5c6c7 100644 --- a/tests/nexus/test_handler.py +++ b/tests/nexus/test_handler.py @@ -179,7 +179,7 @@ async def handler_error_internal( ) -> Output: raise HandlerError( message="deliberate internal handler error", - type=HandlerErrorType.INTERNAL, + error_type=HandlerErrorType.INTERNAL, retryable_override=False, ) from RuntimeError("cause message") diff --git a/tests/nexus/test_workflow_caller_cancellation_types_when_cancel_handler_fails.py b/tests/nexus/test_workflow_caller_cancellation_types_when_cancel_handler_fails.py index 37aa986ed..764c2dc3f 100644 --- a/tests/nexus/test_workflow_caller_cancellation_types_when_cancel_handler_fails.py +++ b/tests/nexus/test_workflow_caller_cancellation_types_when_cancel_handler_fails.py @@ -106,7 +106,7 @@ async def cancel( test_context.cancel_handler_released.set_result(datetime.now(timezone.utc)) raise nexusrpc.HandlerError( "Deliberate non-retryable error in cancel handler", - type=nexusrpc.HandlerErrorType.BAD_REQUEST, + error_type=nexusrpc.HandlerErrorType.BAD_REQUEST, ) diff --git a/tests/nexus/test_workflow_caller_error_chains.py b/tests/nexus/test_workflow_caller_error_chains.py index 07a575d60..1964319bc 100644 --- a/tests/nexus/test_workflow_caller_error_chains.py +++ b/tests/nexus/test_workflow_caller_error_chains.py @@ -97,7 +97,7 @@ def action_in_nexus_operation(): # that of the ApplicationError. The server prepends 'handler error # (INTERNAL):' "message": "handler error (INTERNAL): application-error-message", - "type": nexusrpc.HandlerErrorType.INTERNAL, + "error_type": nexusrpc.HandlerErrorType.INTERNAL, "retryable": False, }, ), @@ -147,7 +147,7 @@ def action_in_nexus_operation(): except RuntimeError as err: raise nexusrpc.HandlerError( "handler-error-message", - type=nexusrpc.HandlerErrorType.NOT_FOUND, + error_type=nexusrpc.HandlerErrorType.NOT_FOUND, ) from err expected_exception_chain_in_workflow = [ @@ -165,7 +165,7 @@ def action_in_nexus_operation(): # was no need to synthesize a wrapping HandlerError The server prepends # 'handler error (INTERNAL):' "message": "handler error (NOT_FOUND): handler-error-message", - "type": nexusrpc.HandlerErrorType.NOT_FOUND, + "error_type": nexusrpc.HandlerErrorType.NOT_FOUND, # The following HandlerError types should be considered non-retryable: # BAD_REQUEST, UNAUTHENTICATED, UNAUTHORIZED, NOT_FOUND, and # RESOURCE_EXHAUSTED. In this test case, the handler does not set the @@ -200,7 +200,7 @@ def action_in_nexus_operation(): except CustomError as err: raise nexusrpc.HandlerError( "handler-error-message", - type=nexusrpc.HandlerErrorType.NOT_FOUND, + error_type=nexusrpc.HandlerErrorType.NOT_FOUND, ) from err expected_exception_chain_in_workflow = ( @@ -228,12 +228,12 @@ def action_in_nexus_operation(): try: raise nexusrpc.HandlerError( "handler-error-message-2", - type=nexusrpc.HandlerErrorType.UNAVAILABLE, + error_type=nexusrpc.HandlerErrorType.UNAVAILABLE, ) except nexusrpc.HandlerError as err: raise nexusrpc.HandlerError( "handler-error-message", - type=nexusrpc.HandlerErrorType.NOT_FOUND, + error_type=nexusrpc.HandlerErrorType.NOT_FOUND, ) from err expected_exception_chain_in_workflow = ( @@ -243,7 +243,7 @@ def action_in_nexus_operation(): nexusrpc.HandlerError, { "message": "handler-error-message-2", - "type": nexusrpc.HandlerErrorType.UNAVAILABLE, + "error_type": nexusrpc.HandlerErrorType.UNAVAILABLE, "retryable": True, }, ) diff --git a/tests/nexus/test_workflow_caller_errors.py b/tests/nexus/test_workflow_caller_errors.py index 4872de9ca..35932d8d9 100644 --- a/tests/nexus/test_workflow_caller_errors.py +++ b/tests/nexus/test_workflow_caller_errors.py @@ -84,7 +84,7 @@ def retried_due_to_resource_exhausted_handler_error( operation_invocation_counts[input.id] += 1 raise nexusrpc.HandlerError( "handler-error-message", - type=nexusrpc.HandlerErrorType.RESOURCE_EXHAUSTED, + error_type=nexusrpc.HandlerErrorType.RESOURCE_EXHAUSTED, ) @nexusrpc.handler.sync_operation @@ -94,7 +94,7 @@ def retried_due_to_internal_handler_error( operation_invocation_counts[input.id] += 1 raise nexusrpc.HandlerError( "handler-error-message", - type=nexusrpc.HandlerErrorType.INTERNAL, + error_type=nexusrpc.HandlerErrorType.INTERNAL, ) @nexusrpc.handler.sync_operation @@ -226,7 +226,7 @@ async def test_nexus_operation_fails_without_retry_as_handler_error( handler_error = err.__cause__.__cause__ assert isinstance(handler_error, nexusrpc.HandlerError) assert not handler_error.retryable - assert handler_error.type == handler_error_type + assert handler_error.error_type == handler_error_type assert handler_error_message in str(handler_error) else: pytest.fail("Unreachable") diff --git a/uv.lock b/uv.lock index e29cf27db..1c0d96d36 100644 --- a/uv.lock +++ b/uv.lock @@ -1773,14 +1773,10 @@ wheels = [ [[package]] name = "nexus-rpc" version = "1.3.0" -source = { registry = "https://pypi.org/simple" } +source = { git = "https://github.com/nexus-rpc/sdk-python.git?branch=preserve-raw-handler-error-type#03ec090da33f4674ac024797872bdf36470c0887" } dependencies = [ { name = "typing-extensions" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/2e/f2/d54f5c03d8f4672ccc0875787a385f53dcb61f98a8ae594b5620e85b9cb3/nexus_rpc-1.3.0.tar.gz", hash = "sha256:e56d3b57b60d707ce7a72f83f23f106b86eca1043aa658e44582ab5ff30ab9ad", size = 75650, upload-time = "2025-12-08T22:59:13.002Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/d6/74/0afd841de3199c148146c1d43b4bfb5605b2f1dc4c9a9087fe395091ea5a/nexus_rpc-1.3.0-py3-none-any.whl", hash = "sha256:aee0707b4861b22d8124ecb3f27d62dafbe8777dc50c66c91e49c006f971b92d", size = 28873, upload-time = "2025-12-08T22:59:12.024Z" }, -] [[package]] name = "nh3" @@ -3004,7 +3000,7 @@ dev = [ requires-dist = [ { name = "grpcio", marker = "extra == 'grpc'", specifier = ">=1.48.2,<2" }, { name = "mcp", marker = "extra == 'openai-agents'", specifier = ">=1.9.4,<2" }, - { name = "nexus-rpc", specifier = "==1.3.0" }, + { name = "nexus-rpc", git = "https://github.com/nexus-rpc/sdk-python.git?branch=preserve-raw-handler-error-type" }, { name = "openai-agents", marker = "extra == 'openai-agents'", specifier = ">=0.3,<0.7" }, { name = "opentelemetry-api", marker = "extra == 'opentelemetry'", specifier = ">=1.11.1,<2" }, { name = "opentelemetry-sdk", marker = "extra == 'opentelemetry'", specifier = ">=1.11.1,<2" },