From b0d92382e70aaf12cfb153f1afe4023992fbb4b2 Mon Sep 17 00:00:00 2001 From: Maxim Fateev Date: Sat, 29 Mar 2025 13:01:40 -0700 Subject: [PATCH 1/4] Added updatable_timer sample --- .gitignore | 1 + README.md | 1 + updatable_timer/README.md | 42 +++++++++++++++++++++++++ updatable_timer/starter.py | 29 +++++++++++++++++ updatable_timer/updatable_timer.py | 40 +++++++++++++++++++++++ updatable_timer/wake_up_time_updater.py | 25 +++++++++++++++ updatable_timer/worker.py | 33 +++++++++++++++++++ updatable_timer/workflow.py | 29 +++++++++++++++++ 8 files changed, 200 insertions(+) create mode 100644 updatable_timer/README.md create mode 100644 updatable_timer/starter.py create mode 100644 updatable_timer/updatable_timer.py create mode 100644 updatable_timer/wake_up_time_updater.py create mode 100644 updatable_timer/worker.py create mode 100644 updatable_timer/workflow.py diff --git a/.gitignore b/.gitignore index 033df5fb..5c5ffffd 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ .venv +.idea __pycache__ diff --git a/README.md b/README.md index 03f2a6cf..50cadd0d 100644 --- a/README.md +++ b/README.md @@ -73,6 +73,7 @@ Some examples require extra dependencies. See each sample's directory for specif * [schedules](schedules) - Demonstrates a Workflow Execution that occurs according to a schedule. * [sentry](sentry) - Report errors to Sentry. * [trio_async](trio_async) - Use asyncio Temporal in Trio-based environments. +* [updatable_timer](updatable_timer) - A timer that can be updated while sleeping. * [worker_specific_task_queues](worker_specific_task_queues) - Use unique task queues to ensure activities run on specific workers. * [worker_versioning](worker_versioning) - Use the Worker Versioning feature to more easily version your workflows & other code. diff --git a/updatable_timer/README.md b/updatable_timer/README.md new file mode 100644 index 00000000..477b36c0 --- /dev/null +++ b/updatable_timer/README.md @@ -0,0 +1,42 @@ +# Updatable Timer Sample + +Demonstrates a helper class which relies on `workflow.wait_condition` to implement a blocking sleep that can be updated at any moment. + +The sample is composed of the three executables: + +* `worker.py` hosts the Workflow Executions. +* `starter.py` starts Workflow Executions. +* `wake_up_timer_updater.py` Signals the Workflow Execution with the new time to wake up. + +First start the Worker: + +```bash +poetry run python worker.py +``` +Check the output of the Worker window. The expected output is: + +``` +Worker started, ctrl+c to exit +``` + +Then in a different terminal window start the Workflow Execution: + +```bash +poetry run python starter.py +``` +Check the output of the Worker window. The expected output is: +``` +Workflow started: run_id=... +``` + +Then run the updater as many times as you want to change timer to 10 seconds from now: + +```bash +poetry run python wake_up_time_updater.py +``` + +Check the output of the worker window. The expected output is: + +``` +Updated wake up time to 10 seconds from now +``` \ No newline at end of file diff --git a/updatable_timer/starter.py b/updatable_timer/starter.py new file mode 100644 index 00000000..4229ada7 --- /dev/null +++ b/updatable_timer/starter.py @@ -0,0 +1,29 @@ +import asyncio +import logging +from datetime import datetime, timedelta +from typing import Optional + +from temporalio import exceptions +from temporalio.client import Client + +from workflow import Workflow + + +async def main(client: Optional[Client] = None): + logging.basicConfig(level=logging.INFO) + + client = client or await Client.connect("localhost:7233") + try: + handle = await client.start_workflow( + Workflow.run, + (datetime.now() + timedelta(days=1)).timestamp(), + id=f"updatable-timer-workflow", + task_queue="updatable-timer", + ) + logging.info(f"Workflow started: run_id={handle.result_run_id}") + except exceptions.WorkflowAlreadyStartedError as e: + logging.info(f"Workflow already running: workflow_id={e.workflow_id}, run_id={e.run_id}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/updatable_timer/updatable_timer.py b/updatable_timer/updatable_timer.py new file mode 100644 index 00000000..afec4de4 --- /dev/null +++ b/updatable_timer/updatable_timer.py @@ -0,0 +1,40 @@ +import asyncio +from datetime import datetime, timedelta + +from temporalio import workflow + + +class UpdatableTimer: + + def __init__(self, wake_up_time: datetime) -> None: + self.wake_up_time = wake_up_time + self.wake_up_time_updated = False + + async def sleep(self) -> None: + workflow.logger.info(f"sleep_until: {self.wake_up_time}") + while True: + now = workflow.now() + + sleep_interval = self.wake_up_time - now + if sleep_interval <= timedelta(0): + break + workflow.logger.info(f"Going to sleep for {sleep_interval}") + + try: + self.wake_up_time_updated = False + await workflow.wait_condition( + lambda: self.wake_up_time_updated, + timeout=sleep_interval, + ) + except asyncio.TimeoutError: + # checks condition at the beginning of the loop + continue + workflow.logger.info(f"sleep_until completed") + + def update_wake_up_time(self, wake_up_time: datetime) -> None: + workflow.logger.info(f"update_wake_up_time: {wake_up_time}") + self.wake_up_time = wake_up_time + self.wake_up_time_updated = True + + def get_wake_up_time(self) -> datetime: + return self.wake_up_time diff --git a/updatable_timer/wake_up_time_updater.py b/updatable_timer/wake_up_time_updater.py new file mode 100644 index 00000000..03db5d04 --- /dev/null +++ b/updatable_timer/wake_up_time_updater.py @@ -0,0 +1,25 @@ +import asyncio +import logging +from datetime import timedelta, datetime +from typing import Optional + +from temporalio.client import Client + +from workflow import Workflow + + +async def main(client: Optional[Client] = None): + logging.basicConfig(level=logging.INFO) + + client = client or await Client.connect("localhost:7233") + handle = client.get_workflow_handle( + workflow_id="updatable-timer-workflow" + ) + # signal workflow about the wake up time change + await handle.signal(Workflow.update_wake_up_time, (datetime.now() + timedelta(seconds=10)).timestamp()) + + logging.info("Updated wake up time to 10 seconds from now") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/updatable_timer/worker.py b/updatable_timer/worker.py new file mode 100644 index 00000000..cdafe87d --- /dev/null +++ b/updatable_timer/worker.py @@ -0,0 +1,33 @@ +import asyncio +import logging + +from temporalio.client import Client +from temporalio.worker import Worker +from workflow import Workflow + +interrupt_event = asyncio.Event() + + +async def main(): + logging.basicConfig(level=logging.INFO) + + client = await Client.connect("localhost:7233") + async with Worker( + client, + task_queue="updatable-timer", + workflows=[Workflow], + ): + logging.info("Worker started, ctrl+c to exit") + # Wait until interrupted + await interrupt_event.wait() + logging.info("Interrupt received, shutting down...") + + +if __name__ == "__main__": + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + loop.run_until_complete(main()) + except KeyboardInterrupt: + interrupt_event.set() + loop.run_until_complete(loop.shutdown_asyncgens()) diff --git a/updatable_timer/workflow.py b/updatable_timer/workflow.py new file mode 100644 index 00000000..315843db --- /dev/null +++ b/updatable_timer/workflow.py @@ -0,0 +1,29 @@ +from datetime import datetime, timezone + +from temporalio import workflow + +from updatable_timer import UpdatableTimer + + +@workflow.defn +class Workflow: + + def __init__(self): + self.timer = None + + @workflow.run + async def run(self, wake_up_time: float): + self.timer = UpdatableTimer(datetime.fromtimestamp(wake_up_time, tz=timezone.utc)) + await self.timer.sleep() + + @workflow.signal + async def update_wake_up_time(self, wake_up_time: float): + # Deals with situation when signal method is called before run method. + # This happens when workflow task is executed after the signal is received + # or when workflow is started using signal with start. + await workflow.wait_condition(lambda: self.timer is not None) + self.timer.update_wake_up_time(datetime.fromtimestamp(wake_up_time, tz=timezone.utc)) + + @workflow.query + def get_wake_up_time(self): + return self.timer.get_wake_up_time() From b53c87cc78bec45025975ecd4e8310962fe73009 Mon Sep 17 00:00:00 2001 From: Maxim Fateev Date: Sat, 29 Mar 2025 13:30:06 -0700 Subject: [PATCH 2/4] Converted updatable_timer to a module --- updatable_timer/__init__.py | 1 + updatable_timer/starter.py | 5 +++-- .../{updatable_timer.py => updatable_timer_lib.py} | 0 updatable_timer/wake_up_time_updater.py | 2 +- updatable_timer/worker.py | 6 ++++-- updatable_timer/workflow.py | 8 ++++---- 6 files changed, 13 insertions(+), 9 deletions(-) create mode 100644 updatable_timer/__init__.py rename updatable_timer/{updatable_timer.py => updatable_timer_lib.py} (100%) diff --git a/updatable_timer/__init__.py b/updatable_timer/__init__.py new file mode 100644 index 00000000..a5ee5055 --- /dev/null +++ b/updatable_timer/__init__.py @@ -0,0 +1 @@ +TASK_QUEUE = "updatable-timer" diff --git a/updatable_timer/starter.py b/updatable_timer/starter.py index 4229ada7..ad5ad44b 100644 --- a/updatable_timer/starter.py +++ b/updatable_timer/starter.py @@ -6,7 +6,8 @@ from temporalio import exceptions from temporalio.client import Client -from workflow import Workflow +from updatable_timer import TASK_QUEUE +from updatable_timer.workflow import Workflow async def main(client: Optional[Client] = None): @@ -18,7 +19,7 @@ async def main(client: Optional[Client] = None): Workflow.run, (datetime.now() + timedelta(days=1)).timestamp(), id=f"updatable-timer-workflow", - task_queue="updatable-timer", + task_queue=TASK_QUEUE, ) logging.info(f"Workflow started: run_id={handle.result_run_id}") except exceptions.WorkflowAlreadyStartedError as e: diff --git a/updatable_timer/updatable_timer.py b/updatable_timer/updatable_timer_lib.py similarity index 100% rename from updatable_timer/updatable_timer.py rename to updatable_timer/updatable_timer_lib.py diff --git a/updatable_timer/wake_up_time_updater.py b/updatable_timer/wake_up_time_updater.py index 03db5d04..d7732e6b 100644 --- a/updatable_timer/wake_up_time_updater.py +++ b/updatable_timer/wake_up_time_updater.py @@ -5,7 +5,7 @@ from temporalio.client import Client -from workflow import Workflow +from updatable_timer.workflow import Workflow async def main(client: Optional[Client] = None): diff --git a/updatable_timer/worker.py b/updatable_timer/worker.py index cdafe87d..6c1f3d7a 100644 --- a/updatable_timer/worker.py +++ b/updatable_timer/worker.py @@ -3,7 +3,9 @@ from temporalio.client import Client from temporalio.worker import Worker -from workflow import Workflow + +from updatable_timer import TASK_QUEUE +from updatable_timer.workflow import Workflow interrupt_event = asyncio.Event() @@ -14,7 +16,7 @@ async def main(): client = await Client.connect("localhost:7233") async with Worker( client, - task_queue="updatable-timer", + task_queue=TASK_QUEUE, workflows=[Workflow], ): logging.info("Worker started, ctrl+c to exit") diff --git a/updatable_timer/workflow.py b/updatable_timer/workflow.py index 315843db..1b11f5de 100644 --- a/updatable_timer/workflow.py +++ b/updatable_timer/workflow.py @@ -2,7 +2,7 @@ from temporalio import workflow -from updatable_timer import UpdatableTimer +from updatable_timer.updatable_timer_lib import UpdatableTimer @workflow.defn @@ -18,9 +18,9 @@ async def run(self, wake_up_time: float): @workflow.signal async def update_wake_up_time(self, wake_up_time: float): - # Deals with situation when signal method is called before run method. - # This happens when workflow task is executed after the signal is received - # or when workflow is started using signal with start. + # Deals with situation when the signal method is called before the run method. + # This happens when a workflow task is executed after a signal is received + # or when a workflow is started using the signal-with-start. await workflow.wait_condition(lambda: self.timer is not None) self.timer.update_wake_up_time(datetime.fromtimestamp(wake_up_time, tz=timezone.utc)) From c3d90334620bbc62899b485f9a688cee1f0dd4d9 Mon Sep 17 00:00:00 2001 From: Maxim Fateev Date: Sat, 29 Mar 2025 15:40:47 -0700 Subject: [PATCH 3/4] Fixed lint errors --- updatable_timer/starter.py | 4 +++- updatable_timer/updatable_timer_lib.py | 1 - updatable_timer/wake_up_time_updater.py | 11 ++++++----- updatable_timer/worker.py | 6 +++--- updatable_timer/workflow.py | 14 ++++++++++---- 5 files changed, 22 insertions(+), 14 deletions(-) diff --git a/updatable_timer/starter.py b/updatable_timer/starter.py index ad5ad44b..88b4d0d4 100644 --- a/updatable_timer/starter.py +++ b/updatable_timer/starter.py @@ -23,7 +23,9 @@ async def main(client: Optional[Client] = None): ) logging.info(f"Workflow started: run_id={handle.result_run_id}") except exceptions.WorkflowAlreadyStartedError as e: - logging.info(f"Workflow already running: workflow_id={e.workflow_id}, run_id={e.run_id}") + logging.info( + f"Workflow already running: workflow_id={e.workflow_id}, run_id={e.run_id}" + ) if __name__ == "__main__": diff --git a/updatable_timer/updatable_timer_lib.py b/updatable_timer/updatable_timer_lib.py index afec4de4..b90ae868 100644 --- a/updatable_timer/updatable_timer_lib.py +++ b/updatable_timer/updatable_timer_lib.py @@ -5,7 +5,6 @@ class UpdatableTimer: - def __init__(self, wake_up_time: datetime) -> None: self.wake_up_time = wake_up_time self.wake_up_time_updated = False diff --git a/updatable_timer/wake_up_time_updater.py b/updatable_timer/wake_up_time_updater.py index d7732e6b..f406c186 100644 --- a/updatable_timer/wake_up_time_updater.py +++ b/updatable_timer/wake_up_time_updater.py @@ -1,6 +1,6 @@ import asyncio import logging -from datetime import timedelta, datetime +from datetime import datetime, timedelta from typing import Optional from temporalio.client import Client @@ -12,11 +12,12 @@ async def main(client: Optional[Client] = None): logging.basicConfig(level=logging.INFO) client = client or await Client.connect("localhost:7233") - handle = client.get_workflow_handle( - workflow_id="updatable-timer-workflow" - ) + handle = client.get_workflow_handle(workflow_id="updatable-timer-workflow") # signal workflow about the wake up time change - await handle.signal(Workflow.update_wake_up_time, (datetime.now() + timedelta(seconds=10)).timestamp()) + await handle.signal( + Workflow.update_wake_up_time, + (datetime.now() + timedelta(seconds=10)).timestamp(), + ) logging.info("Updated wake up time to 10 seconds from now") diff --git a/updatable_timer/worker.py b/updatable_timer/worker.py index 6c1f3d7a..096fa1ff 100644 --- a/updatable_timer/worker.py +++ b/updatable_timer/worker.py @@ -15,9 +15,9 @@ async def main(): client = await Client.connect("localhost:7233") async with Worker( - client, - task_queue=TASK_QUEUE, - workflows=[Workflow], + client, + task_queue=TASK_QUEUE, + workflows=[Workflow], ): logging.info("Worker started, ctrl+c to exit") # Wait until interrupted diff --git a/updatable_timer/workflow.py b/updatable_timer/workflow.py index 1b11f5de..fff0fa25 100644 --- a/updatable_timer/workflow.py +++ b/updatable_timer/workflow.py @@ -1,4 +1,5 @@ from datetime import datetime, timezone +from typing import Optional from temporalio import workflow @@ -7,13 +8,14 @@ @workflow.defn class Workflow: - def __init__(self): - self.timer = None + self.timer: Optional[UpdatableTimer] = None @workflow.run async def run(self, wake_up_time: float): - self.timer = UpdatableTimer(datetime.fromtimestamp(wake_up_time, tz=timezone.utc)) + self.timer = UpdatableTimer( + datetime.fromtimestamp(wake_up_time, tz=timezone.utc) + ) await self.timer.sleep() @workflow.signal @@ -22,8 +24,12 @@ async def update_wake_up_time(self, wake_up_time: float): # This happens when a workflow task is executed after a signal is received # or when a workflow is started using the signal-with-start. await workflow.wait_condition(lambda: self.timer is not None) - self.timer.update_wake_up_time(datetime.fromtimestamp(wake_up_time, tz=timezone.utc)) + assert self.timer is not None # for mypy + self.timer.update_wake_up_time( + datetime.fromtimestamp(wake_up_time, tz=timezone.utc) + ) @workflow.query def get_wake_up_time(self): + assert self.timer is not None # for mypy return self.timer.get_wake_up_time() From 88e3d6e60ba7dce85573a4f8f3697290177f18dc Mon Sep 17 00:00:00 2001 From: Maxim Fateev Date: Tue, 1 Apr 2025 10:24:48 -0700 Subject: [PATCH 4/4] Fixes per PR review feedback. Refactored workflow to use workflow.init. Added unit test. --- tests/updatable_timer/__init__.py | 0 tests/updatable_timer/updatable_timer_test.py | 33 +++++++++++++++++++ updatable_timer/workflow.py | 25 +++++++------- 3 files changed, 44 insertions(+), 14 deletions(-) create mode 100644 tests/updatable_timer/__init__.py create mode 100644 tests/updatable_timer/updatable_timer_test.py diff --git a/tests/updatable_timer/__init__.py b/tests/updatable_timer/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/updatable_timer/updatable_timer_test.py b/tests/updatable_timer/updatable_timer_test.py new file mode 100644 index 00000000..f1e38245 --- /dev/null +++ b/tests/updatable_timer/updatable_timer_test.py @@ -0,0 +1,33 @@ +import datetime +import logging +import math +import uuid + +from temporalio.client import Client, WorkflowExecutionStatus +from temporalio.testing import WorkflowEnvironment +from temporalio.worker import Worker + +from updatable_timer.workflow import Workflow + + +async def test_updatable_timer_workflow(client: Client): + logging.basicConfig(level=logging.DEBUG) + + task_queue_name = str(uuid.uuid4()) + async with await WorkflowEnvironment.start_time_skipping() as env: + async with Worker(env.client, task_queue=task_queue_name, workflows=[Workflow]): + in_a_day = float( + (datetime.datetime.now() + datetime.timedelta(days=1)).timestamp() + ) + in_an_hour = float( + (datetime.datetime.now() + datetime.timedelta(hours=1)).timestamp() + ) + handle = await env.client.start_workflow( + Workflow.run, in_a_day, id=str(uuid.uuid4()), task_queue=task_queue_name + ) + wake_up_time1 = await handle.query(Workflow.get_wake_up_time) + assert math.isclose(wake_up_time1, in_a_day) + await handle.signal(Workflow.update_wake_up_time, in_an_hour) + wake_up_time2 = await handle.query(Workflow.get_wake_up_time) + assert math.isclose(wake_up_time2, in_an_hour) + await handle.result() diff --git a/updatable_timer/workflow.py b/updatable_timer/workflow.py index fff0fa25..749df6fc 100644 --- a/updatable_timer/workflow.py +++ b/updatable_timer/workflow.py @@ -8,28 +8,25 @@ @workflow.defn class Workflow: - def __init__(self): - self.timer: Optional[UpdatableTimer] = None - - @workflow.run - async def run(self, wake_up_time: float): + @workflow.init + def __init__(self, wake_up_time: float) -> None: self.timer = UpdatableTimer( datetime.fromtimestamp(wake_up_time, tz=timezone.utc) ) + + @workflow.run + async def run(self, wake_up_time: float): await self.timer.sleep() @workflow.signal - async def update_wake_up_time(self, wake_up_time: float): - # Deals with situation when the signal method is called before the run method. - # This happens when a workflow task is executed after a signal is received - # or when a workflow is started using the signal-with-start. - await workflow.wait_condition(lambda: self.timer is not None) - assert self.timer is not None # for mypy + async def update_wake_up_time(self, wake_up_time: float) -> None: + workflow.logger.info(f"update_wake_up_time: {wake_up_time}") + self.timer.update_wake_up_time( datetime.fromtimestamp(wake_up_time, tz=timezone.utc) ) @workflow.query - def get_wake_up_time(self): - assert self.timer is not None # for mypy - return self.timer.get_wake_up_time() + def get_wake_up_time(self) -> float: + workflow.logger.info(f"get_wake_up_time") + return float(self.timer.get_wake_up_time().timestamp())