From 8cb3c2710d0187f0f96d6cbb5c328dfd64a4e2b1 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Mon, 15 Sep 2025 16:48:20 -0700 Subject: [PATCH 1/4] Update worker versioning sample to use deployments --- .gitignore | 1 + worker_versioning/README.md | 28 +++-- worker_versioning/activities.py | 20 ++- worker_versioning/app.py | 137 +++++++++++++++++++++ worker_versioning/constants.py | 7 ++ worker_versioning/example.py | 116 ------------------ worker_versioning/workerv1.py | 40 ++++++ worker_versioning/workerv1_1.py | 40 ++++++ worker_versioning/workerv2.py | 40 ++++++ worker_versioning/workflow_v1.py | 27 ----- worker_versioning/workflow_v1_1.py | 45 ------- worker_versioning/workflow_v2.py | 36 ------ worker_versioning/workflows.py | 189 +++++++++++++++++++++++++++++ 13 files changed, 490 insertions(+), 236 deletions(-) create mode 100644 worker_versioning/app.py create mode 100644 worker_versioning/constants.py delete mode 100644 worker_versioning/example.py create mode 100644 worker_versioning/workerv1.py create mode 100644 worker_versioning/workerv1_1.py create mode 100644 worker_versioning/workerv2.py delete mode 100644 worker_versioning/workflow_v1.py delete mode 100644 worker_versioning/workflow_v1_1.py delete mode 100644 worker_versioning/workflow_v2.py create mode 100644 worker_versioning/workflows.py diff --git a/.gitignore b/.gitignore index 41afe5f8..157a7418 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ __pycache__ .vscode .DS_Store +.claude diff --git a/worker_versioning/README.md b/worker_versioning/README.md index 2fd44bc4..ece87f0d 100644 --- a/worker_versioning/README.md +++ b/worker_versioning/README.md @@ -1,12 +1,24 @@ -# Worker Versioning Sample +## Worker Versioning -This sample shows you how you can use the [Worker Versioning](https://docs.temporal.io/workers#worker-versioning) -feature to deploy incompatible changes to workflow code more easily. +This sample demonstrates how to use Temporal's Worker Versioning feature to safely deploy updates to workflow and activity code. It shows the difference between auto-upgrading and pinned workflows, and how to manage worker deployments with different build IDs. -To run, first see [README.md](../README.md) for prerequisites. Then, run the following from the root directory: +The sample creates multiple worker versions (1.0, 1.1, and 2.0) within one deployment and demonstrates: +- **Auto-upgrading workflows**: Automatically and controllably migrate to newer worker versions +- **Pinned workflows**: Stay on the original worker version throughout their lifecycle +- **Compatible vs incompatible changes**: How to make safe updates using `workflow.patched` - uv run worker_versioning/example.py +### Steps to run this sample: -This will add some Build IDs to a Task Queue, and will also run Workers with those versions to show how you can -mark add versions, mark them as compatible (or not) with one another, and run Workers at specific versions. You'll -see that only the workers only process Workflow Tasks assigned versions they are compatible with. +1) Run a [Temporal service](https://github.com/temporalio/samples-python/tree/main/#how-to-use). + +2) Start the main application (this will guide you through the sample): +```bash +uv run worker_versioning/app.py +``` + +3) Follow the prompts to start workers in separate terminals: + - When prompted, run: `uv run worker_versioning/workerv1.py` + - When prompted, run: `uv run worker_versioning/workerv1_1.py` + - When prompted, run: `uv run worker_versioning/workerv2.py` + +The sample will show how auto-upgrading workflows migrate to newer workers while pinned workflows remain on their original version. \ No newline at end of file diff --git a/worker_versioning/activities.py b/worker_versioning/activities.py index 4115e0fd..febcedf0 100644 --- a/worker_versioning/activities.py +++ b/worker_versioning/activities.py @@ -1,11 +1,23 @@ +from dataclasses import dataclass + from temporalio import activity +@dataclass +class IncompatibleActivityInput: + """Input for the incompatible activity.""" + + called_by: str + more_data: str + + @activity.defn -async def greet(inp: str) -> str: - return f"Hi from {inp}" +async def some_activity(called_by: str) -> str: + """Basic activity for the workflow.""" + return f"SomeActivity called by {called_by}" @activity.defn -async def super_greet(inp: str, some_number: int) -> str: - return f"Hi from {inp} with {some_number}" +async def some_incompatible_activity(input_data: IncompatibleActivityInput) -> str: + """Incompatible activity that takes different input.""" + return f"SomeIncompatibleActivity called by {input_data.called_by} with {input_data.more_data}" diff --git a/worker_versioning/app.py b/worker_versioning/app.py new file mode 100644 index 00000000..1726c429 --- /dev/null +++ b/worker_versioning/app.py @@ -0,0 +1,137 @@ +"""Main application for the worker versioning sample.""" + +import asyncio +import logging +import uuid + +from temporalio.client import Client + +from worker_versioning.constants import DEPLOYMENT_NAME, TASK_QUEUE + +logging.basicConfig(level=logging.INFO) + + +async def main() -> None: + client = await Client.connect("localhost:7233") + + # Wait for v1 worker and set as current version + logging.info( + "Waiting for v1 worker to appear. Run `python worker_versioning/workerv1.py` in another terminal" + ) + await wait_for_worker_and_make_current(client, "1.0") + + # Start auto-upgrading and pinned workflows + auto_upgrade_workflow_id = "worker-versioning-versioning-autoupgrade_" + str( + uuid.uuid4() + ) + auto_upgrade_execution = await client.start_workflow( + "AutoUpgrading", + id=auto_upgrade_workflow_id, + task_queue=TASK_QUEUE, + ) + + pinned_workflow_id = "worker-versioning-versioning-pinned_" + str(uuid.uuid4()) + pinned_execution = await client.start_workflow( + "Pinned", + id=pinned_workflow_id, + task_queue=TASK_QUEUE, + ) + + logging.info("Started auto-upgrading workflow: %s", auto_upgrade_execution.id) + logging.info("Started pinned workflow: %s", pinned_execution.id) + + # Signal both workflows a few times to drive them + await advance_workflows(auto_upgrade_execution, pinned_execution) + + # Now wait for the v1.1 worker to appear and become current + logging.info( + "Waiting for v1.1 worker to appear. Run `python worker_versioning/workerv1_1.py` in another terminal" + ) + await wait_for_worker_and_make_current(client, "1.1") + + # Once it has, we will continue to advance the workflows. + # The auto-upgrade workflow will now make progress on the new worker, while the pinned one will + # keep progressing on the old worker. + await advance_workflows(auto_upgrade_execution, pinned_execution) + + # Finally we'll start the v2 worker, and again it'll become the new current version + logging.info( + "Waiting for v2 worker to appear. Run `python worker_versioning/workerv2.py` in another terminal" + ) + await wait_for_worker_and_make_current(client, "2.0") + + # Once it has we'll start one more new workflow, another pinned one, to demonstrate that new + # pinned workflows start on the current version. + pinned_workflow_2_id = "worker-versioning-versioning-pinned-2_" + str(uuid.uuid4()) + pinned_execution_2 = await client.start_workflow( + "Pinned", + id=pinned_workflow_2_id, + task_queue=TASK_QUEUE, + ) + logging.info("Started pinned workflow v2: %s", pinned_execution_2.id) + + # Now we'll conclude all workflows. You should be able to see in your server UI that the pinned + # workflow always stayed on 1.0, while the auto-upgrading workflow migrated. + for handle in [auto_upgrade_execution, pinned_execution, pinned_execution_2]: + await handle.signal("do_next_signal", "conclude") + await handle.result() + + logging.info("All workflows completed") + + +async def advance_workflows(auto_upgrade_execution, pinned_execution): + """Signal both workflows a few times to drive them.""" + for i in range(3): + await auto_upgrade_execution.signal("do_next_signal", "do-activity") + await pinned_execution.signal("do_next_signal", "some-signal") + + +async def wait_for_worker_and_make_current(client: Client, build_id: str) -> None: + import temporalio.api.workflowservice.v1 as wsv1 + from temporalio.common import WorkerDeploymentVersion + + target_version = WorkerDeploymentVersion( + deployment_name=DEPLOYMENT_NAME, build_id=build_id + ) + + # Wait for the worker to appear + while True: + try: + describe_request = wsv1.DescribeWorkerDeploymentRequest( + namespace=client.namespace, + deployment_name=DEPLOYMENT_NAME, + ) + response = await client.workflow_service.describe_worker_deployment( + describe_request + ) + + # Check if our version is present in the version summaries + for version_summary in response.worker_deployment_info.version_summaries: + if ( + version_summary.deployment_version.deployment_name + == target_version.deployment_name + and version_summary.deployment_version.build_id + == target_version.build_id + ): + break + else: + await asyncio.sleep(1) + continue + + break + + except Exception: + await asyncio.sleep(1) + continue + + # Once the version is available, set it as current + set_request = wsv1.SetWorkerDeploymentCurrentVersionRequest( + namespace=client.namespace, + deployment_name=DEPLOYMENT_NAME, + build_id=target_version.build_id, + ) + await client.workflow_service.set_worker_deployment_current_version(set_request) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/worker_versioning/constants.py b/worker_versioning/constants.py new file mode 100644 index 00000000..5cdaba6f --- /dev/null +++ b/worker_versioning/constants.py @@ -0,0 +1,7 @@ +"""Constants for the worker versioning sample.""" + +# Task queue name +TASK_QUEUE = "worker-versioning" + +# Deployment name +DEPLOYMENT_NAME = "my-deployment" diff --git a/worker_versioning/example.py b/worker_versioning/example.py deleted file mode 100644 index 97354303..00000000 --- a/worker_versioning/example.py +++ /dev/null @@ -1,116 +0,0 @@ -import asyncio -import uuid - -from temporalio.client import BuildIdOpAddNewCompatible, BuildIdOpAddNewDefault, Client -from temporalio.worker import Worker - -from worker_versioning.activities import greet, super_greet -from worker_versioning.workflow_v1 import MyWorkflow as MyWorkflowV1 -from worker_versioning.workflow_v1_1 import MyWorkflow as MyWorkflowV1_1 -from worker_versioning.workflow_v2 import MyWorkflow as MyWorkflowV2 - - -async def main(): - client = await Client.connect("localhost:7233") - task_queue = f"worker-versioning-{uuid.uuid4()}" - - # Start a 1.0 worker - async with Worker( - client, - task_queue=task_queue, - workflows=[MyWorkflowV1], - activities=[greet, super_greet], - build_id="1.0", - use_worker_versioning=True, - ): - # Add 1.0 as the default version for the queue - await client.update_worker_build_id_compatibility( - task_queue, BuildIdOpAddNewDefault("1.0") - ) - - # Start a workflow which will run on the 1.0 worker - handle = await client.start_workflow( - MyWorkflowV1.run, - task_queue=task_queue, - id=f"worker-versioning-v1-{uuid.uuid4()}", - ) - # Signal the workflow to proceed - await handle.signal(MyWorkflowV1.proceeder, "go") - - # Give a chance for the worker to process the signal - # TODO Better? - await asyncio.sleep(1) - - # Add 1.1 as the default version for the queue, compatible with 1.0 - await client.update_worker_build_id_compatibility( - task_queue, BuildIdOpAddNewCompatible("1.1", "1.0") - ) - - # Stop the old worker, and start a 1.1 worker. We do this to speed along the example, since the - # 1.0 worker may continue to process tasks briefly after we make 1.1 the new default. - async with Worker( - client, - task_queue=task_queue, - workflows=[MyWorkflowV1_1], - activities=[greet, super_greet], - build_id="1.1", - use_worker_versioning=True, - ): - # Continue driving the workflow. Take note that the new version of the workflow run by the 1.1 - # worker is the one that takes over! You might see a workflow task timeout, if the 1.0 worker is - # processing a task as the version update happens. That's normal. - await handle.signal(MyWorkflowV1.proceeder, "go") - - # Add a new *incompatible* version to the task queue, which will become the new overall default for the queue. - await client.update_worker_build_id_compatibility( - task_queue, BuildIdOpAddNewDefault("2.0") - ) - - # Start a 2.0 worker - async with Worker( - client, - task_queue=task_queue, - workflows=[MyWorkflowV2], - activities=[greet, super_greet], - build_id="2.0", - use_worker_versioning=True, - ): - # Start a new workflow. Note that it will run on the new 2.0 version, without the client invocation changing - # at all! Note here we can use `MyWorkflowV1.run` because the signature of the workflow has not changed. - handle2 = await client.start_workflow( - MyWorkflowV1.run, - task_queue=task_queue, - id=f"worker-versioning-v2-{uuid.uuid4()}", - ) - - # Drive both workflows once more before concluding them. The first workflow will continue running on the 1.1 - # worker. - await handle.signal(MyWorkflowV1.proceeder, "go") - await handle2.signal(MyWorkflowV1.proceeder, "go") - await handle.signal(MyWorkflowV1.proceeder, "finish") - await handle2.signal(MyWorkflowV1.proceeder, "finish") - - # Wait for both workflows to complete - await handle.result() - await handle2.result() - - # Lastly we'll demonstrate how you can use the gRPC api to determine if certain build IDs are ready to be - # retired. There's more information in the documentation, but here's a quick example that shows us how to - # tell when the 1.0 worker can be retired: - - # There is a 5 minute buffer before we will consider IDs no longer reachable by new workflows, to - # account for replication in multi-cluster setups. Uncomment the following line to wait long enough to see - # the 1.0 worker become unreachable. - # await asyncio.sleep(60 * 5) - reachability = await client.get_worker_task_reachability( - build_ids=["2.0", "1.0", "1.1"] - ) - - if not reachability.build_id_reachability["1.0"].task_queue_reachability[ - task_queue - ]: - print("1.0 is ready to be retired!") - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/worker_versioning/workerv1.py b/worker_versioning/workerv1.py new file mode 100644 index 00000000..df324996 --- /dev/null +++ b/worker_versioning/workerv1.py @@ -0,0 +1,40 @@ +"""Worker v1 for the worker versioning sample.""" + +import asyncio +import logging + +from temporalio.client import Client +from temporalio.common import WorkerDeploymentVersion +from temporalio.worker import Worker, WorkerDeploymentConfig + +from worker_versioning.activities import some_activity, some_incompatible_activity +from worker_versioning.constants import DEPLOYMENT_NAME, TASK_QUEUE +from worker_versioning.workflows import AutoUpgradingWorkflowV1, PinnedWorkflowV1 + +logging.basicConfig(level=logging.INFO) + + +async def main() -> None: + """Run worker v1.""" + client = await Client.connect("localhost:7233") + + # Create worker v1 + worker = Worker( + client, + task_queue=TASK_QUEUE, + workflows=[AutoUpgradingWorkflowV1, PinnedWorkflowV1], + activities=[some_activity, some_incompatible_activity], + deployment_config=WorkerDeploymentConfig( + version=WorkerDeploymentVersion( + deployment_name=DEPLOYMENT_NAME, build_id="1.0" + ), + use_worker_versioning=True, + ), + ) + + logging.info("Starting worker v1 (build 1.0)") + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/worker_versioning/workerv1_1.py b/worker_versioning/workerv1_1.py new file mode 100644 index 00000000..8312522f --- /dev/null +++ b/worker_versioning/workerv1_1.py @@ -0,0 +1,40 @@ +"""Worker v1.1 for the worker versioning sample.""" + +import asyncio +import logging + +from temporalio.client import Client +from temporalio.common import WorkerDeploymentVersion +from temporalio.worker import Worker, WorkerDeploymentConfig + +from worker_versioning.activities import some_activity, some_incompatible_activity +from worker_versioning.constants import DEPLOYMENT_NAME, TASK_QUEUE +from worker_versioning.workflows import AutoUpgradingWorkflowV1b, PinnedWorkflowV1 + +logging.basicConfig(level=logging.INFO) + + +async def main() -> None: + """Run worker v1.1.""" + client = await Client.connect("localhost:7233") + + # Create worker v1.1 + worker = Worker( + client, + task_queue=TASK_QUEUE, + workflows=[AutoUpgradingWorkflowV1b, PinnedWorkflowV1], + activities=[some_activity, some_incompatible_activity], + deployment_config=WorkerDeploymentConfig( + version=WorkerDeploymentVersion( + deployment_name=DEPLOYMENT_NAME, build_id="1.1" + ), + use_worker_versioning=True, + ), + ) + + logging.info("Starting worker v1.1 (build 1.1)") + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/worker_versioning/workerv2.py b/worker_versioning/workerv2.py new file mode 100644 index 00000000..5b7b5f4f --- /dev/null +++ b/worker_versioning/workerv2.py @@ -0,0 +1,40 @@ +"""Worker v2 for the worker versioning sample.""" + +import asyncio +import logging + +from temporalio.client import Client +from temporalio.common import WorkerDeploymentVersion +from temporalio.worker import Worker, WorkerDeploymentConfig + +from worker_versioning.activities import some_activity, some_incompatible_activity +from worker_versioning.constants import DEPLOYMENT_NAME, TASK_QUEUE +from worker_versioning.workflows import AutoUpgradingWorkflowV1b, PinnedWorkflowV2 + +logging.basicConfig(level=logging.INFO) + + +async def main() -> None: + """Run worker v2.""" + client = await Client.connect("localhost:7233") + + # Create worker v2 + worker = Worker( + client, + task_queue=TASK_QUEUE, + workflows=[AutoUpgradingWorkflowV1b, PinnedWorkflowV2], + activities=[some_activity, some_incompatible_activity], + deployment_config=WorkerDeploymentConfig( + version=WorkerDeploymentVersion( + deployment_name=DEPLOYMENT_NAME, build_id="2.0" + ), + use_worker_versioning=True, + ), + ) + + logging.info("Starting worker v2 (build 2.0)") + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/worker_versioning/workflow_v1.py b/worker_versioning/workflow_v1.py deleted file mode 100644 index 1cef9095..00000000 --- a/worker_versioning/workflow_v1.py +++ /dev/null @@ -1,27 +0,0 @@ -from datetime import timedelta - -from temporalio import workflow - -with workflow.unsafe.imports_passed_through(): - from worker_versioning.activities import greet - - -@workflow.defn -class MyWorkflow: - """The 1.0 version of the workflow we'll be making changes to""" - - should_finish: bool = False - - @workflow.run - async def run(self) -> str: - workflow.logger.info("Running workflow V1") - await workflow.wait_condition(lambda: self.should_finish) - return "Concluded workflow on V1" - - @workflow.signal - async def proceeder(self, inp: str): - await workflow.execute_activity( - greet, "V1", start_to_close_timeout=timedelta(seconds=5) - ) - if inp == "finish": - self.should_finish = True diff --git a/worker_versioning/workflow_v1_1.py b/worker_versioning/workflow_v1_1.py deleted file mode 100644 index e2f22943..00000000 --- a/worker_versioning/workflow_v1_1.py +++ /dev/null @@ -1,45 +0,0 @@ -from datetime import timedelta - -from temporalio import workflow - -with workflow.unsafe.imports_passed_through(): - from worker_versioning.activities import greet, super_greet - - -@workflow.defn -class MyWorkflow: - """ - The 1.1 version of the workflow, which is compatible with the first version. - - The compatible changes we've made are: - - Altering the log lines - - Using the `patched` API to properly introduce branching behavior while maintaining - compatibility - """ - - should_finish: bool = False - - @workflow.run - async def run(self) -> str: - workflow.logger.info("Running workflow V1.1") - await workflow.wait_condition(lambda: self.should_finish) - return "Concluded workflow on V1.1" - - @workflow.signal - async def proceeder(self, inp: str): - if workflow.patched("different-activity"): - await workflow.execute_activity( - super_greet, - args=["V1.1", 100], - start_to_close_timeout=timedelta(seconds=5), - ) - else: - # Note it is a valid compatible change to alter the input to an activity. However, because - # we're using the patched API, this branch would only be taken if the workflow was started on - # a v1 worker. - await workflow.execute_activity( - greet, "V1.1", start_to_close_timeout=timedelta(seconds=5) - ) - - if inp == "finish": - self.should_finish = True diff --git a/worker_versioning/workflow_v2.py b/worker_versioning/workflow_v2.py deleted file mode 100644 index dcb0a20e..00000000 --- a/worker_versioning/workflow_v2.py +++ /dev/null @@ -1,36 +0,0 @@ -import asyncio -from datetime import timedelta - -from temporalio import workflow - -with workflow.unsafe.imports_passed_through(): - from worker_versioning.activities import greet - - -@workflow.defn -class MyWorkflow: - """ - The 2.0 version of the workflow, which is fully incompatible with the other workflows, since it - alters the sequence of commands without using `patched`. - """ - - should_finish: bool = False - - @workflow.run - async def run(self) -> str: - workflow.logger.info("Running workflow V2") - await workflow.wait_condition(lambda: self.should_finish) - return "Concluded workflow on V2" - - @workflow.signal - async def proceeder(self, inp: str): - await asyncio.sleep(1) - await workflow.execute_activity( - greet, "V2", start_to_close_timeout=timedelta(seconds=5) - ) - await workflow.execute_activity( - greet, "V2", start_to_close_timeout=timedelta(seconds=5) - ) - - if inp == "finish": - self.should_finish = True diff --git a/worker_versioning/workflows.py b/worker_versioning/workflows.py new file mode 100644 index 00000000..a09c371a --- /dev/null +++ b/worker_versioning/workflows.py @@ -0,0 +1,189 @@ +"""Workflow definitions for the worker versioning sample.""" + +from datetime import timedelta + +from temporalio import common, workflow + +with workflow.unsafe.imports_passed_through(): + from worker_versioning.activities import ( + IncompatibleActivityInput, + some_activity, + some_incompatible_activity, + ) + + +@workflow.defn( + name="AutoUpgrading", versioning_behavior=common.VersioningBehavior.AUTO_UPGRADE +) +class AutoUpgradingWorkflowV1: + """AutoUpgradingWorkflowV1 will automatically move to the latest worker version. We'll be making + changes to it, which must be replay safe. + + Note that generally you won't want or need to include a version number in your workflow name if + you're using the worker versioning feature. This sample does it to illustrate changes to the + same code over time - but really what we're demonstrating here is the evolution of what would + have been one workflow definition. + """ + + def __init__(self) -> None: + self.signals: list[str] = [] + + @workflow.run + async def run(self) -> None: + workflow.logger.info( + "Changing workflow v1 started.", extra={"StartTime": workflow.now()} + ) + + # This workflow will listen for signals from our starter, and upon each signal either run + # an activity, or conclude execution. + while True: + await workflow.wait_condition(lambda: len(self.signals) > 0) + signal = self.signals.pop(0) + + if signal == "do-activity": + workflow.logger.info("Changing workflow v1 running activity") + await workflow.execute_activity( + some_activity, "v1", start_to_close_timeout=timedelta(seconds=10) + ) + else: + workflow.logger.info("Concluding workflow v1") + return + + @workflow.signal + async def do_next_signal(self, signal: str) -> None: + """Signal to perform next action.""" + self.signals.append(signal) + + +@workflow.defn( + name="AutoUpgrading", versioning_behavior=common.VersioningBehavior.AUTO_UPGRADE +) +class AutoUpgradingWorkflowV1b: + """AutoUpgradingWorkflowV1b represents us having made *compatible* changes to + AutoUpgradingWorkflowV1. + + The compatible changes we've made are: + - Altering the log lines + - Using the workflow.patched API to properly introduce branching behavior while maintaining + compatibility + """ + + def __init__(self) -> None: + self.signals: list[str] = [] + + @workflow.run + async def run(self) -> None: + workflow.logger.info( + "Changing workflow v1b started.", extra={"StartTime": workflow.now()} + ) + + # This workflow will listen for signals from our starter, and upon each signal either run + # an activity, or conclude execution. + while True: + await workflow.wait_condition(lambda: len(self.signals) > 0) + signal = self.signals.pop(0) + + if signal == "do-activity": + workflow.logger.info("Changing workflow v1b running activity") + if workflow.patched("DifferentActivity"): + await workflow.execute_activity( + some_incompatible_activity, + IncompatibleActivityInput(called_by="v1b", more_data="hello!"), + start_to_close_timeout=timedelta(seconds=10), + ) + else: + # Note it is a valid compatible change to alter the input to an activity. + # However, because we're using the patched API, this branch will never be + # taken. + await workflow.execute_activity( + some_activity, + "v1b", + start_to_close_timeout=timedelta(seconds=10), + ) + else: + workflow.logger.info("Concluding workflow v1b") + break + + @workflow.signal + async def do_next_signal(self, signal: str) -> None: + """Signal to perform next action.""" + self.signals.append(signal) + + +@workflow.defn(name="Pinned", versioning_behavior=common.VersioningBehavior.PINNED) +class PinnedWorkflowV1: + """PinnedWorkflowV1 demonstrates a workflow that likely has a short lifetime, and we want to always + stay pinned to the same version it began on. + + Note that generally you won't want or need to include a version number in your workflow name if + you're using the worker versioning feature. This sample does it to illustrate changes to the + same code over time - but really what we're demonstrating here is the evolution of what would + have been one workflow definition. + """ + + def __init__(self) -> None: + self.signals: list[str] = [] + + @workflow.run + async def run(self) -> None: + workflow.logger.info( + "Pinned Workflow v1 started.", extra={"StartTime": workflow.now()} + ) + + while True: + await workflow.wait_condition(lambda: len(self.signals) > 0) + signal = self.signals.pop(0) + if signal == "conclude": + break + + await workflow.execute_activity( + some_activity, + "Pinned-v1", + start_to_close_timeout=timedelta(seconds=10), + ) + + @workflow.signal + async def do_next_signal(self, signal: str) -> None: + """Signal to perform next action.""" + self.signals.append(signal) + + +@workflow.defn(name="Pinned", versioning_behavior=common.VersioningBehavior.PINNED) +class PinnedWorkflowV2: + """PinnedWorkflowV2 has changes that would make it incompatible with v1, and aren't protected by + a patch. + """ + + def __init__(self) -> None: + self.signals: list[str] = [] + + @workflow.run + async def run(self) -> None: + workflow.logger.info( + "Pinned Workflow v2 started.", extra={"StartTime": workflow.now()} + ) + + # Here we call an activity where we didn't before, which is an incompatible change. + await workflow.execute_activity( + some_activity, + "Pinned-v2", + start_to_close_timeout=timedelta(seconds=10), + ) + + while True: + await workflow.wait_condition(lambda: len(self.signals) > 0) + signal = self.signals.pop(0) + if signal == "conclude": + break + + # We've also changed the activity type here, another incompatible change + await workflow.execute_activity( + some_incompatible_activity, + IncompatibleActivityInput(called_by="Pinned-v2", more_data="hi"), + start_to_close_timeout=timedelta(seconds=10), + ) + + @workflow.signal + async def do_next_signal(self, signal: str) -> None: + """Signal to perform next action.""" + self.signals.append(signal) From 35949d70e85fa87b3bb2f860f4f0d59ec840e31f Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Tue, 16 Sep 2025 12:13:18 -0700 Subject: [PATCH 2/4] Add clarifying comment about workflow type --- worker_versioning/app.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/worker_versioning/app.py b/worker_versioning/app.py index 1726c429..bb4153bc 100644 --- a/worker_versioning/app.py +++ b/worker_versioning/app.py @@ -20,7 +20,10 @@ async def main() -> None: ) await wait_for_worker_and_make_current(client, "1.0") - # Start auto-upgrading and pinned workflows + # Start auto-upgrading and pinned workflows. Importantly, note that when we start the workflows, + # we are using a workflow type name which does *not* include the version number. We defined them + # with versioned names so we could show changes to the code, but here when the client invokes + # them, we're demonstrating that the client remains version-agnostic. auto_upgrade_workflow_id = "worker-versioning-versioning-autoupgrade_" + str( uuid.uuid4() ) From 83d50d11e32566b459499e75cfd4ed4907166b28 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Tue, 16 Sep 2025 17:05:58 -0700 Subject: [PATCH 3/4] A bit of extra cleanup --- worker_versioning/activities.py | 4 ++-- worker_versioning/app.py | 5 ++--- worker_versioning/constants.py | 7 ------- worker_versioning/workerv1.py | 2 +- worker_versioning/workerv1_1.py | 2 +- worker_versioning/workerv2.py | 2 +- 6 files changed, 7 insertions(+), 15 deletions(-) delete mode 100644 worker_versioning/constants.py diff --git a/worker_versioning/activities.py b/worker_versioning/activities.py index febcedf0..50c0700f 100644 --- a/worker_versioning/activities.py +++ b/worker_versioning/activities.py @@ -14,10 +14,10 @@ class IncompatibleActivityInput: @activity.defn async def some_activity(called_by: str) -> str: """Basic activity for the workflow.""" - return f"SomeActivity called by {called_by}" + return f"some_activity called by {called_by}" @activity.defn async def some_incompatible_activity(input_data: IncompatibleActivityInput) -> str: """Incompatible activity that takes different input.""" - return f"SomeIncompatibleActivity called by {input_data.called_by} with {input_data.more_data}" + return f"some_incompatible_activity called by {input_data.called_by} with {input_data.more_data}" diff --git a/worker_versioning/app.py b/worker_versioning/app.py index bb4153bc..8b32aa94 100644 --- a/worker_versioning/app.py +++ b/worker_versioning/app.py @@ -6,7 +6,8 @@ from temporalio.client import Client -from worker_versioning.constants import DEPLOYMENT_NAME, TASK_QUEUE +TASK_QUEUE = "worker-versioning" +DEPLOYMENT_NAME = "my-deployment" logging.basicConfig(level=logging.INFO) @@ -97,7 +98,6 @@ async def wait_for_worker_and_make_current(client: Client, build_id: str) -> Non deployment_name=DEPLOYMENT_NAME, build_id=build_id ) - # Wait for the worker to appear while True: try: describe_request = wsv1.DescribeWorkerDeploymentRequest( @@ -108,7 +108,6 @@ async def wait_for_worker_and_make_current(client: Client, build_id: str) -> Non describe_request ) - # Check if our version is present in the version summaries for version_summary in response.worker_deployment_info.version_summaries: if ( version_summary.deployment_version.deployment_name diff --git a/worker_versioning/constants.py b/worker_versioning/constants.py deleted file mode 100644 index 5cdaba6f..00000000 --- a/worker_versioning/constants.py +++ /dev/null @@ -1,7 +0,0 @@ -"""Constants for the worker versioning sample.""" - -# Task queue name -TASK_QUEUE = "worker-versioning" - -# Deployment name -DEPLOYMENT_NAME = "my-deployment" diff --git a/worker_versioning/workerv1.py b/worker_versioning/workerv1.py index df324996..13c4ed4b 100644 --- a/worker_versioning/workerv1.py +++ b/worker_versioning/workerv1.py @@ -8,7 +8,7 @@ from temporalio.worker import Worker, WorkerDeploymentConfig from worker_versioning.activities import some_activity, some_incompatible_activity -from worker_versioning.constants import DEPLOYMENT_NAME, TASK_QUEUE +from worker_versioning.app import DEPLOYMENT_NAME, TASK_QUEUE from worker_versioning.workflows import AutoUpgradingWorkflowV1, PinnedWorkflowV1 logging.basicConfig(level=logging.INFO) diff --git a/worker_versioning/workerv1_1.py b/worker_versioning/workerv1_1.py index 8312522f..779db3f9 100644 --- a/worker_versioning/workerv1_1.py +++ b/worker_versioning/workerv1_1.py @@ -8,7 +8,7 @@ from temporalio.worker import Worker, WorkerDeploymentConfig from worker_versioning.activities import some_activity, some_incompatible_activity -from worker_versioning.constants import DEPLOYMENT_NAME, TASK_QUEUE +from worker_versioning.app import DEPLOYMENT_NAME, TASK_QUEUE from worker_versioning.workflows import AutoUpgradingWorkflowV1b, PinnedWorkflowV1 logging.basicConfig(level=logging.INFO) diff --git a/worker_versioning/workerv2.py b/worker_versioning/workerv2.py index 5b7b5f4f..107e1a52 100644 --- a/worker_versioning/workerv2.py +++ b/worker_versioning/workerv2.py @@ -8,7 +8,7 @@ from temporalio.worker import Worker, WorkerDeploymentConfig from worker_versioning.activities import some_activity, some_incompatible_activity -from worker_versioning.constants import DEPLOYMENT_NAME, TASK_QUEUE +from worker_versioning.app import DEPLOYMENT_NAME, TASK_QUEUE from worker_versioning.workflows import AutoUpgradingWorkflowV1b, PinnedWorkflowV2 logging.basicConfig(level=logging.INFO) From 4406a4ef9923153d2e102ddb2b968814691dcc09 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Wed, 17 Sep 2025 14:15:50 -0700 Subject: [PATCH 4/4] Add line about minimum req server version --- worker_versioning/README.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/worker_versioning/README.md b/worker_versioning/README.md index ece87f0d..0ff423f1 100644 --- a/worker_versioning/README.md +++ b/worker_versioning/README.md @@ -10,6 +10,7 @@ The sample creates multiple worker versions (1.0, 1.1, and 2.0) within one deplo ### Steps to run this sample: 1) Run a [Temporal service](https://github.com/temporalio/samples-python/tree/main/#how-to-use). + Ensure that you're using at least Server version 1.28.0 (CLI version 1.4.0). 2) Start the main application (this will guide you through the sample): ```bash @@ -21,4 +22,5 @@ uv run worker_versioning/app.py - When prompted, run: `uv run worker_versioning/workerv1_1.py` - When prompted, run: `uv run worker_versioning/workerv2.py` -The sample will show how auto-upgrading workflows migrate to newer workers while pinned workflows remain on their original version. \ No newline at end of file +The sample will show how auto-upgrading workflows migrate to newer workers while pinned workflows +remain on their original version.