diff --git a/README.md b/README.md index ba71f8ca..4a4bb829 100644 --- a/README.md +++ b/README.md @@ -57,6 +57,7 @@ Some examples require extra dependencies. See each sample's directory for specif * [hello_signal](hello/hello_signal.py) - Send signals to a workflow. * [activity_worker](activity_worker) - Use Python activities from a workflow in another language. +* [batch_sliding_window](batch_sliding_window) - Batch processing with a sliding window of child workflows. * [bedrock](bedrock) - Orchestrate a chatbot with Amazon Bedrock. * [cloud_export_to_parquet](cloud_export_to_parquet) - Set up schedule workflow to process exported files on an hourly basis * [context_propagation](context_propagation) - Context propagation through workflows/activities via interceptor. diff --git a/batch_sliding_window/README.md b/batch_sliding_window/README.md new file mode 100644 index 00000000..8d573ca3 --- /dev/null +++ b/batch_sliding_window/README.md @@ -0,0 +1,21 @@ +# Batch Sliding Window + +This sample demonstrates a batch processing workflow that maintains a sliding window of record processing workflows. + +A `SlidingWindowWorkflow` starts a configured number (sliding window size) of `RecordProcessorWorkflow` children in parallel. Each child processes a single record. When a child completes, a new child is started. + +The `SlidingWindowWorkflow` calls continue-as-new after starting a preconfigured number of children to keep its history size bounded. A `RecordProcessorWorkflow` reports its completion through a signal to its parent, which allows notification of a parent that called continue-as-new. + +A single instance of `SlidingWindowWorkflow` has limited window size and throughput. To support larger window size and overall throughput, multiple instances of `SlidingWindowWorkflow` run in parallel. + +### Running This Sample + +To run, first see [README.md](../README.md) for prerequisites. Then, run the following from root directory to start the worker: + + uv run batch_sliding_window/worker.py + +This will start the worker. Then, in another terminal, run the following to execute the workflow: + + uv run batch_sliding_window/starter.py + +The workflow will process 90 records using a sliding window of 10 parallel workers across 3 partitions, with a page size of 5 records per continue-as-new iteration. diff --git a/batch_sliding_window/__init__.py b/batch_sliding_window/__init__.py new file mode 100644 index 00000000..959ab031 --- /dev/null +++ b/batch_sliding_window/__init__.py @@ -0,0 +1,40 @@ +"""Sliding Window Batch Processing Sample. + +This sample demonstrates a batch processing workflow that maintains a sliding window +of record processing workflows. It includes: + +- ProcessBatchWorkflow: Main workflow that partitions work across multiple sliding windows +- SlidingWindowWorkflow: Implements the sliding window pattern with continue-as-new +- RecordProcessorWorkflow: Processes individual records +- RecordLoader: Activity for loading records from external sources +""" + +from batch_sliding_window.batch_workflow import ( + ProcessBatchWorkflow, + ProcessBatchWorkflowInput, +) +from batch_sliding_window.record_loader_activity import ( + GetRecordsInput, + GetRecordsOutput, + RecordLoader, + SingleRecord, +) +from batch_sliding_window.record_processor_workflow import RecordProcessorWorkflow +from batch_sliding_window.sliding_window_workflow import ( + SlidingWindowState, + SlidingWindowWorkflow, + SlidingWindowWorkflowInput, +) + +__all__ = [ + "ProcessBatchWorkflow", + "ProcessBatchWorkflowInput", + "SlidingWindowWorkflow", + "SlidingWindowWorkflowInput", + "SlidingWindowState", + "RecordProcessorWorkflow", + "RecordLoader", + "GetRecordsInput", + "GetRecordsOutput", + "SingleRecord", +] diff --git a/batch_sliding_window/batch_workflow.py b/batch_sliding_window/batch_workflow.py new file mode 100644 index 00000000..a8d1d488 --- /dev/null +++ b/batch_sliding_window/batch_workflow.py @@ -0,0 +1,110 @@ +import asyncio +from dataclasses import dataclass +from datetime import timedelta +from typing import List + +from temporalio import workflow +from temporalio.common import WorkflowIDReusePolicy +from temporalio.exceptions import ApplicationError + +from batch_sliding_window.record_loader_activity import RecordLoader +from batch_sliding_window.sliding_window_workflow import ( + SlidingWindowWorkflow, + SlidingWindowWorkflowInput, +) + + +@dataclass +class ProcessBatchWorkflowInput: + """Input for the ProcessBatchWorkflow. + + A single input structure is preferred to multiple workflow arguments + to simplify backward compatible API changes. + """ + + page_size: int # Number of children started by a single sliding window workflow run + sliding_window_size: int # Maximum number of children to run in parallel + partitions: int # How many sliding windows to run in parallel + + +@workflow.defn +class ProcessBatchWorkflow: + """Sample workflow that partitions the data set into continuous ranges. + + A real application can choose any other way to divide the records + into multiple collections. + """ + + @workflow.run + async def run(self, input: ProcessBatchWorkflowInput) -> int: + # Get total record count + record_count: int = await workflow.execute_activity_method( + RecordLoader.get_record_count, + start_to_close_timeout=timedelta(seconds=5), + ) + + if input.sliding_window_size < input.partitions: + raise ApplicationError( + "SlidingWindowSize cannot be less than number of partitions" + ) + + partitions = self._divide_into_partitions(record_count, input.partitions) + window_sizes = self._divide_into_partitions( + input.sliding_window_size, input.partitions + ) + + workflow.logger.info( + f"ProcessBatchWorkflow started", + extra={ + "input": input, + "record_count": record_count, + "partitions": partitions, + "window_sizes": window_sizes, + }, + ) + + # Start child workflows for each partition + tasks = [] + offset = 0 + + for i in range(input.partitions): + # Make child id more user-friendly + child_id = f"{workflow.info().workflow_id}/{i}" + + # Define partition boundaries + maximum_partition_offset = offset + partitions[i] + if maximum_partition_offset > record_count: + maximum_partition_offset = record_count + + child_input = SlidingWindowWorkflowInput( + page_size=input.page_size, + sliding_window_size=window_sizes[i], + offset=offset, # inclusive + maximum_offset=maximum_partition_offset, # exclusive + progress=0, + current_records=None, + ) + + task = workflow.execute_child_workflow( + SlidingWindowWorkflow.run, + child_input, + id=child_id, + id_reuse_policy=WorkflowIDReusePolicy.ALLOW_DUPLICATE, + ) + tasks.append(task) + offset += partitions[i] + + # Wait for all child workflows to complete + results = await asyncio.gather(*tasks) + return sum(results) + + def _divide_into_partitions(self, number: int, n: int) -> List[int]: + """Divide a number into n partitions as evenly as possible.""" + base = number // n + remainder = number % n + partitions = [base] * n + + for i in range(remainder): + partitions[i] += 1 + + return partitions diff --git a/batch_sliding_window/record_loader_activity.py b/batch_sliding_window/record_loader_activity.py new file mode 100644 index 00000000..26ae14b1 --- /dev/null +++ b/batch_sliding_window/record_loader_activity.py @@ -0,0 +1,59 @@ +from dataclasses import dataclass +from typing import List + +from temporalio import activity + + +@dataclass +class GetRecordsInput: + """Input for the GetRecords activity.""" + + page_size: int + offset: int + max_offset: int + + +@dataclass +class SingleRecord: + """Represents a single record to be processed.""" + + id: int + + +@dataclass +class GetRecordsOutput: + """Output from the GetRecords activity.""" + + records: List[SingleRecord] + + +class RecordLoader: + """Activities for loading records from an external data source.""" + + def __init__(self, record_count: int): + self.record_count = record_count + + @activity.defn + async def get_record_count(self) -> int: + """Get the total record count. + + Used to partition processing across parallel sliding windows. + The sample implementation just returns a fake value passed during worker initialization. + """ + return self.record_count + + @activity.defn + async def get_records(self, input: GetRecordsInput) -> GetRecordsOutput: + """Get records loaded from an external data source. + + The sample returns fake records. + """ + if input.max_offset > self.record_count: + raise ValueError( + f"max_offset({input.max_offset}) > record_count({self.record_count})" + ) + + limit = min(input.offset + input.page_size, input.max_offset) + records = [SingleRecord(id=i) for i in range(input.offset, limit)] + + return GetRecordsOutput(records=records) diff --git a/batch_sliding_window/record_processor_workflow.py b/batch_sliding_window/record_processor_workflow.py new file mode 100644 index 00000000..8921a808 --- /dev/null +++ b/batch_sliding_window/record_processor_workflow.py @@ -0,0 +1,33 @@ +import asyncio +import random + +from temporalio import workflow + +from batch_sliding_window.record_loader_activity import SingleRecord + + +@workflow.defn +class RecordProcessorWorkflow: + """Workflow that implements processing of a single record.""" + + @workflow.run + async def run(self, record: SingleRecord) -> None: + await self._process_record(record) + + # Notify parent about completion via signal + parent = workflow.info().parent + + # This workflow is always expected to have a parent. + # But for unit testing it might be useful to skip the notification if there is none. + if parent: + # Don't specify run_id as parent calls continue-as-new + handle = workflow.get_external_workflow_handle(parent.workflow_id) + await handle.signal("report_completion", record.id) + + async def _process_record(self, record: SingleRecord) -> None: + """Simulate application specific record processing.""" + # Use workflow.random() to get a random number to ensure workflow determinism + sleep_duration = workflow.random().randint(1, 10) + await workflow.sleep(sleep_duration) + + workflow.logger.info(f"Processed record {record}") diff --git a/batch_sliding_window/sliding_window_workflow.py b/batch_sliding_window/sliding_window_workflow.py new file mode 100644 index 00000000..87e4110d --- /dev/null +++ b/batch_sliding_window/sliding_window_workflow.py @@ -0,0 +1,167 @@ +import asyncio +from dataclasses import dataclass +from datetime import timedelta +from typing import Dict, List, Optional, Set + +from temporalio import workflow +from temporalio.common import WorkflowIDReusePolicy + +from batch_sliding_window.record_loader_activity import ( + GetRecordsInput, + GetRecordsOutput, + RecordLoader, + SingleRecord, +) +from batch_sliding_window.record_processor_workflow import RecordProcessorWorkflow + + +@dataclass +class SlidingWindowWorkflowInput: + """Contains SlidingWindowWorkflow arguments.""" + + page_size: int + sliding_window_size: int + offset: int # inclusive + maximum_offset: int # exclusive + progress: int = 0 + # The set of record ids currently being processed + current_records: Optional[Set[int]] = None + + +@dataclass +class SlidingWindowState: + """Used as a 'state' query result.""" + + current_records: List[int] # record ids currently being processed + children_started_by_this_run: int + offset: int + progress: int + + +@workflow.defn +class SlidingWindowWorkflow: + """Workflow processes a range of records using a requested number of child workflows. + + As soon as a child workflow completes a new one is started. + """ + + def __init__(self): + self.current_records: Set[int] = set() + self.children_started_by_this_run = [] + self.offset = 0 + self.progress = 0 + self._completion_signals_received = 0 + + @workflow.run + async def run(self, input: SlidingWindowWorkflowInput) -> int: + workflow.logger.info( + f"SlidingWindowWorkflow started", + extra={ + "sliding_window_size": input.sliding_window_size, + "page_size": input.page_size, + "offset": input.offset, + "maximum_offset": input.maximum_offset, + "progress": input.progress, + }, + ) + + # Initialize state from input + self.current_records = input.current_records or set() + self.offset = input.offset + self.progress = input.progress + + # Set up query handler + workflow.set_query_handler("state", self._handle_state_query) + + # Set up signal handler for completion notifications + workflow.set_signal_handler("report_completion", self._handle_completion_signal) + + return await self._execute(input) + + async def _execute(self, input: SlidingWindowWorkflowInput) -> int: + """Main execution logic.""" + # Get records for this page if we haven't reached the end + records = [] + if self.offset < input.maximum_offset: + get_records_input = GetRecordsInput( + page_size=input.page_size, + offset=self.offset, + max_offset=input.maximum_offset, + ) + get_records_output: GetRecordsOutput = ( + await workflow.execute_activity_method( + RecordLoader.get_records, + get_records_input, + start_to_close_timeout=timedelta(seconds=5), + ) + ) + records = get_records_output.records + + workflow_id = workflow.info().workflow_id + + # Process records + for record in records: + # Wait until we have capacity in the sliding window + await workflow.wait_condition( + lambda: len(self.current_records) < input.sliding_window_size + ) + + # Start child workflow for this record + child_id = f"{workflow_id}/{record.id}" + child_handle = await workflow.start_child_workflow( + RecordProcessorWorkflow.run, + record, + id=child_id, + id_reuse_policy=WorkflowIDReusePolicy.ALLOW_DUPLICATE, + parent_close_policy=workflow.ParentClosePolicy.ABANDON, + ) + + self.children_started_by_this_run.append(child_handle) + self.current_records.add(record.id) + + return await self._continue_as_new_or_complete(input) + + async def _continue_as_new_or_complete( + self, input: SlidingWindowWorkflowInput + ) -> int: + """Continue-as-new after starting page_size children or complete if done.""" + # Update offset based on children started in this run + new_offset = input.offset + len(self.children_started_by_this_run) + + if new_offset < input.maximum_offset: + # In Python, await start_child_workflow() already waits until + # the start has been accepted by the server, so no additional wait needed + + # Continue-as-new with updated state + new_input = SlidingWindowWorkflowInput( + page_size=input.page_size, + sliding_window_size=input.sliding_window_size, + offset=new_offset, + maximum_offset=input.maximum_offset, + progress=self.progress, + current_records=self.current_records, + ) + + workflow.continue_as_new(new_input) + + # Last run in the continue-as-new chain + # Wait for all children to complete + await workflow.wait_condition(lambda: len(self.current_records) == 0) + return self.progress + + def _handle_completion_signal(self, record_id: int) -> None: + """Handle completion signal from child workflow.""" + # Check for duplicate signals + if record_id in self.current_records: + self.current_records.remove(record_id) + self.progress += 1 + + def _handle_state_query(self) -> SlidingWindowState: + """Handle state query for monitoring.""" + current_record_ids = sorted(list(self.current_records)) + return SlidingWindowState( + current_records=current_record_ids, + children_started_by_this_run=len(self.children_started_by_this_run), + offset=self.offset, + progress=self.progress, + ) diff --git a/batch_sliding_window/starter.py b/batch_sliding_window/starter.py new file mode 100644 index 00000000..d9a24971 --- /dev/null +++ b/batch_sliding_window/starter.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python3 +"""Starter for the batch sliding window sample.""" + +import asyncio +import datetime +import logging + +from temporalio.client import Client + +from batch_sliding_window.batch_workflow import ( + ProcessBatchWorkflow, + ProcessBatchWorkflowInput, +) + + +async def main(): + """Start the ProcessBatchWorkflow.""" + # Set up logging + logging.basicConfig(level=logging.INFO) + + # Create client + client = await Client.connect("localhost:7233") + + # Create unique workflow ID with timestamp + timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + workflow_id = f"batch_sliding_window_example_{timestamp}" + + # Define workflow input + workflow_input = ProcessBatchWorkflowInput( + page_size=5, + sliding_window_size=10, + partitions=3, + ) + + print(f"Starting workflow with ID: {workflow_id}") + print(f"Input: {workflow_input}") + + # Start workflow + handle = await client.start_workflow( + ProcessBatchWorkflow.run, + workflow_input, + id=workflow_id, + task_queue="batch_sliding_window_task_queue", + ) + + print(f"Workflow started with ID: {handle.id}") + print(f"Waiting for workflow to complete...") + + # Wait for result + try: + result = await handle.result() + print(f"Workflow completed successfully!") + print(f"Total records processed: {result}") + except Exception as e: + print(f"Workflow failed with error: {e}") + raise + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/batch_sliding_window/worker.py b/batch_sliding_window/worker.py new file mode 100644 index 00000000..c0968bc3 --- /dev/null +++ b/batch_sliding_window/worker.py @@ -0,0 +1,48 @@ +#!/usr/bin/env python3 +"""Worker for the batch sliding window sample.""" + +import asyncio +import logging + +from temporalio import worker +from temporalio.client import Client + +from batch_sliding_window.batch_workflow import ProcessBatchWorkflow +from batch_sliding_window.record_loader_activity import RecordLoader +from batch_sliding_window.record_processor_workflow import RecordProcessorWorkflow +from batch_sliding_window.sliding_window_workflow import SlidingWindowWorkflow + + +async def main(): + """Run the worker that registers all workflows and activities.""" + # Set up logging + logging.basicConfig(level=logging.INFO) + + # Create client + client = await Client.connect("localhost:7233") + + # Create RecordLoader activity with sample data + record_loader = RecordLoader(record_count=90) + + # Create worker + temporal_worker = worker.Worker( + client, + task_queue="batch_sliding_window_task_queue", + workflows=[ + ProcessBatchWorkflow, + SlidingWindowWorkflow, + RecordProcessorWorkflow, + ], + activities=[ + record_loader.get_record_count, + record_loader.get_records, + ], + ) + + print("Starting worker...") + # Run the worker + await temporal_worker.run() + + +if __name__ == "__main__": + asyncio.run(main())