Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions resource_pool/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Resource Pool Sample

This sample shows how to use a long-lived `ResourcePoolWorkflow` to allocate `resources` to `ResourceUserWorkflows`.
Each `ResourceUserWorkflow` runs several activities while it has ownership of a resource. Note that
`ResourcePoolWorkflow` is making resource allocation decisions based on in-memory state.

Run the following from this directory to start the worker:

uv run worker.py

This will start the worker. Then, in another terminal, run the following to execute several `ResourceUserWorkflows`.

uv run starter.py

You should see output indicating that the `ResourcePoolWorkflow` serialized access to each resource.

You can query the set of current resource resource holders with:

tctl wf query -w resource_pool --qt get_current_holders

# Other approaches

There are simpler ways to manage concurrent access to resources. Consider using resource-specific workers/task queues,
and limiting the number of activity slots on the workers. The golang SDK also [sessions](https://docs.temporal.io/develop/go/sessions)
that allow workflows to pin themselves to workers.

The technique in this sample is capable of more complex resource allocation than the options above, but it doesn't scale
as well. Specifically, it can:
- Manage access to a set of resources that is decoupled from the set of workers and task queues
- Run arbitrary code to place workloads on resources as they become available

# Caveats

This sample uses true locking (not leasing!) to avoid complexity and scaling concerns associated with heartbeating via
signals. Locking carries a risk where failure to unlock permanently removing a resource from the pool. However, with
Temporal's durable execution guarantees, this can only happen if:

- A ResourceUserWorkflows times out (prohibited in the sample code)
- An operator terminates a ResourceUserWorkflows. (Temporal recommends canceling workflows instead of terminating them whenever possible.)
- You shut down your workers and never restart them (unhandled, but irrelevant)

If a leak were to happen, you could discover the identity of the leaker using the query above, then:

tctl wf signal -w resource_pool --name release_resource --input '{ "release_key": "<the key from the query above>" }

Performance: A single ResourcePoolWorkflow scales to tens, but not hundreds, of request/release events per second. It is
best suited for allocating resources to long-running workflows. Actual performance will depend on your temporal server's
persistence layer.
Empty file added resource_pool/__init__.py
Empty file.
2 changes: 2 additions & 0 deletions resource_pool/pool_client/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .resource_pool_client import ResourcePoolClient
from .resource_pool_workflow import ResourcePoolWorkflow
101 changes: 101 additions & 0 deletions resource_pool/pool_client/resource_pool_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
from contextlib import asynccontextmanager
from datetime import timedelta
from typing import AsyncGenerator, Optional

from temporalio import workflow

from resource_pool.pool_client.resource_pool_workflow import ResourcePoolWorkflow
from resource_pool.shared import (
AcquiredResource,
AcquireRequest,
AcquireResponse,
DetachedResource,
)


# Use this class in workflow code that that needs to run on locked resources.
class ResourcePoolClient:
def __init__(self, pool_workflow_id: str) -> None:
self.pool_workflow_id = pool_workflow_id
self.acquired_resources: list[AcquiredResource] = []

signal_name = f"assign_resource_{self.pool_workflow_id}"
if workflow.get_signal_handler(signal_name) is None:
workflow.set_signal_handler(signal_name, self._handle_acquire_response)
else:
raise RuntimeError(
f"{signal_name} already registered - if you use multiple ResourcePoolClients within the "
f"same workflow, they must use different pool_workflow_ids"
)

def _handle_acquire_response(self, response: AcquireResponse) -> None:
self.acquired_resources.append(
AcquiredResource(
resource=response.resource, release_key=response.release_key
)
)

async def _send_acquire_signal(self) -> None:
await workflow.get_external_workflow_handle_for(
ResourcePoolWorkflow.run, self.pool_workflow_id
).signal("acquire_resource", AcquireRequest(workflow.info().workflow_id))

async def _send_release_signal(self, acquired_resource: AcquiredResource) -> None:
await workflow.get_external_workflow_handle_for(
ResourcePoolWorkflow.run, self.pool_workflow_id
).signal(
"release_resource",
AcquireResponse(
resource=acquired_resource.resource,
release_key=acquired_resource.release_key,
),
)

@asynccontextmanager
async def acquire_resource(
self,
*,
reattach: Optional[DetachedResource] = None,
max_wait_time: timedelta = timedelta(minutes=5),
) -> AsyncGenerator[AcquiredResource, None]:
_warn_when_workflow_has_timeouts()

if reattach is None:
await self._send_acquire_signal()
await workflow.wait_condition(
lambda: len(self.acquired_resources) > 0, timeout=max_wait_time
)
resource = self.acquired_resources.pop(0)
else:
resource = AcquiredResource(
resource=reattach.resource, release_key=reattach.release_key
)

# Can't happen, but the typechecker doesn't know about workflow.wait_condition
if resource is None:
raise RuntimeError("resource was None when it can't be")

# During the yield, the calling workflow owns the resource. Note that this is a lock, not a lease! Our
# finally block will release the resource if an activity fails. This is why we asserted the lack of
# workflow-level timeouts above - the finally block wouldn't run if there was a timeout.
try:
yield resource
finally:
if not resource.detached:
await self._send_release_signal(resource)


def _warn_when_workflow_has_timeouts() -> None:
def has_timeout(timeout: Optional[timedelta]) -> bool:
# After continue_as_new, timeouts are 0, even if they were None before continue_as_new (and were not set in the
# continue_as_new call).
return timeout is not None and timeout > timedelta(0)

if has_timeout(workflow.info().run_timeout):
workflow.logger.warning(
f"ResourceLockingWorkflow cannot have a run_timeout (found {workflow.info().run_timeout}) - this will leak locks"
)
if has_timeout(workflow.info().execution_timeout):
workflow.logger.warning(
f"ResourceLockingWorkflow cannot have an execution_timeout (found {workflow.info().execution_timeout}) - this will leak locks"
)
149 changes: 149 additions & 0 deletions resource_pool/pool_client/resource_pool_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
from dataclasses import dataclass
from typing import Optional

from temporalio import workflow
from temporalio.exceptions import ApplicationError

from resource_pool.shared import AcquireRequest, AcquireResponse


# Internal to this workflow, we'll associate randomly generated release signal names with each acquire request.
@dataclass
class InternalAcquireRequest(AcquireRequest):
release_signal: Optional[str]


@dataclass
class ResourcePoolWorkflowInput:
# Key is resource, value is current holder of the resource (None if not held)
resources: dict[str, Optional[InternalAcquireRequest]]
waiters: list[InternalAcquireRequest]


@workflow.defn
class ResourcePoolWorkflow:
@workflow.init
def __init__(self, input: ResourcePoolWorkflowInput) -> None:
self.resources = input.resources
self.waiters = input.waiters
self.release_key_to_resource: dict[str, str] = {}

for resource, holder in self.resources.items():
if holder is not None and holder.release_signal is not None:
self.release_key_to_resource[holder.release_signal] = resource

@workflow.signal
async def add_resources(self, resources: list[str]) -> None:
for resource in resources:
if resource in self.resources:
workflow.logger.warning(
f"Ignoring attempt to add already-existing resource: {resource}"
)
else:
self.resources[resource] = None

@workflow.signal
async def acquire_resource(self, request: AcquireRequest) -> None:
self.waiters.append(
InternalAcquireRequest(workflow_id=request.workflow_id, release_signal=None)
)
workflow.logger.info(
f"workflow_id={request.workflow_id} is waiting for a resource"
)

@workflow.signal
async def release_resource(self, acquire_response: AcquireResponse) -> None:
release_key = acquire_response.release_key
resource = self.release_key_to_resource.get(release_key)
if resource is None:
workflow.logger.warning(f"Ignoring unknown release_key: {release_key}")
return

holder = self.resources[resource]
if holder is None:
workflow.logger.warning(
f"Ignoring request to release resource that is not held: {resource}"
)
return

# Remove the current holder
workflow.logger.info(
f"workflow_id={holder.workflow_id} released resource {resource}"
)
self.resources[resource] = None
del self.release_key_to_resource[release_key]

@workflow.query
def get_current_holders(self) -> dict[str, Optional[InternalAcquireRequest]]:
return self.resources

async def assign_resource(
self, resource: str, internal_request: InternalAcquireRequest
) -> None:
workflow.logger.info(
f"workflow_id={internal_request.workflow_id} acquired resource {resource}"
)

requester = workflow.get_external_workflow_handle(internal_request.workflow_id)
try:
release_signal = str(workflow.uuid4())
await requester.signal(
f"assign_resource_{workflow.info().workflow_id}",
AcquireResponse(release_key=release_signal, resource=resource),
)

internal_request.release_signal = release_signal
self.resources[resource] = internal_request
self.release_key_to_resource[release_signal] = resource
except ApplicationError as e:
if e.type == "ExternalWorkflowExecutionNotFound":
workflow.logger.info(
f"Could not assign resource {resource} to {internal_request.workflow_id}: {e.message}"
)
else:
raise e

async def assign_next_resource(self) -> bool:
if len(self.waiters) == 0:
return False

next_free_resource = self.get_free_resource()
if next_free_resource is None:
return False

next_waiter = self.waiters.pop(0)
await self.assign_resource(next_free_resource, next_waiter)
return True

def get_free_resource(self) -> Optional[str]:
return next(
(resource for resource, holder in self.resources.items() if holder is None),
None,
)

def can_assign_resource(self) -> bool:
return len(self.waiters) > 0 and self.get_free_resource() is not None

def should_continue_as_new(self) -> bool:
return (
workflow.info().is_continue_as_new_suggested()
and workflow.all_handlers_finished()
)

@workflow.run
async def run(self, _: ResourcePoolWorkflowInput) -> None:
while True:
await workflow.wait_condition(
lambda: self.can_assign_resource() or self.should_continue_as_new()
)

if await self.assign_next_resource():
continue

if self.should_continue_as_new():
workflow.continue_as_new(
ResourcePoolWorkflowInput(
resources=self.resources,
waiters=self.waiters,
)
)
88 changes: 88 additions & 0 deletions resource_pool/resource_user_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import asyncio
from dataclasses import dataclass, field
from datetime import timedelta
from typing import Optional

from temporalio import activity, workflow

from resource_pool.pool_client import ResourcePoolClient
from resource_pool.shared import DetachedResource


@dataclass
class UseResourceActivityInput:
resource: str
iteration: str


@activity.defn
async def use_resource(input: UseResourceActivityInput) -> None:
info = activity.info()
activity.logger.info(
f"{info.workflow_id} starts using {input.resource} the {input.iteration} time"
)
await asyncio.sleep(3)
activity.logger.info(
f"{info.workflow_id} done using {input.resource} the {input.iteration} time"
)


@dataclass
class ResourceUserWorkflowInput:
# The id of the resource pool workflow to request a resource from
resource_pool_workflow_id: str

# If set, this workflow will fail after the "first" or "second" activity.
iteration_to_fail_after: Optional[str]

# If True, this workflow will continue as new after the last activity. The next iteration will run more activities,
# but will not continue as new.
should_continue_as_new: bool

# Used to transfer resource ownership between iterations during continue_as_new
already_acquired_resource: Optional[DetachedResource] = field(default=None)


class FailWorkflowException(Exception):
pass


# Wait this long for a resource before giving up
MAX_RESOURCE_WAIT_TIME = timedelta(minutes=5)


@workflow.defn(failure_exception_types=[FailWorkflowException])
class ResourceUserWorkflow:
@workflow.run
async def run(self, input: ResourceUserWorkflowInput) -> None:
pool_client = ResourcePoolClient(input.resource_pool_workflow_id)

async with pool_client.acquire_resource(
reattach=input.already_acquired_resource
) as acquired_resource:
for iteration in ["first", "second"]:
await workflow.execute_activity(
use_resource,
UseResourceActivityInput(acquired_resource.resource, iteration),
start_to_close_timeout=timedelta(seconds=10),
)

if iteration == input.iteration_to_fail_after:
workflow.logger.info(
f"Failing after iteration {input.iteration_to_fail_after}"
)
raise FailWorkflowException()

# This workflow only continues as new so it can demonstrate how to pass acquired resources across
# iterations. Ordinarily, such a short workflow would not use continue as new.
if input.should_continue_as_new:
detached_resource = acquired_resource.detach()

next_input = ResourceUserWorkflowInput(
resource_pool_workflow_id=input.resource_pool_workflow_id,
iteration_to_fail_after=input.iteration_to_fail_after,
should_continue_as_new=False,
already_acquired_resource=detached_resource,
)

workflow.continue_as_new(next_input)
Loading
Loading