Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion hello_nexus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ call the operations from a workflow.

Start a Temporal server. (See the main samples repo [README](../README.md)).

Run the following:
Run the following to create the caller and handler namespaces, and the Nexus endpoint:

```
temporal operator namespace create --namespace hello-nexus-basic-handler-namespace
Expand Down
3 changes: 2 additions & 1 deletion hello_nexus/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
type-safe clients, and it is used by Nexus handlers to validate that they implement
correctly-named operation handlers with the correct input and output types.

The service defined in this file features two operations: echo and hello.
The service defined in this file exposes two operations: my_sync_operation and
my_workflow_run_operation.
"""

from dataclasses import dataclass
Expand Down
13 changes: 7 additions & 6 deletions message_passing/introduction/starter.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
GetLanguagesInput,
GreetingWorkflow,
Language,
SetLanguageInput,
)


Expand All @@ -28,20 +29,20 @@ async def main(client: Optional[Client] = None):

# 👉 Execute an Update
previous_language = await wf_handle.execute_update(
GreetingWorkflow.set_language, Language.CHINESE
GreetingWorkflow.set_language, SetLanguageInput(language=Language.CHINESE)
)
current_language = await wf_handle.query(GreetingWorkflow.get_language)
print(f"language changed: {previous_language.name} -> {current_language.name}")
assert await wf_handle.query(GreetingWorkflow.get_language) == Language.CHINESE
print(f"language changed: {previous_language.name} -> {Language.CHINESE.name}")

# 👉 Start an Update and then wait for it to complete
update_handle = await wf_handle.start_update(
GreetingWorkflow.set_language_using_activity,
Language.ARABIC,
SetLanguageInput(language=Language.ARABIC),
wait_for_stage=WorkflowUpdateStage.ACCEPTED,
)
previous_language = await update_handle.result()
current_language = await wf_handle.query(GreetingWorkflow.get_language)
print(f"language changed: {previous_language.name} -> {current_language.name}")
assert await wf_handle.query(GreetingWorkflow.get_language) == Language.ARABIC
print(f"language changed: {previous_language.name} -> {Language.ARABIC.name}")

# 👉 Send a Signal
await wf_handle.signal(GreetingWorkflow.approve, ApproveInput(name=""))
Expand Down
27 changes: 16 additions & 11 deletions message_passing/introduction/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ class GetLanguagesInput:
include_unsupported: bool


@dataclass
class SetLanguageInput:
language: Language


@dataclass
class ApproveInput:
name: str
Expand Down Expand Up @@ -74,29 +79,29 @@ def approve(self, input: ApproveInput) -> None:
self.approver_name = input.name

@workflow.update
def set_language(self, language: Language) -> Language:
def set_language(self, input: SetLanguageInput) -> Language:
# 👉 An Update handler can mutate the Workflow state and return a value.
previous_language, self.language = self.language, language
previous_language, self.language = self.language, input.language
return previous_language

@set_language.validator
def validate_language(self, language: Language) -> None:
if language not in self.greetings:
def validate_language(self, input: SetLanguageInput) -> None:
if input.language not in self.greetings:
# 👉 In an Update validator you raise any exception to reject the Update.
raise ValueError(f"{language.name} is not supported")
raise ValueError(f"{input.language.name} is not supported")

@workflow.update
async def set_language_using_activity(self, language: Language) -> Language:
async def set_language_using_activity(self, input: SetLanguageInput) -> Language:
# 👉 This update handler is async, so it can execute an activity.
if language not in self.greetings:
if input.language not in self.greetings:
# 👉 We use a lock so that, if this handler is executed multiple
# times, each execution can schedule the activity only when the
# previously scheduled activity has completed. This ensures that
# multiple calls to set_language are processed in order.
async with self.lock:
greeting = await workflow.execute_activity(
call_greeting_service,
language,
input.language,
start_to_close_timeout=timedelta(seconds=10),
)
# 👉 The requested language might not be supported by the remote
Expand All @@ -108,10 +113,10 @@ async def set_language_using_activity(self, language: Language) -> Language:
# this purpose.)
if greeting is None:
raise ApplicationError(
f"Greeting service does not support {language.name}"
f"Greeting service does not support {input.language.name}"
)
self.greetings[language] = greeting
previous_language, self.language = self.language, language
self.greetings[input.language] = greeting
previous_language, self.language = self.language, input.language
return previous_language

@workflow.query
Expand Down
39 changes: 39 additions & 0 deletions nexus_sync_operations/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
This sample shows how to create a Nexus service that is backed by a long-running workflow and
exposes operations that execute updates and queries against that workflow. The long-running
workflow, and the updates/queries are private implementation detail of the nexus service: the caller
does not know how the operations are implemented.

### Sample directory structure

- [service.py](./service.py) - shared Nexus service definition
- [caller](./caller) - a caller workflow that executes Nexus operations, together with a worker and starter code
- [handler](./handler) - Nexus operation handlers, together with a workflow used by one of the Nexus operations, and a worker that polls for both workflow, activity, and Nexus tasks.


### Instructions

Start a Temporal server. (See the main samples repo [README](../README.md)).

Run the following to create the caller and handler namespaces, and the Nexus endpoint:

```
temporal operator namespace create --namespace nexus-sync-operations-handler-namespace
temporal operator namespace create --namespace nexus-sync-operations-caller-namespace

temporal operator nexus endpoint create \
--name nexus-sync-operations-nexus-endpoint \
--target-namespace nexus-sync-operations-handler-namespace \
--target-task-queue nexus-sync-operations-handler-task-queue \
--description-file nexus_sync_operations/endpoint_description.md
```

In one terminal, run the Temporal worker in the handler namespace:
```
uv run nexus_sync_operations/handler/worker.py
```

In another terminal, run the Temporal worker in the caller namespace and start the caller
workflow:
```
uv run nexus_sync_operations/caller/app.py
```
Empty file.
Empty file.
41 changes: 41 additions & 0 deletions nexus_sync_operations/caller/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import asyncio
import uuid
from typing import Optional

from temporalio.client import Client
from temporalio.worker import Worker

from nexus_sync_operations.caller.workflows import CallerWorkflow

NAMESPACE = "nexus-sync-operations-caller-namespace"
TASK_QUEUE = "nexus-sync-operations-caller-task-queue"


async def execute_caller_workflow(
client: Optional[Client] = None,
) -> None:
client = client or await Client.connect(
"localhost:7233",
namespace=NAMESPACE,
)

async with Worker(
client,
task_queue=TASK_QUEUE,
workflows=[CallerWorkflow],
):
log = await client.execute_workflow(
CallerWorkflow.run,
id=str(uuid.uuid4()),
task_queue=TASK_QUEUE,
)
for line in log:
print(line)


if __name__ == "__main__":
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(execute_caller_workflow())
except KeyboardInterrupt:
loop.run_until_complete(loop.shutdown_asyncgens())
46 changes: 46 additions & 0 deletions nexus_sync_operations/caller/workflows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
"""
This is a workflow that calls nexus operations. The caller does not have information about how these
operations are implemented by the nexus service.
"""

from temporalio import workflow

from message_passing.introduction import Language
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had traditionally had all samples self-contained in all of our SDKs (i.e. they didn't reference each other), but maybe that's no longer what we want?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, let's allow this one to reference. IMO it is educational / instructive: it's showing users that nexus allows a "service" to be transplanted without changes and given a nexus interface callable from another namespace.

from message_passing.introduction.workflows import GetLanguagesInput, SetLanguageInput

with workflow.unsafe.imports_passed_through():
from nexus_sync_operations.service import GreetingService

NEXUS_ENDPOINT = "nexus-sync-operations-nexus-endpoint"


@workflow.defn
class CallerWorkflow:
@workflow.run
async def run(self) -> list[str]:
log = []
nexus_client = workflow.create_nexus_client(
service=GreetingService,
endpoint=NEXUS_ENDPOINT,
)

# Get supported languages
supported_languages = await nexus_client.execute_operation(
GreetingService.get_languages, GetLanguagesInput(include_unsupported=False)
)
log.append(f"supported languages: {supported_languages}")

# Set language
previous_language = await nexus_client.execute_operation(
GreetingService.set_language,
SetLanguageInput(language=Language.ARABIC),
)
assert (
await nexus_client.execute_operation(GreetingService.get_language, None)
== Language.ARABIC
)
log.append(
f"language changed: {previous_language.name} -> {Language.ARABIC.name}"
)

return log
4 changes: 4 additions & 0 deletions nexus_sync_operations/endpoint_description.md
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was under the impression we are only doing this "description" markdown on the "hello" Nexus sample? Do we need to start doing this for all samples?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I think all nexus endpoints in samples should have a description. The feature exists and we want to encourage users to use it, and it is nicely rendered in the UI.

Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
## Service: [GreetingService](https://github.com/temporalio/samples-python/blob/main/nexus_sync_operations/service.py)
- operation: `get_languages`
- operation: `get_language`
- operation: `set_language`
Empty file.
83 changes: 83 additions & 0 deletions nexus_sync_operations/handler/service_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
"""
This file demonstrates how to implement a Nexus service that is backed by a long-running workflow
and exposes operations that perform updates and queries against that workflow.
"""

from __future__ import annotations

import nexusrpc
from temporalio import nexus
from temporalio.client import Client, WorkflowHandle
from temporalio.common import WorkflowIDConflictPolicy

from message_passing.introduction import Language
from message_passing.introduction.workflows import (
GetLanguagesInput,
GreetingWorkflow,
SetLanguageInput,
)
from nexus_sync_operations.service import GreetingService


@nexusrpc.handler.service_handler(service=GreetingService)
class GreetingServiceHandler:
def __init__(self, workflow_id: str):
self.workflow_id = workflow_id

@classmethod
async def create(
cls, workflow_id: str, client: Client, task_queue: str
) -> GreetingServiceHandler:
# Start the long-running "entity" workflow, if it is not already running.
await client.start_workflow(
GreetingWorkflow.run,
id=workflow_id,
task_queue=task_queue,
id_conflict_policy=WorkflowIDConflictPolicy.USE_EXISTING,
)
return cls(workflow_id)

@property
def greeting_workflow_handle(self) -> WorkflowHandle[GreetingWorkflow, str]:
# In nexus operation handler code, nexus.client() is always available, returning a client
# connected to the handler namespace (it's the same client instance that your nexus worker
# is using to poll the server for nexus tasks). This client can be used to interact with the
# handler namespace, for example to send signals, queries, or updates. Remember however,
# that a sync_operation handler must return quickly (no more than a few seconds). To do
# long-running work in a nexus operation handler, use
# temporalio.nexus.workflow_run_operation (see the hello_nexus sample).
return nexus.client().get_workflow_handle_for(
GreetingWorkflow.run, self.workflow_id
)

# 👉 This is a handler for a nexus operation whose internal implementation involves executing a
# query against a long-running workflow that is private to the nexus service.
@nexusrpc.handler.sync_operation
async def get_languages(
self, ctx: nexusrpc.handler.StartOperationContext, input: GetLanguagesInput
) -> list[Language]:
return await self.greeting_workflow_handle.query(
GreetingWorkflow.get_languages, input
)

# 👉 This is a handler for a nexus operation whose internal implementation involves executing a
# query against a long-running workflow that is private to the nexus service.
@nexusrpc.handler.sync_operation
async def get_language(
self, ctx: nexusrpc.handler.StartOperationContext, input: None
) -> Language:
return await self.greeting_workflow_handle.query(GreetingWorkflow.get_language)

# 👉 This is a handler for a nexus operation whose internal implementation involves executing an
# update against a long-running workflow that is private to the nexus service. Although updates
# can run for an arbitrarily long time, when exposing an update via a nexus sync operation the
# update should execute quickly (sync operations must complete in under 10s).
@nexusrpc.handler.sync_operation
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory an update can be long running and we should demonstrate async operation with it, but I understand if we need some kind of server side support for update callbacks or something for that. May be worth noting though, up to you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a note explaining that updates must be fast when used in a sync operation. Yes, async nexus operations backed by updates do not exist yet and would require server-side nexus callback support in the server-side update state machine.

async def set_language(
self,
ctx: nexusrpc.handler.StartOperationContext,
input: SetLanguageInput,
) -> Language:
return await self.greeting_workflow_handle.execute_update(
GreetingWorkflow.set_language_using_activity, input
)
50 changes: 50 additions & 0 deletions nexus_sync_operations/handler/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import asyncio
import logging
from typing import Optional

from temporalio.client import Client
from temporalio.worker import Worker

from message_passing.introduction.activities import call_greeting_service
from message_passing.introduction.workflows import GreetingWorkflow
from nexus_sync_operations.handler.service_handler import GreetingServiceHandler

interrupt_event = asyncio.Event()

NAMESPACE = "nexus-sync-operations-handler-namespace"
TASK_QUEUE = "nexus-sync-operations-handler-task-queue"


async def main(client: Optional[Client] = None):
logging.basicConfig(level=logging.INFO)

client = client or await Client.connect(
"localhost:7233",
namespace=NAMESPACE,
)

# Create the nexus service handler instance, starting the long-running entity workflow that
# backs the Nexus service
greeting_service_handler = await GreetingServiceHandler.create(
"nexus-sync-operations-greeting-workflow", client, TASK_QUEUE
)

async with Worker(
client,
task_queue=TASK_QUEUE,
workflows=[GreetingWorkflow],
activities=[call_greeting_service],
nexus_service_handlers=[greeting_service_handler],
):
logging.info("Worker started, ctrl+c to exit")
await interrupt_event.wait()
logging.info("Shutting down")


if __name__ == "__main__":
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
interrupt_event.set()
loop.run_until_complete(loop.shutdown_asyncgens())
Loading
Loading