diff --git a/bases/bot_detector/scrape_task_producer/core.py b/bases/bot_detector/scrape_task_producer/core.py index 0c9f64b..eb98487 100644 --- a/bases/bot_detector/scrape_task_producer/core.py +++ b/bases/bot_detector/scrape_task_producer/core.py @@ -6,18 +6,31 @@ from bot_detector.database import Settings as DBSettings from bot_detector.database import get_session_factory from bot_detector.database.player import PlayerRepo -from bot_detector.kafka import ( - PlayersToScrapeConsumer, - PlayersToScrapeProducer, - ToScrapeStruct, -) from bot_detector.kafka import Settings as KafkaSettings from bot_detector.structs import MetaData, PlayerStruct +from bot_detector.wide_event import WideEventLogger +from bot_detector.event_queue.adapters.kafka import ( + KafkaConfig, + KafkaConsumerConfig, + KafkaProducerConfig, +) +from bot_detector.event_queue.core import Queue +from bot_detector.event_queue.factory import QueueFactory +from bot_detector.kafka import ToScrapeStruct from pydantic_settings import BaseSettings from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker from typing_extensions import Literal logger = logging.getLogger(__name__) +wide_event = WideEventLogger() + + +def _add_event(data: dict[str, object]) -> None: + wide_event.add(data) + + +def _force_log() -> None: + wide_event.add({"force_log": True}) class Settings(BaseSettings): @@ -37,67 +50,69 @@ class FetchParams: done: bool = False def __post_init__(self): - self.first_date = date.today() - timedelta(days=365) - self.last_date = date.today() - timedelta(days=self.days - 1) + self._update_step_flags() + self.update_date(days=self.days, infinity=False) - def update_date(self, days, infinity: bool = False): + def update_date(self, days: int, infinity: bool = False) -> None: self.days = days - delta = timedelta(days=365) if infinity else timedelta(days=self.days) self.first_date = date.today() - delta self.last_date = date.today() - timedelta(days=self.days - 1) assert self.first_date < self.last_date + def _update_step_flags(self) -> None: + match self.step: + case "normal": + self.possible_ban = False + self.confirmed_ban = False + case "possible_ban": + self.possible_ban = True + self.confirmed_ban = False + case "confirmed_ban": + self.possible_ban = True + self.confirmed_ban = True + case _: + raise ValueError(f"Invalid step: {self.step}") + + def set_step(self, step: Literal["normal", "possible_ban", "confirmed_ban"]) -> None: + self.step = step + self._update_step_flags() + + def reset_for_new_day(self, max_days: int) -> None: + self.set_step("normal") + self.update_date(days=max_days, infinity=True) + self.player_id = 0 + async def produce_players( players: list[PlayerStruct], - player_producer: PlayersToScrapeProducer, + player_queue: Queue[ToScrapeStruct], ): - logger.info(f"Putting {len(players)} players in queue") + if not players: + return + + _add_event({"queue_players": {"count": len(players)}}) + metadata = MetaData(version=1, source="scrape_task_producer") player_structs = [ - ToScrapeStruct( - metadata=MetaData(version=1, source="scrape_task_producer"), - player_data=player, - ) + ToScrapeStruct(metadata=metadata, player_data=player) for player in players if len(player.name) <= 13 ] - - for player in player_structs: - await player_producer.produce_one( - ToScrapeStruct( - metadata=MetaData(version=1, source="scrape_task_producer"), - player_data=player.player_data, - ) - ) + if not player_structs: + return + error = await player_queue.put(player_structs) + if isinstance(error, Exception): + raise error def _reduce_days(fetch_params: FetchParams) -> FetchParams: - logger.info(f"Reducing days for {asdict(fetch_params)}") + _add_event({"reduce_days": {"fetch_params": asdict(fetch_params)}}) _days = fetch_params.days - 1 if fetch_params.days > 1 else 1 fetch_params.update_date(days=_days) fetch_params.player_id = 0 return fetch_params -def _update_ban_flags(fetch_params: FetchParams) -> FetchParams: - match fetch_params.step: - case "normal": - fetch_params.possible_ban = False - fetch_params.confirmed_ban = False - return fetch_params - case "possible_ban": - fetch_params.possible_ban = True - fetch_params.confirmed_ban = False - return fetch_params - case "confirmed_ban": - fetch_params.possible_ban = True - fetch_params.confirmed_ban = True - return fetch_params - case _: - raise ValueError(f"Invalid step: {fetch_params.step}") - - def determine_fetch_params( fetch_params: FetchParams, players: list[PlayerStruct] | None, @@ -108,45 +123,64 @@ def determine_fetch_params( if players is None: return fetch_params + fetch_params._update_step_flags() + if len(players) >= fetch_params.limit: - fetch_params = _update_ban_flags(fetch_params) fetch_params.player_id = players[-1].id return fetch_params match fetch_params.step: case "normal": - fetch_params = _update_ban_flags(fetch_params) - if fetch_params.days > 1: return _reduce_days(fetch_params) assert fetch_params.days <= 1 - logger.info("All normal scraped, going to step: possible bans") - fetch_params.step = "possible_ban" + _add_event( + { + "set_step": { + "from": fetch_params.step, + "to": "possible_ban", + } + } + ) + _force_log() + fetch_params.set_step("possible_ban") fetch_params.update_date(days=max_days, infinity=True) return fetch_params case "possible_ban": - fetch_params = _update_ban_flags(fetch_params) - if fetch_params.days > max_possible_ban_days: return _reduce_days(fetch_params) assert fetch_params.days <= max_possible_ban_days - logger.info("All possible bans scraped, going to step: confirmed bans") - fetch_params.step = "confirmed_ban" + _add_event( + { + "set_step": { + "from": fetch_params.step, + "to": "confirmed_ban", + } + } + ) + _force_log() + fetch_params.set_step("confirmed_ban") fetch_params.update_date(days=max_days, infinity=True) return fetch_params case "confirmed_ban": - fetch_params = _update_ban_flags(fetch_params) - if fetch_params.days > max_confirmed_ban_days: return _reduce_days(fetch_params) assert fetch_params.days <= max_confirmed_ban_days - logger.info("All confirmed bans scraped, going to step: normal") - fetch_params.step = "normal" + _add_event( + { + "set_step": { + "from": fetch_params.step, + "to": "normal", + } + } + ) + _force_log() + fetch_params.set_step("normal") fetch_params.update_date(days=max_days, infinity=True) fetch_params.done = True return fetch_params @@ -155,8 +189,7 @@ def determine_fetch_params( async def process_players( async_session: async_sessionmaker[AsyncSession], player_repo: PlayerRepo, - player_producer: PlayersToScrapeProducer, - player_consumer: PlayersToScrapeConsumer, + player_queue: Queue[ToScrapeStruct], limit: int = 10, ): max_days = 20 @@ -170,79 +203,101 @@ async def process_players( last_day = date.today() while True: - lag = await player_consumer.get_lag() - - if last_day != date.today(): - logger.info("New day detected, resetting days and confirmed_ban") - last_day = date.today() - fp.days = max_days - fp.confirmed_ban = False - fp.possible_ban = False - fp.player_id = 0 - - if lag >= 100_000: - logger.info(f"{lag=} to high, sleeping(10)") - await asyncio.sleep(10) - continue - - logger.info(f"{asdict(fp)}") - - async with async_session() as session: - players = await player_repo.select_player( - async_session=session, - player_id=fp.player_id, - possible_ban=fp.possible_ban, - confirmed_ban=fp.confirmed_ban, - or_none=fp.step == "normal", - first_date=fp.first_date, - last_date=fp.last_date, - limit=fp.limit, + token = wide_event.set({}) + try: + lag = await player_queue.lag() + if last_day != date.today(): + _add_event({"new_day_reset": True}) + _force_log() + last_day = date.today() + fp.reset_for_new_day(max_days) + + if lag >= 100_000: + _add_event({"lag_throttle": {"lag": lag}}) + await asyncio.sleep(10) + continue + + _add_event({"fetch_params": asdict(fp)}) + + async with async_session() as session: + players = await player_repo.select_player( + async_session=session, + player_id=fp.player_id, + possible_ban=fp.possible_ban, + confirmed_ban=fp.confirmed_ban, + or_none=fp.step == "normal", + first_date=fp.first_date, + last_date=fp.last_date, + limit=fp.limit, + ) + + await produce_players(players=players, player_queue=player_queue) + + fp = determine_fetch_params( + fetch_params=fp, + players=players, + max_days=max_days, ) - await produce_players(players=players, player_producer=player_producer) - - fp = determine_fetch_params( - fetch_params=fp, - players=players, - max_days=max_days, - ) - - if fp.done: - fp.done = False - now = datetime.now() - end_of_today = datetime.combine(now.date(), time.max) - - time_remaining = end_of_today - now - sleep_time = int(time_remaining.total_seconds()) - sleep_time = max(sleep_time, 1) # Ensure at least 1 second sleep - logger.info(f"Sleeping for {sleep_time} seconds until end of day") - await asyncio.sleep(sleep_time) + if fp.done: + fp.done = False + now = datetime.now() + end_of_today = datetime.combine(now.date(), time.max) + + time_remaining = end_of_today - now + sleep_time = int(time_remaining.total_seconds()) + sleep_time = max(sleep_time, 1) # Ensure at least 1 second sleep + _add_event({"done_for_day": {"sleep_seconds": sleep_time}}) + _force_log() + await asyncio.sleep(sleep_time) + except Exception as exc: + _add_event({"error": {"message": str(exc)}}) + raise + finally: + final_ctx = wide_event.get() + force_log = bool(final_ctx.pop("force_log", False)) + if "error" in final_ctx: + logger.error(final_ctx) + elif force_log or wide_event.sample(): + logger.info(final_ctx) + wide_event.reset(token) async def main(): async_session, async_engine = get_session_factory(SETTINGS=DBSettings()) bootstrap_servers = KafkaSettings().KAFKA_BOOTSTRAP_SERVERS - player_producer = PlayersToScrapeProducer(bootstrap_servers=bootstrap_servers) - player_consumer = PlayersToScrapeConsumer( - bootstrap_servers=bootstrap_servers, group_id="scraper" + queue = QueueFactory.create_queue( + model=ToScrapeStruct, + queue_type="queue", + backend_type="kafka", + config=KafkaConfig( + topic="players.to_scrape", + bootstrap_servers=bootstrap_servers, + producer=True, + consumer=True, + producer_config=KafkaProducerConfig( + partition_key_fn=lambda message: str(message.player_data.id % 10) + ), + consumer_config=KafkaConsumerConfig(group_id="scraper"), + ), ) + if isinstance(queue, Exception): + raise queue + player_queue = queue - await player_producer.start() - await player_consumer.start() + await player_queue.start() try: await process_players( async_session=async_session, player_repo=PlayerRepo(), - player_producer=player_producer, - player_consumer=player_consumer, + player_queue=player_queue, limit=Settings().LIMIT, ) finally: await async_engine.dispose() - await player_producer.stop() - await player_consumer.stop() + await player_queue.stop() async def run_async(): diff --git a/components/bot_detector/event_queue/adapters/kafka/adapter.py b/components/bot_detector/event_queue/adapters/kafka/adapter.py index 751fce5..282a06e 100644 --- a/components/bot_detector/event_queue/adapters/kafka/adapter.py +++ b/components/bot_detector/event_queue/adapters/kafka/adapter.py @@ -3,7 +3,12 @@ from typing import Generic, Optional, TypeVar import orjson -from aiokafka import AIOKafkaConsumer, AIOKafkaProducer, ConsumerRecord +from aiokafka import ( + AIOKafkaConsumer, + AIOKafkaProducer, + ConsumerRecord, + TopicPartition, +) from aiokafka.errors import KafkaTimeoutError from bot_detector.event_queue.core.batcher import Batcher from bot_detector.event_queue.core.errors import ( @@ -79,7 +84,7 @@ async def put(self, messages: list[T]) -> Optional[Exception]: _config = self.config.producer_config for message in messages: - raw_key = _config.partition_key_fn() + raw_key = _config.partition_key_fn(message) if isinstance(raw_key, bytes): key = raw_key elif isinstance(raw_key, str): @@ -197,6 +202,21 @@ async def commit(self) -> Optional[Exception]: ) await self.consumer.commit() + async def lag(self) -> int: + if self.consumer is None: + return 0 + total_lag = 0 + partitions = self.consumer.partitions_for_topic(self.config.topic) + if partitions is None: + return 0 + + for partition in partitions: + tp = TopicPartition(self.config.topic, partition) + committed = await self.consumer.committed(tp) or 0 + end_offset = await self.consumer.end_offsets([tp]) + total_lag += end_offset[tp] - committed + return total_lag + class AIOKafkaAdapter(QueueBackendProtocol[T]): """ @@ -232,3 +252,6 @@ async def get_many(self, count: int) -> list[T] | Exception: async def commit(self) -> Optional[Exception]: await self.consumer.commit() + + async def lag(self) -> int: + return await self.consumer.lag() diff --git a/components/bot_detector/event_queue/adapters/kafka/config.py b/components/bot_detector/event_queue/adapters/kafka/config.py index 5d2e36d..85b9421 100644 --- a/components/bot_detector/event_queue/adapters/kafka/config.py +++ b/components/bot_detector/event_queue/adapters/kafka/config.py @@ -13,7 +13,7 @@ class KafkaConsumerConfig(BaseModel): class KafkaProducerConfig(BaseModel): - partition_key_fn: Callable[[], bytes | str] + partition_key_fn: Callable[[T], bytes | str] MAX_PRODUCE_RETRIES: int = 3 MAX_PRODUCE_RETRY_BACKOFF: int = 60 diff --git a/components/bot_detector/event_queue/adapters/memory/adapter.py b/components/bot_detector/event_queue/adapters/memory/adapter.py index 10f77c3..fd8e541 100644 --- a/components/bot_detector/event_queue/adapters/memory/adapter.py +++ b/components/bot_detector/event_queue/adapters/memory/adapter.py @@ -60,6 +60,9 @@ async def get_many(self, count: int) -> list[T]: async def commit(self) -> Optional[Exception]: self._queue.task_done() + async def lag(self) -> int: + return self._queue.qsize() + class InMemoryProducerAdapter( _InMemoryBase[T], @@ -103,3 +106,6 @@ async def get_many(self, count: int) -> list[T]: async def commit(self) -> Optional[Exception]: return await self.consumer.commit() + + async def lag(self) -> int: + return await self.consumer.lag() diff --git a/components/bot_detector/event_queue/core/event_queue.py b/components/bot_detector/event_queue/core/event_queue.py index d43ed42..8fd674b 100644 --- a/components/bot_detector/event_queue/core/event_queue.py +++ b/components/bot_detector/event_queue/core/event_queue.py @@ -64,6 +64,9 @@ async def get_one(self) -> Optional[T] | Exception: async def get_many(self, count: int) -> list[T] | Exception: return await self._backend.get_many(count) + async def lag(self) -> int: + return await self._backend.lag() + class Queue(QueueConsumer[T], QueueProducer[T]): """ diff --git a/components/bot_detector/event_queue/core/interface.py b/components/bot_detector/event_queue/core/interface.py index c211a14..f5d63e4 100644 --- a/components/bot_detector/event_queue/core/interface.py +++ b/components/bot_detector/event_queue/core/interface.py @@ -36,6 +36,8 @@ async def get_many(self, count: int) -> list[T] | Exception: ... async def commit(self) -> Optional[Exception]: ... + async def lag(self) -> int: ... + @runtime_checkable class QueueBackendProtocol( diff --git a/projects/api_public/pyproject.toml b/projects/api_public/pyproject.toml index 3d04ed3..1eb4a85 100644 --- a/projects/api_public/pyproject.toml +++ b/projects/api_public/pyproject.toml @@ -28,5 +28,6 @@ packages = ["bot_detector"] "../../bases/bot_detector/api_public" = "bot_detector/api_public" "../../components/bot_detector/database" = "bot_detector/database" "../../components/bot_detector/kafka" = "bot_detector/kafka" +"../../components/bot_detector/wide_event" = "bot_detector/wide_event" "../../components/bot_detector/structs" = "bot_detector/structs" -"../../components/bot_detector/logfmt" = "bot_detector/logfmt" \ No newline at end of file +"../../components/bot_detector/logfmt" = "bot_detector/logfmt" diff --git a/projects/scrape_task_producer/pyproject.toml b/projects/scrape_task_producer/pyproject.toml index 0095718..2cc943c 100644 --- a/projects/scrape_task_producer/pyproject.toml +++ b/projects/scrape_task_producer/pyproject.toml @@ -30,5 +30,7 @@ packages = ["bot_detector"] "../../bases/bot_detector/scrape_task_producer" = "bot_detector/scrape_task_producer" "../../components/bot_detector/database" = "bot_detector/database" "../../components/bot_detector/kafka" = "bot_detector/kafka" +"../../components/bot_detector/event_queue" = "bot_detector/event_queue" +"../../components/bot_detector/wide_event" = "bot_detector/wide_event" "../../components/bot_detector/structs" = "bot_detector/structs" "../../components/bot_detector/logfmt" = "bot_detector/logfmt" diff --git a/test/components/bot_detector/event_queue/adapter_kafka/test_factory_kafka.py b/test/components/bot_detector/event_queue/adapter_kafka/test_factory_kafka.py index d25f004..b4199e3 100644 --- a/test/components/bot_detector/event_queue/adapter_kafka/test_factory_kafka.py +++ b/test/components/bot_detector/event_queue/adapter_kafka/test_factory_kafka.py @@ -26,7 +26,7 @@ async def test_queue_factory_creates_kafka_queue(): consumer=True, producer=True, consumer_config=KafkaConsumerConfig(group_id="group"), - producer_config=KafkaProducerConfig(partition_key_fn=lambda: "1"), + producer_config=KafkaProducerConfig(partition_key_fn=lambda _: "1"), ) queue = QueueFactory.create_queue( diff --git a/test/components/bot_detector/event_queue/adapter_kafka/test_producer_adapter_kafka.py b/test/components/bot_detector/event_queue/adapter_kafka/test_producer_adapter_kafka.py index ee952b2..33b93ca 100644 --- a/test/components/bot_detector/event_queue/adapter_kafka/test_producer_adapter_kafka.py +++ b/test/components/bot_detector/event_queue/adapter_kafka/test_producer_adapter_kafka.py @@ -29,7 +29,7 @@ async def test_producer_start_stop(): consumer=False, producer=True, consumer_config=None, - producer_config=KafkaProducerConfig(partition_key_fn=lambda: "1"), + producer_config=KafkaProducerConfig(partition_key_fn=lambda _: "1"), ) adapter = AIOKafkaProducerAdapter(PlayerScraped, config) @@ -55,7 +55,7 @@ async def test_producer_custom_partition_key(): consumer=False, producer=True, consumer_config=None, - producer_config=KafkaProducerConfig(partition_key_fn=lambda: "1"), + producer_config=KafkaProducerConfig(partition_key_fn=lambda _: "1"), ) adapter = AIOKafkaProducerAdapter(PlayerScraped, config) @@ -86,7 +86,7 @@ async def test_producer_partition_key_bytes(): consumer=False, producer=True, consumer_config=None, - producer_config=KafkaProducerConfig(partition_key_fn=lambda: b"bytes"), + producer_config=KafkaProducerConfig(partition_key_fn=lambda _: b"bytes"), ) adapter = AIOKafkaProducerAdapter(PlayerScraped, config) @@ -110,7 +110,7 @@ async def test_producer_partition_key_invalid_type(): consumer=False, producer=True, consumer_config=None, - producer_config=KafkaProducerConfig(partition_key_fn=lambda: 123), + producer_config=KafkaProducerConfig(partition_key_fn=lambda _: 123), ) adapter = AIOKafkaProducerAdapter(PlayerScraped, config) adapter.producer = AsyncMock() @@ -128,7 +128,7 @@ async def test_producer_serialization(): consumer=False, producer=True, consumer_config=None, - producer_config=KafkaProducerConfig(partition_key_fn=lambda: "1"), + producer_config=KafkaProducerConfig(partition_key_fn=lambda _: "1"), ) adapter = AIOKafkaProducerAdapter(PlayerScraped, config) @@ -153,7 +153,7 @@ async def test_producer_start_noop_when_existing(): consumer=False, producer=True, consumer_config=None, - producer_config=KafkaProducerConfig(partition_key_fn=lambda: "1"), + producer_config=KafkaProducerConfig(partition_key_fn=lambda _: "1"), ) adapter = AIOKafkaProducerAdapter(PlayerScraped, config) adapter.producer = AsyncMock() @@ -174,7 +174,7 @@ async def test_producer_stop_noop_when_missing(): consumer=False, producer=True, consumer_config=None, - producer_config=KafkaProducerConfig(partition_key_fn=lambda: "1"), + producer_config=KafkaProducerConfig(partition_key_fn=lambda _: "1"), ) adapter = AIOKafkaProducerAdapter(PlayerScraped, config) @@ -190,7 +190,7 @@ async def test_producer_send_failure(): producer=True, consumer_config=None, producer_config=KafkaProducerConfig( - partition_key_fn=lambda: "1", + partition_key_fn=lambda _: "1", MAX_PRODUCE_RETRIES=1, ), ) @@ -224,7 +224,7 @@ async def test_producer_put_without_start(): consumer=False, producer=True, consumer_config=None, - producer_config=KafkaProducerConfig(partition_key_fn=lambda: "1"), + producer_config=KafkaProducerConfig(partition_key_fn=lambda _: "1"), ) adapter = AIOKafkaProducerAdapter(PlayerScraped, config) diff --git a/test/components/bot_detector/event_queue/adapter_kafka/test_queue_adapter_kafka.py b/test/components/bot_detector/event_queue/adapter_kafka/test_queue_adapter_kafka.py index dbc165f..0902faa 100644 --- a/test/components/bot_detector/event_queue/adapter_kafka/test_queue_adapter_kafka.py +++ b/test/components/bot_detector/event_queue/adapter_kafka/test_queue_adapter_kafka.py @@ -24,7 +24,7 @@ async def test_queue_adapter_wiring(): consumer=True, producer=True, consumer_config=KafkaConsumerConfig(group_id="group"), - producer_config=KafkaProducerConfig(partition_key_fn=lambda: "1"), + producer_config=KafkaProducerConfig(partition_key_fn=lambda _: "1"), ) adapter = AIOKafkaAdapter(PlayerScraped, config) adapter.producer.start = AsyncMock()