diff --git a/nexus_multiple_args/README.md b/nexus_multiple_args/README.md new file mode 100644 index 00000000..8da0f146 --- /dev/null +++ b/nexus_multiple_args/README.md @@ -0,0 +1,33 @@ +This sample shows how to map a Nexus operation to a handler workflow that takes multiple input arguments. The Nexus operation receives a single input object but unpacks it into multiple arguments when starting the workflow. + +### Sample directory structure + +- [service.py](./service.py) - shared Nexus service definition +- [caller](./caller) - a caller workflow that executes Nexus operations, together with a worker and starter code +- [handler](./handler) - Nexus operation handlers, together with a workflow used by the Nexus operation, and a worker that polls for both workflow and Nexus tasks. + +### Instructions + +Start a Temporal server. (See the main samples repo [README](../README.md)). + +Run the following: + +``` +temporal operator namespace create --namespace nexus-multiple-args-handler-namespace +temporal operator namespace create --namespace nexus-multiple-args-caller-namespace + +temporal operator nexus endpoint create \ + --name nexus-multiple-args-nexus-endpoint \ + --target-namespace nexus-multiple-args-handler-namespace \ + --target-task-queue nexus-multiple-args-handler-task-queue +``` + +In one terminal, run the Temporal worker in the handler namespace: +``` +uv run nexus_multiple_args/handler/worker.py +``` + +In another terminal, run the Temporal worker in the caller namespace and start the caller workflow: +``` +uv run nexus_multiple_args/caller/app.py +``` \ No newline at end of file diff --git a/nexus_multiple_args/__init__.py b/nexus_multiple_args/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/nexus_multiple_args/caller/__init__.py b/nexus_multiple_args/caller/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/nexus_multiple_args/caller/app.py b/nexus_multiple_args/caller/app.py new file mode 100644 index 00000000..88aadbf9 --- /dev/null +++ b/nexus_multiple_args/caller/app.py @@ -0,0 +1,53 @@ +import asyncio +import uuid +from typing import Optional + +from temporalio.client import Client +from temporalio.worker import Worker + +from nexus_multiple_args.caller.workflows import CallerWorkflow + +NAMESPACE = "nexus-multiple-args-caller-namespace" +TASK_QUEUE = "nexus-multiple-args-caller-task-queue" + + +async def execute_caller_workflow( + client: Optional[Client] = None, +) -> tuple[str, str]: + client = client or await Client.connect( + "localhost:7233", + namespace=NAMESPACE, + ) + + async with Worker( + client, + task_queue=TASK_QUEUE, + workflows=[CallerWorkflow], + ): + # Execute workflow with English language + result1 = await client.execute_workflow( + CallerWorkflow.run, + args=["Nexus", "en"], + id=str(uuid.uuid4()), + task_queue=TASK_QUEUE, + ) + + # Execute workflow with Spanish language + result2 = await client.execute_workflow( + CallerWorkflow.run, + args=["Nexus", "es"], + id=str(uuid.uuid4()), + task_queue=TASK_QUEUE, + ) + + return result1, result2 + + +if __name__ == "__main__": + loop = asyncio.new_event_loop() + try: + results = loop.run_until_complete(execute_caller_workflow()) + for result in results: + print(result) + except KeyboardInterrupt: + loop.run_until_complete(loop.shutdown_asyncgens()) diff --git a/nexus_multiple_args/caller/workflows.py b/nexus_multiple_args/caller/workflows.py new file mode 100644 index 00000000..940a032f --- /dev/null +++ b/nexus_multiple_args/caller/workflows.py @@ -0,0 +1,30 @@ +from temporalio import workflow + +with workflow.unsafe.imports_passed_through(): + from nexus_multiple_args.service import HelloInput, MyNexusService + +NEXUS_ENDPOINT = "nexus-multiple-args-nexus-endpoint" + + +# This is a workflow that calls a nexus operation with multiple arguments. +@workflow.defn +class CallerWorkflow: + # An __init__ method is always optional on a workflow class. Here we use it to set the + # nexus client, but that could alternatively be done in the run method. + def __init__(self): + self.nexus_client = workflow.create_nexus_client( + service=MyNexusService, + endpoint=NEXUS_ENDPOINT, + ) + + # The workflow run method demonstrates calling a nexus operation with multiple arguments + # packed into an input object. + @workflow.run + async def run(self, name: str, language: str) -> str: + # Start the nexus operation and wait for the result in one go, using execute_operation. + # The multiple arguments (name and language) are packed into a HelloInput object. + result = await self.nexus_client.execute_operation( + MyNexusService.hello, + HelloInput(name=name, language=language), + ) + return result.message diff --git a/nexus_multiple_args/handler/__init__.py b/nexus_multiple_args/handler/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/nexus_multiple_args/handler/service_handler.py b/nexus_multiple_args/handler/service_handler.py new file mode 100644 index 00000000..c2ddfb92 --- /dev/null +++ b/nexus_multiple_args/handler/service_handler.py @@ -0,0 +1,39 @@ +from __future__ import annotations + +import uuid + +import nexusrpc +from temporalio import nexus + +from nexus_multiple_args.handler.workflows import HelloHandlerWorkflow +from nexus_multiple_args.service import HelloInput, HelloOutput, MyNexusService + + +# @@@SNIPSTART samples-python-nexus-handler-multiargs +@nexusrpc.handler.service_handler(service=MyNexusService) +class MyNexusServiceHandler: + """ + Service handler that demonstrates multiple argument handling in Nexus operations. + """ + + # This is a nexus operation that is backed by a Temporal workflow. + # The key feature here is that it demonstrates how to map a single input object + # (HelloInput) to a workflow that takes multiple individual arguments. + @nexus.workflow_run_operation + async def hello( + self, ctx: nexus.WorkflowRunOperationContext, input: HelloInput + ) -> nexus.WorkflowHandle[HelloOutput]: + """ + Start a workflow with multiple arguments unpacked from the input object. + """ + return await ctx.start_workflow( + HelloHandlerWorkflow.run, + args=[ + input.name, # First argument: name + input.language, # Second argument: language + ], + id=str(uuid.uuid4()), + ) + + +# @@@SNIPEND diff --git a/nexus_multiple_args/handler/worker.py b/nexus_multiple_args/handler/worker.py new file mode 100644 index 00000000..d12a7ee1 --- /dev/null +++ b/nexus_multiple_args/handler/worker.py @@ -0,0 +1,45 @@ +import asyncio +import logging +from typing import Optional + +from temporalio.client import Client +from temporalio.worker import Worker + +from nexus_multiple_args.handler.service_handler import MyNexusServiceHandler +from nexus_multiple_args.handler.workflows import HelloHandlerWorkflow + +interrupt_event = asyncio.Event() + +NAMESPACE = "nexus-multiple-args-handler-namespace" +TASK_QUEUE = "nexus-multiple-args-handler-task-queue" + + +async def main(client: Optional[Client] = None): + logging.basicConfig(level=logging.INFO) + + client = client or await Client.connect( + "localhost:7233", + namespace=NAMESPACE, + ) + + # Start the worker, passing the Nexus service handler instance, in addition to the + # workflow classes that are started by your nexus operations, and any activities + # needed. This Worker will poll for both workflow tasks and Nexus tasks. + async with Worker( + client, + task_queue=TASK_QUEUE, + workflows=[HelloHandlerWorkflow], + nexus_service_handlers=[MyNexusServiceHandler()], + ): + logging.info("Worker started, ctrl+c to exit") + await interrupt_event.wait() + logging.info("Shutting down") + + +if __name__ == "__main__": + loop = asyncio.new_event_loop() + try: + loop.run_until_complete(main()) + except KeyboardInterrupt: + interrupt_event.set() + loop.run_until_complete(loop.shutdown_asyncgens()) diff --git a/nexus_multiple_args/handler/workflows.py b/nexus_multiple_args/handler/workflows.py new file mode 100644 index 00000000..15bd0824 --- /dev/null +++ b/nexus_multiple_args/handler/workflows.py @@ -0,0 +1,32 @@ +from temporalio import workflow + +with workflow.unsafe.imports_passed_through(): + from nexus_multiple_args.service import HelloOutput + + +# This is the workflow that is started by the `hello` nexus operation. +# It demonstrates handling multiple arguments passed from the Nexus service. +@workflow.defn +class HelloHandlerWorkflow: + @workflow.run + async def run(self, name: str, language: str) -> HelloOutput: + """ + Handle the hello workflow with multiple arguments. + + This method receives the individual arguments (name and language) + that were unpacked from the HelloInput in the service handler. + """ + if language == "en": + message = f"Hello {name} πŸ‘‹" + elif language == "fr": + message = f"Bonjour {name} πŸ‘‹" + elif language == "de": + message = f"Hallo {name} πŸ‘‹" + elif language == "es": + message = f"Β‘Hola! {name} πŸ‘‹" + elif language == "tr": + message = f"Merhaba {name} πŸ‘‹" + else: + raise ValueError(f"Unsupported language: {language}") + + return HelloOutput(message=message) diff --git a/nexus_multiple_args/service.py b/nexus_multiple_args/service.py new file mode 100644 index 00000000..ccae11fd --- /dev/null +++ b/nexus_multiple_args/service.py @@ -0,0 +1,34 @@ +""" +This is a Nexus service definition that demonstrates multiple argument handling. + +A service definition defines a Nexus service as a named collection of operations, each +with input and output types. It does not implement operation handling: see the service +handler and operation handlers in nexus_multiple_args.handler.service_handler for that. + +A Nexus service definition is used by Nexus callers (e.g. a Temporal workflow) to create +type-safe clients, and it is used by Nexus handlers to validate that they implement +correctly-named operation handlers with the correct input and output types. + +The service defined in this file features one operation: hello, where hello +demonstrates handling multiple arguments through a single input object. +""" + +from dataclasses import dataclass + +import nexusrpc + + +@dataclass +class HelloInput: + name: str + language: str + + +@dataclass +class HelloOutput: + message: str + + +@nexusrpc.service +class MyNexusService: + hello: nexusrpc.Operation[HelloInput, HelloOutput] diff --git a/tests/hello_nexus/hello_nexus_test.py b/tests/hello_nexus/hello_nexus_test.py index f9a8807d..fecfe17c 100644 --- a/tests/hello_nexus/hello_nexus_test.py +++ b/tests/hello_nexus/hello_nexus_test.py @@ -8,7 +8,7 @@ import hello_nexus.caller.app import hello_nexus.caller.workflows import hello_nexus.handler.worker -from tests.hello_nexus.helpers import create_nexus_endpoint, delete_nexus_endpoint +from tests.helpers.nexus import create_nexus_endpoint, delete_nexus_endpoint async def test_nexus_service_basic(client: Client, env: WorkflowEnvironment): diff --git a/tests/helpers/__init__.py b/tests/helpers/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/hello_nexus/helpers.py b/tests/helpers/nexus.py similarity index 100% rename from tests/hello_nexus/helpers.py rename to tests/helpers/nexus.py diff --git a/tests/nexus_multiple_args/__init__.py b/tests/nexus_multiple_args/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/nexus_multiple_args/nexus_multiple_args_test.py b/tests/nexus_multiple_args/nexus_multiple_args_test.py new file mode 100644 index 00000000..682b8f20 --- /dev/null +++ b/tests/nexus_multiple_args/nexus_multiple_args_test.py @@ -0,0 +1,50 @@ +import asyncio +import sys + +import pytest +from temporalio.client import Client +from temporalio.testing import WorkflowEnvironment + +import nexus_multiple_args.caller.app +import nexus_multiple_args.caller.workflows +import nexus_multiple_args.handler.worker +from tests.helpers.nexus import create_nexus_endpoint, delete_nexus_endpoint + + +async def test_nexus_multiple_args(client: Client, env: WorkflowEnvironment): + if env.supports_time_skipping: + pytest.skip("Nexus tests don't work under the Java test server") + + if sys.version_info[:2] < (3, 10): + pytest.skip("Sample is written for Python >= 3.10") + + create_response = await create_nexus_endpoint( + name=nexus_multiple_args.caller.workflows.NEXUS_ENDPOINT, + task_queue=nexus_multiple_args.handler.worker.TASK_QUEUE, + client=client, + ) + try: + handler_worker_task = asyncio.create_task( + nexus_multiple_args.handler.worker.main( + client, + ) + ) + await asyncio.sleep(1) + results = await nexus_multiple_args.caller.app.execute_caller_workflow( + client, + ) + nexus_multiple_args.handler.worker.interrupt_event.set() + await handler_worker_task + nexus_multiple_args.handler.worker.interrupt_event.clear() + + # Verify the expected output messages + assert results == ( + "Hello Nexus πŸ‘‹", + "Β‘Hola! Nexus πŸ‘‹", + ) + finally: + await delete_nexus_endpoint( + id=create_response.endpoint.id, + version=create_response.endpoint.version, + client=client, + )