Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 35 additions & 34 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,60 +25,61 @@ pip install calendar-queue

## Usage

An example usage of `CalendarQueue`
### CalendarQueue

`CalendarQueue` is a low-level, efficient queue for scheduling events at specific times:

```python
import asyncio
from datetime import datetime
from random import randrange
from secrets import token_hex

from datetime import datetime, timedelta
from calendar_queue import CalendarQueue

# (optional) define the item type
CustomItem = tuple[str]

# use the low level calendar queue
cq: CalendarQueue[CustomItem] = CalendarQueue()

async def put_random():
cq = CalendarQueue()

print("Putting new items in the calendar queue. Hit CTRL + C to stop.")
async def schedule_events():
for i in range(3):
scheduled_time = (datetime.now() + timedelta(seconds=i+1)).timestamp()
cq.put_nowait((scheduled_time, f"Event {i+1}"))

while True:
async def process_events():
for _ in range(3):
ts, event = await cq.get()
print(f"{datetime.fromtimestamp(ts).isoformat()}: {event}")

# sleep for a while, just to release the task
await asyncio.sleep(1)

scheduled_ts = datetime.now().timestamp() + randrange(1, 5)

s = token_hex(8)
async def main():
await asyncio.gather(schedule_events(), process_events())

current_item: CustomItem = (s)
if __name__ == "__main__":
asyncio.run(main())
```

print(f"{datetime.now().isoformat()}: putting {current_item} scheduled for {datetime.fromtimestamp(scheduled_ts).isoformat()}")
### Calendar

cq.put_nowait((scheduled_ts, current_item))
`Calendar` is a higher-level abstraction that simplifies working with `datetime` objects and provides an async iterator:

```python
import asyncio
from datetime import datetime, timedelta
from calendar_queue import Calendar

async def get_from_queue():
calendar = Calendar()

while True:
ts, el = await cq.get()
async def schedule_events():
for i in range(3):
scheduled_time = datetime.now() + timedelta(seconds=i+1)
calendar.schedule(f"Event {i+1}", when=scheduled_time)

print(f"{datetime.now().isoformat()}: getting {el} scheduled for {datetime.fromtimestamp(ts).isoformat()}")
async def process_events():
async for ts, event in calendar:
print(f"{datetime.fromtimestamp(ts).isoformat()}: {event}")
if int(ts) == int((datetime.now() + timedelta(seconds=3)).timestamp()):
calendar.stop()

async def main():

await asyncio.gather(
asyncio.create_task(put_random()),
asyncio.create_task(get_from_queue()),
)

await asyncio.gather(schedule_events(), process_events())

if __name__ == "__main__":
asyncio.run(main())

```

## Development
Expand Down