From 5fd2a0e6e752ef0368d0b78a9adeac8027c1547a Mon Sep 17 00:00:00 2001 From: Andy Staples Date: Fri, 5 Dec 2025 11:05:07 -0700 Subject: [PATCH 1/5] Add new_uuid method to client - Update entity messages to use UUIDs as requestIds --- durabletask/internal/helpers.py | 17 ++++++--- durabletask/task.py | 16 +++++++++ durabletask/worker.py | 20 +++++++++-- .../test_dts_orchestration_e2e.py | 36 +++++++++++++++++++ tests/durabletask/test_orchestration_e2e.py | 34 ++++++++++++++++++ 5 files changed, 116 insertions(+), 7 deletions(-) diff --git a/durabletask/internal/helpers.py b/durabletask/internal/helpers.py index ccd8558..88481fa 100644 --- a/durabletask/internal/helpers.py +++ b/durabletask/internal/helpers.py @@ -196,9 +196,14 @@ def new_schedule_task_action(id: int, name: str, encoded_input: Optional[str], )) -def new_call_entity_action(id: int, parent_instance_id: str, entity_id: EntityInstanceId, operation: str, encoded_input: Optional[str]): +def new_call_entity_action(id: int, + parent_instance_id: str, + entity_id: EntityInstanceId, + operation: str, + encoded_input: Optional[str], + request_id: str) -> pb.OrchestratorAction: return pb.OrchestratorAction(id=id, sendEntityMessage=pb.SendEntityMessageAction(entityOperationCalled=pb.EntityOperationCalledEvent( - requestId=f"{parent_instance_id}:{id}", + requestId=request_id, operation=operation, scheduledTime=None, input=get_string_value(encoded_input), @@ -208,9 +213,13 @@ def new_call_entity_action(id: int, parent_instance_id: str, entity_id: EntityIn ))) -def new_signal_entity_action(id: int, entity_id: EntityInstanceId, operation: str, encoded_input: Optional[str]): +def new_signal_entity_action(id: int, + entity_id: EntityInstanceId, + operation: str, + encoded_input: Optional[str], + request_id: str) -> pb.OrchestratorAction: return pb.OrchestratorAction(id=id, sendEntityMessage=pb.SendEntityMessageAction(entityOperationSignaled=pb.EntityOperationSignaledEvent( - requestId=f"{entity_id}:{id}", + requestId=request_id, operation=operation, scheduledTime=None, input=get_string_value(encoded_input), diff --git a/durabletask/task.py b/durabletask/task.py index 3570838..2f763bc 100644 --- a/durabletask/task.py +++ b/durabletask/task.py @@ -258,6 +258,22 @@ def continue_as_new(self, new_input: Any, *, save_events: bool = False) -> None: """ pass + @abstractmethod + def new_uuid(self) -> str: + """Create a new UUID that is safe for replay within an orchestration or operation. + + The default implementation of this method creates a name-based UUID + using the algorithm from RFC 4122 §4.3. The name input used to generate + this value is a combination of the orchestration instance ID and an + internally managed sequence number. + + Returns + ------- + str + New UUID that is safe for replay within an orchestration or operation. + """ + pass + @abstractmethod def _exit_critical_section(self) -> None: pass diff --git a/durabletask/worker.py b/durabletask/worker.py index fae345c..d9384aa 100644 --- a/durabletask/worker.py +++ b/durabletask/worker.py @@ -13,6 +13,7 @@ from types import GeneratorType from enum import Enum from typing import Any, Generator, Optional, Sequence, TypeVar, Union +import uuid from packaging.version import InvalidVersion, parse import grpc @@ -33,6 +34,7 @@ TInput = TypeVar("TInput") TOutput = TypeVar("TOutput") +DATETIME_STRING_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ' class ConcurrencyOptions: @@ -831,6 +833,7 @@ def __init__(self, instance_id: str, registry: _Registry): # Maps criticalSectionId to task ID self._entity_lock_id_map: dict[str, int] = {} self._sequence_number = 0 + self._new_uuid_counter = 0 self._current_utc_datetime = datetime(1000, 1, 1) self._instance_id = instance_id self._registry = registry @@ -1165,7 +1168,7 @@ def call_entity_function_helper( raise RuntimeError(error_message) encoded_input = shared.to_json(input) if input is not None else None - action = ph.new_call_entity_action(id, self.instance_id, entity_id, operation, encoded_input) + action = ph.new_call_entity_action(id, self.instance_id, entity_id, operation, encoded_input, self.new_uuid()) self._pending_actions[id] = action fn_task = task.CompletableTask() @@ -1188,7 +1191,7 @@ def signal_entity_function_helper( encoded_input = shared.to_json(input) if input is not None else None - action = ph.new_signal_entity_action(id, entity_id, operation, encoded_input) + action = ph.new_signal_entity_action(id, entity_id, operation, encoded_input, self.new_uuid()) self._pending_actions[id] = action def lock_entities_function_helper(self, id: int, entities: list[EntityInstanceId]) -> None: @@ -1199,7 +1202,7 @@ def lock_entities_function_helper(self, id: int, entities: list[EntityInstanceId if not transition_valid: raise RuntimeError(error_message) - critical_section_id = f"{self.instance_id}:{id:04x}" + critical_section_id = self.new_uuid() request, target = self._entity_context.emit_acquire_message(critical_section_id, entities) @@ -1251,6 +1254,17 @@ def continue_as_new(self, new_input, *, save_events: bool = False) -> None: self.set_continued_as_new(new_input, save_events) + def new_uuid(self) -> str: + URL_NAMESPACE: str = "9e952958-5e33-4daf-827f-2fa12937b875" + + uuid_name_value = \ + f"{self._instance_id}" \ + f"_{self.current_utc_datetime.strftime(DATETIME_STRING_FORMAT)}" \ + f"_{self._new_uuid_counter}" + self._new_uuid_counter += 1 + namespace_uuid = uuid.uuid5(uuid.NAMESPACE_OID, URL_NAMESPACE) + return str(uuid.uuid5(namespace_uuid, uuid_name_value)) + class ExecutionResults: actions: list[pb.OrchestratorAction] diff --git a/tests/durabletask-azuremanaged/test_dts_orchestration_e2e.py b/tests/durabletask-azuremanaged/test_dts_orchestration_e2e.py index 4a963fc..be7fab1 100644 --- a/tests/durabletask-azuremanaged/test_dts_orchestration_e2e.py +++ b/tests/durabletask-azuremanaged/test_dts_orchestration_e2e.py @@ -532,3 +532,39 @@ def empty_orchestrator(ctx: task.OrchestrationContext, _): assert state.serialized_input is None assert state.serialized_output is None assert state.serialized_custom_status == "\"foobaz\"" + + +def test_new_uuid(): + def noop(_: task.ActivityContext, _): + pass + + def empty_orchestrator(ctx: task.OrchestrationContext, _): + # Assert that two new_uuid calls return different values + results = [ctx.new_uuid(), ctx.new_uuid()] + yield ctx.call_activity("noop") + # Assert that new_uuid still returns a unique value after replay + results.append(ctx.new_uuid()) + return results + + # Start a worker, which will connect to the sidecar in a background thread + with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True, + taskhub=taskhub_name, token_credential=None) as w: + w.add_orchestrator(empty_orchestrator) + w.add_activity(noop) + w.start() + + c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True, + taskhub=taskhub_name, token_credential=None) + id = c.schedule_new_orchestration(empty_orchestrator) + state = c.wait_for_orchestration_completion(id, timeout=30) + + assert state is not None + assert state.name == task.get_name(empty_orchestrator) + assert state.instance_id == id + assert state.failure_details is None + assert state.runtime_status == client.OrchestrationStatus.COMPLETED + results = json.loads(state.serialized_output or "\"\"") + assert isinstance(results, list) and len(results) == 3 + assert results[0] != results[1] + assert results[0] != results[2] + assert results[1] != results[2] diff --git a/tests/durabletask/test_orchestration_e2e.py b/tests/durabletask/test_orchestration_e2e.py index 3db608d..39b7b8e 100644 --- a/tests/durabletask/test_orchestration_e2e.py +++ b/tests/durabletask/test_orchestration_e2e.py @@ -499,3 +499,37 @@ def empty_orchestrator(ctx: task.OrchestrationContext, _): assert state.serialized_input is None assert state.serialized_output is None assert state.serialized_custom_status == "\"foobaz\"" + + +def test_new_uuid(): + def noop(_: task.ActivityContext, _): + pass + + def empty_orchestrator(ctx: task.OrchestrationContext, _): + # Assert that two new_uuid calls return different values + results = [ctx.new_uuid(), ctx.new_uuid()] + yield ctx.call_activity("noop") + # Assert that new_uuid still returns a unique value after replay + results.append(ctx.new_uuid()) + return results + + # Start a worker, which will connect to the sidecar in a background thread + with worker.TaskHubGrpcWorker() as w: + w.add_orchestrator(empty_orchestrator) + w.add_activity(noop) + w.start() + + c = client.TaskHubGrpcClient() + id = c.schedule_new_orchestration(empty_orchestrator) + state = c.wait_for_orchestration_completion(id, timeout=30) + + assert state is not None + assert state.name == task.get_name(empty_orchestrator) + assert state.instance_id == id + assert state.failure_details is None + assert state.runtime_status == client.OrchestrationStatus.COMPLETED + results = json.loads(state.serialized_output or "\"\"") + assert isinstance(results, list) and len(results) == 3 + assert results[0] != results[1] + assert results[0] != results[2] + assert results[1] != results[2] From f7676f9290a1d7d8637207de164c6ac8b3ce73e9 Mon Sep 17 00:00:00 2001 From: Andy Staples Date: Fri, 5 Dec 2025 11:15:14 -0700 Subject: [PATCH 2/5] Linter fix --- tests/durabletask-azuremanaged/test_dts_orchestration_e2e.py | 2 +- tests/durabletask/test_orchestration_e2e.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/durabletask-azuremanaged/test_dts_orchestration_e2e.py b/tests/durabletask-azuremanaged/test_dts_orchestration_e2e.py index be7fab1..3d8f54e 100644 --- a/tests/durabletask-azuremanaged/test_dts_orchestration_e2e.py +++ b/tests/durabletask-azuremanaged/test_dts_orchestration_e2e.py @@ -535,7 +535,7 @@ def empty_orchestrator(ctx: task.OrchestrationContext, _): def test_new_uuid(): - def noop(_: task.ActivityContext, _): + def noop(_: task.ActivityContext, _1): pass def empty_orchestrator(ctx: task.OrchestrationContext, _): diff --git a/tests/durabletask/test_orchestration_e2e.py b/tests/durabletask/test_orchestration_e2e.py index 39b7b8e..9547933 100644 --- a/tests/durabletask/test_orchestration_e2e.py +++ b/tests/durabletask/test_orchestration_e2e.py @@ -502,7 +502,7 @@ def empty_orchestrator(ctx: task.OrchestrationContext, _): def test_new_uuid(): - def noop(_: task.ActivityContext, _): + def noop(_: task.ActivityContext, _1): pass def empty_orchestrator(ctx: task.OrchestrationContext, _): From 5bf0cd54336d0c42a8009a4b7c2b1e2ab97cbc97 Mon Sep 17 00:00:00 2001 From: andystaples <77818326+andystaples@users.noreply.github.com> Date: Fri, 5 Dec 2025 11:18:09 -0700 Subject: [PATCH 3/5] Update durabletask/worker.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- durabletask/worker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/durabletask/worker.py b/durabletask/worker.py index d9384aa..866e61e 100644 --- a/durabletask/worker.py +++ b/durabletask/worker.py @@ -1255,14 +1255,14 @@ def continue_as_new(self, new_input, *, save_events: bool = False) -> None: self.set_continued_as_new(new_input, save_events) def new_uuid(self) -> str: - URL_NAMESPACE: str = "9e952958-5e33-4daf-827f-2fa12937b875" + NAMESPACE_UUID: str = "9e952958-5e33-4daf-827f-2fa12937b875" uuid_name_value = \ f"{self._instance_id}" \ f"_{self.current_utc_datetime.strftime(DATETIME_STRING_FORMAT)}" \ f"_{self._new_uuid_counter}" self._new_uuid_counter += 1 - namespace_uuid = uuid.uuid5(uuid.NAMESPACE_OID, URL_NAMESPACE) + namespace_uuid = uuid.uuid5(uuid.NAMESPACE_OID, NAMESPACE_UUID) return str(uuid.uuid5(namespace_uuid, uuid_name_value)) From 5b539c2f5f996313ccacb4ab68fb1bd8f195f08b Mon Sep 17 00:00:00 2001 From: andystaples <77818326+andystaples@users.noreply.github.com> Date: Fri, 5 Dec 2025 11:18:17 -0700 Subject: [PATCH 4/5] Update durabletask/task.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- durabletask/task.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/durabletask/task.py b/durabletask/task.py index 2f763bc..053f70b 100644 --- a/durabletask/task.py +++ b/durabletask/task.py @@ -264,8 +264,8 @@ def new_uuid(self) -> str: The default implementation of this method creates a name-based UUID using the algorithm from RFC 4122 §4.3. The name input used to generate - this value is a combination of the orchestration instance ID and an - internally managed sequence number. + this value is a combination of the orchestration instance ID, the current UTC datetime, + and an internally managed counter. Returns ------- From f54a00e8a6bd58c65a5a1cabe5ec1882c2688742 Mon Sep 17 00:00:00 2001 From: Andy Staples Date: Fri, 5 Dec 2025 11:20:10 -0700 Subject: [PATCH 5/5] Check valid UUID in test --- .../durabletask-azuremanaged/test_dts_orchestration_e2e.py | 7 ++++--- tests/durabletask/test_orchestration_e2e.py | 7 ++++--- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/tests/durabletask-azuremanaged/test_dts_orchestration_e2e.py b/tests/durabletask-azuremanaged/test_dts_orchestration_e2e.py index 3d8f54e..7a7232e 100644 --- a/tests/durabletask-azuremanaged/test_dts_orchestration_e2e.py +++ b/tests/durabletask-azuremanaged/test_dts_orchestration_e2e.py @@ -5,6 +5,7 @@ import os import threading from datetime import timedelta +import uuid import pytest @@ -565,6 +566,6 @@ def empty_orchestrator(ctx: task.OrchestrationContext, _): assert state.runtime_status == client.OrchestrationStatus.COMPLETED results = json.loads(state.serialized_output or "\"\"") assert isinstance(results, list) and len(results) == 3 - assert results[0] != results[1] - assert results[0] != results[2] - assert results[1] != results[2] + assert uuid.UUID(results[0]) != uuid.UUID(results[1]) + assert uuid.UUID(results[0]) != uuid.UUID(results[2]) + assert uuid.UUID(results[1]) != uuid.UUID(results[2]) diff --git a/tests/durabletask/test_orchestration_e2e.py b/tests/durabletask/test_orchestration_e2e.py index 9547933..997bc50 100644 --- a/tests/durabletask/test_orchestration_e2e.py +++ b/tests/durabletask/test_orchestration_e2e.py @@ -5,6 +5,7 @@ import threading import time from datetime import timedelta +import uuid import pytest @@ -530,6 +531,6 @@ def empty_orchestrator(ctx: task.OrchestrationContext, _): assert state.runtime_status == client.OrchestrationStatus.COMPLETED results = json.loads(state.serialized_output or "\"\"") assert isinstance(results, list) and len(results) == 3 - assert results[0] != results[1] - assert results[0] != results[2] - assert results[1] != results[2] + assert uuid.UUID(results[0]) != uuid.UUID(results[1]) + assert uuid.UUID(results[0]) != uuid.UUID(results[2]) + assert uuid.UUID(results[1]) != uuid.UUID(results[2])