diff --git a/resource_pool/README.md b/resource_pool/README.md new file mode 100644 index 00000000..143de0f3 --- /dev/null +++ b/resource_pool/README.md @@ -0,0 +1,48 @@ +# Resource Pool Sample + +This sample shows how to use a long-lived `ResourcePoolWorkflow` to allocate `resources` to `ResourceUserWorkflows`. +Each `ResourceUserWorkflow` runs several activities while it has ownership of a resource. Note that +`ResourcePoolWorkflow` is making resource allocation decisions based on in-memory state. + +Run the following from this directory to start the worker: + + uv run worker.py + +This will start the worker. Then, in another terminal, run the following to execute several `ResourceUserWorkflows`. + + uv run starter.py + +You should see output indicating that the `ResourcePoolWorkflow` serialized access to each resource. + +You can query the set of current resource resource holders with: + + tctl wf query -w resource_pool --qt get_current_holders + +# Other approaches + +There are simpler ways to manage concurrent access to resources. Consider using resource-specific workers/task queues, +and limiting the number of activity slots on the workers. The golang SDK also [sessions](https://docs.temporal.io/develop/go/sessions) +that allow workflows to pin themselves to workers. + +The technique in this sample is capable of more complex resource allocation than the options above, but it doesn't scale +as well. Specifically, it can: +- Manage access to a set of resources that is decoupled from the set of workers and task queues +- Run arbitrary code to place workloads on resources as they become available + +# Caveats + +This sample uses true locking (not leasing!) to avoid complexity and scaling concerns associated with heartbeating via +signals. Locking carries a risk where failure to unlock permanently removing a resource from the pool. However, with +Temporal's durable execution guarantees, this can only happen if: + +- A ResourceUserWorkflows times out (prohibited in the sample code) +- An operator terminates a ResourceUserWorkflows. (Temporal recommends canceling workflows instead of terminating them whenever possible.) +- You shut down your workers and never restart them (unhandled, but irrelevant) + +If a leak were to happen, you could discover the identity of the leaker using the query above, then: + + tctl wf signal -w resource_pool --name release_resource --input '{ "release_key": "" } + +Performance: A single ResourcePoolWorkflow scales to tens, but not hundreds, of request/release events per second. It is +best suited for allocating resources to long-running workflows. Actual performance will depend on your temporal server's +persistence layer. \ No newline at end of file diff --git a/resource_pool/__init__.py b/resource_pool/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/resource_pool/pool_client/__init__.py b/resource_pool/pool_client/__init__.py new file mode 100644 index 00000000..b8471d8a --- /dev/null +++ b/resource_pool/pool_client/__init__.py @@ -0,0 +1,2 @@ +from .resource_pool_client import ResourcePoolClient +from .resource_pool_workflow import ResourcePoolWorkflow diff --git a/resource_pool/pool_client/resource_pool_client.py b/resource_pool/pool_client/resource_pool_client.py new file mode 100644 index 00000000..b7183afa --- /dev/null +++ b/resource_pool/pool_client/resource_pool_client.py @@ -0,0 +1,101 @@ +from contextlib import asynccontextmanager +from datetime import timedelta +from typing import AsyncGenerator, Optional + +from temporalio import workflow + +from resource_pool.pool_client.resource_pool_workflow import ResourcePoolWorkflow +from resource_pool.shared import ( + AcquiredResource, + AcquireRequest, + AcquireResponse, + DetachedResource, +) + + +# Use this class in workflow code that that needs to run on locked resources. +class ResourcePoolClient: + def __init__(self, pool_workflow_id: str) -> None: + self.pool_workflow_id = pool_workflow_id + self.acquired_resources: list[AcquiredResource] = [] + + signal_name = f"assign_resource_{self.pool_workflow_id}" + if workflow.get_signal_handler(signal_name) is None: + workflow.set_signal_handler(signal_name, self._handle_acquire_response) + else: + raise RuntimeError( + f"{signal_name} already registered - if you use multiple ResourcePoolClients within the " + f"same workflow, they must use different pool_workflow_ids" + ) + + def _handle_acquire_response(self, response: AcquireResponse) -> None: + self.acquired_resources.append( + AcquiredResource( + resource=response.resource, release_key=response.release_key + ) + ) + + async def _send_acquire_signal(self) -> None: + await workflow.get_external_workflow_handle_for( + ResourcePoolWorkflow.run, self.pool_workflow_id + ).signal("acquire_resource", AcquireRequest(workflow.info().workflow_id)) + + async def _send_release_signal(self, acquired_resource: AcquiredResource) -> None: + await workflow.get_external_workflow_handle_for( + ResourcePoolWorkflow.run, self.pool_workflow_id + ).signal( + "release_resource", + AcquireResponse( + resource=acquired_resource.resource, + release_key=acquired_resource.release_key, + ), + ) + + @asynccontextmanager + async def acquire_resource( + self, + *, + reattach: Optional[DetachedResource] = None, + max_wait_time: timedelta = timedelta(minutes=5), + ) -> AsyncGenerator[AcquiredResource, None]: + _warn_when_workflow_has_timeouts() + + if reattach is None: + await self._send_acquire_signal() + await workflow.wait_condition( + lambda: len(self.acquired_resources) > 0, timeout=max_wait_time + ) + resource = self.acquired_resources.pop(0) + else: + resource = AcquiredResource( + resource=reattach.resource, release_key=reattach.release_key + ) + + # Can't happen, but the typechecker doesn't know about workflow.wait_condition + if resource is None: + raise RuntimeError("resource was None when it can't be") + + # During the yield, the calling workflow owns the resource. Note that this is a lock, not a lease! Our + # finally block will release the resource if an activity fails. This is why we asserted the lack of + # workflow-level timeouts above - the finally block wouldn't run if there was a timeout. + try: + yield resource + finally: + if not resource.detached: + await self._send_release_signal(resource) + + +def _warn_when_workflow_has_timeouts() -> None: + def has_timeout(timeout: Optional[timedelta]) -> bool: + # After continue_as_new, timeouts are 0, even if they were None before continue_as_new (and were not set in the + # continue_as_new call). + return timeout is not None and timeout > timedelta(0) + + if has_timeout(workflow.info().run_timeout): + workflow.logger.warning( + f"ResourceLockingWorkflow cannot have a run_timeout (found {workflow.info().run_timeout}) - this will leak locks" + ) + if has_timeout(workflow.info().execution_timeout): + workflow.logger.warning( + f"ResourceLockingWorkflow cannot have an execution_timeout (found {workflow.info().execution_timeout}) - this will leak locks" + ) diff --git a/resource_pool/pool_client/resource_pool_workflow.py b/resource_pool/pool_client/resource_pool_workflow.py new file mode 100644 index 00000000..2321f82d --- /dev/null +++ b/resource_pool/pool_client/resource_pool_workflow.py @@ -0,0 +1,149 @@ +from dataclasses import dataclass +from typing import Optional + +from temporalio import workflow +from temporalio.exceptions import ApplicationError + +from resource_pool.shared import AcquireRequest, AcquireResponse + + +# Internal to this workflow, we'll associate randomly generated release signal names with each acquire request. +@dataclass +class InternalAcquireRequest(AcquireRequest): + release_signal: Optional[str] + + +@dataclass +class ResourcePoolWorkflowInput: + # Key is resource, value is current holder of the resource (None if not held) + resources: dict[str, Optional[InternalAcquireRequest]] + waiters: list[InternalAcquireRequest] + + +@workflow.defn +class ResourcePoolWorkflow: + @workflow.init + def __init__(self, input: ResourcePoolWorkflowInput) -> None: + self.resources = input.resources + self.waiters = input.waiters + self.release_key_to_resource: dict[str, str] = {} + + for resource, holder in self.resources.items(): + if holder is not None and holder.release_signal is not None: + self.release_key_to_resource[holder.release_signal] = resource + + @workflow.signal + async def add_resources(self, resources: list[str]) -> None: + for resource in resources: + if resource in self.resources: + workflow.logger.warning( + f"Ignoring attempt to add already-existing resource: {resource}" + ) + else: + self.resources[resource] = None + + @workflow.signal + async def acquire_resource(self, request: AcquireRequest) -> None: + self.waiters.append( + InternalAcquireRequest(workflow_id=request.workflow_id, release_signal=None) + ) + workflow.logger.info( + f"workflow_id={request.workflow_id} is waiting for a resource" + ) + + @workflow.signal + async def release_resource(self, acquire_response: AcquireResponse) -> None: + release_key = acquire_response.release_key + resource = self.release_key_to_resource.get(release_key) + if resource is None: + workflow.logger.warning(f"Ignoring unknown release_key: {release_key}") + return + + holder = self.resources[resource] + if holder is None: + workflow.logger.warning( + f"Ignoring request to release resource that is not held: {resource}" + ) + return + + # Remove the current holder + workflow.logger.info( + f"workflow_id={holder.workflow_id} released resource {resource}" + ) + self.resources[resource] = None + del self.release_key_to_resource[release_key] + + @workflow.query + def get_current_holders(self) -> dict[str, Optional[InternalAcquireRequest]]: + return self.resources + + async def assign_resource( + self, resource: str, internal_request: InternalAcquireRequest + ) -> None: + workflow.logger.info( + f"workflow_id={internal_request.workflow_id} acquired resource {resource}" + ) + + requester = workflow.get_external_workflow_handle(internal_request.workflow_id) + try: + release_signal = str(workflow.uuid4()) + await requester.signal( + f"assign_resource_{workflow.info().workflow_id}", + AcquireResponse(release_key=release_signal, resource=resource), + ) + + internal_request.release_signal = release_signal + self.resources[resource] = internal_request + self.release_key_to_resource[release_signal] = resource + except ApplicationError as e: + if e.type == "ExternalWorkflowExecutionNotFound": + workflow.logger.info( + f"Could not assign resource {resource} to {internal_request.workflow_id}: {e.message}" + ) + else: + raise e + + async def assign_next_resource(self) -> bool: + if len(self.waiters) == 0: + return False + + next_free_resource = self.get_free_resource() + if next_free_resource is None: + return False + + next_waiter = self.waiters.pop(0) + await self.assign_resource(next_free_resource, next_waiter) + return True + + def get_free_resource(self) -> Optional[str]: + return next( + (resource for resource, holder in self.resources.items() if holder is None), + None, + ) + + def can_assign_resource(self) -> bool: + return len(self.waiters) > 0 and self.get_free_resource() is not None + + def should_continue_as_new(self) -> bool: + return ( + workflow.info().is_continue_as_new_suggested() + and workflow.all_handlers_finished() + ) + + @workflow.run + async def run(self, _: ResourcePoolWorkflowInput) -> None: + while True: + await workflow.wait_condition( + lambda: self.can_assign_resource() or self.should_continue_as_new() + ) + + if await self.assign_next_resource(): + continue + + if self.should_continue_as_new(): + workflow.continue_as_new( + ResourcePoolWorkflowInput( + resources=self.resources, + waiters=self.waiters, + ) + ) diff --git a/resource_pool/resource_user_workflow.py b/resource_pool/resource_user_workflow.py new file mode 100644 index 00000000..80ee7fba --- /dev/null +++ b/resource_pool/resource_user_workflow.py @@ -0,0 +1,88 @@ +import asyncio +from dataclasses import dataclass, field +from datetime import timedelta +from typing import Optional + +from temporalio import activity, workflow + +from resource_pool.pool_client import ResourcePoolClient +from resource_pool.shared import DetachedResource + + +@dataclass +class UseResourceActivityInput: + resource: str + iteration: str + + +@activity.defn +async def use_resource(input: UseResourceActivityInput) -> None: + info = activity.info() + activity.logger.info( + f"{info.workflow_id} starts using {input.resource} the {input.iteration} time" + ) + await asyncio.sleep(3) + activity.logger.info( + f"{info.workflow_id} done using {input.resource} the {input.iteration} time" + ) + + +@dataclass +class ResourceUserWorkflowInput: + # The id of the resource pool workflow to request a resource from + resource_pool_workflow_id: str + + # If set, this workflow will fail after the "first" or "second" activity. + iteration_to_fail_after: Optional[str] + + # If True, this workflow will continue as new after the last activity. The next iteration will run more activities, + # but will not continue as new. + should_continue_as_new: bool + + # Used to transfer resource ownership between iterations during continue_as_new + already_acquired_resource: Optional[DetachedResource] = field(default=None) + + +class FailWorkflowException(Exception): + pass + + +# Wait this long for a resource before giving up +MAX_RESOURCE_WAIT_TIME = timedelta(minutes=5) + + +@workflow.defn(failure_exception_types=[FailWorkflowException]) +class ResourceUserWorkflow: + @workflow.run + async def run(self, input: ResourceUserWorkflowInput) -> None: + pool_client = ResourcePoolClient(input.resource_pool_workflow_id) + + async with pool_client.acquire_resource( + reattach=input.already_acquired_resource + ) as acquired_resource: + for iteration in ["first", "second"]: + await workflow.execute_activity( + use_resource, + UseResourceActivityInput(acquired_resource.resource, iteration), + start_to_close_timeout=timedelta(seconds=10), + ) + + if iteration == input.iteration_to_fail_after: + workflow.logger.info( + f"Failing after iteration {input.iteration_to_fail_after}" + ) + raise FailWorkflowException() + + # This workflow only continues as new so it can demonstrate how to pass acquired resources across + # iterations. Ordinarily, such a short workflow would not use continue as new. + if input.should_continue_as_new: + detached_resource = acquired_resource.detach() + + next_input = ResourceUserWorkflowInput( + resource_pool_workflow_id=input.resource_pool_workflow_id, + iteration_to_fail_after=input.iteration_to_fail_after, + should_continue_as_new=False, + already_acquired_resource=detached_resource, + ) + + workflow.continue_as_new(next_input) diff --git a/resource_pool/shared.py b/resource_pool/shared.py new file mode 100644 index 00000000..3930bb72 --- /dev/null +++ b/resource_pool/shared.py @@ -0,0 +1,31 @@ +from dataclasses import dataclass, field + +RESOURCE_POOL_WORKFLOW_ID = "resource_pool" + + +@dataclass +class AcquireRequest: + workflow_id: str + + +@dataclass +class AcquireResponse: + release_key: str + resource: str + + +@dataclass +class DetachedResource: + resource: str + release_key: str + + +@dataclass +class AcquiredResource: + resource: str + release_key: str + detached: bool = field(default=False) + + def detach(self) -> DetachedResource: + self.detached = True + return DetachedResource(resource=self.resource, release_key=self.release_key) diff --git a/resource_pool/starter.py b/resource_pool/starter.py new file mode 100644 index 00000000..2ae1ab44 --- /dev/null +++ b/resource_pool/starter.py @@ -0,0 +1,66 @@ +import asyncio +from typing import Any + +from temporalio.client import Client, WorkflowFailureError, WorkflowHandle +from temporalio.common import WorkflowIDConflictPolicy + +from resource_pool.pool_client.resource_pool_workflow import ( + ResourcePoolWorkflow, + ResourcePoolWorkflowInput, +) +from resource_pool.resource_user_workflow import ( + ResourceUserWorkflow, + ResourceUserWorkflowInput, +) +from resource_pool.shared import RESOURCE_POOL_WORKFLOW_ID + + +async def main() -> None: + # Connect client + client = await Client.connect("localhost:7233") + + # Initialize the resource pool + resource_pool_handle = await client.start_workflow( + workflow=ResourcePoolWorkflow.run, + arg=ResourcePoolWorkflowInput( + resources={"resource_a": None, "resource_b": None}, + waiters=[], + ), + id=RESOURCE_POOL_WORKFLOW_ID, + task_queue="resource_pool-task-queue", + id_conflict_policy=WorkflowIDConflictPolicy.USE_EXISTING, + ) + + # Start the ResourceUserWorkflows + resource_user_handles: list[WorkflowHandle[Any, Any]] = [] + for i in range(0, 4): + input = ResourceUserWorkflowInput( + resource_pool_workflow_id=RESOURCE_POOL_WORKFLOW_ID, + iteration_to_fail_after=None, + should_continue_as_new=False, + ) + if i == 0: + input.should_continue_as_new = True + if i == 1: + input.iteration_to_fail_after = "first" + + handle = await client.start_workflow( + workflow=ResourceUserWorkflow.run, + arg=input, + id=f"resource-user-workflow-{i}", + task_queue="resource_pool-task-queue", + ) + resource_user_handles.append(handle) + + for handle in resource_user_handles: + try: + await handle.result() + except WorkflowFailureError: + pass + + # Clean up after ourselves. In the real world, the resource pool workflow would run forever. + await resource_pool_handle.terminate() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/resource_pool/worker.py b/resource_pool/worker.py new file mode 100644 index 00000000..cb3a06dd --- /dev/null +++ b/resource_pool/worker.py @@ -0,0 +1,31 @@ +import asyncio +import logging + +from temporalio.client import Client +from temporalio.worker import Worker + +from resource_pool.pool_client.resource_pool_workflow import ResourcePoolWorkflow +from resource_pool.resource_user_workflow import ResourceUserWorkflow, use_resource + + +async def main() -> None: + logging.basicConfig(level=logging.INFO) + + # Start client + client = await Client.connect("localhost:7233") + + # Run a worker for the workflow + worker = Worker( + client, + task_queue="resource_pool-task-queue", + workflows=[ResourcePoolWorkflow, ResourceUserWorkflow], + activities=[ + use_resource, + ], + ) + + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/tests/resource_pool/__init__.py b/tests/resource_pool/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/resource_pool/workflow_test.py b/tests/resource_pool/workflow_test.py new file mode 100644 index 00000000..42a6fbde --- /dev/null +++ b/tests/resource_pool/workflow_test.py @@ -0,0 +1,120 @@ +import asyncio +from collections import defaultdict +from typing import Any, Optional, Sequence + +from temporalio import activity +from temporalio.client import Client, WorkflowFailureError, WorkflowHandle +from temporalio.common import WorkflowIDConflictPolicy +from temporalio.worker import Worker + +from resource_pool.pool_client.resource_pool_workflow import ( + ResourcePoolWorkflow, + ResourcePoolWorkflowInput, +) +from resource_pool.resource_user_workflow import ( + ResourceUserWorkflow, + ResourceUserWorkflowInput, + UseResourceActivityInput, +) +from resource_pool.shared import RESOURCE_POOL_WORKFLOW_ID + +TASK_QUEUE = "resource_pool-task-queue" + + +async def test_resource_pool_workflow(client: Client): + # key is resource, value is a description of resource usage + resource_usage: defaultdict[str, list[Sequence[str]]] = defaultdict(list) + + # Mock out the activity to count executions + @activity.defn(name="use_resource") + async def use_resource_mock(input: UseResourceActivityInput) -> None: + workflow_id = activity.info().workflow_id + resource_usage[input.resource].append((workflow_id, "start")) + # We need a small sleep here to bait out races + await asyncio.sleep(0.05) + resource_usage[input.resource].append((workflow_id, "end")) + + async with Worker( + client, + task_queue=TASK_QUEUE, + workflows=[ResourcePoolWorkflow, ResourceUserWorkflow], + activities=[use_resource_mock], + ): + await run_all_workflows(client) + + # Did any workflow run in more than one place? + workflow_id_to_resource: dict[str, str] = {} + for resource, events in resource_usage.items(): + for workflow_id, event in events: + if workflow_id in workflow_id_to_resource: + existing_resource = workflow_id_to_resource[workflow_id] + assert ( + existing_resource == resource + ), f"{workflow_id} ran on both {resource} and {existing_resource}" + else: + workflow_id_to_resource[workflow_id] = resource + + # Did any resource have more than one workflow on it at a time? + for resource, events in resource_usage.items(): + holder: Optional[str] = None + for workflow_id, event in events: + if event == "start": + assert ( + holder is None + ), f"{workflow_id} started on {resource} held by {holder}" + holder = workflow_id + else: + assert ( + holder == workflow_id + ), f"{workflow_id} ended on {resource} held by {holder}" + holder = None + + # Are all the resources free, per the query? + handle: WorkflowHandle[ + ResourcePoolWorkflow, None + ] = client.get_workflow_handle_for( + ResourcePoolWorkflow.run, RESOURCE_POOL_WORKFLOW_ID + ) + query_result = await handle.query(ResourcePoolWorkflow.get_current_holders) + assert query_result == {"r_a": None, "r_b": None, "r_c": None} + + +async def run_all_workflows(client: Client): + resource_pool_handle = await client.start_workflow( + workflow=ResourcePoolWorkflow.run, + arg=ResourcePoolWorkflowInput( + resources={"r_a": None, "r_b": None, "r_c": None}, + waiters=[], + ), + id=RESOURCE_POOL_WORKFLOW_ID, + task_queue=TASK_QUEUE, + id_conflict_policy=WorkflowIDConflictPolicy.USE_EXISTING, + ) + + resource_user_handles: list[WorkflowHandle[Any, Any]] = [] + for i in range(0, 8): + input = ResourceUserWorkflowInput( + resource_pool_workflow_id=RESOURCE_POOL_WORKFLOW_ID, + iteration_to_fail_after=None, + should_continue_as_new=False, + ) + if i == 0: + input.should_continue_as_new = True + if i == 1: + input.iteration_to_fail_after = "first" + + handle = await client.start_workflow( + workflow=ResourceUserWorkflow.run, + arg=input, + id=f"resource-user-workflow-{i}", + task_queue=TASK_QUEUE, + ) + resource_user_handles.append(handle) + + for handle in resource_user_handles: + try: + await handle.result() + except WorkflowFailureError: + pass + + await resource_pool_handle.terminate()