diff --git a/masterbase/app.py b/masterbase/app.py index 2ef6dc6..ae00d6a 100644 --- a/masterbase/app.py +++ b/masterbase/app.py @@ -23,6 +23,7 @@ sys.path.append(os.path.dirname(CURRENT_DIR)) # ruff: noqa: E402 +# ruff: noqa: I001 from masterbase.anomaly import DetectionState from masterbase.guards import ( analyst_guard, @@ -65,6 +66,8 @@ from masterbase.registers import shutdown_registers, startup_registers from masterbase.steam import account_exists, is_limited_account +# ruff: enable + logger = logging.getLogger(__name__) @@ -127,6 +130,7 @@ def close_session(request: Request, api_key: str) -> dict[str, bool]: return {"closed_successfully": True} + @post("/close_session", guards=[valid_key_guard, user_in_session_guard], sync_to_thread=False) def close_with_late_bytes(request: Request, api_key: str, data: LateBytesBody) -> dict[str, bool]: """Close a session out. Will find the latest open session for a user. @@ -199,8 +203,12 @@ async def demodata(request: Request, api_key: str, session_id: str) -> Stream: """Return the demo.""" minio_client = request.app.state.minio_client blob_name = demo_blob_name(session_id) - file = minio_client.get_object("demoblobs", blob_name) - stat = minio_client.stat_object("demoblobs", blob_name) + + try: + file = minio_client.get_object("demoblobs", blob_name) + stat = minio_client.stat_object("demoblobs", blob_name) + except Exception as exc: + raise HTTPException(detail="Demo not found!", status_code=404) from exc headers = { "Content-Disposition": f'attachment; filename="{blob_name}"', @@ -257,12 +265,14 @@ async def report_player(request: Request, api_key: str, data: ReportBody) -> dic except IntegrityError: raise HTTPException(detail=f"Unknown session ID {data.session_id}", status_code=402) + @get("/broadcasts", sync_to_thread=False) def broadcasts(request: Request) -> list[dict[str, str]]: """Return a list of broadcasts.""" engine = request.app.state.engine return get_broadcasts(engine) + class DemoHandler(WebsocketListener): """Custom Websocket Class.""" @@ -306,7 +316,7 @@ async def on_disconnect(self, socket: WebSocket) -> None: # type: ignore """Close handle on disconnect.""" if socket in streaming_sessions: session_manager = streaming_sessions[socket] - else : + else: logger.warning("Attempting to disconnect from already disconnected socket!") return logger.info(f"Received socket disconnect from session ID: {session_manager.session_id}") diff --git a/masterbase/lib.py b/masterbase/lib.py index 4995b3c..c5d4416 100644 --- a/masterbase/lib.py +++ b/masterbase/lib.py @@ -4,6 +4,7 @@ import io import json import logging +import math import os import secrets import socket @@ -318,8 +319,8 @@ def get_uningested_demos(engine: Engine, limit: int) -> list[str]: active = false AND open = false AND ingested = false + AND pruned = false AND demo_size > 0 - AND blob_name IS NOT NULL ORDER BY created_at ASC LIMIT :limit; @@ -340,7 +341,7 @@ def get_uningested_demos(engine: Engine, limit: int) -> list[str]: def ingest_demo(minio_client: Minio, engine: Engine, session_id: str): """Ingest a demo analysis from an analysis client.""" - blob_name = f"{session_id}.json" + blob_name = json_blob_name(session_id) try: raw_data = minio_client.get_object("jsonblobs", blob_name).read() decoded_data = raw_data.decode("utf-8") @@ -551,8 +552,7 @@ def _close_session_with_demo( end_time = :end_time, demo_size = :demo_size, markov_score = :markov_score, - updated_at = :updated_at, - blob_name = :blob_name + updated_at = :updated_at WHERE steam_id = :steam_id AND session_id = :session_id @@ -566,7 +566,6 @@ def _close_session_with_demo( "updated_at": current_time.isoformat(), "demo_size": size, "markov_score": markov_score, - "blob_name": demo_blob_name(session_id), }, ).scalar_one() if late_bytes is not None: @@ -590,9 +589,8 @@ def close_session_helper( minio_client: Minio, engine: Engine, steam_id: str, - streaming_sessions: - SocketManagerMapType, - late_bytes: bytes | None + streaming_sessions: SocketManagerMapType, + late_bytes: bytes | None, ) -> str: """Properly close a session and return a summary message. @@ -655,6 +653,11 @@ def demo_blob_name(session_id: str) -> str: return f"{session_id}.dem" +def json_blob_name(session_id: str) -> str: + """Format the object name for a json blob.""" + return f"{session_id}.json" + + def demo_sink_path(session_id: str) -> str: """Format the media path for a demo blob.""" return os.path.join(DEMOS_PATH, demo_blob_name(session_id)) @@ -863,13 +866,205 @@ def check_is_loser(engine: Engine, steam_id: str) -> bool: return bool(result) + def get_broadcasts(engine: Engine) -> list[dict[str, str]]: """Get the list of broadcasts.""" with engine.connect() as conn: - result = conn.execute( - sa.text("SELECT * FROM broadcasts") - ) + result = conn.execute(sa.text("SELECT * FROM broadcasts")) rows = [row._asdict() for row in result.all()] for row in rows: row["post_date"] = row.pop("created_at") return rows + + +# This function is only meant to run on boot! +def cleanup_hung_sessions(engine: Engine) -> None: + """Remove any sessions that were left open/active after shutdown.""" + logger.info("Checking for hanging sessions...") + with engine.connect() as conn: + result = conn.execute( + sa.text( # We have to delete reports first because of the REFERENCES constraint + """ + DELETE FROM reports WHERE session_id IN ( + SELECT session_id FROM demo_sessions + WHERE active = true + OR open = true + OR demo_size IS NULL + ); + + DELETE FROM demo_sessions + WHERE active = true + OR open = true + OR demo_size IS NULL; + """ + ) + ) + deleted_rows = result.rowcount + conn.commit() + logger.info("Deleted %d hanging sessions.", deleted_rows) + + +# This function is only meant to run on boot! +def prune_if_necessary(engine: Engine, minio_client: Minio) -> bool: + """Mark sessions as pruned so the specificed amount of free space is available.""" + logger.info("Checking if we need to prune demos...") + current_size = get_total_storage_usage(minio_client) + + with engine.connect() as conn: + max_result = conn.execute( + sa.text( + """ + SELECT max_storage_gb FROM prune_config; + """ + ) + ) + max_size_gb = max_result.scalar_one() + if max_size_gb is None or max_size_gb <= 0: + logger.warning("No storage limit set, enjoy filling your disk!") + return False + max_size = max_size_gb * (1024**3) + total_bytes_to_remove = current_size - max_size + logger.info("Current size: %d MB; Max size: %d MB", current_size / (1024**2), max_size / (1024**2)) + if total_bytes_to_remove <= 0: + logger.info("No need to prune.") + return False + + logger.info("Attempting to prune %d MB", max(0, total_bytes_to_remove / (1024**2))) + + # get the oldest demos that don't have any detections + # we allow demos that have already been pruned in case we somehow end up in a state + # where a demo is marked as pruned but its blob remains. + result = conn.execute( + sa.text( + """ + SELECT session_id FROM demo_sessions + WHERE active = false + AND open = false + AND session_id NOT IN (SELECT session_id FROM analysis) + ORDER BY created_at ASC + """ + ) + ) + + prunable_demos_oldest_first = [row[0] for row in result.all()] + + minio_demoblobs_dict = {blob.object_name: blob for blob in minio_client.list_objects("demoblobs")} + session_ids_to_remove = [] + bytes_saved = 0 + + # prune just enough so we're in our space budget + for session_id in prunable_demos_oldest_first: + blob = minio_demoblobs_dict.get(demo_blob_name(session_id)) + if blob is None: + # already pruned, do not count + continue + session_ids_to_remove.append(session_id) + bytes_saved += blob.size + if bytes_saved >= total_bytes_to_remove: + break + + if len(session_ids_to_remove) == 0: + logger.warning("No demos to prune, but we're over the limit!") + return False + + # mark as pruned + conn.execute( + sa.text( + """ + UPDATE demo_sessions + SET pruned = true + WHERE session_id IN :session_ids_to_remove; + """ + ), + {"session_ids_to_remove": tuple(session_ids_to_remove)}, + ) + conn.commit() + logger.info("Marked %d demos for pruning.", len(session_ids_to_remove)) + # pruned demo blobs will be deleted by cleanup_orphaned_demos, which runs after this on boot + return True + + +# This function is only meant to run on boot! +def cleanup_pruned_demos(engine: Engine, minio_client: Minio) -> None: + """Remove blobs for pruned or deleted sessions.""" + logger.info("Checking for orphaned demos.") + with engine.connect() as conn: + result = conn.execute( + sa.text( + """ + SELECT session_id FROM demo_sessions WHERE pruned = false; + """ + ) + ) + ids_in_db = [row[0] for row in result.all()] + minio_demoblobs_dict = {blob.object_name: blob for blob in minio_client.list_objects("demoblobs")} + minio_jsonblobs_dict = {blob.object_name: blob for blob in minio_client.list_objects("jsonblobs")} + + for session_id in ids_in_db: + demo_blob = demo_blob_name(session_id) + json_blob = json_blob_name(session_id) + if minio_demoblobs_dict.get(demo_blob) is not None: + minio_demoblobs_dict.pop(demo_blob) + if minio_jsonblobs_dict.get(json_blob) is not None: + minio_jsonblobs_dict.pop(json_blob) + + # dicts now contain only orphaned blobs + + ratio_result = conn.execute( + sa.text( + """ + SELECT max_prune_ratio FROM prune_config; + """ + ) + ) + # If we're gonna wipe more than max_prune_ratio (default 0.05) of the blobs, something is probably very wrong. + # Setting this to negative will perform a one-time prune regardless of ratio. + max_prune_ratio = ratio_result.scalar_one() + if len(minio_demoblobs_dict) > len(ids_in_db) * max_prune_ratio and max_prune_ratio >= 0: + logger.warning( + "Too many orphaned demo blobs: %d (%f%%) found, but limit set to %d (%f%%). " + "Refusing to clean up because something probably broke.", + len(minio_demoblobs_dict), + len(minio_demoblobs_dict) / len(ids_in_db) * 100, + math.floor(len(ids_in_db) * max_prune_ratio), + max_prune_ratio * 100, + ) + return + + if max_prune_ratio < 0: + max_prune_ratio = abs(max_prune_ratio) + logger.info("Orphaned demo cleanup forced by config. Setting back to %f", max_prune_ratio) + conn.execute( + sa.text( + """ + UPDATE prune_config + SET max_prune_ratio = :max_prune_ratio; + """ + ), + {"max_prune_ratio": max_prune_ratio}, + ) + conn.commit() + + for blob in minio_demoblobs_dict.values(): + logger.info("Removing orphaned demo %s", blob.object_name) + minio_client.remove_object("demoblobs", blob.object_name) + for blob in minio_jsonblobs_dict.values(): + logger.info("Removing orphaned json %s", blob.object_name) + minio_client.remove_object("jsonblobs", blob.object_name) + + +def get_total_storage_usage(minio_client: Minio) -> int: + """Get the total storage used by all buckets in bytes.""" + try: + buckets = minio_client.list_buckets() + total_size = 0 + + for bucket in buckets: + objects = minio_client.list_objects(bucket.name, recursive=True) + bucket_size = sum(obj.size for obj in objects) + total_size += bucket_size + + return total_size + except S3Error as exc: + print("Error occurred:", exc) + return -1 diff --git a/masterbase/registers.py b/masterbase/registers.py index dfe6fc5..1e8e600 100644 --- a/masterbase/registers.py +++ b/masterbase/registers.py @@ -7,7 +7,13 @@ from sqlalchemy import Engine, create_engine from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine -from masterbase.lib import make_db_uri, make_minio_client +from masterbase.lib import ( + cleanup_hung_sessions, + cleanup_pruned_demos, + make_db_uri, + make_minio_client, + prune_if_necessary, +) def get_minio_connection(app: Litestar) -> Minio: @@ -55,5 +61,15 @@ async def close_async_db_connection(app: Litestar) -> None: await cast("AsyncEngine", app.state.async_engine).dispose() -startup_registers = (get_db_connection, get_async_db_connection, get_minio_connection) +def boot_cleanup(app: Litestar) -> None: + """Cleanup the database on boot.""" + engine = app.state.engine + minio_client = app.state.minio_client + + cleanup_hung_sessions(engine) + prune_if_necessary(engine, minio_client) + cleanup_pruned_demos(engine, minio_client) + + +startup_registers = (get_db_connection, get_async_db_connection, get_minio_connection, boot_cleanup) shutdown_registers = (close_db_connection, close_async_db_connection) diff --git a/migrations/versions/eba5782c5979_pruning.py b/migrations/versions/eba5782c5979_pruning.py new file mode 100644 index 0000000..0ce7f1d --- /dev/null +++ b/migrations/versions/eba5782c5979_pruning.py @@ -0,0 +1,64 @@ +"""pruning + +Revision ID: eba5782c5979 +Revises: f51cab87d3fd +Create Date: 2025-03-08 13:46:16.132860 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = 'eba5782c5979' +down_revision: Union[str, None] = 'f51cab87d3fd' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Add pruned column to demo_sessions and default to false. Drop blob_name column.""" + op.execute( + """ + ALTER TABLE demo_sessions + ADD COLUMN pruned boolean, + DROP COLUMN blob_name; + + UPDATE demo_sessions + SET pruned = false; + + ALTER TABLE demo_sessions + ALTER COLUMN pruned SET DEFAULT false; + + ALTER TABLE demo_sessions + ALTER COLUMN pruned SET NOT NULL; + """ + ) + + op.execute( + """ + CREATE TABLE prune_config ( + max_storage_gb integer PRIMARY KEY DEFAULT 0, + max_prune_ratio double precision DEFAULT 0.05 + ); + + INSERT INTO prune_config (max_storage_gb) + VALUES (DEFAULT); + """ + ) + +def downgrade() -> None: + """Remove pruned column from demo_sessions. Restore blob_name column.""" + op.execute( + """ + ALTER TABLE demo_sessions + DROP COLUMN pruned, + ADD COLUMN blob_name text; + + DROP TABLE prune_config; + """ + ) + # Not guaranteed to work, but should account for normal use cases. + op.execute("UPDATE demo_sessions SET blob_name = session_id || '.dem';")