Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions nexus_multiple_args/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
This sample shows how to map a Nexus operation to a handler workflow that takes multiple input arguments. The Nexus operation receives a single input object but unpacks it into multiple arguments when starting the workflow.

### Sample directory structure

- [service.py](./service.py) - shared Nexus service definition
- [caller](./caller) - a caller workflow that executes Nexus operations, together with a worker and starter code
- [handler](./handler) - Nexus operation handlers, together with a workflow used by the Nexus operation, and a worker that polls for both workflow and Nexus tasks.

### Instructions

Start a Temporal server. (See the main samples repo [README](../README.md)).

Run the following:

```
temporal operator namespace create --namespace nexus-multiple-args-handler-namespace
temporal operator namespace create --namespace nexus-multiple-args-caller-namespace

temporal operator nexus endpoint create \
--name nexus-multiple-args-nexus-endpoint \
--target-namespace nexus-multiple-args-handler-namespace \
--target-task-queue nexus-multiple-args-handler-task-queue
```

In one terminal, run the Temporal worker in the handler namespace:
```
uv run nexus_multiple_args/handler/worker.py
```

In another terminal, run the Temporal worker in the caller namespace and start the caller workflow:
```
uv run nexus_multiple_args/caller/app.py
```
Empty file.
Empty file.
53 changes: 53 additions & 0 deletions nexus_multiple_args/caller/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import asyncio
import uuid
from typing import Optional

from temporalio.client import Client
from temporalio.worker import Worker

from nexus_multiple_args.caller.workflows import CallerWorkflow

NAMESPACE = "nexus-multiple-args-caller-namespace"
TASK_QUEUE = "nexus-multiple-args-caller-task-queue"


async def execute_caller_workflow(
client: Optional[Client] = None,
) -> tuple[str, str]:
client = client or await Client.connect(
"localhost:7233",
namespace=NAMESPACE,
)

async with Worker(
client,
task_queue=TASK_QUEUE,
workflows=[CallerWorkflow],
):
# Execute workflow with English language
result1 = await client.execute_workflow(
CallerWorkflow.run,
args=["Nexus", "en"],
id=str(uuid.uuid4()),
task_queue=TASK_QUEUE,
)

# Execute workflow with Spanish language
result2 = await client.execute_workflow(
CallerWorkflow.run,
args=["Nexus", "es"],
id=str(uuid.uuid4()),
task_queue=TASK_QUEUE,
)

return result1, result2


if __name__ == "__main__":
loop = asyncio.new_event_loop()
try:
results = loop.run_until_complete(execute_caller_workflow())
for result in results:
print(result)
except KeyboardInterrupt:
loop.run_until_complete(loop.shutdown_asyncgens())
30 changes: 30 additions & 0 deletions nexus_multiple_args/caller/workflows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from temporalio import workflow

with workflow.unsafe.imports_passed_through():
from nexus_multiple_args.service import HelloInput, MyNexusService

NEXUS_ENDPOINT = "nexus-multiple-args-nexus-endpoint"


# This is a workflow that calls a nexus operation with multiple arguments.
@workflow.defn
class CallerWorkflow:
# An __init__ method is always optional on a workflow class. Here we use it to set the
# nexus client, but that could alternatively be done in the run method.
def __init__(self):
self.nexus_client = workflow.create_nexus_client(
service=MyNexusService,
endpoint=NEXUS_ENDPOINT,
)

# The workflow run method demonstrates calling a nexus operation with multiple arguments
# packed into an input object.
@workflow.run
async def run(self, name: str, language: str) -> str:
# Start the nexus operation and wait for the result in one go, using execute_operation.
# The multiple arguments (name and language) are packed into a HelloInput object.
result = await self.nexus_client.execute_operation(
MyNexusService.hello,
HelloInput(name=name, language=language),
)
return result.message
Empty file.
39 changes: 39 additions & 0 deletions nexus_multiple_args/handler/service_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from __future__ import annotations

import uuid

import nexusrpc
from temporalio import nexus

from nexus_multiple_args.handler.workflows import HelloHandlerWorkflow
from nexus_multiple_args.service import HelloInput, HelloOutput, MyNexusService


# @@@SNIPSTART samples-python-nexus-handler-multiargs
@nexusrpc.handler.service_handler(service=MyNexusService)
class MyNexusServiceHandler:
"""
Service handler that demonstrates multiple argument handling in Nexus operations.
"""

# This is a nexus operation that is backed by a Temporal workflow.
# The key feature here is that it demonstrates how to map a single input object
# (HelloInput) to a workflow that takes multiple individual arguments.
@nexus.workflow_run_operation
async def hello(
self, ctx: nexus.WorkflowRunOperationContext, input: HelloInput
) -> nexus.WorkflowHandle[HelloOutput]:
"""
Start a workflow with multiple arguments unpacked from the input object.
"""
return await ctx.start_workflow(
HelloHandlerWorkflow.run,
args=[
input.name, # First argument: name
input.language, # Second argument: language
],
id=str(uuid.uuid4()),
)


# @@@SNIPEND
45 changes: 45 additions & 0 deletions nexus_multiple_args/handler/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import asyncio
import logging
from typing import Optional

from temporalio.client import Client
from temporalio.worker import Worker

from nexus_multiple_args.handler.service_handler import MyNexusServiceHandler
from nexus_multiple_args.handler.workflows import HelloHandlerWorkflow

interrupt_event = asyncio.Event()

NAMESPACE = "nexus-multiple-args-handler-namespace"
TASK_QUEUE = "nexus-multiple-args-handler-task-queue"


async def main(client: Optional[Client] = None):
logging.basicConfig(level=logging.INFO)

client = client or await Client.connect(
"localhost:7233",
namespace=NAMESPACE,
)

# Start the worker, passing the Nexus service handler instance, in addition to the
# workflow classes that are started by your nexus operations, and any activities
# needed. This Worker will poll for both workflow tasks and Nexus tasks.
async with Worker(
client,
task_queue=TASK_QUEUE,
workflows=[HelloHandlerWorkflow],
nexus_service_handlers=[MyNexusServiceHandler()],
):
logging.info("Worker started, ctrl+c to exit")
await interrupt_event.wait()
logging.info("Shutting down")


if __name__ == "__main__":
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
interrupt_event.set()
loop.run_until_complete(loop.shutdown_asyncgens())
32 changes: 32 additions & 0 deletions nexus_multiple_args/handler/workflows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from temporalio import workflow

with workflow.unsafe.imports_passed_through():
from nexus_multiple_args.service import HelloOutput


# This is the workflow that is started by the `hello` nexus operation.
# It demonstrates handling multiple arguments passed from the Nexus service.
@workflow.defn
class HelloHandlerWorkflow:
@workflow.run
async def run(self, name: str, language: str) -> HelloOutput:
"""
Handle the hello workflow with multiple arguments.

This method receives the individual arguments (name and language)
that were unpacked from the HelloInput in the service handler.
"""
if language == "en":
message = f"Hello {name} 👋"
elif language == "fr":
message = f"Bonjour {name} 👋"
elif language == "de":
message = f"Hallo {name} 👋"
elif language == "es":
message = f"¡Hola! {name} 👋"
elif language == "tr":
message = f"Merhaba {name} 👋"
else:
raise ValueError(f"Unsupported language: {language}")

return HelloOutput(message=message)
34 changes: 34 additions & 0 deletions nexus_multiple_args/service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""
This is a Nexus service definition that demonstrates multiple argument handling.

A service definition defines a Nexus service as a named collection of operations, each
with input and output types. It does not implement operation handling: see the service
handler and operation handlers in nexus_multiple_args.handler.service_handler for that.

A Nexus service definition is used by Nexus callers (e.g. a Temporal workflow) to create
type-safe clients, and it is used by Nexus handlers to validate that they implement
correctly-named operation handlers with the correct input and output types.

The service defined in this file features one operation: hello, where hello
demonstrates handling multiple arguments through a single input object.
"""

from dataclasses import dataclass

import nexusrpc


@dataclass
class HelloInput:
name: str
language: str


@dataclass
class HelloOutput:
message: str


@nexusrpc.service
class MyNexusService:
hello: nexusrpc.Operation[HelloInput, HelloOutput]
2 changes: 1 addition & 1 deletion tests/hello_nexus/hello_nexus_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import hello_nexus.caller.app
import hello_nexus.caller.workflows
import hello_nexus.handler.worker
from tests.hello_nexus.helpers import create_nexus_endpoint, delete_nexus_endpoint
from tests.helpers.nexus import create_nexus_endpoint, delete_nexus_endpoint


async def test_nexus_service_basic(client: Client, env: WorkflowEnvironment):
Expand Down
Empty file added tests/helpers/__init__.py
Empty file.
File renamed without changes.
Empty file.
50 changes: 50 additions & 0 deletions tests/nexus_multiple_args/nexus_multiple_args_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import asyncio
import sys

import pytest
from temporalio.client import Client
from temporalio.testing import WorkflowEnvironment

import nexus_multiple_args.caller.app
import nexus_multiple_args.caller.workflows
import nexus_multiple_args.handler.worker
from tests.helpers.nexus import create_nexus_endpoint, delete_nexus_endpoint


async def test_nexus_multiple_args(client: Client, env: WorkflowEnvironment):
if env.supports_time_skipping:
pytest.skip("Nexus tests don't work under the Java test server")

if sys.version_info[:2] < (3, 10):
pytest.skip("Sample is written for Python >= 3.10")

create_response = await create_nexus_endpoint(
name=nexus_multiple_args.caller.workflows.NEXUS_ENDPOINT,
task_queue=nexus_multiple_args.handler.worker.TASK_QUEUE,
client=client,
)
try:
handler_worker_task = asyncio.create_task(
nexus_multiple_args.handler.worker.main(
client,
)
)
await asyncio.sleep(1)
results = await nexus_multiple_args.caller.app.execute_caller_workflow(
client,
)
nexus_multiple_args.handler.worker.interrupt_event.set()
await handler_worker_task
nexus_multiple_args.handler.worker.interrupt_event.clear()

# Verify the expected output messages
assert results == (
"Hello Nexus 👋",
"¡Hola! Nexus 👋",
)
finally:
await delete_nexus_endpoint(
id=create_response.endpoint.id,
version=create_response.endpoint.version,
client=client,
)
Loading