diff --git a/README.md b/README.md index 43d2cba6..d9ae57a3 100644 --- a/README.md +++ b/README.md @@ -65,6 +65,7 @@ Some examples require extra dependencies. See each sample's directory for specif * [custom_decorator](custom_decorator) - Custom decorator to auto-heartbeat a long-running activity. * [custom_metric](custom_metric) - Custom metric to record the workflow type in the activity schedule to start latency. * [dsl](dsl) - DSL workflow that executes steps defined in a YAML file. +* [eager_wf_start](eager_wf_start) - Run a workflow using Eager Workflow Start * [encryption](encryption) - Apply end-to-end encryption for all input/output. * [env_config](env_config) - Load client configuration from TOML files with programmatic overrides. * [gevent_async](gevent_async) - Combine gevent and Temporal. diff --git a/eager_wf_start/README.md b/eager_wf_start/README.md new file mode 100644 index 00000000..834c6a55 --- /dev/null +++ b/eager_wf_start/README.md @@ -0,0 +1,16 @@ +# Eager Workflow Start + +This sample shows how to create a workflow that uses Eager Workflow Start. + +The target use case is workflows whose first task needs to execute quickly (ex: payment verification in an online checkout workflow). That work typically can't be done directly in the workflow (ex: using web APIs, databases, etc.), and also needs to avoid the overhead of dispatching another task. Using a Local Activity suffices both needs, which this sample demonstrates. + +You can read more about Eager Workflow Start in our: + +- [Eager Workflow Start blog](https://temporal.io/blog/improving-latency-with-eager-workflow-start) +- [Worker Performance Docs](https://docs.temporal.io/develop/worker-performance#eager-workflow-start) + +To run, first see the main [README.md](../README.md) for prerequisites. + +Then run the sample via: + + uv run eager_wf_start/run.py \ No newline at end of file diff --git a/eager_wf_start/__init__.py b/eager_wf_start/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/eager_wf_start/activities.py b/eager_wf_start/activities.py new file mode 100644 index 00000000..6533140f --- /dev/null +++ b/eager_wf_start/activities.py @@ -0,0 +1,6 @@ +from temporalio import activity + + +@activity.defn() +async def greeting(name: str) -> str: + return f"Hello {name}!" diff --git a/eager_wf_start/run.py b/eager_wf_start/run.py new file mode 100644 index 00000000..c1daf82a --- /dev/null +++ b/eager_wf_start/run.py @@ -0,0 +1,42 @@ +import asyncio +import uuid + +from temporalio.client import Client +from temporalio.worker import Worker + +from eager_wf_start.activities import greeting +from eager_wf_start.workflows import EagerWorkflow + +TASK_QUEUE = "eager-wf-start-task-queue" + + +async def main(): + + # Note that the worker and client run in the same process and share the same client connection. + client = await Client.connect("localhost:7233") + worker = Worker( + client, + task_queue=TASK_QUEUE, + workflows=[EagerWorkflow], + activities=[greeting], + ) + + # Run worker in the background + async with worker: + # Start workflow(s) while worker is running + wf_handle = await client.start_workflow( + EagerWorkflow.run, + "Temporal", + id=f"eager-workflow-id-{uuid.uuid4()}", + task_queue=TASK_QUEUE, + request_eager_start=True, + ) + + # This is an internal flag not intended to be used publicly. + # It is used here purely to display that the workflow was eagerly started. + print(f"Workflow eagerly started: {wf_handle.__temporal_eagerly_started}") + print(await wf_handle.result()) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/eager_wf_start/workflows.py b/eager_wf_start/workflows.py new file mode 100644 index 00000000..9107d402 --- /dev/null +++ b/eager_wf_start/workflows.py @@ -0,0 +1,15 @@ +from datetime import timedelta + +from temporalio import workflow + +with workflow.unsafe.imports_passed_through(): + from eager_wf_start.activities import greeting + + +@workflow.defn +class EagerWorkflow: + @workflow.run + async def run(self, name: str) -> str: + return await workflow.execute_local_activity( + greeting, name, schedule_to_close_timeout=timedelta(seconds=5) + ) diff --git a/tests/conftest.py b/tests/conftest.py index e63a059b..65de246e 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -45,6 +45,8 @@ async def env(request) -> AsyncGenerator[WorkflowEnvironment, None]: dev_server_extra_args=[ "--dynamic-config-value", "frontend.enableExecuteMultiOperation=true", + "--dynamic-config-value", + "system.enableEagerWorkflowStart=true", ] ) elif env_type == "time-skipping": diff --git a/tests/eager_wf_start/__init__.py b/tests/eager_wf_start/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/eager_wf_start/workflow_test.py b/tests/eager_wf_start/workflow_test.py new file mode 100644 index 00000000..1080f5bc --- /dev/null +++ b/tests/eager_wf_start/workflow_test.py @@ -0,0 +1,29 @@ +import uuid + +from temporalio.client import Client +from temporalio.worker import Worker + +from eager_wf_start.activities import greeting +from eager_wf_start.workflows import EagerWorkflow + + +async def test_eager_wf_start(client: Client): + task_queue_name = str(uuid.uuid4()) + + async with Worker( + client, + task_queue=task_queue_name, + workflows=[EagerWorkflow], + activities=[greeting], + ): + handle = await client.start_workflow( + EagerWorkflow.run, + "Temporal", + id=f"workflow-{uuid.uuid4()}", + task_queue=task_queue_name, + request_eager_start=True, + ) + print("HANDLE", handle.__temporal_eagerly_started) + assert handle.__temporal_eagerly_started + result = await handle.result() + assert result == "Hello Temporal!"