From 704423dfb6296f621b220f1174e776f03f459379 Mon Sep 17 00:00:00 2001 From: extreme4all <40169115+extreme4all@users.noreply.github.com> Date: Thu, 5 Feb 2026 19:40:55 +0100 Subject: [PATCH 1/8] Refactor scrape task producer to event queue --- .../bot_detector/scrape_task_producer/core.py | 134 +++++++++--------- 1 file changed, 65 insertions(+), 69 deletions(-) diff --git a/bases/bot_detector/scrape_task_producer/core.py b/bases/bot_detector/scrape_task_producer/core.py index 0c9f64b..3f8db36 100644 --- a/bases/bot_detector/scrape_task_producer/core.py +++ b/bases/bot_detector/scrape_task_producer/core.py @@ -6,13 +6,12 @@ from bot_detector.database import Settings as DBSettings from bot_detector.database import get_session_factory from bot_detector.database.player import PlayerRepo -from bot_detector.kafka import ( - PlayersToScrapeConsumer, - PlayersToScrapeProducer, - ToScrapeStruct, -) from bot_detector.kafka import Settings as KafkaSettings from bot_detector.structs import MetaData, PlayerStruct +from bot_detector.event_queue.adapters.kafka import KafkaConfig, KafkaProducerConfig +from bot_detector.event_queue.core import QueueProducer +from bot_detector.event_queue.factory import QueueFactory +from bot_detector.kafka import ToScrapeStruct from pydantic_settings import BaseSettings from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker from typing_extensions import Literal @@ -37,39 +36,59 @@ class FetchParams: done: bool = False def __post_init__(self): - self.first_date = date.today() - timedelta(days=365) - self.last_date = date.today() - timedelta(days=self.days - 1) + self._update_step_flags() + self.update_date(days=self.days, infinity=False) - def update_date(self, days, infinity: bool = False): + def update_date(self, days: int, infinity: bool = False) -> None: self.days = days - delta = timedelta(days=365) if infinity else timedelta(days=self.days) self.first_date = date.today() - delta self.last_date = date.today() - timedelta(days=self.days - 1) assert self.first_date < self.last_date + def _update_step_flags(self) -> None: + match self.step: + case "normal": + self.possible_ban = False + self.confirmed_ban = False + case "possible_ban": + self.possible_ban = True + self.confirmed_ban = False + case "confirmed_ban": + self.possible_ban = True + self.confirmed_ban = True + case _: + raise ValueError(f"Invalid step: {self.step}") + + def set_step(self, step: Literal["normal", "possible_ban", "confirmed_ban"]) -> None: + self.step = step + self._update_step_flags() + + def reset_for_new_day(self, max_days: int) -> None: + self.set_step("normal") + self.update_date(days=max_days, infinity=True) + self.player_id = 0 + async def produce_players( players: list[PlayerStruct], - player_producer: PlayersToScrapeProducer, + player_producer: QueueProducer[ToScrapeStruct], ): + if not players: + return + logger.info(f"Putting {len(players)} players in queue") + metadata = MetaData(version=1, source="scrape_task_producer") player_structs = [ - ToScrapeStruct( - metadata=MetaData(version=1, source="scrape_task_producer"), - player_data=player, - ) + ToScrapeStruct(metadata=metadata, player_data=player) for player in players if len(player.name) <= 13 ] - - for player in player_structs: - await player_producer.produce_one( - ToScrapeStruct( - metadata=MetaData(version=1, source="scrape_task_producer"), - player_data=player.player_data, - ) - ) + if not player_structs: + return + error = await player_producer.put(player_structs) + if isinstance(error, Exception): + raise error def _reduce_days(fetch_params: FetchParams) -> FetchParams: @@ -80,24 +99,6 @@ def _reduce_days(fetch_params: FetchParams) -> FetchParams: return fetch_params -def _update_ban_flags(fetch_params: FetchParams) -> FetchParams: - match fetch_params.step: - case "normal": - fetch_params.possible_ban = False - fetch_params.confirmed_ban = False - return fetch_params - case "possible_ban": - fetch_params.possible_ban = True - fetch_params.confirmed_ban = False - return fetch_params - case "confirmed_ban": - fetch_params.possible_ban = True - fetch_params.confirmed_ban = True - return fetch_params - case _: - raise ValueError(f"Invalid step: {fetch_params.step}") - - def determine_fetch_params( fetch_params: FetchParams, players: list[PlayerStruct] | None, @@ -108,45 +109,40 @@ def determine_fetch_params( if players is None: return fetch_params + fetch_params._update_step_flags() + if len(players) >= fetch_params.limit: - fetch_params = _update_ban_flags(fetch_params) fetch_params.player_id = players[-1].id return fetch_params match fetch_params.step: case "normal": - fetch_params = _update_ban_flags(fetch_params) - if fetch_params.days > 1: return _reduce_days(fetch_params) assert fetch_params.days <= 1 logger.info("All normal scraped, going to step: possible bans") - fetch_params.step = "possible_ban" + fetch_params.set_step("possible_ban") fetch_params.update_date(days=max_days, infinity=True) return fetch_params case "possible_ban": - fetch_params = _update_ban_flags(fetch_params) - if fetch_params.days > max_possible_ban_days: return _reduce_days(fetch_params) assert fetch_params.days <= max_possible_ban_days logger.info("All possible bans scraped, going to step: confirmed bans") - fetch_params.step = "confirmed_ban" + fetch_params.set_step("confirmed_ban") fetch_params.update_date(days=max_days, infinity=True) return fetch_params case "confirmed_ban": - fetch_params = _update_ban_flags(fetch_params) - if fetch_params.days > max_confirmed_ban_days: return _reduce_days(fetch_params) assert fetch_params.days <= max_confirmed_ban_days logger.info("All confirmed bans scraped, going to step: normal") - fetch_params.step = "normal" + fetch_params.set_step("normal") fetch_params.update_date(days=max_days, infinity=True) fetch_params.done = True return fetch_params @@ -155,8 +151,7 @@ def determine_fetch_params( async def process_players( async_session: async_sessionmaker[AsyncSession], player_repo: PlayerRepo, - player_producer: PlayersToScrapeProducer, - player_consumer: PlayersToScrapeConsumer, + player_producer: QueueProducer[ToScrapeStruct], limit: int = 10, ): max_days = 20 @@ -170,20 +165,10 @@ async def process_players( last_day = date.today() while True: - lag = await player_consumer.get_lag() - if last_day != date.today(): logger.info("New day detected, resetting days and confirmed_ban") last_day = date.today() - fp.days = max_days - fp.confirmed_ban = False - fp.possible_ban = False - fp.player_id = 0 - - if lag >= 100_000: - logger.info(f"{lag=} to high, sleeping(10)") - await asyncio.sleep(10) - continue + fp.reset_for_new_day(max_days) logger.info(f"{asdict(fp)}") @@ -223,26 +208,37 @@ async def main(): async_session, async_engine = get_session_factory(SETTINGS=DBSettings()) bootstrap_servers = KafkaSettings().KAFKA_BOOTSTRAP_SERVERS - player_producer = PlayersToScrapeProducer(bootstrap_servers=bootstrap_servers) - player_consumer = PlayersToScrapeConsumer( - bootstrap_servers=bootstrap_servers, group_id="scraper" + queue = QueueFactory.create_queue( + model=ToScrapeStruct, + queue_type="producer", + backend_type="kafka", + config=KafkaConfig( + topic="players.to_scrape", + bootstrap_servers=bootstrap_servers, + producer=True, + consumer=False, + producer_config=KafkaProducerConfig( + partition_key_fn=lambda: "scrape_task_producer" + ), + consumer_config=None, + ), ) + if isinstance(queue, Exception): + raise queue + player_producer = queue await player_producer.start() - await player_consumer.start() try: await process_players( async_session=async_session, player_repo=PlayerRepo(), player_producer=player_producer, - player_consumer=player_consumer, limit=Settings().LIMIT, ) finally: await async_engine.dispose() await player_producer.stop() - await player_consumer.stop() async def run_async(): From dd7f41368bb103d520b15aae6cfde5c1f3de51f7 Mon Sep 17 00:00:00 2001 From: extreme4all <40169115+extreme4all@users.noreply.github.com> Date: Thu, 5 Feb 2026 19:52:32 +0100 Subject: [PATCH 2/8] Add lag support to event queue consumers --- .../event_queue/adapters/kafka/adapter.py | 25 ++++++++++++++++++- .../event_queue/adapters/memory/adapter.py | 6 +++++ .../event_queue/core/event_queue.py | 3 +++ .../event_queue/core/interface.py | 2 ++ 4 files changed, 35 insertions(+), 1 deletion(-) diff --git a/components/bot_detector/event_queue/adapters/kafka/adapter.py b/components/bot_detector/event_queue/adapters/kafka/adapter.py index 751fce5..99b7f47 100644 --- a/components/bot_detector/event_queue/adapters/kafka/adapter.py +++ b/components/bot_detector/event_queue/adapters/kafka/adapter.py @@ -3,7 +3,12 @@ from typing import Generic, Optional, TypeVar import orjson -from aiokafka import AIOKafkaConsumer, AIOKafkaProducer, ConsumerRecord +from aiokafka import ( + AIOKafkaConsumer, + AIOKafkaProducer, + ConsumerRecord, + TopicPartition, +) from aiokafka.errors import KafkaTimeoutError from bot_detector.event_queue.core.batcher import Batcher from bot_detector.event_queue.core.errors import ( @@ -197,6 +202,21 @@ async def commit(self) -> Optional[Exception]: ) await self.consumer.commit() + async def lag(self) -> int: + if self.consumer is None: + return 0 + total_lag = 0 + partitions = self.consumer.partitions_for_topic(self.config.topic) + if partitions is None: + return 0 + + for partition in partitions: + tp = TopicPartition(self.config.topic, partition) + committed = await self.consumer.committed(tp) or 0 + end_offset = await self.consumer.end_offsets([tp]) + total_lag += end_offset[tp] - committed + return total_lag + class AIOKafkaAdapter(QueueBackendProtocol[T]): """ @@ -232,3 +252,6 @@ async def get_many(self, count: int) -> list[T] | Exception: async def commit(self) -> Optional[Exception]: await self.consumer.commit() + + async def lag(self) -> int: + return await self.consumer.lag() diff --git a/components/bot_detector/event_queue/adapters/memory/adapter.py b/components/bot_detector/event_queue/adapters/memory/adapter.py index 10f77c3..fd8e541 100644 --- a/components/bot_detector/event_queue/adapters/memory/adapter.py +++ b/components/bot_detector/event_queue/adapters/memory/adapter.py @@ -60,6 +60,9 @@ async def get_many(self, count: int) -> list[T]: async def commit(self) -> Optional[Exception]: self._queue.task_done() + async def lag(self) -> int: + return self._queue.qsize() + class InMemoryProducerAdapter( _InMemoryBase[T], @@ -103,3 +106,6 @@ async def get_many(self, count: int) -> list[T]: async def commit(self) -> Optional[Exception]: return await self.consumer.commit() + + async def lag(self) -> int: + return await self.consumer.lag() diff --git a/components/bot_detector/event_queue/core/event_queue.py b/components/bot_detector/event_queue/core/event_queue.py index d43ed42..8fd674b 100644 --- a/components/bot_detector/event_queue/core/event_queue.py +++ b/components/bot_detector/event_queue/core/event_queue.py @@ -64,6 +64,9 @@ async def get_one(self) -> Optional[T] | Exception: async def get_many(self, count: int) -> list[T] | Exception: return await self._backend.get_many(count) + async def lag(self) -> int: + return await self._backend.lag() + class Queue(QueueConsumer[T], QueueProducer[T]): """ diff --git a/components/bot_detector/event_queue/core/interface.py b/components/bot_detector/event_queue/core/interface.py index c211a14..f5d63e4 100644 --- a/components/bot_detector/event_queue/core/interface.py +++ b/components/bot_detector/event_queue/core/interface.py @@ -36,6 +36,8 @@ async def get_many(self, count: int) -> list[T] | Exception: ... async def commit(self) -> Optional[Exception]: ... + async def lag(self) -> int: ... + @runtime_checkable class QueueBackendProtocol( From f114d1ff3e87184f970ff17ffdbd0ebb1229a779 Mon Sep 17 00:00:00 2001 From: extreme4all <40169115+extreme4all@users.noreply.github.com> Date: Thu, 5 Feb 2026 20:00:44 +0100 Subject: [PATCH 3/8] Restore lag check in scrape task producer --- .../bot_detector/scrape_task_producer/core.py | 43 ++++++++++++++++--- 1 file changed, 37 insertions(+), 6 deletions(-) diff --git a/bases/bot_detector/scrape_task_producer/core.py b/bases/bot_detector/scrape_task_producer/core.py index 3f8db36..8560f41 100644 --- a/bases/bot_detector/scrape_task_producer/core.py +++ b/bases/bot_detector/scrape_task_producer/core.py @@ -8,8 +8,12 @@ from bot_detector.database.player import PlayerRepo from bot_detector.kafka import Settings as KafkaSettings from bot_detector.structs import MetaData, PlayerStruct -from bot_detector.event_queue.adapters.kafka import KafkaConfig, KafkaProducerConfig -from bot_detector.event_queue.core import QueueProducer +from bot_detector.event_queue.adapters.kafka import ( + KafkaConfig, + KafkaConsumerConfig, + KafkaProducerConfig, +) +from bot_detector.event_queue.core import QueueConsumer, QueueProducer from bot_detector.event_queue.factory import QueueFactory from bot_detector.kafka import ToScrapeStruct from pydantic_settings import BaseSettings @@ -152,6 +156,7 @@ async def process_players( async_session: async_sessionmaker[AsyncSession], player_repo: PlayerRepo, player_producer: QueueProducer[ToScrapeStruct], + player_consumer: QueueConsumer[ToScrapeStruct], limit: int = 10, ): max_days = 20 @@ -165,11 +170,17 @@ async def process_players( last_day = date.today() while True: + lag = await player_consumer.lag() if last_day != date.today(): logger.info("New day detected, resetting days and confirmed_ban") last_day = date.today() fp.reset_for_new_day(max_days) + if lag >= 100_000: + logger.info(f"{lag=} to high, sleeping(10)") + await asyncio.sleep(10) + continue + logger.info(f"{asdict(fp)}") async with async_session() as session: @@ -208,7 +219,7 @@ async def main(): async_session, async_engine = get_session_factory(SETTINGS=DBSettings()) bootstrap_servers = KafkaSettings().KAFKA_BOOTSTRAP_SERVERS - queue = QueueFactory.create_queue( + queue_producer = QueueFactory.create_queue( model=ToScrapeStruct, queue_type="producer", backend_type="kafka", @@ -223,22 +234,42 @@ async def main(): consumer_config=None, ), ) - if isinstance(queue, Exception): - raise queue - player_producer = queue + if isinstance(queue_producer, Exception): + raise queue_producer + player_producer = queue_producer + + queue_consumer = QueueFactory.create_queue( + model=ToScrapeStruct, + queue_type="consumer", + backend_type="kafka", + config=KafkaConfig( + topic="players.to_scrape", + bootstrap_servers=bootstrap_servers, + producer=False, + consumer=True, + producer_config=None, + consumer_config=KafkaConsumerConfig(group_id="scraper"), + ), + ) + if isinstance(queue_consumer, Exception): + raise queue_consumer + player_consumer = queue_consumer await player_producer.start() + await player_consumer.start() try: await process_players( async_session=async_session, player_repo=PlayerRepo(), player_producer=player_producer, + player_consumer=player_consumer, limit=Settings().LIMIT, ) finally: await async_engine.dispose() await player_producer.stop() + await player_consumer.stop() async def run_async(): From adfccd155f3bad013875a47a537341d1fba855fb Mon Sep 17 00:00:00 2001 From: extreme4all <40169115+extreme4all@users.noreply.github.com> Date: Thu, 5 Feb 2026 20:08:34 +0100 Subject: [PATCH 4/8] Use queue backend and per-message partition keys --- .../bot_detector/scrape_task_producer/core.py | 53 ++++++------------- .../event_queue/adapters/kafka/adapter.py | 2 +- .../event_queue/adapters/kafka/config.py | 2 +- .../adapter_kafka/test_factory_kafka.py | 2 +- .../test_producer_adapter_kafka.py | 18 +++---- .../adapter_kafka/test_queue_adapter_kafka.py | 2 +- 6 files changed, 29 insertions(+), 50 deletions(-) diff --git a/bases/bot_detector/scrape_task_producer/core.py b/bases/bot_detector/scrape_task_producer/core.py index 8560f41..a4772c9 100644 --- a/bases/bot_detector/scrape_task_producer/core.py +++ b/bases/bot_detector/scrape_task_producer/core.py @@ -13,7 +13,7 @@ KafkaConsumerConfig, KafkaProducerConfig, ) -from bot_detector.event_queue.core import QueueConsumer, QueueProducer +from bot_detector.event_queue.core import Queue from bot_detector.event_queue.factory import QueueFactory from bot_detector.kafka import ToScrapeStruct from pydantic_settings import BaseSettings @@ -76,7 +76,7 @@ def reset_for_new_day(self, max_days: int) -> None: async def produce_players( players: list[PlayerStruct], - player_producer: QueueProducer[ToScrapeStruct], + player_queue: Queue[ToScrapeStruct], ): if not players: return @@ -90,7 +90,7 @@ async def produce_players( ] if not player_structs: return - error = await player_producer.put(player_structs) + error = await player_queue.put(player_structs) if isinstance(error, Exception): raise error @@ -155,8 +155,7 @@ def determine_fetch_params( async def process_players( async_session: async_sessionmaker[AsyncSession], player_repo: PlayerRepo, - player_producer: QueueProducer[ToScrapeStruct], - player_consumer: QueueConsumer[ToScrapeStruct], + player_queue: Queue[ToScrapeStruct], limit: int = 10, ): max_days = 20 @@ -170,7 +169,7 @@ async def process_players( last_day = date.today() while True: - lag = await player_consumer.lag() + lag = await player_queue.lag() if last_day != date.today(): logger.info("New day detected, resetting days and confirmed_ban") last_day = date.today() @@ -195,7 +194,7 @@ async def process_players( limit=fp.limit, ) - await produce_players(players=players, player_producer=player_producer) + await produce_players(players=players, player_queue=player_queue) fp = determine_fetch_params( fetch_params=fp, @@ -219,57 +218,37 @@ async def main(): async_session, async_engine = get_session_factory(SETTINGS=DBSettings()) bootstrap_servers = KafkaSettings().KAFKA_BOOTSTRAP_SERVERS - queue_producer = QueueFactory.create_queue( + queue = QueueFactory.create_queue( model=ToScrapeStruct, - queue_type="producer", + queue_type="queue", backend_type="kafka", config=KafkaConfig( topic="players.to_scrape", bootstrap_servers=bootstrap_servers, producer=True, - consumer=False, + consumer=True, producer_config=KafkaProducerConfig( - partition_key_fn=lambda: "scrape_task_producer" + partition_key_fn=lambda message: str(message.player_data.id % 10) ), - consumer_config=None, - ), - ) - if isinstance(queue_producer, Exception): - raise queue_producer - player_producer = queue_producer - - queue_consumer = QueueFactory.create_queue( - model=ToScrapeStruct, - queue_type="consumer", - backend_type="kafka", - config=KafkaConfig( - topic="players.to_scrape", - bootstrap_servers=bootstrap_servers, - producer=False, - consumer=True, - producer_config=None, consumer_config=KafkaConsumerConfig(group_id="scraper"), ), ) - if isinstance(queue_consumer, Exception): - raise queue_consumer - player_consumer = queue_consumer + if isinstance(queue, Exception): + raise queue + player_queue = queue - await player_producer.start() - await player_consumer.start() + await player_queue.start() try: await process_players( async_session=async_session, player_repo=PlayerRepo(), - player_producer=player_producer, - player_consumer=player_consumer, + player_queue=player_queue, limit=Settings().LIMIT, ) finally: await async_engine.dispose() - await player_producer.stop() - await player_consumer.stop() + await player_queue.stop() async def run_async(): diff --git a/components/bot_detector/event_queue/adapters/kafka/adapter.py b/components/bot_detector/event_queue/adapters/kafka/adapter.py index 99b7f47..282a06e 100644 --- a/components/bot_detector/event_queue/adapters/kafka/adapter.py +++ b/components/bot_detector/event_queue/adapters/kafka/adapter.py @@ -84,7 +84,7 @@ async def put(self, messages: list[T]) -> Optional[Exception]: _config = self.config.producer_config for message in messages: - raw_key = _config.partition_key_fn() + raw_key = _config.partition_key_fn(message) if isinstance(raw_key, bytes): key = raw_key elif isinstance(raw_key, str): diff --git a/components/bot_detector/event_queue/adapters/kafka/config.py b/components/bot_detector/event_queue/adapters/kafka/config.py index 5d2e36d..85b9421 100644 --- a/components/bot_detector/event_queue/adapters/kafka/config.py +++ b/components/bot_detector/event_queue/adapters/kafka/config.py @@ -13,7 +13,7 @@ class KafkaConsumerConfig(BaseModel): class KafkaProducerConfig(BaseModel): - partition_key_fn: Callable[[], bytes | str] + partition_key_fn: Callable[[T], bytes | str] MAX_PRODUCE_RETRIES: int = 3 MAX_PRODUCE_RETRY_BACKOFF: int = 60 diff --git a/test/components/bot_detector/event_queue/adapter_kafka/test_factory_kafka.py b/test/components/bot_detector/event_queue/adapter_kafka/test_factory_kafka.py index d25f004..b4199e3 100644 --- a/test/components/bot_detector/event_queue/adapter_kafka/test_factory_kafka.py +++ b/test/components/bot_detector/event_queue/adapter_kafka/test_factory_kafka.py @@ -26,7 +26,7 @@ async def test_queue_factory_creates_kafka_queue(): consumer=True, producer=True, consumer_config=KafkaConsumerConfig(group_id="group"), - producer_config=KafkaProducerConfig(partition_key_fn=lambda: "1"), + producer_config=KafkaProducerConfig(partition_key_fn=lambda _: "1"), ) queue = QueueFactory.create_queue( diff --git a/test/components/bot_detector/event_queue/adapter_kafka/test_producer_adapter_kafka.py b/test/components/bot_detector/event_queue/adapter_kafka/test_producer_adapter_kafka.py index ee952b2..33b93ca 100644 --- a/test/components/bot_detector/event_queue/adapter_kafka/test_producer_adapter_kafka.py +++ b/test/components/bot_detector/event_queue/adapter_kafka/test_producer_adapter_kafka.py @@ -29,7 +29,7 @@ async def test_producer_start_stop(): consumer=False, producer=True, consumer_config=None, - producer_config=KafkaProducerConfig(partition_key_fn=lambda: "1"), + producer_config=KafkaProducerConfig(partition_key_fn=lambda _: "1"), ) adapter = AIOKafkaProducerAdapter(PlayerScraped, config) @@ -55,7 +55,7 @@ async def test_producer_custom_partition_key(): consumer=False, producer=True, consumer_config=None, - producer_config=KafkaProducerConfig(partition_key_fn=lambda: "1"), + producer_config=KafkaProducerConfig(partition_key_fn=lambda _: "1"), ) adapter = AIOKafkaProducerAdapter(PlayerScraped, config) @@ -86,7 +86,7 @@ async def test_producer_partition_key_bytes(): consumer=False, producer=True, consumer_config=None, - producer_config=KafkaProducerConfig(partition_key_fn=lambda: b"bytes"), + producer_config=KafkaProducerConfig(partition_key_fn=lambda _: b"bytes"), ) adapter = AIOKafkaProducerAdapter(PlayerScraped, config) @@ -110,7 +110,7 @@ async def test_producer_partition_key_invalid_type(): consumer=False, producer=True, consumer_config=None, - producer_config=KafkaProducerConfig(partition_key_fn=lambda: 123), + producer_config=KafkaProducerConfig(partition_key_fn=lambda _: 123), ) adapter = AIOKafkaProducerAdapter(PlayerScraped, config) adapter.producer = AsyncMock() @@ -128,7 +128,7 @@ async def test_producer_serialization(): consumer=False, producer=True, consumer_config=None, - producer_config=KafkaProducerConfig(partition_key_fn=lambda: "1"), + producer_config=KafkaProducerConfig(partition_key_fn=lambda _: "1"), ) adapter = AIOKafkaProducerAdapter(PlayerScraped, config) @@ -153,7 +153,7 @@ async def test_producer_start_noop_when_existing(): consumer=False, producer=True, consumer_config=None, - producer_config=KafkaProducerConfig(partition_key_fn=lambda: "1"), + producer_config=KafkaProducerConfig(partition_key_fn=lambda _: "1"), ) adapter = AIOKafkaProducerAdapter(PlayerScraped, config) adapter.producer = AsyncMock() @@ -174,7 +174,7 @@ async def test_producer_stop_noop_when_missing(): consumer=False, producer=True, consumer_config=None, - producer_config=KafkaProducerConfig(partition_key_fn=lambda: "1"), + producer_config=KafkaProducerConfig(partition_key_fn=lambda _: "1"), ) adapter = AIOKafkaProducerAdapter(PlayerScraped, config) @@ -190,7 +190,7 @@ async def test_producer_send_failure(): producer=True, consumer_config=None, producer_config=KafkaProducerConfig( - partition_key_fn=lambda: "1", + partition_key_fn=lambda _: "1", MAX_PRODUCE_RETRIES=1, ), ) @@ -224,7 +224,7 @@ async def test_producer_put_without_start(): consumer=False, producer=True, consumer_config=None, - producer_config=KafkaProducerConfig(partition_key_fn=lambda: "1"), + producer_config=KafkaProducerConfig(partition_key_fn=lambda _: "1"), ) adapter = AIOKafkaProducerAdapter(PlayerScraped, config) diff --git a/test/components/bot_detector/event_queue/adapter_kafka/test_queue_adapter_kafka.py b/test/components/bot_detector/event_queue/adapter_kafka/test_queue_adapter_kafka.py index dbc165f..0902faa 100644 --- a/test/components/bot_detector/event_queue/adapter_kafka/test_queue_adapter_kafka.py +++ b/test/components/bot_detector/event_queue/adapter_kafka/test_queue_adapter_kafka.py @@ -24,7 +24,7 @@ async def test_queue_adapter_wiring(): consumer=True, producer=True, consumer_config=KafkaConsumerConfig(group_id="group"), - producer_config=KafkaProducerConfig(partition_key_fn=lambda: "1"), + producer_config=KafkaProducerConfig(partition_key_fn=lambda _: "1"), ) adapter = AIOKafkaAdapter(PlayerScraped, config) adapter.producer.start = AsyncMock() From edb625010e3c3986f24ea4088b4096cf8ff1b880 Mon Sep 17 00:00:00 2001 From: extreme4all <40169115+extreme4all@users.noreply.github.com> Date: Thu, 5 Feb 2026 20:32:46 +0100 Subject: [PATCH 5/8] Add event_queue brick to scrape task producer --- projects/scrape_task_producer/pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/projects/scrape_task_producer/pyproject.toml b/projects/scrape_task_producer/pyproject.toml index 0095718..e4c7b9d 100644 --- a/projects/scrape_task_producer/pyproject.toml +++ b/projects/scrape_task_producer/pyproject.toml @@ -30,5 +30,6 @@ packages = ["bot_detector"] "../../bases/bot_detector/scrape_task_producer" = "bot_detector/scrape_task_producer" "../../components/bot_detector/database" = "bot_detector/database" "../../components/bot_detector/kafka" = "bot_detector/kafka" +"../../components/bot_detector/event_queue" = "bot_detector/event_queue" "../../components/bot_detector/structs" = "bot_detector/structs" "../../components/bot_detector/logfmt" = "bot_detector/logfmt" From 83d1ded5cdbc0d5e9de22e73705c8d68b6cb08c7 Mon Sep 17 00:00:00 2001 From: extreme4all <40169115+extreme4all@users.noreply.github.com> Date: Thu, 5 Feb 2026 20:40:32 +0100 Subject: [PATCH 6/8] Add wide_event brick to api_public --- projects/api_public/pyproject.toml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/projects/api_public/pyproject.toml b/projects/api_public/pyproject.toml index 3d04ed3..1eb4a85 100644 --- a/projects/api_public/pyproject.toml +++ b/projects/api_public/pyproject.toml @@ -28,5 +28,6 @@ packages = ["bot_detector"] "../../bases/bot_detector/api_public" = "bot_detector/api_public" "../../components/bot_detector/database" = "bot_detector/database" "../../components/bot_detector/kafka" = "bot_detector/kafka" +"../../components/bot_detector/wide_event" = "bot_detector/wide_event" "../../components/bot_detector/structs" = "bot_detector/structs" -"../../components/bot_detector/logfmt" = "bot_detector/logfmt" \ No newline at end of file +"../../components/bot_detector/logfmt" = "bot_detector/logfmt" From c26457c642119ad1d33a290f5d6a89a4fadc991c Mon Sep 17 00:00:00 2001 From: extreme4all <40169115+extreme4all@users.noreply.github.com> Date: Thu, 5 Feb 2026 20:54:58 +0100 Subject: [PATCH 7/8] Add wide event logging for scrape task producer --- .../bot_detector/scrape_task_producer/core.py | 138 ++++++++++++------ projects/scrape_task_producer/pyproject.toml | 1 + 2 files changed, 93 insertions(+), 46 deletions(-) diff --git a/bases/bot_detector/scrape_task_producer/core.py b/bases/bot_detector/scrape_task_producer/core.py index a4772c9..a6cc7f9 100644 --- a/bases/bot_detector/scrape_task_producer/core.py +++ b/bases/bot_detector/scrape_task_producer/core.py @@ -8,6 +8,7 @@ from bot_detector.database.player import PlayerRepo from bot_detector.kafka import Settings as KafkaSettings from bot_detector.structs import MetaData, PlayerStruct +from bot_detector.wide_event import WideEventLogger from bot_detector.event_queue.adapters.kafka import ( KafkaConfig, KafkaConsumerConfig, @@ -21,6 +22,20 @@ from typing_extensions import Literal logger = logging.getLogger(__name__) +wide_event = WideEventLogger(sample_ratio=0.1) + + +def _log_event( + data: dict[str, object], + *, + force: bool = False, + error: Exception | None = None, +) -> None: + if error is not None: + logger.error({**data, "error": str(error)}) + return + if force or wide_event.sample(): + logger.info(data) class Settings(BaseSettings): @@ -81,7 +96,7 @@ async def produce_players( if not players: return - logger.info(f"Putting {len(players)} players in queue") + _log_event({"event": "queue_players", "count": len(players)}) metadata = MetaData(version=1, source="scrape_task_producer") player_structs = [ ToScrapeStruct(metadata=metadata, player_data=player) @@ -96,7 +111,7 @@ async def produce_players( def _reduce_days(fetch_params: FetchParams) -> FetchParams: - logger.info(f"Reducing days for {asdict(fetch_params)}") + _log_event({"event": "reduce_days", "fetch_params": asdict(fetch_params)}) _days = fetch_params.days - 1 if fetch_params.days > 1 else 1 fetch_params.update_date(days=_days) fetch_params.player_id = 0 @@ -125,7 +140,14 @@ def determine_fetch_params( return _reduce_days(fetch_params) assert fetch_params.days <= 1 - logger.info("All normal scraped, going to step: possible bans") + _log_event( + { + "event": "set_step", + "from": fetch_params.step, + "to": "possible_ban", + }, + force=True, + ) fetch_params.set_step("possible_ban") fetch_params.update_date(days=max_days, infinity=True) return fetch_params @@ -135,7 +157,14 @@ def determine_fetch_params( return _reduce_days(fetch_params) assert fetch_params.days <= max_possible_ban_days - logger.info("All possible bans scraped, going to step: confirmed bans") + _log_event( + { + "event": "set_step", + "from": fetch_params.step, + "to": "confirmed_ban", + }, + force=True, + ) fetch_params.set_step("confirmed_ban") fetch_params.update_date(days=max_days, infinity=True) return fetch_params @@ -145,7 +174,14 @@ def determine_fetch_params( return _reduce_days(fetch_params) assert fetch_params.days <= max_confirmed_ban_days - logger.info("All confirmed bans scraped, going to step: normal") + _log_event( + { + "event": "set_step", + "from": fetch_params.step, + "to": "normal", + }, + force=True, + ) fetch_params.set_step("normal") fetch_params.update_date(days=max_days, infinity=True) fetch_params.done = True @@ -169,49 +205,59 @@ async def process_players( last_day = date.today() while True: - lag = await player_queue.lag() - if last_day != date.today(): - logger.info("New day detected, resetting days and confirmed_ban") - last_day = date.today() - fp.reset_for_new_day(max_days) - - if lag >= 100_000: - logger.info(f"{lag=} to high, sleeping(10)") - await asyncio.sleep(10) - continue - - logger.info(f"{asdict(fp)}") - - async with async_session() as session: - players = await player_repo.select_player( - async_session=session, - player_id=fp.player_id, - possible_ban=fp.possible_ban, - confirmed_ban=fp.confirmed_ban, - or_none=fp.step == "normal", - first_date=fp.first_date, - last_date=fp.last_date, - limit=fp.limit, + try: + lag = await player_queue.lag() + if last_day != date.today(): + _log_event({"event": "new_day_reset"}, force=True) + last_day = date.today() + fp.reset_for_new_day(max_days) + + if lag >= 100_000: + _log_event({"event": "lag_throttle", "lag": lag}) + await asyncio.sleep(10) + continue + + _log_event({"event": "fetch_params", "data": asdict(fp)}) + + async with async_session() as session: + players = await player_repo.select_player( + async_session=session, + player_id=fp.player_id, + possible_ban=fp.possible_ban, + confirmed_ban=fp.confirmed_ban, + or_none=fp.step == "normal", + first_date=fp.first_date, + last_date=fp.last_date, + limit=fp.limit, + ) + + await produce_players(players=players, player_queue=player_queue) + + fp = determine_fetch_params( + fetch_params=fp, + players=players, + max_days=max_days, ) - await produce_players(players=players, player_queue=player_queue) - - fp = determine_fetch_params( - fetch_params=fp, - players=players, - max_days=max_days, - ) - - if fp.done: - fp.done = False - now = datetime.now() - end_of_today = datetime.combine(now.date(), time.max) - - time_remaining = end_of_today - now - sleep_time = int(time_remaining.total_seconds()) - sleep_time = max(sleep_time, 1) # Ensure at least 1 second sleep - logger.info(f"Sleeping for {sleep_time} seconds until end of day") - await asyncio.sleep(sleep_time) + if fp.done: + fp.done = False + now = datetime.now() + end_of_today = datetime.combine(now.date(), time.max) + + time_remaining = end_of_today - now + sleep_time = int(time_remaining.total_seconds()) + sleep_time = max(sleep_time, 1) # Ensure at least 1 second sleep + _log_event( + { + "event": "done_for_day", + "sleep_seconds": sleep_time, + }, + force=True, + ) + await asyncio.sleep(sleep_time) + except Exception as exc: + _log_event({"event": "scrape_task_producer_error"}, error=exc) + raise async def main(): diff --git a/projects/scrape_task_producer/pyproject.toml b/projects/scrape_task_producer/pyproject.toml index e4c7b9d..2cc943c 100644 --- a/projects/scrape_task_producer/pyproject.toml +++ b/projects/scrape_task_producer/pyproject.toml @@ -31,5 +31,6 @@ packages = ["bot_detector"] "../../components/bot_detector/database" = "bot_detector/database" "../../components/bot_detector/kafka" = "bot_detector/kafka" "../../components/bot_detector/event_queue" = "bot_detector/event_queue" +"../../components/bot_detector/wide_event" = "bot_detector/wide_event" "../../components/bot_detector/structs" = "bot_detector/structs" "../../components/bot_detector/logfmt" = "bot_detector/logfmt" From b498217d3c290c4acc1258ca6991dab78288f555 Mon Sep 17 00:00:00 2001 From: extreme4all <40169115+extreme4all@users.noreply.github.com> Date: Thu, 5 Feb 2026 21:16:48 +0100 Subject: [PATCH 8/8] Refine scrape task producer wide event logging --- .../bot_detector/scrape_task_producer/core.py | 89 ++++++++++--------- 1 file changed, 46 insertions(+), 43 deletions(-) diff --git a/bases/bot_detector/scrape_task_producer/core.py b/bases/bot_detector/scrape_task_producer/core.py index a6cc7f9..eb98487 100644 --- a/bases/bot_detector/scrape_task_producer/core.py +++ b/bases/bot_detector/scrape_task_producer/core.py @@ -22,20 +22,15 @@ from typing_extensions import Literal logger = logging.getLogger(__name__) -wide_event = WideEventLogger(sample_ratio=0.1) +wide_event = WideEventLogger() -def _log_event( - data: dict[str, object], - *, - force: bool = False, - error: Exception | None = None, -) -> None: - if error is not None: - logger.error({**data, "error": str(error)}) - return - if force or wide_event.sample(): - logger.info(data) +def _add_event(data: dict[str, object]) -> None: + wide_event.add(data) + + +def _force_log() -> None: + wide_event.add({"force_log": True}) class Settings(BaseSettings): @@ -96,7 +91,7 @@ async def produce_players( if not players: return - _log_event({"event": "queue_players", "count": len(players)}) + _add_event({"queue_players": {"count": len(players)}}) metadata = MetaData(version=1, source="scrape_task_producer") player_structs = [ ToScrapeStruct(metadata=metadata, player_data=player) @@ -111,7 +106,7 @@ async def produce_players( def _reduce_days(fetch_params: FetchParams) -> FetchParams: - _log_event({"event": "reduce_days", "fetch_params": asdict(fetch_params)}) + _add_event({"reduce_days": {"fetch_params": asdict(fetch_params)}}) _days = fetch_params.days - 1 if fetch_params.days > 1 else 1 fetch_params.update_date(days=_days) fetch_params.player_id = 0 @@ -140,14 +135,15 @@ def determine_fetch_params( return _reduce_days(fetch_params) assert fetch_params.days <= 1 - _log_event( + _add_event( { - "event": "set_step", - "from": fetch_params.step, - "to": "possible_ban", - }, - force=True, + "set_step": { + "from": fetch_params.step, + "to": "possible_ban", + } + } ) + _force_log() fetch_params.set_step("possible_ban") fetch_params.update_date(days=max_days, infinity=True) return fetch_params @@ -157,14 +153,15 @@ def determine_fetch_params( return _reduce_days(fetch_params) assert fetch_params.days <= max_possible_ban_days - _log_event( + _add_event( { - "event": "set_step", - "from": fetch_params.step, - "to": "confirmed_ban", - }, - force=True, + "set_step": { + "from": fetch_params.step, + "to": "confirmed_ban", + } + } ) + _force_log() fetch_params.set_step("confirmed_ban") fetch_params.update_date(days=max_days, infinity=True) return fetch_params @@ -174,14 +171,15 @@ def determine_fetch_params( return _reduce_days(fetch_params) assert fetch_params.days <= max_confirmed_ban_days - _log_event( + _add_event( { - "event": "set_step", - "from": fetch_params.step, - "to": "normal", - }, - force=True, + "set_step": { + "from": fetch_params.step, + "to": "normal", + } + } ) + _force_log() fetch_params.set_step("normal") fetch_params.update_date(days=max_days, infinity=True) fetch_params.done = True @@ -205,19 +203,21 @@ async def process_players( last_day = date.today() while True: + token = wide_event.set({}) try: lag = await player_queue.lag() if last_day != date.today(): - _log_event({"event": "new_day_reset"}, force=True) + _add_event({"new_day_reset": True}) + _force_log() last_day = date.today() fp.reset_for_new_day(max_days) if lag >= 100_000: - _log_event({"event": "lag_throttle", "lag": lag}) + _add_event({"lag_throttle": {"lag": lag}}) await asyncio.sleep(10) continue - _log_event({"event": "fetch_params", "data": asdict(fp)}) + _add_event({"fetch_params": asdict(fp)}) async with async_session() as session: players = await player_repo.select_player( @@ -247,17 +247,20 @@ async def process_players( time_remaining = end_of_today - now sleep_time = int(time_remaining.total_seconds()) sleep_time = max(sleep_time, 1) # Ensure at least 1 second sleep - _log_event( - { - "event": "done_for_day", - "sleep_seconds": sleep_time, - }, - force=True, - ) + _add_event({"done_for_day": {"sleep_seconds": sleep_time}}) + _force_log() await asyncio.sleep(sleep_time) except Exception as exc: - _log_event({"event": "scrape_task_producer_error"}, error=exc) + _add_event({"error": {"message": str(exc)}}) raise + finally: + final_ctx = wide_event.get() + force_log = bool(final_ctx.pop("force_log", False)) + if "error" in final_ctx: + logger.error(final_ctx) + elif force_log or wide_event.sample(): + logger.info(final_ctx) + wide_event.reset(token) async def main():