Skip to content

Commit b732c01

Browse files
committed
feat(fetch): afetch now using stream instead of batch
1 parent 50dc993 commit b732c01

File tree

2 files changed

+63
-31
lines changed

2 files changed

+63
-31
lines changed

bin/cppref

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -73,22 +73,21 @@ class CppRef:
7373
records = list(filter(lambda r: not html.joinpath(f"{source}{r.id}.html").exists(), records)) # fmt: off
7474
records = list(records)
7575
records = list(records)
76-
if len(records) == 0:
76+
if (length := len(records)) == 0:
7777
return print("Nothing to fetch.", file=sys.stderr)
7878

79-
# Process
80-
async def _fetch(records: list[Record]):
81-
index, total = -1, len(records)
82-
async for resp in Utils.afetch(*records, timeout=timeout, limit=limit):
83-
index += 1
84-
if isinstance(resp, BaseException):
85-
print(f"Error={type(resp).__name__}({str(resp)}), record={records[index]}", file=sys.stderr) # fmt: off
86-
continue
87-
Utils.write_file(html.joinpath(f"{source}{records[index].id}.html"), resp) # fmt: off
88-
print(f"{index}/{total}", file=sys.stdout)
79+
pbar = tqdm(total=length)
80+
81+
def on_success(record: Record, resp: str):
82+
Utils.write_file(html.joinpath(f"{source}{record.id}.html"), resp)
83+
pbar.update()
84+
85+
def on_failed(record: Record, exec: Exception):
86+
print(f"Error={type(exec).__name__}({exec}): {record}", file=sys.stderr)
87+
pbar.update()
8988

9089
html.mkdir(parents=True, exist_ok=True)
91-
asyncio.run(_fetch(records))
90+
asyncio.run(Utils.afetch(*records, timeout=timeout, limit=limit, on_success=on_success, on_failed=on_failed)) # fmt: off
9291

9392
def parse(self, force: bool = False, interact: bool = False):
9493
source = ConfContext.read_source()

src/cppref/utils.py

Lines changed: 52 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,11 @@
33
import asyncio
44
import gzip
55
import sqlite3
6+
from asyncio import Queue
67
from pathlib import Path
7-
from typing import Callable, Sequence
8+
from typing import Callable
89

9-
from playwright.async_api import async_playwright
10+
from playwright.async_api import Page, async_playwright
1011
from playwright.sync_api import sync_playwright
1112

1213
from cppref.typing_ import Record, Source
@@ -37,27 +38,59 @@ def fetch(record: Record, timeout: float) -> str:
3738
return content
3839

3940
@staticmethod
40-
async def afetch(*records: Record, timeout: float, limit: int):
41-
def batch_iter[T](data: Sequence[T]):
42-
length = len(data)
43-
for i in range(0, length, limit):
44-
yield data[i : i + limit]
41+
async def afetch(
42+
*records: Record,
43+
timeout: float,
44+
limit: int,
45+
on_success: Callable[[Record, str], None],
46+
on_failed: Callable[[Record, Exception], None],
47+
):
48+
_records = Queue[Record]()
49+
for recrod in records:
50+
_records.put_nowait(recrod)
51+
52+
_results = Queue[tuple[Record, Exception | str]]()
53+
54+
async def producer(page: Page):
55+
while not _records.empty():
56+
record = _records.get_nowait()
57+
try:
58+
resp = await page.goto(record.url, timeout=timeout, wait_until="networkidle") # fmt: off
59+
assert resp is not None, f"Timeout: {record}"
60+
assert resp.ok, f"Request failed: {record}, status={resp.status_text}" # fmt: off
61+
except Exception as e:
62+
_results.put_nowait((record, e))
63+
else:
64+
_results.put_nowait((record, await page.content()))
65+
finally:
66+
_records.task_done()
67+
68+
async def customer():
69+
while True:
70+
record, resp = await _results.get()
71+
if isinstance(resp, str):
72+
try:
73+
on_success(record, resp)
74+
except Exception as e:
75+
on_failed(record, e)
76+
else:
77+
on_failed(record, resp)
78+
_results.task_done()
4579

4680
async with async_playwright() as p:
4781
browser = await p.chromium.launch(headless=True)
4882
pages = [await browser.new_page() for _ in range(limit)]
49-
50-
async def _fetch(index: int, record: Record) -> str:
51-
resp = await pages[index].goto(record.url, timeout=timeout, wait_until="networkidle") # fmt: off
52-
assert resp is not None, f"Timeout: {record}"
53-
assert resp.ok, f"Request failed: status={resp.status_text}, {record}"
54-
return await pages[index].content()
55-
56-
for batch in batch_iter(records):
57-
tasks = map(lambda t: _fetch(t[0], t[1]), enumerate(batch))
58-
htmls = await asyncio.gather(*tasks, return_exceptions=True)
59-
for html in htmls:
60-
yield html
83+
producers = [asyncio.create_task(producer(pages[i])) for i in range(limit)]
84+
customers = [asyncio.create_task(customer()) for _ in range(limit)]
85+
await _records.join()
86+
for p in producers:
87+
p.cancel()
88+
await _results.join()
89+
for c in customers:
90+
c.cancel()
91+
92+
await asyncio.gather(*producers, return_exceptions=True)
93+
await asyncio.gather(*customers, return_exceptions=True)
6194

6295
for page in pages:
6396
await page.close()

0 commit comments

Comments
 (0)