Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
.venv
.idea
__pycache__
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ Some examples require extra dependencies. See each sample's directory for specif
* [schedules](schedules) - Demonstrates a Workflow Execution that occurs according to a schedule.
* [sentry](sentry) - Report errors to Sentry.
* [trio_async](trio_async) - Use asyncio Temporal in Trio-based environments.
* [updatable_timer](updatable_timer) - A timer that can be updated while sleeping.
* [worker_specific_task_queues](worker_specific_task_queues) - Use unique task queues to ensure activities run on specific workers.
* [worker_versioning](worker_versioning) - Use the Worker Versioning feature to more easily version your workflows & other code.

Expand Down
Empty file.
33 changes: 33 additions & 0 deletions tests/updatable_timer/updatable_timer_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import datetime
import logging
import math
import uuid

from temporalio.client import Client, WorkflowExecutionStatus
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker

from updatable_timer.workflow import Workflow


async def test_updatable_timer_workflow(client: Client):
logging.basicConfig(level=logging.DEBUG)

task_queue_name = str(uuid.uuid4())
async with await WorkflowEnvironment.start_time_skipping() as env:
async with Worker(env.client, task_queue=task_queue_name, workflows=[Workflow]):
in_a_day = float(
(datetime.datetime.now() + datetime.timedelta(days=1)).timestamp()
)
in_an_hour = float(
(datetime.datetime.now() + datetime.timedelta(hours=1)).timestamp()
)
handle = await env.client.start_workflow(
Workflow.run, in_a_day, id=str(uuid.uuid4()), task_queue=task_queue_name
)
wake_up_time1 = await handle.query(Workflow.get_wake_up_time)
assert math.isclose(wake_up_time1, in_a_day)
await handle.signal(Workflow.update_wake_up_time, in_an_hour)
wake_up_time2 = await handle.query(Workflow.get_wake_up_time)
assert math.isclose(wake_up_time2, in_an_hour)
await handle.result()
42 changes: 42 additions & 0 deletions updatable_timer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Updatable Timer Sample

Demonstrates a helper class which relies on `workflow.wait_condition` to implement a blocking sleep that can be updated at any moment.

The sample is composed of the three executables:

* `worker.py` hosts the Workflow Executions.
* `starter.py` starts Workflow Executions.
* `wake_up_timer_updater.py` Signals the Workflow Execution with the new time to wake up.

First start the Worker:

```bash
poetry run python worker.py
```
Check the output of the Worker window. The expected output is:

```
Worker started, ctrl+c to exit
```

Then in a different terminal window start the Workflow Execution:

```bash
poetry run python starter.py
```
Check the output of the Worker window. The expected output is:
```
Workflow started: run_id=...
```

Then run the updater as many times as you want to change timer to 10 seconds from now:

```bash
poetry run python wake_up_time_updater.py
```

Check the output of the worker window. The expected output is:

```
Updated wake up time to 10 seconds from now
```
1 change: 1 addition & 0 deletions updatable_timer/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
TASK_QUEUE = "updatable-timer"
32 changes: 32 additions & 0 deletions updatable_timer/starter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import asyncio
import logging
from datetime import datetime, timedelta
from typing import Optional

from temporalio import exceptions
from temporalio.client import Client

from updatable_timer import TASK_QUEUE
from updatable_timer.workflow import Workflow


async def main(client: Optional[Client] = None):
logging.basicConfig(level=logging.INFO)

client = client or await Client.connect("localhost:7233")
try:
handle = await client.start_workflow(
Workflow.run,
(datetime.now() + timedelta(days=1)).timestamp(),
id=f"updatable-timer-workflow",
task_queue=TASK_QUEUE,
)
logging.info(f"Workflow started: run_id={handle.result_run_id}")
except exceptions.WorkflowAlreadyStartedError as e:
logging.info(
f"Workflow already running: workflow_id={e.workflow_id}, run_id={e.run_id}"
)


if __name__ == "__main__":
asyncio.run(main())
39 changes: 39 additions & 0 deletions updatable_timer/updatable_timer_lib.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import asyncio
from datetime import datetime, timedelta

from temporalio import workflow


class UpdatableTimer:
def __init__(self, wake_up_time: datetime) -> None:
self.wake_up_time = wake_up_time
self.wake_up_time_updated = False

async def sleep(self) -> None:
workflow.logger.info(f"sleep_until: {self.wake_up_time}")
while True:
now = workflow.now()

sleep_interval = self.wake_up_time - now
if sleep_interval <= timedelta(0):
break
workflow.logger.info(f"Going to sleep for {sleep_interval}")

try:
self.wake_up_time_updated = False
await workflow.wait_condition(
lambda: self.wake_up_time_updated,
timeout=sleep_interval,
)
except asyncio.TimeoutError:
# checks condition at the beginning of the loop
continue
workflow.logger.info(f"sleep_until completed")

def update_wake_up_time(self, wake_up_time: datetime) -> None:
workflow.logger.info(f"update_wake_up_time: {wake_up_time}")
self.wake_up_time = wake_up_time
self.wake_up_time_updated = True

def get_wake_up_time(self) -> datetime:
return self.wake_up_time
26 changes: 26 additions & 0 deletions updatable_timer/wake_up_time_updater.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import asyncio
import logging
from datetime import datetime, timedelta
from typing import Optional

from temporalio.client import Client

from updatable_timer.workflow import Workflow


async def main(client: Optional[Client] = None):
logging.basicConfig(level=logging.INFO)

client = client or await Client.connect("localhost:7233")
handle = client.get_workflow_handle(workflow_id="updatable-timer-workflow")
# signal workflow about the wake up time change
await handle.signal(
Workflow.update_wake_up_time,
(datetime.now() + timedelta(seconds=10)).timestamp(),
)

logging.info("Updated wake up time to 10 seconds from now")


if __name__ == "__main__":
asyncio.run(main())
35 changes: 35 additions & 0 deletions updatable_timer/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import asyncio
import logging

from temporalio.client import Client
from temporalio.worker import Worker

from updatable_timer import TASK_QUEUE
from updatable_timer.workflow import Workflow

interrupt_event = asyncio.Event()


async def main():
logging.basicConfig(level=logging.INFO)

client = await Client.connect("localhost:7233")
async with Worker(
client,
task_queue=TASK_QUEUE,
workflows=[Workflow],
):
logging.info("Worker started, ctrl+c to exit")
# Wait until interrupted
await interrupt_event.wait()
logging.info("Interrupt received, shutting down...")


if __name__ == "__main__":
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
interrupt_event.set()
loop.run_until_complete(loop.shutdown_asyncgens())
32 changes: 32 additions & 0 deletions updatable_timer/workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from datetime import datetime, timezone
from typing import Optional

from temporalio import workflow

from updatable_timer.updatable_timer_lib import UpdatableTimer


@workflow.defn
class Workflow:
@workflow.init
def __init__(self, wake_up_time: float) -> None:
self.timer = UpdatableTimer(
datetime.fromtimestamp(wake_up_time, tz=timezone.utc)
)

@workflow.run
async def run(self, wake_up_time: float):
await self.timer.sleep()

@workflow.signal
async def update_wake_up_time(self, wake_up_time: float) -> None:
workflow.logger.info(f"update_wake_up_time: {wake_up_time}")

self.timer.update_wake_up_time(
datetime.fromtimestamp(wake_up_time, tz=timezone.utc)
)

@workflow.query
def get_wake_up_time(self) -> float:
workflow.logger.info(f"get_wake_up_time")
return float(self.timer.get_wake_up_time().timestamp())