Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
277 changes: 166 additions & 111 deletions bases/bot_detector/scrape_task_producer/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,31 @@
from bot_detector.database import Settings as DBSettings
from bot_detector.database import get_session_factory
from bot_detector.database.player import PlayerRepo
from bot_detector.kafka import (
PlayersToScrapeConsumer,
PlayersToScrapeProducer,
ToScrapeStruct,
)
from bot_detector.kafka import Settings as KafkaSettings
from bot_detector.structs import MetaData, PlayerStruct
from bot_detector.wide_event import WideEventLogger
from bot_detector.event_queue.adapters.kafka import (
KafkaConfig,
KafkaConsumerConfig,
KafkaProducerConfig,
)
from bot_detector.event_queue.core import Queue
from bot_detector.event_queue.factory import QueueFactory
from bot_detector.kafka import ToScrapeStruct
from pydantic_settings import BaseSettings
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from typing_extensions import Literal

logger = logging.getLogger(__name__)
wide_event = WideEventLogger()


def _add_event(data: dict[str, object]) -> None:
wide_event.add(data)


def _force_log() -> None:
wide_event.add({"force_log": True})


class Settings(BaseSettings):
Expand All @@ -37,67 +50,69 @@ class FetchParams:
done: bool = False

def __post_init__(self):
self.first_date = date.today() - timedelta(days=365)
self.last_date = date.today() - timedelta(days=self.days - 1)
self._update_step_flags()
self.update_date(days=self.days, infinity=False)

def update_date(self, days, infinity: bool = False):
def update_date(self, days: int, infinity: bool = False) -> None:
self.days = days

delta = timedelta(days=365) if infinity else timedelta(days=self.days)
self.first_date = date.today() - delta
self.last_date = date.today() - timedelta(days=self.days - 1)
assert self.first_date < self.last_date

def _update_step_flags(self) -> None:
match self.step:
case "normal":
self.possible_ban = False
self.confirmed_ban = False
case "possible_ban":
self.possible_ban = True
self.confirmed_ban = False
case "confirmed_ban":
self.possible_ban = True
self.confirmed_ban = True
case _:
raise ValueError(f"Invalid step: {self.step}")

def set_step(self, step: Literal["normal", "possible_ban", "confirmed_ban"]) -> None:
self.step = step
self._update_step_flags()

def reset_for_new_day(self, max_days: int) -> None:
self.set_step("normal")
self.update_date(days=max_days, infinity=True)
self.player_id = 0


async def produce_players(
players: list[PlayerStruct],
player_producer: PlayersToScrapeProducer,
player_queue: Queue[ToScrapeStruct],
):
logger.info(f"Putting {len(players)} players in queue")
if not players:
return

_add_event({"queue_players": {"count": len(players)}})
metadata = MetaData(version=1, source="scrape_task_producer")
player_structs = [
ToScrapeStruct(
metadata=MetaData(version=1, source="scrape_task_producer"),
player_data=player,
)
ToScrapeStruct(metadata=metadata, player_data=player)
for player in players
if len(player.name) <= 13
]

for player in player_structs:
await player_producer.produce_one(
ToScrapeStruct(
metadata=MetaData(version=1, source="scrape_task_producer"),
player_data=player.player_data,
)
)
if not player_structs:
return
error = await player_queue.put(player_structs)
if isinstance(error, Exception):
raise error


def _reduce_days(fetch_params: FetchParams) -> FetchParams:
logger.info(f"Reducing days for {asdict(fetch_params)}")
_add_event({"reduce_days": {"fetch_params": asdict(fetch_params)}})
_days = fetch_params.days - 1 if fetch_params.days > 1 else 1
fetch_params.update_date(days=_days)
fetch_params.player_id = 0
return fetch_params


def _update_ban_flags(fetch_params: FetchParams) -> FetchParams:
match fetch_params.step:
case "normal":
fetch_params.possible_ban = False
fetch_params.confirmed_ban = False
return fetch_params
case "possible_ban":
fetch_params.possible_ban = True
fetch_params.confirmed_ban = False
return fetch_params
case "confirmed_ban":
fetch_params.possible_ban = True
fetch_params.confirmed_ban = True
return fetch_params
case _:
raise ValueError(f"Invalid step: {fetch_params.step}")


def determine_fetch_params(
fetch_params: FetchParams,
players: list[PlayerStruct] | None,
Expand All @@ -108,45 +123,64 @@ def determine_fetch_params(
if players is None:
return fetch_params

fetch_params._update_step_flags()

if len(players) >= fetch_params.limit:
fetch_params = _update_ban_flags(fetch_params)
fetch_params.player_id = players[-1].id
return fetch_params

match fetch_params.step:
case "normal":
fetch_params = _update_ban_flags(fetch_params)

if fetch_params.days > 1:
return _reduce_days(fetch_params)

assert fetch_params.days <= 1
logger.info("All normal scraped, going to step: possible bans")
fetch_params.step = "possible_ban"
_add_event(
{
"set_step": {
"from": fetch_params.step,
"to": "possible_ban",
}
}
)
_force_log()
fetch_params.set_step("possible_ban")
fetch_params.update_date(days=max_days, infinity=True)
return fetch_params

case "possible_ban":
fetch_params = _update_ban_flags(fetch_params)

if fetch_params.days > max_possible_ban_days:
return _reduce_days(fetch_params)

assert fetch_params.days <= max_possible_ban_days
logger.info("All possible bans scraped, going to step: confirmed bans")
fetch_params.step = "confirmed_ban"
_add_event(
{
"set_step": {
"from": fetch_params.step,
"to": "confirmed_ban",
}
}
)
_force_log()
fetch_params.set_step("confirmed_ban")
fetch_params.update_date(days=max_days, infinity=True)
return fetch_params

case "confirmed_ban":
fetch_params = _update_ban_flags(fetch_params)

if fetch_params.days > max_confirmed_ban_days:
return _reduce_days(fetch_params)

assert fetch_params.days <= max_confirmed_ban_days
logger.info("All confirmed bans scraped, going to step: normal")
fetch_params.step = "normal"
_add_event(
{
"set_step": {
"from": fetch_params.step,
"to": "normal",
}
}
)
_force_log()
fetch_params.set_step("normal")
fetch_params.update_date(days=max_days, infinity=True)
fetch_params.done = True
return fetch_params
Expand All @@ -155,8 +189,7 @@ def determine_fetch_params(
async def process_players(
async_session: async_sessionmaker[AsyncSession],
player_repo: PlayerRepo,
player_producer: PlayersToScrapeProducer,
player_consumer: PlayersToScrapeConsumer,
player_queue: Queue[ToScrapeStruct],
limit: int = 10,
):
max_days = 20
Expand All @@ -170,79 +203,101 @@ async def process_players(
last_day = date.today()

while True:
lag = await player_consumer.get_lag()

if last_day != date.today():
logger.info("New day detected, resetting days and confirmed_ban")
last_day = date.today()
fp.days = max_days
fp.confirmed_ban = False
fp.possible_ban = False
fp.player_id = 0

if lag >= 100_000:
logger.info(f"{lag=} to high, sleeping(10)")
await asyncio.sleep(10)
continue

logger.info(f"{asdict(fp)}")

async with async_session() as session:
players = await player_repo.select_player(
async_session=session,
player_id=fp.player_id,
possible_ban=fp.possible_ban,
confirmed_ban=fp.confirmed_ban,
or_none=fp.step == "normal",
first_date=fp.first_date,
last_date=fp.last_date,
limit=fp.limit,
token = wide_event.set({})
try:
lag = await player_queue.lag()
if last_day != date.today():
_add_event({"new_day_reset": True})
_force_log()
last_day = date.today()
fp.reset_for_new_day(max_days)

if lag >= 100_000:
_add_event({"lag_throttle": {"lag": lag}})
await asyncio.sleep(10)
continue

_add_event({"fetch_params": asdict(fp)})

async with async_session() as session:
players = await player_repo.select_player(
async_session=session,
player_id=fp.player_id,
possible_ban=fp.possible_ban,
confirmed_ban=fp.confirmed_ban,
or_none=fp.step == "normal",
first_date=fp.first_date,
last_date=fp.last_date,
limit=fp.limit,
)

await produce_players(players=players, player_queue=player_queue)

fp = determine_fetch_params(
fetch_params=fp,
players=players,
max_days=max_days,
)

await produce_players(players=players, player_producer=player_producer)

fp = determine_fetch_params(
fetch_params=fp,
players=players,
max_days=max_days,
)

if fp.done:
fp.done = False
now = datetime.now()
end_of_today = datetime.combine(now.date(), time.max)

time_remaining = end_of_today - now
sleep_time = int(time_remaining.total_seconds())
sleep_time = max(sleep_time, 1) # Ensure at least 1 second sleep
logger.info(f"Sleeping for {sleep_time} seconds until end of day")
await asyncio.sleep(sleep_time)
if fp.done:
fp.done = False
now = datetime.now()
end_of_today = datetime.combine(now.date(), time.max)

time_remaining = end_of_today - now
sleep_time = int(time_remaining.total_seconds())
sleep_time = max(sleep_time, 1) # Ensure at least 1 second sleep
_add_event({"done_for_day": {"sleep_seconds": sleep_time}})
_force_log()
await asyncio.sleep(sleep_time)
except Exception as exc:
_add_event({"error": {"message": str(exc)}})
raise
finally:
final_ctx = wide_event.get()
force_log = bool(final_ctx.pop("force_log", False))
if "error" in final_ctx:
logger.error(final_ctx)
elif force_log or wide_event.sample():
logger.info(final_ctx)
wide_event.reset(token)


async def main():
async_session, async_engine = get_session_factory(SETTINGS=DBSettings())

bootstrap_servers = KafkaSettings().KAFKA_BOOTSTRAP_SERVERS
player_producer = PlayersToScrapeProducer(bootstrap_servers=bootstrap_servers)
player_consumer = PlayersToScrapeConsumer(
bootstrap_servers=bootstrap_servers, group_id="scraper"
queue = QueueFactory.create_queue(
model=ToScrapeStruct,
queue_type="queue",
backend_type="kafka",
config=KafkaConfig(
topic="players.to_scrape",
bootstrap_servers=bootstrap_servers,
producer=True,
consumer=True,
producer_config=KafkaProducerConfig(
partition_key_fn=lambda message: str(message.player_data.id % 10)
),
consumer_config=KafkaConsumerConfig(group_id="scraper"),
),
)
if isinstance(queue, Exception):
raise queue
player_queue = queue

await player_producer.start()
await player_consumer.start()
await player_queue.start()

try:
await process_players(
async_session=async_session,
player_repo=PlayerRepo(),
player_producer=player_producer,
player_consumer=player_consumer,
player_queue=player_queue,
limit=Settings().LIMIT,
)
finally:
await async_engine.dispose()
await player_producer.stop()
await player_consumer.stop()
await player_queue.stop()


async def run_async():
Expand Down
Loading
Loading