Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
__pycache__
.vscode
.DS_Store
.claude
30 changes: 22 additions & 8 deletions worker_versioning/README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,26 @@
# Worker Versioning Sample
## Worker Versioning

This sample shows you how you can use the [Worker Versioning](https://docs.temporal.io/workers#worker-versioning)
feature to deploy incompatible changes to workflow code more easily.
This sample demonstrates how to use Temporal's Worker Versioning feature to safely deploy updates to workflow and activity code. It shows the difference between auto-upgrading and pinned workflows, and how to manage worker deployments with different build IDs.

To run, first see [README.md](../README.md) for prerequisites. Then, run the following from the root directory:
The sample creates multiple worker versions (1.0, 1.1, and 2.0) within one deployment and demonstrates:
- **Auto-upgrading workflows**: Automatically and controllably migrate to newer worker versions
- **Pinned workflows**: Stay on the original worker version throughout their lifecycle
- **Compatible vs incompatible changes**: How to make safe updates using `workflow.patched`

uv run worker_versioning/example.py
### Steps to run this sample:

This will add some Build IDs to a Task Queue, and will also run Workers with those versions to show how you can
mark add versions, mark them as compatible (or not) with one another, and run Workers at specific versions. You'll
see that only the workers only process Workflow Tasks assigned versions they are compatible with.
1) Run a [Temporal service](https://github.com/temporalio/samples-python/tree/main/#how-to-use).
Ensure that you're using at least Server version 1.28.0 (CLI version 1.4.0).

2) Start the main application (this will guide you through the sample):
```bash
uv run worker_versioning/app.py
```

3) Follow the prompts to start workers in separate terminals:
- When prompted, run: `uv run worker_versioning/workerv1.py`
- When prompted, run: `uv run worker_versioning/workerv1_1.py`
- When prompted, run: `uv run worker_versioning/workerv2.py`

The sample will show how auto-upgrading workflows migrate to newer workers while pinned workflows
remain on their original version.
20 changes: 16 additions & 4 deletions worker_versioning/activities.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,23 @@
from dataclasses import dataclass

from temporalio import activity


@dataclass
class IncompatibleActivityInput:
"""Input for the incompatible activity."""

called_by: str
more_data: str


@activity.defn
async def greet(inp: str) -> str:
return f"Hi from {inp}"
async def some_activity(called_by: str) -> str:
"""Basic activity for the workflow."""
return f"some_activity called by {called_by}"


@activity.defn
async def super_greet(inp: str, some_number: int) -> str:
return f"Hi from {inp} with {some_number}"
async def some_incompatible_activity(input_data: IncompatibleActivityInput) -> str:
"""Incompatible activity that takes different input."""
return f"some_incompatible_activity called by {input_data.called_by} with {input_data.more_data}"
139 changes: 139 additions & 0 deletions worker_versioning/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
"""Main application for the worker versioning sample."""

import asyncio
import logging
import uuid

from temporalio.client import Client

TASK_QUEUE = "worker-versioning"
DEPLOYMENT_NAME = "my-deployment"

logging.basicConfig(level=logging.INFO)


async def main() -> None:
client = await Client.connect("localhost:7233")

# Wait for v1 worker and set as current version
logging.info(
"Waiting for v1 worker to appear. Run `python worker_versioning/workerv1.py` in another terminal"
)
await wait_for_worker_and_make_current(client, "1.0")

# Start auto-upgrading and pinned workflows. Importantly, note that when we start the workflows,
# we are using a workflow type name which does *not* include the version number. We defined them
# with versioned names so we could show changes to the code, but here when the client invokes
# them, we're demonstrating that the client remains version-agnostic.
auto_upgrade_workflow_id = "worker-versioning-versioning-autoupgrade_" + str(
uuid.uuid4()
)
auto_upgrade_execution = await client.start_workflow(
"AutoUpgrading",
id=auto_upgrade_workflow_id,
task_queue=TASK_QUEUE,
)

pinned_workflow_id = "worker-versioning-versioning-pinned_" + str(uuid.uuid4())
pinned_execution = await client.start_workflow(
"Pinned",
id=pinned_workflow_id,
task_queue=TASK_QUEUE,
)

logging.info("Started auto-upgrading workflow: %s", auto_upgrade_execution.id)
logging.info("Started pinned workflow: %s", pinned_execution.id)

# Signal both workflows a few times to drive them
await advance_workflows(auto_upgrade_execution, pinned_execution)

# Now wait for the v1.1 worker to appear and become current
logging.info(
"Waiting for v1.1 worker to appear. Run `python worker_versioning/workerv1_1.py` in another terminal"
)
await wait_for_worker_and_make_current(client, "1.1")

# Once it has, we will continue to advance the workflows.
# The auto-upgrade workflow will now make progress on the new worker, while the pinned one will
# keep progressing on the old worker.
await advance_workflows(auto_upgrade_execution, pinned_execution)

# Finally we'll start the v2 worker, and again it'll become the new current version
logging.info(
"Waiting for v2 worker to appear. Run `python worker_versioning/workerv2.py` in another terminal"
)
await wait_for_worker_and_make_current(client, "2.0")

# Once it has we'll start one more new workflow, another pinned one, to demonstrate that new
# pinned workflows start on the current version.
pinned_workflow_2_id = "worker-versioning-versioning-pinned-2_" + str(uuid.uuid4())
pinned_execution_2 = await client.start_workflow(
"Pinned",
id=pinned_workflow_2_id,
task_queue=TASK_QUEUE,
)
logging.info("Started pinned workflow v2: %s", pinned_execution_2.id)

# Now we'll conclude all workflows. You should be able to see in your server UI that the pinned
# workflow always stayed on 1.0, while the auto-upgrading workflow migrated.
for handle in [auto_upgrade_execution, pinned_execution, pinned_execution_2]:
await handle.signal("do_next_signal", "conclude")
await handle.result()

logging.info("All workflows completed")


async def advance_workflows(auto_upgrade_execution, pinned_execution):
"""Signal both workflows a few times to drive them."""
for i in range(3):
await auto_upgrade_execution.signal("do_next_signal", "do-activity")
await pinned_execution.signal("do_next_signal", "some-signal")


async def wait_for_worker_and_make_current(client: Client, build_id: str) -> None:
import temporalio.api.workflowservice.v1 as wsv1
from temporalio.common import WorkerDeploymentVersion

target_version = WorkerDeploymentVersion(
deployment_name=DEPLOYMENT_NAME, build_id=build_id
)

while True:
try:
describe_request = wsv1.DescribeWorkerDeploymentRequest(
namespace=client.namespace,
deployment_name=DEPLOYMENT_NAME,
)
response = await client.workflow_service.describe_worker_deployment(
describe_request
)

for version_summary in response.worker_deployment_info.version_summaries:
if (
version_summary.deployment_version.deployment_name
== target_version.deployment_name
and version_summary.deployment_version.build_id
== target_version.build_id
):
break
else:
await asyncio.sleep(1)
continue

break

except Exception:
await asyncio.sleep(1)
continue

# Once the version is available, set it as current
set_request = wsv1.SetWorkerDeploymentCurrentVersionRequest(
namespace=client.namespace,
deployment_name=DEPLOYMENT_NAME,
build_id=target_version.build_id,
)
await client.workflow_service.set_worker_deployment_current_version(set_request)


if __name__ == "__main__":
asyncio.run(main())
116 changes: 0 additions & 116 deletions worker_versioning/example.py

This file was deleted.

40 changes: 40 additions & 0 deletions worker_versioning/workerv1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""Worker v1 for the worker versioning sample."""

import asyncio
import logging

from temporalio.client import Client
from temporalio.common import WorkerDeploymentVersion
from temporalio.worker import Worker, WorkerDeploymentConfig

from worker_versioning.activities import some_activity, some_incompatible_activity
from worker_versioning.app import DEPLOYMENT_NAME, TASK_QUEUE
from worker_versioning.workflows import AutoUpgradingWorkflowV1, PinnedWorkflowV1

logging.basicConfig(level=logging.INFO)


async def main() -> None:
"""Run worker v1."""
client = await Client.connect("localhost:7233")

# Create worker v1
worker = Worker(
client,
task_queue=TASK_QUEUE,
workflows=[AutoUpgradingWorkflowV1, PinnedWorkflowV1],
activities=[some_activity, some_incompatible_activity],
deployment_config=WorkerDeploymentConfig(
version=WorkerDeploymentVersion(
deployment_name=DEPLOYMENT_NAME, build_id="1.0"
),
use_worker_versioning=True,
),
)

logging.info("Starting worker v1 (build 1.0)")
await worker.run()


if __name__ == "__main__":
asyncio.run(main())
Loading
Loading