Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions components/bot_detector/event_queue/core/event_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ async def start(self):
async def stop(self):
await self._backend.stop()

async def put(self, message: list[T]):
await self._backend.put(message)
async def put(self, message: list[T]) -> Optional[Exception]:
return await self._backend.put(message)


class QueueConsumer(Generic[T]):
Expand Down Expand Up @@ -64,6 +64,9 @@ async def get_one(self) -> Optional[T] | Exception:
async def get_many(self, count: int) -> list[T] | Exception:
return await self._backend.get_many(count)

async def commit(self) -> Optional[Exception]:
return await self._backend.commit()


class Queue(QueueConsumer[T], QueueProducer[T]):
"""
Expand Down
17 changes: 14 additions & 3 deletions components/bot_detector/worker/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,15 @@
from .core import BaseWorker
from .interface import WorkerInterface
from .core import BaseWorker, ConsumerWorker, ProducerWorker
from .interface import (
ConsumerWorkerInterface,
ProducerWorkerInterface,
WorkerInterface,
)

__all__ = ["BaseWorker", "WorkerInterface"]
__all__ = [
"BaseWorker",
"ConsumerWorker",
"ConsumerWorkerInterface",
"ProducerWorker",
"ProducerWorkerInterface",
"WorkerInterface",
]
240 changes: 139 additions & 101 deletions components/bot_detector/worker/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,138 +2,147 @@
import logging
from typing import Any, Generic, TypeVar

from bot_detector.kafka import ConsumerInterface, ProducerInterface
from bot_detector.event_queue.core import Queue, QueueProducer
from bot_detector.wide_event import EventLoggerInterface, WideEventLogger
from pydantic import BaseModel

from .interface import WorkerInterface
from .interface import ConsumerWorkerInterface, ProducerWorkerInterface

T = TypeVar("T", bound=BaseModel)


class BaseWorker(Generic[T], WorkerInterface[T]):
class BaseWorker(Generic[T]):
"""Generic worker with minimal boilerplate and integrated logging."""

EMPTY_MESSAGE_SLEEP = 10
PRODUCE_RETRY_DELAY = 5
PRODUCE_MAX_RETRY = 3

def __init__(
self,
consumer: ConsumerInterface[T],
producer: ProducerInterface[T],
max_messages: int = 10_000,
max_interval_ms: int = 5_000,
batch_processing: bool = False,
wide_event: EventLoggerInterface = WideEventLogger(sample_ratio=0.1),
logger_name: str | None = None,
) -> None:
self._consumer = consumer
self._producer = producer
self._max_messages = max_messages
self._max_interval_ms = max_interval_ms
self._batch_processing = batch_processing
self._wide_event = wide_event
self._logger = logging.getLogger(logger_name or self.__class__.__name__)
self._stop_event = asyncio.Event()

async def on_message(self, message: T) -> bool:
"""Override with single message logic. Return True for success."""
return True

async def on_message_batch(self, messages: list[T]) -> bool:
"""Override with batch message logic. Return True if all succeed."""
for msg in messages:
if not await self.on_message(msg):
return False
return True

async def start(self) -> None:
await self._consumer.start()
await self._producer.start()
await self._run()

async def stop(self) -> None:
self._stop_event.set()
await self._consumer.stop()
await self._producer.stop()

# ------------------------
# Hooks
# ------------------------
async def _consume_error_hook(self, errors: list[str]):
self._add_context({"error": {"consumer_errors": errors[:5]}})
async def _consume_error_hook(self, errors: list[Exception]):
error_messages = [str(error) for error in errors[:5]]
self._add_context({"error": {"consumer_errors": error_messages}})

async def _empty_message_hook(self):
self._add_context({"_run": {"status": "empty"}})
await asyncio.sleep(self.EMPTY_MESSAGE_SLEEP)

async def _failed_on_message_hook(self):
self._add_context({"_run": {"status": "failed"}})
async def _failed_on_message_hook(self, error: Exception):
self._add_context({"_run": {"status": "failed", "error": str(error)}})

async def _success_on_message_hook(self):
self._add_context({"_run": {"status": "success"}})

# ------------------------
# Retry logic
# WideEvent logging
# ------------------------
def _set_context(self, data: dict) -> Any:
return self._wide_event.set(data)

def _add_context(self, data: dict) -> None:
self._wide_event.add(data)

def _get_context(self) -> dict:
return self._wide_event.get()

def _reset_context(self, token: Any) -> None:
self._wide_event.reset(token)

def _log(self) -> None:
context = self._get_context()
if "error" in context:
self._logger.error(context)
elif self._wide_event.sample():
self._logger.info(context)


class ConsumerWorker(BaseWorker[T], ConsumerWorkerInterface[T]):
"""Worker that consumes messages from a queue and processes them."""

MAX_MESSAGES = 10_000

def __init__(
self,
queue: Queue[T],
batch_processing: bool = False,
wide_event: EventLoggerInterface = WideEventLogger(sample_ratio=0.1),
logger_name: str | None = None,
) -> None:
super().__init__(wide_event=wide_event, logger_name=logger_name)
self._queue = queue
self._batch_processing = batch_processing

async def on_message(self, message: T) -> Exception | None:
"""Override with single message logic. Return Exception on failure."""
return None

async def on_message_batch(self, messages: list[T]) -> Exception | None:
"""Override with batch message logic. Return Exception on failure."""
for msg in messages:
error = await self.on_message(msg)
if error:
return error
return None

async def start(self) -> None:
await self._queue.start()
await self._run()

async def stop(self) -> None:
self._stop_event.set()
await self._queue.stop()

async def _produce_failed_messages(self, batch: list[T]) -> None:
errors: list[str] = []

for message in batch:
retry_count = 0
while retry_count < self.PRODUCE_MAX_RETRY:
try:
await self._producer.produce_one(message=message)
break
except Exception as e:
retry_count += 1
if retry_count >= self.PRODUCE_MAX_RETRY:
errors.append(str(e))
break
await asyncio.sleep(self.PRODUCE_RETRY_DELAY)

if errors:
self._add_context({"error": {"produce_failed_messages": errors[:5]}})
error = await self._queue.put(batch)
if error:
self._add_context({"error": {"produce_failed_messages": [str(error)]}})

# ------------------------
# Core processing loops
# ------------------------
async def _run_one(self) -> list[T]:
failed: list[T] = []
message, consume_error = await self._consumer.consume_one()
if consume_error:
await self._consume_error_hook([consume_error])
if message is None:
result = await self._queue.get_one()
if isinstance(result, Exception):
await self._consume_error_hook([result])
await self._empty_message_hook()
return failed
if result is None:
await self._empty_message_hook()
return failed
message = result
error = await self.on_message(message)
if error is None:
await self._success_on_message_hook()
else:
if await self.on_message(message):
await self._success_on_message_hook()
else:
failed.append(message)
await self._failed_on_message_hook()
failed.append(message)
await self._failed_on_message_hook(error)
return failed

async def _run_many(self) -> list[T]:
failed_messages: list[T] = []
batch, errors = await self._consumer.consume_many(
max_records=self._max_messages, timeout_ms=self._max_interval_ms
)

if errors:
await self._consume_error_hook(errors)

if not batch:
result = await self._queue.get_many(self.MAX_MESSAGES)
if isinstance(result, Exception):
await self._consume_error_hook([result])
await self._empty_message_hook()
return []

return failed_messages
if not result:
await self._empty_message_hook()
return failed_messages
batch = result
self._add_context({"_run": {"batch_size": len(batch)}})

if await self.on_message_batch(batch):
error = await self.on_message_batch(batch)
if error is None:
await self._success_on_message_hook()
else:
failed_messages.extend(batch)
await self._failed_on_message_hook()
await self._failed_on_message_hook(error)

return failed_messages

Expand All @@ -146,32 +155,61 @@ async def _run(self) -> None:
else:
failed_messages = await self._run_one()
await self._produce_failed_messages(failed_messages)
await self._consumer.commit()
commit_error = await self._queue.commit()
if commit_error:
Comment on lines 155 to +159

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Skip commit when no messages were consumed

ConsumerWorker now calls queue.commit() on every loop iteration, even after _run_one/_run_many returns early for an empty queue (or a fetch error). That is harmless for Kafka, but the in-memory backend’s commit() calls asyncio.Queue.task_done() without checking whether a get_nowait() actually happened. When get_one returns None or get_many returns an empty list, task_done() raises ValueError, and the worker will hit the _run_exception path and sleep 15s every loop. This is a regression for the in-memory queue: commit should be a no-op unless at least one message was consumed (or the in-memory commit should guard against empty).

Useful? React with 👍 / 👎.

await self._consume_error_hook([commit_error])
except Exception as e:
self._add_context({"error": {"_run_exception": str(e)}})
await asyncio.sleep(15)
finally:
self._log()
self._reset_context(token)

# ------------------------
# WideEvent logging
# ------------------------
def _set_context(self, data: dict) -> Any:
return self._wide_event.set(data)

def _add_context(self, data: dict) -> None:
self._wide_event.add(data)
class ProducerWorker(BaseWorker[T], ProducerWorkerInterface[T]):
"""Worker that builds and produces messages to a queue."""

def _get_context(self) -> dict:
return self._wide_event.get()
def __init__(
self,
queue: QueueProducer[T] | Queue[T],
wide_event: EventLoggerInterface = WideEventLogger(sample_ratio=0.1),
logger_name: str | None = None,
) -> None:
super().__init__(wide_event=wide_event, logger_name=logger_name)
self._queue = queue

def _reset_context(self, token: Any) -> None:
self._wide_event.reset(token)
async def build_messages(self) -> list[T] | Exception | None:
"""Override to return a batch of messages to produce."""
return None

def _log(self) -> None:
context = self._get_context()
if "error" in context:
self._logger.error(context)
elif self._wide_event.sample():
self._logger.info(context)
async def start(self) -> None:
await self._queue.start()
await self._run()

async def stop(self) -> None:
self._stop_event.set()
await self._queue.stop()

async def _run(self) -> None:
while not self._stop_event.is_set():
token = self._set_context(data={})
try:
result = await self.build_messages()
if isinstance(result, Exception):
await self._failed_on_message_hook(result)
await self._empty_message_hook()
continue
if not result:
await self._empty_message_hook()
continue
error = await self._queue.put(result)
if error:
self._add_context(
{"error": {"produce_failed_messages": [str(error)]}}
)
except Exception as e:
self._add_context({"error": {"_run_exception": str(e)}})
await asyncio.sleep(15)
finally:
self._log()
self._reset_context(token)
12 changes: 9 additions & 3 deletions components/bot_detector/worker/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,15 @@
T = TypeVar("T", bound=BaseModel)


class WorkerInterface(Protocol, Generic[T]): # pragma: no cover
class WorkerInterface(Protocol): # pragma: no cover
async def start(self) -> None: ...
async def stop(self) -> None: ...

async def on_message(self, message: T) -> bool: ...
async def on_message_batch(self, messages: list[T]) -> bool: ...

class ConsumerWorkerInterface(WorkerInterface, Generic[T]): # pragma: no cover
async def on_message(self, message: T) -> Exception | None: ...
async def on_message_batch(self, messages: list[T]) -> Exception | None: ...


class ProducerWorkerInterface(WorkerInterface, Generic[T]): # pragma: no cover
async def build_messages(self) -> list[T] | Exception | None: ...
Loading