diff --git a/.gitignore b/.gitignore index 033df5fb..5c5ffffd 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ .venv +.idea __pycache__ diff --git a/README.md b/README.md index 03f2a6cf..50cadd0d 100644 --- a/README.md +++ b/README.md @@ -73,6 +73,7 @@ Some examples require extra dependencies. See each sample's directory for specif * [schedules](schedules) - Demonstrates a Workflow Execution that occurs according to a schedule. * [sentry](sentry) - Report errors to Sentry. * [trio_async](trio_async) - Use asyncio Temporal in Trio-based environments. +* [updatable_timer](updatable_timer) - A timer that can be updated while sleeping. * [worker_specific_task_queues](worker_specific_task_queues) - Use unique task queues to ensure activities run on specific workers. * [worker_versioning](worker_versioning) - Use the Worker Versioning feature to more easily version your workflows & other code. diff --git a/tests/updatable_timer/__init__.py b/tests/updatable_timer/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/updatable_timer/updatable_timer_test.py b/tests/updatable_timer/updatable_timer_test.py new file mode 100644 index 00000000..f1e38245 --- /dev/null +++ b/tests/updatable_timer/updatable_timer_test.py @@ -0,0 +1,33 @@ +import datetime +import logging +import math +import uuid + +from temporalio.client import Client, WorkflowExecutionStatus +from temporalio.testing import WorkflowEnvironment +from temporalio.worker import Worker + +from updatable_timer.workflow import Workflow + + +async def test_updatable_timer_workflow(client: Client): + logging.basicConfig(level=logging.DEBUG) + + task_queue_name = str(uuid.uuid4()) + async with await WorkflowEnvironment.start_time_skipping() as env: + async with Worker(env.client, task_queue=task_queue_name, workflows=[Workflow]): + in_a_day = float( + (datetime.datetime.now() + datetime.timedelta(days=1)).timestamp() + ) + in_an_hour = float( + (datetime.datetime.now() + datetime.timedelta(hours=1)).timestamp() + ) + handle = await env.client.start_workflow( + Workflow.run, in_a_day, id=str(uuid.uuid4()), task_queue=task_queue_name + ) + wake_up_time1 = await handle.query(Workflow.get_wake_up_time) + assert math.isclose(wake_up_time1, in_a_day) + await handle.signal(Workflow.update_wake_up_time, in_an_hour) + wake_up_time2 = await handle.query(Workflow.get_wake_up_time) + assert math.isclose(wake_up_time2, in_an_hour) + await handle.result() diff --git a/updatable_timer/README.md b/updatable_timer/README.md new file mode 100644 index 00000000..477b36c0 --- /dev/null +++ b/updatable_timer/README.md @@ -0,0 +1,42 @@ +# Updatable Timer Sample + +Demonstrates a helper class which relies on `workflow.wait_condition` to implement a blocking sleep that can be updated at any moment. + +The sample is composed of the three executables: + +* `worker.py` hosts the Workflow Executions. +* `starter.py` starts Workflow Executions. +* `wake_up_timer_updater.py` Signals the Workflow Execution with the new time to wake up. + +First start the Worker: + +```bash +poetry run python worker.py +``` +Check the output of the Worker window. The expected output is: + +``` +Worker started, ctrl+c to exit +``` + +Then in a different terminal window start the Workflow Execution: + +```bash +poetry run python starter.py +``` +Check the output of the Worker window. The expected output is: +``` +Workflow started: run_id=... +``` + +Then run the updater as many times as you want to change timer to 10 seconds from now: + +```bash +poetry run python wake_up_time_updater.py +``` + +Check the output of the worker window. The expected output is: + +``` +Updated wake up time to 10 seconds from now +``` \ No newline at end of file diff --git a/updatable_timer/__init__.py b/updatable_timer/__init__.py new file mode 100644 index 00000000..a5ee5055 --- /dev/null +++ b/updatable_timer/__init__.py @@ -0,0 +1 @@ +TASK_QUEUE = "updatable-timer" diff --git a/updatable_timer/starter.py b/updatable_timer/starter.py new file mode 100644 index 00000000..88b4d0d4 --- /dev/null +++ b/updatable_timer/starter.py @@ -0,0 +1,32 @@ +import asyncio +import logging +from datetime import datetime, timedelta +from typing import Optional + +from temporalio import exceptions +from temporalio.client import Client + +from updatable_timer import TASK_QUEUE +from updatable_timer.workflow import Workflow + + +async def main(client: Optional[Client] = None): + logging.basicConfig(level=logging.INFO) + + client = client or await Client.connect("localhost:7233") + try: + handle = await client.start_workflow( + Workflow.run, + (datetime.now() + timedelta(days=1)).timestamp(), + id=f"updatable-timer-workflow", + task_queue=TASK_QUEUE, + ) + logging.info(f"Workflow started: run_id={handle.result_run_id}") + except exceptions.WorkflowAlreadyStartedError as e: + logging.info( + f"Workflow already running: workflow_id={e.workflow_id}, run_id={e.run_id}" + ) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/updatable_timer/updatable_timer_lib.py b/updatable_timer/updatable_timer_lib.py new file mode 100644 index 00000000..b90ae868 --- /dev/null +++ b/updatable_timer/updatable_timer_lib.py @@ -0,0 +1,39 @@ +import asyncio +from datetime import datetime, timedelta + +from temporalio import workflow + + +class UpdatableTimer: + def __init__(self, wake_up_time: datetime) -> None: + self.wake_up_time = wake_up_time + self.wake_up_time_updated = False + + async def sleep(self) -> None: + workflow.logger.info(f"sleep_until: {self.wake_up_time}") + while True: + now = workflow.now() + + sleep_interval = self.wake_up_time - now + if sleep_interval <= timedelta(0): + break + workflow.logger.info(f"Going to sleep for {sleep_interval}") + + try: + self.wake_up_time_updated = False + await workflow.wait_condition( + lambda: self.wake_up_time_updated, + timeout=sleep_interval, + ) + except asyncio.TimeoutError: + # checks condition at the beginning of the loop + continue + workflow.logger.info(f"sleep_until completed") + + def update_wake_up_time(self, wake_up_time: datetime) -> None: + workflow.logger.info(f"update_wake_up_time: {wake_up_time}") + self.wake_up_time = wake_up_time + self.wake_up_time_updated = True + + def get_wake_up_time(self) -> datetime: + return self.wake_up_time diff --git a/updatable_timer/wake_up_time_updater.py b/updatable_timer/wake_up_time_updater.py new file mode 100644 index 00000000..f406c186 --- /dev/null +++ b/updatable_timer/wake_up_time_updater.py @@ -0,0 +1,26 @@ +import asyncio +import logging +from datetime import datetime, timedelta +from typing import Optional + +from temporalio.client import Client + +from updatable_timer.workflow import Workflow + + +async def main(client: Optional[Client] = None): + logging.basicConfig(level=logging.INFO) + + client = client or await Client.connect("localhost:7233") + handle = client.get_workflow_handle(workflow_id="updatable-timer-workflow") + # signal workflow about the wake up time change + await handle.signal( + Workflow.update_wake_up_time, + (datetime.now() + timedelta(seconds=10)).timestamp(), + ) + + logging.info("Updated wake up time to 10 seconds from now") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/updatable_timer/worker.py b/updatable_timer/worker.py new file mode 100644 index 00000000..096fa1ff --- /dev/null +++ b/updatable_timer/worker.py @@ -0,0 +1,35 @@ +import asyncio +import logging + +from temporalio.client import Client +from temporalio.worker import Worker + +from updatable_timer import TASK_QUEUE +from updatable_timer.workflow import Workflow + +interrupt_event = asyncio.Event() + + +async def main(): + logging.basicConfig(level=logging.INFO) + + client = await Client.connect("localhost:7233") + async with Worker( + client, + task_queue=TASK_QUEUE, + workflows=[Workflow], + ): + logging.info("Worker started, ctrl+c to exit") + # Wait until interrupted + await interrupt_event.wait() + logging.info("Interrupt received, shutting down...") + + +if __name__ == "__main__": + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + loop.run_until_complete(main()) + except KeyboardInterrupt: + interrupt_event.set() + loop.run_until_complete(loop.shutdown_asyncgens()) diff --git a/updatable_timer/workflow.py b/updatable_timer/workflow.py new file mode 100644 index 00000000..749df6fc --- /dev/null +++ b/updatable_timer/workflow.py @@ -0,0 +1,32 @@ +from datetime import datetime, timezone +from typing import Optional + +from temporalio import workflow + +from updatable_timer.updatable_timer_lib import UpdatableTimer + + +@workflow.defn +class Workflow: + @workflow.init + def __init__(self, wake_up_time: float) -> None: + self.timer = UpdatableTimer( + datetime.fromtimestamp(wake_up_time, tz=timezone.utc) + ) + + @workflow.run + async def run(self, wake_up_time: float): + await self.timer.sleep() + + @workflow.signal + async def update_wake_up_time(self, wake_up_time: float) -> None: + workflow.logger.info(f"update_wake_up_time: {wake_up_time}") + + self.timer.update_wake_up_time( + datetime.fromtimestamp(wake_up_time, tz=timezone.utc) + ) + + @workflow.query + def get_wake_up_time(self) -> float: + workflow.logger.info(f"get_wake_up_time") + return float(self.timer.get_wake_up_time().timestamp())