Skip to content
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ Some examples require extra dependencies. See each sample's directory for specif
* [hello_signal](hello/hello_signal.py) - Send signals to a workflow.
<!-- Keep this list in alphabetical order -->
* [activity_worker](activity_worker) - Use Python activities from a workflow in another language.
* [batch_sliding_window](batch_sliding_window) - Batch processing with a sliding window of child workflows.
* [bedrock](bedrock) - Orchestrate a chatbot with Amazon Bedrock.
* [cloud_export_to_parquet](cloud_export_to_parquet) - Set up schedule workflow to process exported files on an hourly basis
* [context_propagation](context_propagation) - Context propagation through workflows/activities via interceptor.
Expand Down
21 changes: 21 additions & 0 deletions batch_sliding_window/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Batch Sliding Window

This sample demonstrates a batch processing workflow that maintains a sliding window of record processing workflows.

A `SlidingWindowWorkflow` starts a configured number (sliding window size) of `RecordProcessorWorkflow` children in parallel. Each child processes a single record. When a child completes, a new child is started.

The `SlidingWindowWorkflow` calls continue-as-new after starting a preconfigured number of children to keep its history size bounded. A `RecordProcessorWorkflow` reports its completion through a signal to its parent, which allows notification of a parent that called continue-as-new.

A single instance of `SlidingWindowWorkflow` has limited window size and throughput. To support larger window size and overall throughput, multiple instances of `SlidingWindowWorkflow` run in parallel.

### Running This Sample

To run, first see [README.md](../README.md) for prerequisites. Then, run the following from root directory to start the worker:

uv run batch_sliding_window/worker.py

This will start the worker. Then, in another terminal, run the following to execute the workflow:

uv run batch_sliding_window/starter.py

The workflow will process 90 records using a sliding window of 10 parallel workers across 3 partitions, with a page size of 5 records per continue-as-new iteration.
40 changes: 40 additions & 0 deletions batch_sliding_window/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""Sliding Window Batch Processing Sample.

This sample demonstrates a batch processing workflow that maintains a sliding window
of record processing workflows. It includes:

- ProcessBatchWorkflow: Main workflow that partitions work across multiple sliding windows
- SlidingWindowWorkflow: Implements the sliding window pattern with continue-as-new
- RecordProcessorWorkflow: Processes individual records
- RecordLoader: Activity for loading records from external sources
"""

from batch_sliding_window.batch_workflow import (
ProcessBatchWorkflow,
ProcessBatchWorkflowInput,
)
from batch_sliding_window.record_loader_activity import (
GetRecordsInput,
GetRecordsOutput,
RecordLoader,
SingleRecord,
)
from batch_sliding_window.record_processor_workflow import RecordProcessorWorkflow
from batch_sliding_window.sliding_window_workflow import (
SlidingWindowState,
SlidingWindowWorkflow,
SlidingWindowWorkflowInput,
)

__all__ = [
"ProcessBatchWorkflow",
"ProcessBatchWorkflowInput",
"SlidingWindowWorkflow",
"SlidingWindowWorkflowInput",
"SlidingWindowState",
"RecordProcessorWorkflow",
"RecordLoader",
"GetRecordsInput",
"GetRecordsOutput",
"SingleRecord",
]
110 changes: 110 additions & 0 deletions batch_sliding_window/batch_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import asyncio
from dataclasses import dataclass
from datetime import timedelta
from typing import List

from temporalio import workflow
from temporalio.common import WorkflowIDReusePolicy
from temporalio.exceptions import ApplicationError

from batch_sliding_window.record_loader_activity import RecordLoader
from batch_sliding_window.sliding_window_workflow import (
SlidingWindowWorkflow,
SlidingWindowWorkflowInput,
)


@dataclass
class ProcessBatchWorkflowInput:
"""Input for the ProcessBatchWorkflow.

A single input structure is preferred to multiple workflow arguments
to simplify backward compatible API changes.
"""

page_size: int # Number of children started by a single sliding window workflow run
sliding_window_size: int # Maximum number of children to run in parallel
partitions: int # How many sliding windows to run in parallel


@workflow.defn
class ProcessBatchWorkflow:
"""Sample workflow that partitions the data set into continuous ranges.

A real application can choose any other way to divide the records
into multiple collections.
"""

@workflow.run
async def run(self, input: ProcessBatchWorkflowInput) -> int:
# Get total record count
record_count: int = await workflow.execute_activity_method(
RecordLoader.get_record_count,
start_to_close_timeout=timedelta(seconds=5),
)

if input.sliding_window_size < input.partitions:
raise ApplicationError(
"SlidingWindowSize cannot be less than number of partitions"
)

partitions = self._divide_into_partitions(record_count, input.partitions)
window_sizes = self._divide_into_partitions(
input.sliding_window_size, input.partitions
)

workflow.logger.info(
f"ProcessBatchWorkflow started",
extra={
"input": input,
"record_count": record_count,
"partitions": partitions,
"window_sizes": window_sizes,
},
)

# Start child workflows for each partition
tasks = []
offset = 0

for i in range(input.partitions):
# Make child id more user-friendly
child_id = f"{workflow.info().workflow_id}/{i}"

# Define partition boundaries
maximum_partition_offset = offset + partitions[i]
if maximum_partition_offset > record_count:
maximum_partition_offset = record_count

child_input = SlidingWindowWorkflowInput(
page_size=input.page_size,
sliding_window_size=window_sizes[i],
offset=offset, # inclusive
maximum_offset=maximum_partition_offset, # exclusive
progress=0,
current_records=None,
)

task = workflow.execute_child_workflow(
SlidingWindowWorkflow.run,
child_input,
id=child_id,
id_reuse_policy=WorkflowIDReusePolicy.ALLOW_DUPLICATE,
)
tasks.append(task)
offset += partitions[i]

# Wait for all child workflows to complete
results = await asyncio.gather(*tasks)
return sum(results)

def _divide_into_partitions(self, number: int, n: int) -> List[int]:
"""Divide a number into n partitions as evenly as possible."""
base = number // n
remainder = number % n
partitions = [base] * n

for i in range(remainder):
partitions[i] += 1

return partitions
59 changes: 59 additions & 0 deletions batch_sliding_window/record_loader_activity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from dataclasses import dataclass
from typing import List

from temporalio import activity


@dataclass
class GetRecordsInput:
"""Input for the GetRecords activity."""

page_size: int
offset: int
max_offset: int


@dataclass
class SingleRecord:
"""Represents a single record to be processed."""

id: int


@dataclass
class GetRecordsOutput:
"""Output from the GetRecords activity."""

records: List[SingleRecord]


class RecordLoader:
"""Activities for loading records from an external data source."""

def __init__(self, record_count: int):
self.record_count = record_count

@activity.defn
async def get_record_count(self) -> int:
"""Get the total record count.

Used to partition processing across parallel sliding windows.
The sample implementation just returns a fake value passed during worker initialization.
"""
return self.record_count

@activity.defn
async def get_records(self, input: GetRecordsInput) -> GetRecordsOutput:
"""Get records loaded from an external data source.

The sample returns fake records.
"""
if input.max_offset > self.record_count:
raise ValueError(
f"max_offset({input.max_offset}) > record_count({self.record_count})"
)

limit = min(input.offset + input.page_size, input.max_offset)
records = [SingleRecord(id=i) for i in range(input.offset, limit)]

return GetRecordsOutput(records=records)
33 changes: 33 additions & 0 deletions batch_sliding_window/record_processor_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import asyncio
import random

from temporalio import workflow

from batch_sliding_window.record_loader_activity import SingleRecord


@workflow.defn
class RecordProcessorWorkflow:
"""Workflow that implements processing of a single record."""

@workflow.run
async def run(self, record: SingleRecord) -> None:
await self._process_record(record)

# Notify parent about completion via signal
parent = workflow.info().parent

# This workflow is always expected to have a parent.
# But for unit testing it might be useful to skip the notification if there is none.
if parent:
# Don't specify run_id as parent calls continue-as-new
handle = workflow.get_external_workflow_handle(parent.workflow_id)
await handle.signal("report_completion", record.id)

async def _process_record(self, record: SingleRecord) -> None:
"""Simulate application specific record processing."""
# Use workflow.random() to get a random number to ensure workflow determinism
sleep_duration = workflow.random().randint(1, 10)
await workflow.sleep(sleep_duration)

workflow.logger.info(f"Processed record {record}")
Loading
Loading