From bfc170c57b3c6c59e241d1e771db2a727c0f466c Mon Sep 17 00:00:00 2001 From: Bill Richards Date: Wed, 16 Jul 2025 12:04:59 -0700 Subject: [PATCH 1/8] translated Go sample of sliding window to be writting in python --- batch-sliding-window/README.md | 29 ++++ batch-sliding-window/__init__.py | 37 +++++ batch-sliding-window/batch_workflow.py | 103 ++++++++++++ .../record_loader_activity.py | 54 ++++++ .../record_processor_workflow.py | 34 ++++ batch-sliding-window/run_starter.py | 51 ++++++ batch-sliding-window/run_worker.py | 53 ++++++ .../sliding_window_workflow.py | 156 ++++++++++++++++++ 8 files changed, 517 insertions(+) create mode 100644 batch-sliding-window/README.md create mode 100644 batch-sliding-window/__init__.py create mode 100644 batch-sliding-window/batch_workflow.py create mode 100644 batch-sliding-window/record_loader_activity.py create mode 100644 batch-sliding-window/record_processor_workflow.py create mode 100644 batch-sliding-window/run_starter.py create mode 100644 batch-sliding-window/run_worker.py create mode 100644 batch-sliding-window/sliding_window_workflow.py diff --git a/batch-sliding-window/README.md b/batch-sliding-window/README.md new file mode 100644 index 00000000..b4ea259b --- /dev/null +++ b/batch-sliding-window/README.md @@ -0,0 +1,29 @@ +## Sliding Window Batch Sample + +A sample implementation of a batch processing Workflow that maintains a sliding window of record processing Workflows. + +A SlidingWindowWorkflow starts a configured number (sliding window size) of RecordProcessorWorkflow children in parallel. +Each child processes a single record. When a child completes a new child is started. + +A SlidingWindowWorkflow calls continue-as-new after starting a preconfigured number of children to keep its history size bounded. +A RecordProcessorWorkflow reports its completion through a Signal to its parent. +This allows to notify a parent that called continue-as-new. + +A single instance of SlidingWindowWorkflow has limited window size and throughput. +To support larger window size and overall throughput multiple instances of SlidingWindowWorkflow run in parallel. + +#### Running the Sliding Window Batch Sample + +Make sure the [Temporal Server is running locally](https://learn.temporal.io/getting_started/python/dev_environment/#set-up-a-local-temporal-service-for-development-with-temporal-cli). + +From the root of the project, start a Worker: + +```bash +python python-samples/batch-sliding-window/worker.py +``` + +Start the Workflow Execution: + +```bash +python python-samples/batch-sliding-window/starter.py +``` \ No newline at end of file diff --git a/batch-sliding-window/__init__.py b/batch-sliding-window/__init__.py new file mode 100644 index 00000000..77364f2a --- /dev/null +++ b/batch-sliding-window/__init__.py @@ -0,0 +1,37 @@ +"""Sliding Window Batch Processing Sample. + +This sample demonstrates a batch processing workflow that maintains a sliding window +of record processing workflows. It includes: + +- ProcessBatchWorkflow: Main workflow that partitions work across multiple sliding windows +- SlidingWindowWorkflow: Implements the sliding window pattern with continue-as-new +- RecordProcessorWorkflow: Processes individual records +- RecordLoader: Activity for loading records from external sources +""" + +from .batch_workflow import ProcessBatchWorkflow, ProcessBatchWorkflowInput +from .sliding_window_workflow import ( + SlidingWindowWorkflow, + SlidingWindowWorkflowInput, + SlidingWindowState, +) +from .record_processor_workflow import RecordProcessorWorkflow +from .record_loader_activity import ( + RecordLoader, + GetRecordsInput, + GetRecordsOutput, + SingleRecord, +) + +__all__ = [ + "ProcessBatchWorkflow", + "ProcessBatchWorkflowInput", + "SlidingWindowWorkflow", + "SlidingWindowWorkflowInput", + "SlidingWindowState", + "RecordProcessorWorkflow", + "RecordLoader", + "GetRecordsInput", + "GetRecordsOutput", + "SingleRecord", +] \ No newline at end of file diff --git a/batch-sliding-window/batch_workflow.py b/batch-sliding-window/batch_workflow.py new file mode 100644 index 00000000..effffb22 --- /dev/null +++ b/batch-sliding-window/batch_workflow.py @@ -0,0 +1,103 @@ +from dataclasses import dataclass +from typing import List +import asyncio + +from temporalio import workflow +from temporalio.common import WorkflowIDReusePolicy +from temporalio.exceptions import ApplicationError + +from .record_loader_activity import RecordLoader +from .sliding_window_workflow import SlidingWindowWorkflow, SlidingWindowWorkflowInput + + +@dataclass +class ProcessBatchWorkflowInput: + """Input for the ProcessBatchWorkflow. + + A single input structure is preferred to multiple workflow arguments + to simplify backward compatible API changes. + """ + page_size: int # Number of children started by a single sliding window workflow run + sliding_window_size: int # Maximum number of children to run in parallel + partitions: int # How many sliding windows to run in parallel + + +@workflow.defn +class ProcessBatchWorkflow: + """Sample workflow that partitions the data set into continuous ranges. + + A real application can choose any other way to divide the records + into multiple collections. + """ + + @workflow.run + async def run(self, input: ProcessBatchWorkflowInput) -> int: + # Get total record count + record_count = await workflow.execute_activity( + RecordLoader.get_record_count, + start_to_close_timeout=workflow.timedelta(seconds=5), + ) + + if input.sliding_window_size < input.partitions: + raise ApplicationError( + "SlidingWindowSize cannot be less than number of partitions" + ) + + partitions = self._divide_into_partitions(record_count, input.partitions) + window_sizes = self._divide_into_partitions(input.sliding_window_size, input.partitions) + + workflow.logger.info( + f"ProcessBatchWorkflow started", + extra={ + "input": input, + "record_count": record_count, + "partitions": partitions, + "window_sizes": window_sizes, + } + ) + + # Start child workflows for each partition + tasks = [] + offset = 0 + + for i in range(input.partitions): + # Make child id more user-friendly + child_id = f"{workflow.info().workflow_id}/{i}" + + # Define partition boundaries + maximum_partition_offset = offset + partitions[i] + if maximum_partition_offset > record_count: + maximum_partition_offset = record_count + + child_input = SlidingWindowWorkflowInput( + page_size=input.page_size, + sliding_window_size=window_sizes[i], + offset=offset, # inclusive + maximum_offset=maximum_partition_offset, # exclusive + progress=0, + current_records=None, + ) + + task = workflow.execute_child_workflow( + SlidingWindowWorkflow.run, + child_input, + id=child_id, + id_reuse_policy=WorkflowIDReusePolicy.ALLOW_DUPLICATE, + ) + tasks.append(task) + offset += partitions[i] + + # Wait for all child workflows to complete + results = await asyncio.gather(*tasks) + return sum(results) + + def _divide_into_partitions(self, number: int, n: int) -> List[int]: + """Divide a number into n partitions as evenly as possible.""" + base = number // n + remainder = number % n + partitions = [base] * n + + for i in range(remainder): + partitions[i] += 1 + + return partitions \ No newline at end of file diff --git a/batch-sliding-window/record_loader_activity.py b/batch-sliding-window/record_loader_activity.py new file mode 100644 index 00000000..ced72551 --- /dev/null +++ b/batch-sliding-window/record_loader_activity.py @@ -0,0 +1,54 @@ +from dataclasses import dataclass +from typing import List + +from temporalio import activity + + +@dataclass +class GetRecordsInput: + """Input for the GetRecords activity.""" + page_size: int + offset: int + max_offset: int + + +@dataclass +class SingleRecord: + """Represents a single record to be processed.""" + id: int + + +@dataclass +class GetRecordsOutput: + """Output from the GetRecords activity.""" + records: List[SingleRecord] + + +class RecordLoader: + """Activities for loading records from an external data source.""" + + def __init__(self, record_count: int): + self.record_count = record_count + + @activity.defn + async def get_record_count(self) -> int: + """Get the total record count. + + Used to partition processing across parallel sliding windows. + The sample implementation just returns a fake value passed during worker initialization. + """ + return self.record_count + + @activity.defn + async def get_records(self, input: GetRecordsInput) -> GetRecordsOutput: + """Get records loaded from an external data source. + + The sample returns fake records. + """ + if input.max_offset > self.record_count: + raise ValueError(f"max_offset({input.max_offset}) > record_count({self.record_count})") + + limit = min(input.offset + input.page_size, input.max_offset) + records = [SingleRecord(id=i) for i in range(input.offset, limit)] + + return GetRecordsOutput(records=records) \ No newline at end of file diff --git a/batch-sliding-window/record_processor_workflow.py b/batch-sliding-window/record_processor_workflow.py new file mode 100644 index 00000000..39c3b015 --- /dev/null +++ b/batch-sliding-window/record_processor_workflow.py @@ -0,0 +1,34 @@ +import asyncio +import random + +from temporalio import workflow + +from .record_loader_activity import SingleRecord + + +@workflow.defn +class RecordProcessorWorkflow: + """Workflow that implements processing of a single record.""" + + @workflow.run + async def run(self, record: SingleRecord) -> None: + await self._process_record(record) + + # Notify parent about completion via signal + parent = workflow.info().parent + + # This workflow is always expected to have a parent. + # But for unit testing it might be useful to skip the notification if there is none. + if parent: + # Don't specify run_id as parent calls continue-as-new + await workflow.external_workflow_handle(parent.workflow_id).signal( + "report_completion", record.id + ) + + async def _process_record(self, record: SingleRecord) -> None: + """Simulate application specific record processing.""" + # Use workflow.random() to get a random number to ensure workflow determinism + sleep_duration = workflow.random().randint(1, 10) + await workflow.sleep(sleep_duration) + + workflow.logger.info(f"Processed record {record}") \ No newline at end of file diff --git a/batch-sliding-window/run_starter.py b/batch-sliding-window/run_starter.py new file mode 100644 index 00000000..101f5495 --- /dev/null +++ b/batch-sliding-window/run_starter.py @@ -0,0 +1,51 @@ +#!/usr/bin/env python3 +"""Standalone starter for the batch sliding window sample.""" + +import asyncio +import logging +import sys +from pathlib import Path + +# Add the python-samples directory to the path to enable imports +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from temporalio.client import Client + +from batch_sliding_window.batch_workflow import ProcessBatchWorkflow, ProcessBatchWorkflowInput + + +async def main(): + """Start the ProcessBatchWorkflow.""" + # Set up logging + logging.basicConfig(level=logging.INFO) + + # Create client + client = await Client.connect("localhost:7233") + + # Create workflow input + workflow_input = ProcessBatchWorkflowInput( + page_size=5, + sliding_window_size=10, + partitions=3, + ) + + print(f"Starting workflow with input: {workflow_input}") + + # Start workflow + handle = await client.start_workflow( + ProcessBatchWorkflow.run, + workflow_input, + id="batch-sliding-window-example", + task_queue="batch-sliding-window", + ) + + print(f"Started workflow: {handle.id}") + + # Wait for workflow completion + # This is rarely needed in real use cases as batch workflows are usually long-running + result = await handle.result() + print(f"Workflow completed. Total records processed: {result}") + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/batch-sliding-window/run_worker.py b/batch-sliding-window/run_worker.py new file mode 100644 index 00000000..3fa75bc8 --- /dev/null +++ b/batch-sliding-window/run_worker.py @@ -0,0 +1,53 @@ +#!/usr/bin/env python3 +"""Standalone worker runner for the batch sliding window sample.""" + +import asyncio +import logging +import sys +from pathlib import Path + +# Add the python-samples directory to the path to enable imports +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from temporalio import worker +from temporalio.client import Client + +from batch_sliding_window.batch_workflow import ProcessBatchWorkflow +from batch_sliding_window.sliding_window_workflow import SlidingWindowWorkflow +from batch_sliding_window.record_processor_workflow import RecordProcessorWorkflow +from batch_sliding_window.record_loader_activity import RecordLoader + + +async def main(): + """Run the worker that registers all workflows and activities.""" + # Set up logging + logging.basicConfig(level=logging.INFO) + + # Create client + client = await Client.connect("localhost:7233") + + # Create RecordLoader activity with sample data + record_loader = RecordLoader(record_count=90) + + # Create worker + temporal_worker = worker.Worker( + client, + task_queue="batch-sliding-window", + workflows=[ + ProcessBatchWorkflow, + SlidingWindowWorkflow, + RecordProcessorWorkflow, + ], + activities=[ + record_loader.get_record_count, + record_loader.get_records, + ], + ) + + print("Starting worker...") + # Run the worker + await temporal_worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/batch-sliding-window/sliding_window_workflow.py b/batch-sliding-window/sliding_window_workflow.py new file mode 100644 index 00000000..a5f26209 --- /dev/null +++ b/batch-sliding-window/sliding_window_workflow.py @@ -0,0 +1,156 @@ +import asyncio +from dataclasses import dataclass +from typing import Dict, List, Optional, Set + +from temporalio import workflow +from temporalio.common import WorkflowIDReusePolicy + +from .record_loader_activity import RecordLoader, GetRecordsInput, SingleRecord +from .record_processor_workflow import RecordProcessorWorkflow + + +@dataclass +class SlidingWindowWorkflowInput: + """Contains SlidingWindowWorkflow arguments.""" + page_size: int + sliding_window_size: int + offset: int # inclusive + maximum_offset: int # exclusive + progress: int = 0 + # The set of record ids currently being processed + current_records: Optional[Set[int]] = None + + +@dataclass +class SlidingWindowState: + """Used as a 'state' query result.""" + current_records: List[int] # record ids currently being processed + children_started_by_this_run: int + offset: int + progress: int + + +@workflow.defn +class SlidingWindowWorkflow: + """Workflow processes a range of records using a requested number of child workflows. + + As soon as a child workflow completes a new one is started. + """ + + def __init__(self): + self.current_records: Set[int] = set() + self.children_started_by_this_run: List[workflow.ChildWorkflowHandle] = [] + self.offset = 0 + self.progress = 0 + self._completion_signals_received = 0 + + @workflow.run + async def run(self, input: SlidingWindowWorkflowInput) -> int: + workflow.logger.info( + f"SlidingWindowWorkflow started", + extra={ + "sliding_window_size": input.sliding_window_size, + "page_size": input.page_size, + "offset": input.offset, + "maximum_offset": input.maximum_offset, + "progress": input.progress, + } + ) + + # Initialize state from input + self.current_records = input.current_records or set() + self.offset = input.offset + self.progress = input.progress + + # Set up query handler + workflow.set_query_handler("state", self._handle_state_query) + + # Set up signal handler for completion notifications + workflow.set_signal_handler("report_completion", self._handle_completion_signal) + + return await self._execute(input) + + async def _execute(self, input: SlidingWindowWorkflowInput) -> int: + """Main execution logic.""" + # Get records for this page if we haven't reached the end + records = [] + if self.offset < input.maximum_offset: + get_records_input = GetRecordsInput( + page_size=input.page_size, + offset=self.offset, + max_offset=input.maximum_offset, + ) + get_records_output = await workflow.execute_activity( + RecordLoader.get_records, + get_records_input, + start_to_close_timeout=workflow.timedelta(seconds=5), + ) + records = get_records_output.records + + workflow_id = workflow.info().workflow_id + + # Process records + for record in records: + # Wait until we have capacity in the sliding window + await workflow.wait_condition( + lambda: len(self.current_records) < input.sliding_window_size + ) + + # Start child workflow for this record + child_id = f"{workflow_id}/{record.id}" + child_handle = await workflow.start_child_workflow( + RecordProcessorWorkflow.run, + record, + id=child_id, + id_reuse_policy=WorkflowIDReusePolicy.ALLOW_DUPLICATE, + parent_close_policy=workflow.ParentClosePolicy.ABANDON, + ) + + self.children_started_by_this_run.append(child_handle) + self.current_records.add(record.id) + + return await self._continue_as_new_or_complete(input) + + async def _continue_as_new_or_complete(self, input: SlidingWindowWorkflowInput) -> int: + """Continue-as-new after starting page_size children or complete if done.""" + # Update offset based on children started in this run + new_offset = input.offset + len(self.children_started_by_this_run) + + if new_offset < input.maximum_offset: + # Wait for all children started by this run to begin execution + for child in self.children_started_by_this_run: + await child.get_workflow_execution() + + # Continue-as-new with updated state + new_input = SlidingWindowWorkflowInput( + page_size=input.page_size, + sliding_window_size=input.sliding_window_size, + offset=new_offset, + maximum_offset=input.maximum_offset, + progress=self.progress, + current_records=self.current_records, + ) + + workflow.continue_as_new(new_input) + + # Last run in the continue-as-new chain + # Wait for all children to complete + await workflow.wait_condition(lambda: len(self.current_records) == 0) + return self.progress + + def _handle_completion_signal(self, record_id: int) -> None: + """Handle completion signal from child workflow.""" + # Check for duplicate signals + if record_id in self.current_records: + self.current_records.remove(record_id) + self.progress += 1 + + def _handle_state_query(self) -> SlidingWindowState: + """Handle state query for monitoring.""" + current_record_ids = sorted(list(self.current_records)) + return SlidingWindowState( + current_records=current_record_ids, + children_started_by_this_run=len(self.children_started_by_this_run), + offset=self.offset, + progress=self.progress, + ) \ No newline at end of file From a2ca8cc250dac3a30f2939273fcd43d5a3c8492d Mon Sep 17 00:00:00 2001 From: Bill Richards Date: Thu, 17 Jul 2025 09:30:41 -0700 Subject: [PATCH 2/8] refactor, change folder name, fix imports and update readme to match style --- README.md | 1 + batch-sliding-window/README.md | 29 ------------------- batch_sliding_window/README.md | 21 ++++++++++++++ .../__init__.py | 8 ++--- .../batch_workflow.py | 4 +-- .../record_loader_activity.py | 0 .../record_processor_workflow.py | 2 +- .../run_starter.py | 11 ++----- .../run_worker.py | 9 ++---- .../sliding_window_workflow.py | 4 +-- 10 files changed, 36 insertions(+), 53 deletions(-) delete mode 100644 batch-sliding-window/README.md create mode 100644 batch_sliding_window/README.md rename {batch-sliding-window => batch_sliding_window}/__init__.py (75%) rename {batch-sliding-window => batch_sliding_window}/batch_workflow.py (95%) rename {batch-sliding-window => batch_sliding_window}/record_loader_activity.py (100%) rename {batch-sliding-window => batch_sliding_window}/record_processor_workflow.py (94%) rename {batch-sliding-window => batch_sliding_window}/run_starter.py (77%) rename {batch-sliding-window => batch_sliding_window}/run_worker.py (82%) rename {batch-sliding-window => batch_sliding_window}/sliding_window_workflow.py (96%) diff --git a/README.md b/README.md index 5c1941d1..1b7c6aef 100644 --- a/README.md +++ b/README.md @@ -56,6 +56,7 @@ Some examples require extra dependencies. See each sample's directory for specif * [hello_signal](hello/hello_signal.py) - Send signals to a workflow. * [activity_worker](activity_worker) - Use Python activities from a workflow in another language. +* [batch_sliding_window](batch_sliding_window) - Batch processing with a sliding window of child workflows. * [bedrock](bedrock) - Orchestrate a chatbot with Amazon Bedrock. * [cloud_export_to_parquet](cloud_export_to_parquet) - Set up schedule workflow to process exported files on an hourly basis * [context_propagation](context_propagation) - Context propagation through workflows/activities via interceptor. diff --git a/batch-sliding-window/README.md b/batch-sliding-window/README.md deleted file mode 100644 index b4ea259b..00000000 --- a/batch-sliding-window/README.md +++ /dev/null @@ -1,29 +0,0 @@ -## Sliding Window Batch Sample - -A sample implementation of a batch processing Workflow that maintains a sliding window of record processing Workflows. - -A SlidingWindowWorkflow starts a configured number (sliding window size) of RecordProcessorWorkflow children in parallel. -Each child processes a single record. When a child completes a new child is started. - -A SlidingWindowWorkflow calls continue-as-new after starting a preconfigured number of children to keep its history size bounded. -A RecordProcessorWorkflow reports its completion through a Signal to its parent. -This allows to notify a parent that called continue-as-new. - -A single instance of SlidingWindowWorkflow has limited window size and throughput. -To support larger window size and overall throughput multiple instances of SlidingWindowWorkflow run in parallel. - -#### Running the Sliding Window Batch Sample - -Make sure the [Temporal Server is running locally](https://learn.temporal.io/getting_started/python/dev_environment/#set-up-a-local-temporal-service-for-development-with-temporal-cli). - -From the root of the project, start a Worker: - -```bash -python python-samples/batch-sliding-window/worker.py -``` - -Start the Workflow Execution: - -```bash -python python-samples/batch-sliding-window/starter.py -``` \ No newline at end of file diff --git a/batch_sliding_window/README.md b/batch_sliding_window/README.md new file mode 100644 index 00000000..8256af3b --- /dev/null +++ b/batch_sliding_window/README.md @@ -0,0 +1,21 @@ +# Batch Sliding Window + +This sample demonstrates a batch processing workflow that maintains a sliding window of record processing workflows. + +A `SlidingWindowWorkflow` starts a configured number (sliding window size) of `RecordProcessorWorkflow` children in parallel. Each child processes a single record. When a child completes, a new child is started. + +The `SlidingWindowWorkflow` calls continue-as-new after starting a preconfigured number of children to keep its history size bounded. A `RecordProcessorWorkflow` reports its completion through a signal to its parent, which allows notification of a parent that called continue-as-new. + +A single instance of `SlidingWindowWorkflow` has limited window size and throughput. To support larger window size and overall throughput, multiple instances of `SlidingWindowWorkflow` run in parallel. + +### Running This Sample + +To run, first see [README.md](../README.md) for prerequisites. Then, run the following from this directory to start the worker: + + uv run run_worker.py + +This will start the worker. Then, in another terminal, run the following to execute the workflow: + + uv run run_starter.py + +The workflow will process 90 records using a sliding window of 10 parallel workers across 3 partitions, with a page size of 5 records per continue-as-new iteration. \ No newline at end of file diff --git a/batch-sliding-window/__init__.py b/batch_sliding_window/__init__.py similarity index 75% rename from batch-sliding-window/__init__.py rename to batch_sliding_window/__init__.py index 77364f2a..9e473f22 100644 --- a/batch-sliding-window/__init__.py +++ b/batch_sliding_window/__init__.py @@ -9,14 +9,14 @@ - RecordLoader: Activity for loading records from external sources """ -from .batch_workflow import ProcessBatchWorkflow, ProcessBatchWorkflowInput -from .sliding_window_workflow import ( +from batch_sliding_window.batch_workflow import ProcessBatchWorkflow, ProcessBatchWorkflowInput +from batch_sliding_window.sliding_window_workflow import ( SlidingWindowWorkflow, SlidingWindowWorkflowInput, SlidingWindowState, ) -from .record_processor_workflow import RecordProcessorWorkflow -from .record_loader_activity import ( +from batch_sliding_window.record_processor_workflow import RecordProcessorWorkflow +from batch_sliding_window.record_loader_activity import ( RecordLoader, GetRecordsInput, GetRecordsOutput, diff --git a/batch-sliding-window/batch_workflow.py b/batch_sliding_window/batch_workflow.py similarity index 95% rename from batch-sliding-window/batch_workflow.py rename to batch_sliding_window/batch_workflow.py index effffb22..bdfb7e9d 100644 --- a/batch-sliding-window/batch_workflow.py +++ b/batch_sliding_window/batch_workflow.py @@ -6,8 +6,8 @@ from temporalio.common import WorkflowIDReusePolicy from temporalio.exceptions import ApplicationError -from .record_loader_activity import RecordLoader -from .sliding_window_workflow import SlidingWindowWorkflow, SlidingWindowWorkflowInput +from batch_sliding_window.record_loader_activity import RecordLoader +from batch_sliding_window.sliding_window_workflow import SlidingWindowWorkflow, SlidingWindowWorkflowInput @dataclass diff --git a/batch-sliding-window/record_loader_activity.py b/batch_sliding_window/record_loader_activity.py similarity index 100% rename from batch-sliding-window/record_loader_activity.py rename to batch_sliding_window/record_loader_activity.py diff --git a/batch-sliding-window/record_processor_workflow.py b/batch_sliding_window/record_processor_workflow.py similarity index 94% rename from batch-sliding-window/record_processor_workflow.py rename to batch_sliding_window/record_processor_workflow.py index 39c3b015..e1391d4e 100644 --- a/batch-sliding-window/record_processor_workflow.py +++ b/batch_sliding_window/record_processor_workflow.py @@ -3,7 +3,7 @@ from temporalio import workflow -from .record_loader_activity import SingleRecord +from batch_sliding_window.record_loader_activity import SingleRecord @workflow.defn diff --git a/batch-sliding-window/run_starter.py b/batch_sliding_window/run_starter.py similarity index 77% rename from batch-sliding-window/run_starter.py rename to batch_sliding_window/run_starter.py index 101f5495..476a3be7 100644 --- a/batch-sliding-window/run_starter.py +++ b/batch_sliding_window/run_starter.py @@ -1,13 +1,8 @@ #!/usr/bin/env python3 -"""Standalone starter for the batch sliding window sample.""" +"""Starter for the batch sliding window sample.""" import asyncio import logging -import sys -from pathlib import Path - -# Add the python-samples directory to the path to enable imports -sys.path.insert(0, str(Path(__file__).parent.parent)) from temporalio.client import Client @@ -35,8 +30,8 @@ async def main(): handle = await client.start_workflow( ProcessBatchWorkflow.run, workflow_input, - id="batch-sliding-window-example", - task_queue="batch-sliding-window", + id="batch_sliding_window_example", + task_queue="batch_sliding_window", ) print(f"Started workflow: {handle.id}") diff --git a/batch-sliding-window/run_worker.py b/batch_sliding_window/run_worker.py similarity index 82% rename from batch-sliding-window/run_worker.py rename to batch_sliding_window/run_worker.py index 3fa75bc8..1741c9ba 100644 --- a/batch-sliding-window/run_worker.py +++ b/batch_sliding_window/run_worker.py @@ -1,13 +1,8 @@ #!/usr/bin/env python3 -"""Standalone worker runner for the batch sliding window sample.""" +"""Worker for the batch sliding window sample.""" import asyncio import logging -import sys -from pathlib import Path - -# Add the python-samples directory to the path to enable imports -sys.path.insert(0, str(Path(__file__).parent.parent)) from temporalio import worker from temporalio.client import Client @@ -32,7 +27,7 @@ async def main(): # Create worker temporal_worker = worker.Worker( client, - task_queue="batch-sliding-window", + task_queue="batch_sliding_window", workflows=[ ProcessBatchWorkflow, SlidingWindowWorkflow, diff --git a/batch-sliding-window/sliding_window_workflow.py b/batch_sliding_window/sliding_window_workflow.py similarity index 96% rename from batch-sliding-window/sliding_window_workflow.py rename to batch_sliding_window/sliding_window_workflow.py index a5f26209..986446d4 100644 --- a/batch-sliding-window/sliding_window_workflow.py +++ b/batch_sliding_window/sliding_window_workflow.py @@ -5,8 +5,8 @@ from temporalio import workflow from temporalio.common import WorkflowIDReusePolicy -from .record_loader_activity import RecordLoader, GetRecordsInput, SingleRecord -from .record_processor_workflow import RecordProcessorWorkflow +from batch_sliding_window.record_loader_activity import RecordLoader, GetRecordsInput, SingleRecord +from batch_sliding_window.record_processor_workflow import RecordProcessorWorkflow @dataclass From 01f3ded8ba26660ff3284421c7c2c19c5c89645f Mon Sep 17 00:00:00 2001 From: Bill Richards Date: Thu, 17 Jul 2025 12:30:33 -0700 Subject: [PATCH 3/8] Cleaned things up and got everything functioning --- batch_sliding_window/README.md | 4 +- .../record_processor_workflow.py | 5 +- batch_sliding_window/run_starter.py | 46 --------------- .../sliding_window_workflow.py | 7 +-- batch_sliding_window/starter.py | 57 +++++++++++++++++++ .../{run_worker.py => worker.py} | 2 +- 6 files changed, 65 insertions(+), 56 deletions(-) delete mode 100644 batch_sliding_window/run_starter.py create mode 100644 batch_sliding_window/starter.py rename batch_sliding_window/{run_worker.py => worker.py} (95%) diff --git a/batch_sliding_window/README.md b/batch_sliding_window/README.md index 8256af3b..b196613a 100644 --- a/batch_sliding_window/README.md +++ b/batch_sliding_window/README.md @@ -12,10 +12,10 @@ A single instance of `SlidingWindowWorkflow` has limited window size and through To run, first see [README.md](../README.md) for prerequisites. Then, run the following from this directory to start the worker: - uv run run_worker.py + uv run worker.py This will start the worker. Then, in another terminal, run the following to execute the workflow: - uv run run_starter.py + uv run starter.py The workflow will process 90 records using a sliding window of 10 parallel workers across 3 partitions, with a page size of 5 records per continue-as-new iteration. \ No newline at end of file diff --git a/batch_sliding_window/record_processor_workflow.py b/batch_sliding_window/record_processor_workflow.py index e1391d4e..aa01ee29 100644 --- a/batch_sliding_window/record_processor_workflow.py +++ b/batch_sliding_window/record_processor_workflow.py @@ -21,9 +21,8 @@ async def run(self, record: SingleRecord) -> None: # But for unit testing it might be useful to skip the notification if there is none. if parent: # Don't specify run_id as parent calls continue-as-new - await workflow.external_workflow_handle(parent.workflow_id).signal( - "report_completion", record.id - ) + handle = workflow.get_external_workflow_handle(parent.workflow_id) + await handle.signal("report_completion", record.id) async def _process_record(self, record: SingleRecord) -> None: """Simulate application specific record processing.""" diff --git a/batch_sliding_window/run_starter.py b/batch_sliding_window/run_starter.py deleted file mode 100644 index 476a3be7..00000000 --- a/batch_sliding_window/run_starter.py +++ /dev/null @@ -1,46 +0,0 @@ -#!/usr/bin/env python3 -"""Starter for the batch sliding window sample.""" - -import asyncio -import logging - -from temporalio.client import Client - -from batch_sliding_window.batch_workflow import ProcessBatchWorkflow, ProcessBatchWorkflowInput - - -async def main(): - """Start the ProcessBatchWorkflow.""" - # Set up logging - logging.basicConfig(level=logging.INFO) - - # Create client - client = await Client.connect("localhost:7233") - - # Create workflow input - workflow_input = ProcessBatchWorkflowInput( - page_size=5, - sliding_window_size=10, - partitions=3, - ) - - print(f"Starting workflow with input: {workflow_input}") - - # Start workflow - handle = await client.start_workflow( - ProcessBatchWorkflow.run, - workflow_input, - id="batch_sliding_window_example", - task_queue="batch_sliding_window", - ) - - print(f"Started workflow: {handle.id}") - - # Wait for workflow completion - # This is rarely needed in real use cases as batch workflows are usually long-running - result = await handle.result() - print(f"Workflow completed. Total records processed: {result}") - - -if __name__ == "__main__": - asyncio.run(main()) \ No newline at end of file diff --git a/batch_sliding_window/sliding_window_workflow.py b/batch_sliding_window/sliding_window_workflow.py index 986446d4..7f845f3c 100644 --- a/batch_sliding_window/sliding_window_workflow.py +++ b/batch_sliding_window/sliding_window_workflow.py @@ -39,7 +39,7 @@ class SlidingWindowWorkflow: def __init__(self): self.current_records: Set[int] = set() - self.children_started_by_this_run: List[workflow.ChildWorkflowHandle] = [] + self.children_started_by_this_run = [] self.offset = 0 self.progress = 0 self._completion_signals_received = 0 @@ -117,9 +117,8 @@ async def _continue_as_new_or_complete(self, input: SlidingWindowWorkflowInput) new_offset = input.offset + len(self.children_started_by_this_run) if new_offset < input.maximum_offset: - # Wait for all children started by this run to begin execution - for child in self.children_started_by_this_run: - await child.get_workflow_execution() + # In Python, await start_child_workflow() already waits until + # the start has been accepted by the server, so no additional wait needed # Continue-as-new with updated state new_input = SlidingWindowWorkflowInput( diff --git a/batch_sliding_window/starter.py b/batch_sliding_window/starter.py new file mode 100644 index 00000000..caa0a936 --- /dev/null +++ b/batch_sliding_window/starter.py @@ -0,0 +1,57 @@ +#!/usr/bin/env python3 +"""Starter for the batch sliding window sample.""" + +import asyncio +import datetime +import logging + +from temporalio.client import Client + +from batch_sliding_window.batch_workflow import ProcessBatchWorkflow, ProcessBatchWorkflowInput + + +async def main(): + """Start the ProcessBatchWorkflow.""" + # Set up logging + logging.basicConfig(level=logging.INFO) + + # Create client + client = await Client.connect("localhost:7233") + + # Create unique workflow ID with timestamp + timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + workflow_id = f"batch_sliding_window_example_{timestamp}" + + # Define workflow input + workflow_input = ProcessBatchWorkflowInput( + page_size=5, + sliding_window_size=10, + partitions=3, + ) + + print(f"Starting workflow with ID: {workflow_id}") + print(f"Input: {workflow_input}") + + # Start workflow + handle = await client.start_workflow( + ProcessBatchWorkflow.run, + workflow_input, + id=workflow_id, + task_queue="batch_sliding_window_task_queue", + ) + + print(f"Workflow started with ID: {handle.id}") + print(f"Waiting for workflow to complete...") + + # Wait for result + try: + result = await handle.result() + print(f"Workflow completed successfully!") + print(f"Total records processed: {result}") + except Exception as e: + print(f"Workflow failed with error: {e}") + raise + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/batch_sliding_window/run_worker.py b/batch_sliding_window/worker.py similarity index 95% rename from batch_sliding_window/run_worker.py rename to batch_sliding_window/worker.py index 1741c9ba..4e10f82f 100644 --- a/batch_sliding_window/run_worker.py +++ b/batch_sliding_window/worker.py @@ -27,7 +27,7 @@ async def main(): # Create worker temporal_worker = worker.Worker( client, - task_queue="batch_sliding_window", + task_queue="batch_sliding_window_task_queue", workflows=[ ProcessBatchWorkflow, SlidingWindowWorkflow, From c32fa47a7ce381604ff000a7430449f360ebe7a0 Mon Sep 17 00:00:00 2001 From: Bill Richards Date: Thu, 17 Jul 2025 13:12:47 -0700 Subject: [PATCH 4/8] Run linter --- batch_sliding_window/__init__.py | 7 +++-- batch_sliding_window/batch_workflow.py | 30 +++++++++++-------- .../record_loader_activity.py | 15 ++++++---- .../record_processor_workflow.py | 8 ++--- .../sliding_window_workflow.py | 24 ++++++++++----- batch_sliding_window/starter.py | 19 +++++++----- batch_sliding_window/worker.py | 2 +- 7 files changed, 65 insertions(+), 40 deletions(-) diff --git a/batch_sliding_window/__init__.py b/batch_sliding_window/__init__.py index 9e473f22..ffe2e124 100644 --- a/batch_sliding_window/__init__.py +++ b/batch_sliding_window/__init__.py @@ -9,7 +9,10 @@ - RecordLoader: Activity for loading records from external sources """ -from batch_sliding_window.batch_workflow import ProcessBatchWorkflow, ProcessBatchWorkflowInput +from batch_sliding_window.batch_workflow import ( + ProcessBatchWorkflow, + ProcessBatchWorkflowInput, +) from batch_sliding_window.sliding_window_workflow import ( SlidingWindowWorkflow, SlidingWindowWorkflowInput, @@ -34,4 +37,4 @@ "GetRecordsInput", "GetRecordsOutput", "SingleRecord", -] \ No newline at end of file +] diff --git a/batch_sliding_window/batch_workflow.py b/batch_sliding_window/batch_workflow.py index bdfb7e9d..856bf792 100644 --- a/batch_sliding_window/batch_workflow.py +++ b/batch_sliding_window/batch_workflow.py @@ -7,16 +7,20 @@ from temporalio.exceptions import ApplicationError from batch_sliding_window.record_loader_activity import RecordLoader -from batch_sliding_window.sliding_window_workflow import SlidingWindowWorkflow, SlidingWindowWorkflowInput +from batch_sliding_window.sliding_window_workflow import ( + SlidingWindowWorkflow, + SlidingWindowWorkflowInput, +) @dataclass class ProcessBatchWorkflowInput: """Input for the ProcessBatchWorkflow. - - A single input structure is preferred to multiple workflow arguments + + A single input structure is preferred to multiple workflow arguments to simplify backward compatible API changes. """ + page_size: int # Number of children started by a single sliding window workflow run sliding_window_size: int # Maximum number of children to run in parallel partitions: int # How many sliding windows to run in parallel @@ -25,8 +29,8 @@ class ProcessBatchWorkflowInput: @workflow.defn class ProcessBatchWorkflow: """Sample workflow that partitions the data set into continuous ranges. - - A real application can choose any other way to divide the records + + A real application can choose any other way to divide the records into multiple collections. """ @@ -44,7 +48,9 @@ async def run(self, input: ProcessBatchWorkflowInput) -> int: ) partitions = self._divide_into_partitions(record_count, input.partitions) - window_sizes = self._divide_into_partitions(input.sliding_window_size, input.partitions) + window_sizes = self._divide_into_partitions( + input.sliding_window_size, input.partitions + ) workflow.logger.info( f"ProcessBatchWorkflow started", @@ -53,22 +59,22 @@ async def run(self, input: ProcessBatchWorkflowInput) -> int: "record_count": record_count, "partitions": partitions, "window_sizes": window_sizes, - } + }, ) # Start child workflows for each partition tasks = [] offset = 0 - + for i in range(input.partitions): # Make child id more user-friendly child_id = f"{workflow.info().workflow_id}/{i}" - + # Define partition boundaries maximum_partition_offset = offset + partitions[i] if maximum_partition_offset > record_count: maximum_partition_offset = record_count - + child_input = SlidingWindowWorkflowInput( page_size=input.page_size, sliding_window_size=window_sizes[i], @@ -77,7 +83,7 @@ async def run(self, input: ProcessBatchWorkflowInput) -> int: progress=0, current_records=None, ) - + task = workflow.execute_child_workflow( SlidingWindowWorkflow.run, child_input, @@ -100,4 +106,4 @@ def _divide_into_partitions(self, number: int, n: int) -> List[int]: for i in range(remainder): partitions[i] += 1 - return partitions \ No newline at end of file + return partitions diff --git a/batch_sliding_window/record_loader_activity.py b/batch_sliding_window/record_loader_activity.py index ced72551..26ae14b1 100644 --- a/batch_sliding_window/record_loader_activity.py +++ b/batch_sliding_window/record_loader_activity.py @@ -7,6 +7,7 @@ @dataclass class GetRecordsInput: """Input for the GetRecords activity.""" + page_size: int offset: int max_offset: int @@ -15,12 +16,14 @@ class GetRecordsInput: @dataclass class SingleRecord: """Represents a single record to be processed.""" + id: int @dataclass class GetRecordsOutput: """Output from the GetRecords activity.""" + records: List[SingleRecord] @@ -33,7 +36,7 @@ def __init__(self, record_count: int): @activity.defn async def get_record_count(self) -> int: """Get the total record count. - + Used to partition processing across parallel sliding windows. The sample implementation just returns a fake value passed during worker initialization. """ @@ -42,13 +45,15 @@ async def get_record_count(self) -> int: @activity.defn async def get_records(self, input: GetRecordsInput) -> GetRecordsOutput: """Get records loaded from an external data source. - + The sample returns fake records. """ if input.max_offset > self.record_count: - raise ValueError(f"max_offset({input.max_offset}) > record_count({self.record_count})") + raise ValueError( + f"max_offset({input.max_offset}) > record_count({self.record_count})" + ) limit = min(input.offset + input.page_size, input.max_offset) records = [SingleRecord(id=i) for i in range(input.offset, limit)] - - return GetRecordsOutput(records=records) \ No newline at end of file + + return GetRecordsOutput(records=records) diff --git a/batch_sliding_window/record_processor_workflow.py b/batch_sliding_window/record_processor_workflow.py index aa01ee29..8921a808 100644 --- a/batch_sliding_window/record_processor_workflow.py +++ b/batch_sliding_window/record_processor_workflow.py @@ -13,10 +13,10 @@ class RecordProcessorWorkflow: @workflow.run async def run(self, record: SingleRecord) -> None: await self._process_record(record) - + # Notify parent about completion via signal parent = workflow.info().parent - + # This workflow is always expected to have a parent. # But for unit testing it might be useful to skip the notification if there is none. if parent: @@ -29,5 +29,5 @@ async def _process_record(self, record: SingleRecord) -> None: # Use workflow.random() to get a random number to ensure workflow determinism sleep_duration = workflow.random().randint(1, 10) await workflow.sleep(sleep_duration) - - workflow.logger.info(f"Processed record {record}") \ No newline at end of file + + workflow.logger.info(f"Processed record {record}") diff --git a/batch_sliding_window/sliding_window_workflow.py b/batch_sliding_window/sliding_window_workflow.py index 7f845f3c..c50bf8d2 100644 --- a/batch_sliding_window/sliding_window_workflow.py +++ b/batch_sliding_window/sliding_window_workflow.py @@ -5,13 +5,18 @@ from temporalio import workflow from temporalio.common import WorkflowIDReusePolicy -from batch_sliding_window.record_loader_activity import RecordLoader, GetRecordsInput, SingleRecord +from batch_sliding_window.record_loader_activity import ( + RecordLoader, + GetRecordsInput, + SingleRecord, +) from batch_sliding_window.record_processor_workflow import RecordProcessorWorkflow @dataclass class SlidingWindowWorkflowInput: """Contains SlidingWindowWorkflow arguments.""" + page_size: int sliding_window_size: int offset: int # inclusive @@ -24,6 +29,7 @@ class SlidingWindowWorkflowInput: @dataclass class SlidingWindowState: """Used as a 'state' query result.""" + current_records: List[int] # record ids currently being processed children_started_by_this_run: int offset: int @@ -33,7 +39,7 @@ class SlidingWindowState: @workflow.defn class SlidingWindowWorkflow: """Workflow processes a range of records using a requested number of child workflows. - + As soon as a child workflow completes a new one is started. """ @@ -54,7 +60,7 @@ async def run(self, input: SlidingWindowWorkflowInput) -> int: "offset": input.offset, "maximum_offset": input.maximum_offset, "progress": input.progress, - } + }, ) # Initialize state from input @@ -111,13 +117,15 @@ async def _execute(self, input: SlidingWindowWorkflowInput) -> int: return await self._continue_as_new_or_complete(input) - async def _continue_as_new_or_complete(self, input: SlidingWindowWorkflowInput) -> int: + async def _continue_as_new_or_complete( + self, input: SlidingWindowWorkflowInput + ) -> int: """Continue-as-new after starting page_size children or complete if done.""" # Update offset based on children started in this run new_offset = input.offset + len(self.children_started_by_this_run) - + if new_offset < input.maximum_offset: - # In Python, await start_child_workflow() already waits until + # In Python, await start_child_workflow() already waits until # the start has been accepted by the server, so no additional wait needed # Continue-as-new with updated state @@ -129,7 +137,7 @@ async def _continue_as_new_or_complete(self, input: SlidingWindowWorkflowInput) progress=self.progress, current_records=self.current_records, ) - + workflow.continue_as_new(new_input) # Last run in the continue-as-new chain @@ -152,4 +160,4 @@ def _handle_state_query(self) -> SlidingWindowState: children_started_by_this_run=len(self.children_started_by_this_run), offset=self.offset, progress=self.progress, - ) \ No newline at end of file + ) diff --git a/batch_sliding_window/starter.py b/batch_sliding_window/starter.py index caa0a936..d9a24971 100644 --- a/batch_sliding_window/starter.py +++ b/batch_sliding_window/starter.py @@ -7,7 +7,10 @@ from temporalio.client import Client -from batch_sliding_window.batch_workflow import ProcessBatchWorkflow, ProcessBatchWorkflowInput +from batch_sliding_window.batch_workflow import ( + ProcessBatchWorkflow, + ProcessBatchWorkflowInput, +) async def main(): @@ -17,21 +20,21 @@ async def main(): # Create client client = await Client.connect("localhost:7233") - + # Create unique workflow ID with timestamp timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") workflow_id = f"batch_sliding_window_example_{timestamp}" - + # Define workflow input workflow_input = ProcessBatchWorkflowInput( page_size=5, sliding_window_size=10, partitions=3, ) - + print(f"Starting workflow with ID: {workflow_id}") print(f"Input: {workflow_input}") - + # Start workflow handle = await client.start_workflow( ProcessBatchWorkflow.run, @@ -39,10 +42,10 @@ async def main(): id=workflow_id, task_queue="batch_sliding_window_task_queue", ) - + print(f"Workflow started with ID: {handle.id}") print(f"Waiting for workflow to complete...") - + # Wait for result try: result = await handle.result() @@ -54,4 +57,4 @@ async def main(): if __name__ == "__main__": - asyncio.run(main()) \ No newline at end of file + asyncio.run(main()) diff --git a/batch_sliding_window/worker.py b/batch_sliding_window/worker.py index 4e10f82f..09b425b3 100644 --- a/batch_sliding_window/worker.py +++ b/batch_sliding_window/worker.py @@ -45,4 +45,4 @@ async def main(): if __name__ == "__main__": - asyncio.run(main()) \ No newline at end of file + asyncio.run(main()) From e06db853f93e4d2e7ea1a4a79585a6ce3e3131f6 Mon Sep 17 00:00:00 2001 From: Bill Richards Date: Thu, 17 Jul 2025 13:15:03 -0700 Subject: [PATCH 5/8] sort imports --- batch_sliding_window/__init__.py | 14 +++++++------- batch_sliding_window/batch_workflow.py | 2 +- batch_sliding_window/sliding_window_workflow.py | 2 +- batch_sliding_window/worker.py | 4 ++-- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/batch_sliding_window/__init__.py b/batch_sliding_window/__init__.py index ffe2e124..959ab031 100644 --- a/batch_sliding_window/__init__.py +++ b/batch_sliding_window/__init__.py @@ -13,18 +13,18 @@ ProcessBatchWorkflow, ProcessBatchWorkflowInput, ) -from batch_sliding_window.sliding_window_workflow import ( - SlidingWindowWorkflow, - SlidingWindowWorkflowInput, - SlidingWindowState, -) -from batch_sliding_window.record_processor_workflow import RecordProcessorWorkflow from batch_sliding_window.record_loader_activity import ( - RecordLoader, GetRecordsInput, GetRecordsOutput, + RecordLoader, SingleRecord, ) +from batch_sliding_window.record_processor_workflow import RecordProcessorWorkflow +from batch_sliding_window.sliding_window_workflow import ( + SlidingWindowState, + SlidingWindowWorkflow, + SlidingWindowWorkflowInput, +) __all__ = [ "ProcessBatchWorkflow", diff --git a/batch_sliding_window/batch_workflow.py b/batch_sliding_window/batch_workflow.py index 856bf792..061c94ab 100644 --- a/batch_sliding_window/batch_workflow.py +++ b/batch_sliding_window/batch_workflow.py @@ -1,6 +1,6 @@ +import asyncio from dataclasses import dataclass from typing import List -import asyncio from temporalio import workflow from temporalio.common import WorkflowIDReusePolicy diff --git a/batch_sliding_window/sliding_window_workflow.py b/batch_sliding_window/sliding_window_workflow.py index c50bf8d2..b8dbbb3a 100644 --- a/batch_sliding_window/sliding_window_workflow.py +++ b/batch_sliding_window/sliding_window_workflow.py @@ -6,8 +6,8 @@ from temporalio.common import WorkflowIDReusePolicy from batch_sliding_window.record_loader_activity import ( - RecordLoader, GetRecordsInput, + RecordLoader, SingleRecord, ) from batch_sliding_window.record_processor_workflow import RecordProcessorWorkflow diff --git a/batch_sliding_window/worker.py b/batch_sliding_window/worker.py index 09b425b3..c0968bc3 100644 --- a/batch_sliding_window/worker.py +++ b/batch_sliding_window/worker.py @@ -8,9 +8,9 @@ from temporalio.client import Client from batch_sliding_window.batch_workflow import ProcessBatchWorkflow -from batch_sliding_window.sliding_window_workflow import SlidingWindowWorkflow -from batch_sliding_window.record_processor_workflow import RecordProcessorWorkflow from batch_sliding_window.record_loader_activity import RecordLoader +from batch_sliding_window.record_processor_workflow import RecordProcessorWorkflow +from batch_sliding_window.sliding_window_workflow import SlidingWindowWorkflow async def main(): From 23edf7cf635c9f1c053d184419a5b0675ee56cfa Mon Sep 17 00:00:00 2001 From: Bill Richards Date: Thu, 17 Jul 2025 14:24:29 -0700 Subject: [PATCH 6/8] fixed undeclared types --- batch_sliding_window/batch_workflow.py | 5 +++-- batch_sliding_window/sliding_window_workflow.py | 12 ++++++++---- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/batch_sliding_window/batch_workflow.py b/batch_sliding_window/batch_workflow.py index 061c94ab..a8d1d488 100644 --- a/batch_sliding_window/batch_workflow.py +++ b/batch_sliding_window/batch_workflow.py @@ -1,5 +1,6 @@ import asyncio from dataclasses import dataclass +from datetime import timedelta from typing import List from temporalio import workflow @@ -37,9 +38,9 @@ class ProcessBatchWorkflow: @workflow.run async def run(self, input: ProcessBatchWorkflowInput) -> int: # Get total record count - record_count = await workflow.execute_activity( + record_count: int = await workflow.execute_activity_method( RecordLoader.get_record_count, - start_to_close_timeout=workflow.timedelta(seconds=5), + start_to_close_timeout=timedelta(seconds=5), ) if input.sliding_window_size < input.partitions: diff --git a/batch_sliding_window/sliding_window_workflow.py b/batch_sliding_window/sliding_window_workflow.py index b8dbbb3a..87e4110d 100644 --- a/batch_sliding_window/sliding_window_workflow.py +++ b/batch_sliding_window/sliding_window_workflow.py @@ -1,5 +1,6 @@ import asyncio from dataclasses import dataclass +from datetime import timedelta from typing import Dict, List, Optional, Set from temporalio import workflow @@ -7,6 +8,7 @@ from batch_sliding_window.record_loader_activity import ( GetRecordsInput, + GetRecordsOutput, RecordLoader, SingleRecord, ) @@ -86,10 +88,12 @@ async def _execute(self, input: SlidingWindowWorkflowInput) -> int: offset=self.offset, max_offset=input.maximum_offset, ) - get_records_output = await workflow.execute_activity( - RecordLoader.get_records, - get_records_input, - start_to_close_timeout=workflow.timedelta(seconds=5), + get_records_output: GetRecordsOutput = ( + await workflow.execute_activity_method( + RecordLoader.get_records, + get_records_input, + start_to_close_timeout=timedelta(seconds=5), + ) ) records = get_records_output.records From a5f41ef4a9e55f4dc6c9f10c865b0454418616a8 Mon Sep 17 00:00:00 2001 From: Bill Richards Date: Fri, 18 Jul 2025 09:36:53 -0700 Subject: [PATCH 7/8] Change Readme to instruct running from the root directory --- batch_sliding_window/README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/batch_sliding_window/README.md b/batch_sliding_window/README.md index b196613a..a43745db 100644 --- a/batch_sliding_window/README.md +++ b/batch_sliding_window/README.md @@ -10,12 +10,12 @@ A single instance of `SlidingWindowWorkflow` has limited window size and through ### Running This Sample -To run, first see [README.md](../README.md) for prerequisites. Then, run the following from this directory to start the worker: +To run, first see [README.md](../README.md) for prerequisites. Then, run the following from root directory to start the worker: - uv run worker.py + uv run batch_sliding_window/worker.py This will start the worker. Then, in another terminal, run the following to execute the workflow: - uv run starter.py + uv run batch_sliding_window/starter.py The workflow will process 90 records using a sliding window of 10 parallel workers across 3 partitions, with a page size of 5 records per continue-as-new iteration. \ No newline at end of file From 0d1256c7e4100ca4112f05f170c33e8b03955c03 Mon Sep 17 00:00:00 2001 From: Bill Richards Date: Fri, 18 Jul 2025 17:24:38 -0600 Subject: [PATCH 8/8] force commit to try to fix required actions --- batch_sliding_window/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/batch_sliding_window/README.md b/batch_sliding_window/README.md index a43745db..8d573ca3 100644 --- a/batch_sliding_window/README.md +++ b/batch_sliding_window/README.md @@ -18,4 +18,4 @@ This will start the worker. Then, in another terminal, run the following to exec uv run batch_sliding_window/starter.py -The workflow will process 90 records using a sliding window of 10 parallel workers across 3 partitions, with a page size of 5 records per continue-as-new iteration. \ No newline at end of file +The workflow will process 90 records using a sliding window of 10 parallel workers across 3 partitions, with a page size of 5 records per continue-as-new iteration.