From 07315a63c6a038b7597e64ca2c6c82abc19e0e06 Mon Sep 17 00:00:00 2001 From: megascatterbomb Date: Sat, 8 Mar 2025 15:45:39 +1300 Subject: [PATCH 01/10] revision, cleanup on boot --- masterbase/lib.py | 53 ++++++++++++++++++ masterbase/registers.py | 18 ++++++- migrations/versions/eba5782c5979_pruning.py | 59 +++++++++++++++++++++ 3 files changed, 128 insertions(+), 2 deletions(-) create mode 100644 migrations/versions/eba5782c5979_pruning.py diff --git a/masterbase/lib.py b/masterbase/lib.py index 4995b3c..e588e32 100644 --- a/masterbase/lib.py +++ b/masterbase/lib.py @@ -873,3 +873,56 @@ def get_broadcasts(engine: Engine) -> list[dict[str, str]]: for row in rows: row["post_date"] = row.pop("created_at") return rows + +# This function is only meant to run on boot! +def cleanup_hung_sessions(engine: Engine) -> None: + """Remove any sessions that were left open/active after shutdown.""" + logger.info(f"Checking for hanging sessions.") + with engine.connect() as conn: + conn.execute( + sa.text( + """ + DELETE FROM demo_sessions + WHERE active = true + OR open = true; + """ + ) + ) + conn.commit() + +# This function is only meant to run on boot! +def cleanup_orphaned_demos(engine: Engine, minio_client: Minio) -> None: + """Remove any orphaned blobs in MinIO that we don't have a session for.""" + logger.info(f"Checking for orphaned demos.") + with engine.connect() as conn: + result = conn.execute( + sa.text( + """ + SELECT session_id FROM demo_sessions; + """ + ) + ) + ids_in_db = [row[0] for row in result.all()] + minio_demoblobs_dict = {blob.object_name: blob for blob in minio_client.list_objects("demoblobs")} + minio_jsonblobs_dict = {blob.object_name: blob for blob in minio_client.list_objects("jsonblobs")} + + for session_id in ids_in_db: + demo_name = f"{session_id}.dem" + demo_json = f"{session_id}.json" + if minio_demoblobs_dict.get(demo_name) is not None: + minio_demoblobs_dict.pop(demo_name) + if minio_jsonblobs_dict.get(demo_json) is not None: + minio_jsonblobs_dict.pop(demo_json) + + # dicts now contain only orphaned blobs + for blob in minio_demoblobs_dict.values(): + logger.info(f"Removing orphaned demo {blob.object_name}") + minio_client.remove_object("demoblobs", blob.object_name) + for blob in minio_jsonblobs_dict.values(): + logger.info(f"Removing orphaned json {blob.object_name}") + minio_client.remove_object("jsonblobs", blob.object_name) + +# This function is only meant to run on boot! +def prune_if_necessary(engine: Engine) -> None: + """Prune the database so the specificed amount of free space is available.""" + pass \ No newline at end of file diff --git a/masterbase/registers.py b/masterbase/registers.py index dfe6fc5..b1ee27e 100644 --- a/masterbase/registers.py +++ b/masterbase/registers.py @@ -7,7 +7,13 @@ from sqlalchemy import Engine, create_engine from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine -from masterbase.lib import make_db_uri, make_minio_client +from masterbase.lib import ( + make_db_uri, + make_minio_client, + cleanup_hung_sessions, + cleanup_orphaned_demos, + prune_if_necessary +) def get_minio_connection(app: Litestar) -> Minio: @@ -54,6 +60,14 @@ async def close_async_db_connection(app: Litestar) -> None: if getattr(app.state, "async_engine", None): await cast("AsyncEngine", app.state.async_engine).dispose() +def boot_cleanup(app: Litestar) -> None: + """Cleanup the database on boot.""" + engine = app.state.engine + minio_client = app.state.minio_client -startup_registers = (get_db_connection, get_async_db_connection, get_minio_connection) + cleanup_hung_sessions(engine) + cleanup_orphaned_demos(engine, minio_client) + prune_if_necessary(engine) + +startup_registers = (get_db_connection, get_async_db_connection, get_minio_connection, boot_cleanup) shutdown_registers = (close_db_connection, close_async_db_connection) diff --git a/migrations/versions/eba5782c5979_pruning.py b/migrations/versions/eba5782c5979_pruning.py new file mode 100644 index 0000000..b6b877b --- /dev/null +++ b/migrations/versions/eba5782c5979_pruning.py @@ -0,0 +1,59 @@ +"""pruning + +Revision ID: eba5782c5979 +Revises: f51cab87d3fd +Create Date: 2025-03-08 13:46:16.132860 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = 'eba5782c5979' +down_revision: Union[str, None] = 'f51cab87d3fd' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Add pruned column to demo_sessions and default to false""" + op.execute( + """ + ALTER TABLE demo_sessions + ADD COLUMN pruned boolean; + + UPDATE demo_sessions + SET pruned = false; + + ALTER TABLE demo_sessions + ALTER COLUMN pruned SET DEFAULT false; + + ALTER TABLE demo_sessions + ALTER COLUMN pruned SET NOT NULL; + """ + ) + + op.execute( + """ + CREATE TABLE prune_config ( + min_free_space_gb integer + ); + + INSERT INTO prune_config (min_free_space_gb) + VALUES (50) + """ + ) + +def downgrade() -> None: + """Remove pruned column from demo_sessions""" + op.execute( + """ + ALTER TABLE demo_sessions + DROP COLUMN pruned; + + DROP TABLE prune_config; + """ + ) From 1c00e92c57234794a7089f28bece438a90726890 Mon Sep 17 00:00:00 2001 From: megascatterbomb Date: Sun, 9 Mar 2025 12:51:24 +1300 Subject: [PATCH 02/10] WIP pruning logic --- masterbase/app.py | 1 + masterbase/lib.py | 119 ++++++++++++++++---- masterbase/registers.py | 6 +- migrations/versions/eba5782c5979_pruning.py | 18 +-- 4 files changed, 114 insertions(+), 30 deletions(-) diff --git a/masterbase/app.py b/masterbase/app.py index 2ef6dc6..f46d7ed 100644 --- a/masterbase/app.py +++ b/masterbase/app.py @@ -46,6 +46,7 @@ close_session_helper, db_export_chunks, demo_blob_name, + json_blob_name, generate_api_key, generate_uuid4_int, get_broadcasts, diff --git a/masterbase/lib.py b/masterbase/lib.py index e588e32..15ecaa7 100644 --- a/masterbase/lib.py +++ b/masterbase/lib.py @@ -319,7 +319,6 @@ def get_uningested_demos(engine: Engine, limit: int) -> list[str]: AND open = false AND ingested = false AND demo_size > 0 - AND blob_name IS NOT NULL ORDER BY created_at ASC LIMIT :limit; @@ -340,7 +339,7 @@ def get_uningested_demos(engine: Engine, limit: int) -> list[str]: def ingest_demo(minio_client: Minio, engine: Engine, session_id: str): """Ingest a demo analysis from an analysis client.""" - blob_name = f"{session_id}.json" + blob_name = json_blob_name(session_id) try: raw_data = minio_client.get_object("jsonblobs", blob_name).read() decoded_data = raw_data.decode("utf-8") @@ -552,7 +551,6 @@ def _close_session_with_demo( demo_size = :demo_size, markov_score = :markov_score, updated_at = :updated_at, - blob_name = :blob_name WHERE steam_id = :steam_id AND session_id = :session_id @@ -566,7 +564,6 @@ def _close_session_with_demo( "updated_at": current_time.isoformat(), "demo_size": size, "markov_score": markov_score, - "blob_name": demo_blob_name(session_id), }, ).scalar_one() if late_bytes is not None: @@ -654,6 +651,9 @@ def demo_blob_name(session_id: str) -> str: """Format the object name for a demo blob.""" return f"{session_id}.dem" +def json_blob_name(session_id: str) -> str: + """Format the object name for a json blob.""" + return f"{session_id}.json" def demo_sink_path(session_id: str) -> str: """Format the media path for a demo blob.""" @@ -891,14 +891,75 @@ def cleanup_hung_sessions(engine: Engine) -> None: conn.commit() # This function is only meant to run on boot! -def cleanup_orphaned_demos(engine: Engine, minio_client: Minio) -> None: - """Remove any orphaned blobs in MinIO that we don't have a session for.""" - logger.info(f"Checking for orphaned demos.") +def prune_if_necessary(engine: Engine, minio_client: Minio) -> bool: + """Mark sessions as pruned so the specificed amount of free space is available.""" + + current_size = get_total_storage_usage(minio_client) + + with engine.connect() as conn: + max_result = conn.execute( + sa.text( + """ + SELECT max_storage_gb FROM prune_config; + """ + ) + ) + max_size = max_result.scalar_one() * (1024 ** 3) + total_bytes_to_remove = current_size - max_size + if total_bytes_to_remove <= 0: + return False + + # time to prune + + # get the oldest demos that don't have any detections + prunable_demos_oldest_first = conn.execute( + sa.text( + """ + SELECT (session_id, demo_size) FROM demo_sessions + WHERE active = false + AND open = false + AND NOT IN (SELECT session_id FROM analysis) + ORDER BY created_at ASC + """ + ) + ).all() + + session_ids_to_remove = {} + + # prune just enough so we're in our space budget + for row in prunable_demos_oldest_first: + session_id, demo_size = row + if demo_size is None: # this should never happen (TODO: handle by prior cleanup) + continue + session_ids_to_remove[session_id] = demo_size + + if sum(session_ids_to_remove.values()) >= total_bytes_to_remove: + break + + # mark as pruned + conn.execute( + sa.text( + """ + UPDATE demo_sessions + SET pruned = true + WHERE session_id IN :session_ids_to_remove; + """ + ), + {"session_ids_to_remove": session_ids_to_remove} + ) + conn.commit() + # pruned demo blobs will be deleted by cleanup_orphaned_demos, which runs after this on boot + return True + +# This function is only meant to run on boot! +def cleanup_pruned_demos(engine: Engine, minio_client: Minio) -> None: + """Remove blobs for pruned or deleted sessions.""" + logger.info("Checking for orphaned demos.") with engine.connect() as conn: result = conn.execute( sa.text( """ - SELECT session_id FROM demo_sessions; + SELECT session_id FROM demo_sessions WHERE pruned = false; """ ) ) @@ -907,22 +968,40 @@ def cleanup_orphaned_demos(engine: Engine, minio_client: Minio) -> None: minio_jsonblobs_dict = {blob.object_name: blob for blob in minio_client.list_objects("jsonblobs")} for session_id in ids_in_db: - demo_name = f"{session_id}.dem" - demo_json = f"{session_id}.json" - if minio_demoblobs_dict.get(demo_name) is not None: - minio_demoblobs_dict.pop(demo_name) - if minio_jsonblobs_dict.get(demo_json) is not None: - minio_jsonblobs_dict.pop(demo_json) + demo_blob = demo_blob_name(session_id) + json_blob = json_blob_name(session_id) + if minio_demoblobs_dict.get(demo_blob) is not None: + minio_demoblobs_dict.pop(demo_blob) + if minio_jsonblobs_dict.get(json_blob) is not None: + minio_jsonblobs_dict.pop(json_blob) # dicts now contain only orphaned blobs + + # If we're gonna wipe more than this % of the blobs, something is probably very wrong. + max_cleanup_ratio = 0.05 + if len(minio_demoblobs_dict) > ids_in_db * max_cleanup_ratio: + logger.warning("Too many orphaned demo blobs found, refusing to clean up because something probably broke.") + return + for blob in minio_demoblobs_dict.values(): - logger.info(f"Removing orphaned demo {blob.object_name}") + logger.info("Removing orphaned demo %s", blob.object_name) minio_client.remove_object("demoblobs", blob.object_name) for blob in minio_jsonblobs_dict.values(): - logger.info(f"Removing orphaned json {blob.object_name}") + logger.info("Removing orphaned json %s", blob.object_name) minio_client.remove_object("jsonblobs", blob.object_name) -# This function is only meant to run on boot! -def prune_if_necessary(engine: Engine) -> None: - """Prune the database so the specificed amount of free space is available.""" - pass \ No newline at end of file +def get_total_storage_usage(minio_client: Minio) -> int: + """Get the total storage used by all buckets in bytes.""" + try: + buckets = minio_client.list_buckets() + total_size = 0 + + for bucket in buckets: + objects = minio_client.list_objects(bucket.name, recursive=True) + bucket_size = sum(obj.size for obj in objects) + total_size += bucket_size + + return total_size + except S3Error as exc: + print("Error occurred:", exc) + return -1 diff --git a/masterbase/registers.py b/masterbase/registers.py index b1ee27e..54fa061 100644 --- a/masterbase/registers.py +++ b/masterbase/registers.py @@ -11,7 +11,7 @@ make_db_uri, make_minio_client, cleanup_hung_sessions, - cleanup_orphaned_demos, + cleanup_pruned_demos, prune_if_necessary ) @@ -66,8 +66,8 @@ def boot_cleanup(app: Litestar) -> None: minio_client = app.state.minio_client cleanup_hung_sessions(engine) - cleanup_orphaned_demos(engine, minio_client) - prune_if_necessary(engine) + prune_if_necessary(engine, minio_client) + cleanup_pruned_demos(engine, minio_client) startup_registers = (get_db_connection, get_async_db_connection, get_minio_connection, boot_cleanup) shutdown_registers = (close_db_connection, close_async_db_connection) diff --git a/migrations/versions/eba5782c5979_pruning.py b/migrations/versions/eba5782c5979_pruning.py index b6b877b..c91190f 100644 --- a/migrations/versions/eba5782c5979_pruning.py +++ b/migrations/versions/eba5782c5979_pruning.py @@ -19,11 +19,12 @@ def upgrade() -> None: - """Add pruned column to demo_sessions and default to false""" + """Add pruned column to demo_sessions and default to false. Drop blob_name column.""" op.execute( """ ALTER TABLE demo_sessions - ADD COLUMN pruned boolean; + ADD COLUMN pruned boolean, + DROP COLUMN blob_name; UPDATE demo_sessions SET pruned = false; @@ -39,21 +40,24 @@ def upgrade() -> None: op.execute( """ CREATE TABLE prune_config ( - min_free_space_gb integer + max_storage_gb integer ); - INSERT INTO prune_config (min_free_space_gb) - VALUES (50) + INSERT INTO prune_config (max_storage_gb) + VALUES (null); """ ) def downgrade() -> None: - """Remove pruned column from demo_sessions""" + """Remove pruned column from demo_sessions. Restore blob_name column.""" op.execute( """ ALTER TABLE demo_sessions - DROP COLUMN pruned; + DROP COLUMN pruned, + ADD COLUMN blob_name text; DROP TABLE prune_config; """ ) + # Not guaranteed to work, but should account for normal use cases. + op.execute("UPDATE demo_sessions SET blob_name = session_id || '.dem';") From 78c314addb94a3147fa3cbebb4bfe14d783f32ec Mon Sep 17 00:00:00 2001 From: megascatterbomb Date: Tue, 11 Mar 2025 13:28:38 +1300 Subject: [PATCH 03/10] fix code written by inexperienced python dev (me) --- masterbase/lib.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/masterbase/lib.py b/masterbase/lib.py index 15ecaa7..88a6abb 100644 --- a/masterbase/lib.py +++ b/masterbase/lib.py @@ -904,9 +904,12 @@ def prune_if_necessary(engine: Engine, minio_client: Minio) -> bool: """ ) ) - max_size = max_result.scalar_one() * (1024 ** 3) + max_size_gb = max_result.scalar_one() + if max_size_gb is None: + return False + max_size = max_size_gb * (1024 ** 3) total_bytes_to_remove = current_size - max_size - if total_bytes_to_remove <= 0: + if max_size == 0 or total_bytes_to_remove <= 0: return False # time to prune @@ -979,7 +982,7 @@ def cleanup_pruned_demos(engine: Engine, minio_client: Minio) -> None: # If we're gonna wipe more than this % of the blobs, something is probably very wrong. max_cleanup_ratio = 0.05 - if len(minio_demoblobs_dict) > ids_in_db * max_cleanup_ratio: + if len(minio_demoblobs_dict) > len(ids_in_db) * max_cleanup_ratio: logger.warning("Too many orphaned demo blobs found, refusing to clean up because something probably broke.") return From 21a506a3f1bbfb78b4c602d87d895081c919aac1 Mon Sep 17 00:00:00 2001 From: megascatterbomb Date: Tue, 11 Mar 2025 15:02:01 +1300 Subject: [PATCH 04/10] don't try to prune already pruned demos --- masterbase/lib.py | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/masterbase/lib.py b/masterbase/lib.py index 88a6abb..6c003d4 100644 --- a/masterbase/lib.py +++ b/masterbase/lib.py @@ -877,23 +877,26 @@ def get_broadcasts(engine: Engine) -> list[dict[str, str]]: # This function is only meant to run on boot! def cleanup_hung_sessions(engine: Engine) -> None: """Remove any sessions that were left open/active after shutdown.""" - logger.info(f"Checking for hanging sessions.") + logger.info(f"Checking for hanging sessions...") with engine.connect() as conn: - conn.execute( + result = conn.execute( sa.text( """ DELETE FROM demo_sessions WHERE active = true - OR open = true; + OR open = true + OR demo_size IS NULL; """ ) ) + deleted_rows = result.rowcount conn.commit() + logger.info("Deleted %d hanging sessions.", deleted_rows) # This function is only meant to run on boot! def prune_if_necessary(engine: Engine, minio_client: Minio) -> bool: """Mark sessions as pruned so the specificed amount of free space is available.""" - + logger.info(f"Checking if we need to prune demos...") current_size = get_total_storage_usage(minio_client) with engine.connect() as conn: @@ -905,14 +908,17 @@ def prune_if_necessary(engine: Engine, minio_client: Minio) -> bool: ) ) max_size_gb = max_result.scalar_one() - if max_size_gb is None: + if max_size_gb is None or max_size_gb <= 0: + logger.warning("No storage limit set, enjoy filling your disk!") return False max_size = max_size_gb * (1024 ** 3) total_bytes_to_remove = current_size - max_size - if max_size == 0 or total_bytes_to_remove <= 0: + logger.info("Current size: %d; Max size: %d; Bytes to remove: %d", current_size, max_size, total_bytes_to_remove) + if total_bytes_to_remove <= 0: + logger.info("No need to prune.") return False - # time to prune + logger.info("Going to prune.") # get the oldest demos that don't have any detections prunable_demos_oldest_first = conn.execute( @@ -921,6 +927,7 @@ def prune_if_necessary(engine: Engine, minio_client: Minio) -> bool: SELECT (session_id, demo_size) FROM demo_sessions WHERE active = false AND open = false + AND pruned = false AND NOT IN (SELECT session_id FROM analysis) ORDER BY created_at ASC """ @@ -951,6 +958,7 @@ def prune_if_necessary(engine: Engine, minio_client: Minio) -> bool: {"session_ids_to_remove": session_ids_to_remove} ) conn.commit() + logger.info("Marking %d demos for pruning.", len(session_ids_to_remove)) # pruned demo blobs will be deleted by cleanup_orphaned_demos, which runs after this on boot return True From 3eb5f5abb4724b4fb75b073dfccb791c5bf4e918 Mon Sep 17 00:00:00 2001 From: megascatterbomb Date: Tue, 11 Mar 2025 20:32:23 +1300 Subject: [PATCH 05/10] fix marking prunes --- masterbase/lib.py | 16 +++++++++------- migrations/versions/eba5782c5979_pruning.py | 4 ++-- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/masterbase/lib.py b/masterbase/lib.py index 6c003d4..22cad4b 100644 --- a/masterbase/lib.py +++ b/masterbase/lib.py @@ -550,7 +550,7 @@ def _close_session_with_demo( end_time = :end_time, demo_size = :demo_size, markov_score = :markov_score, - updated_at = :updated_at, + updated_at = :updated_at WHERE steam_id = :steam_id AND session_id = :session_id @@ -913,22 +913,22 @@ def prune_if_necessary(engine: Engine, minio_client: Minio) -> bool: return False max_size = max_size_gb * (1024 ** 3) total_bytes_to_remove = current_size - max_size - logger.info("Current size: %d; Max size: %d; Bytes to remove: %d", current_size, max_size, total_bytes_to_remove) + logger.info("Current size: %d MB; Max size: %d MB", current_size / (1024 ** 2), max_size / (1024 ** 2)) if total_bytes_to_remove <= 0: logger.info("No need to prune.") return False - logger.info("Going to prune.") + logger.info("Attempting to prune %d MB", max(0, total_bytes_to_remove / (1024 ** 2))) # get the oldest demos that don't have any detections prunable_demos_oldest_first = conn.execute( sa.text( """ - SELECT (session_id, demo_size) FROM demo_sessions + SELECT session_id, demo_size FROM demo_sessions WHERE active = false AND open = false AND pruned = false - AND NOT IN (SELECT session_id FROM analysis) + AND session_id NOT IN (SELECT session_id FROM analysis) ORDER BY created_at ASC """ ) @@ -946,6 +946,8 @@ def prune_if_necessary(engine: Engine, minio_client: Minio) -> bool: if sum(session_ids_to_remove.values()) >= total_bytes_to_remove: break + session_ids_to_remove = list(session_ids_to_remove.keys()) + # mark as pruned conn.execute( sa.text( @@ -955,10 +957,10 @@ def prune_if_necessary(engine: Engine, minio_client: Minio) -> bool: WHERE session_id IN :session_ids_to_remove; """ ), - {"session_ids_to_remove": session_ids_to_remove} + {"session_ids_to_remove": tuple(session_ids_to_remove)} ) conn.commit() - logger.info("Marking %d demos for pruning.", len(session_ids_to_remove)) + logger.info("Marked %d demos for pruning.", len(session_ids_to_remove)) # pruned demo blobs will be deleted by cleanup_orphaned_demos, which runs after this on boot return True diff --git a/migrations/versions/eba5782c5979_pruning.py b/migrations/versions/eba5782c5979_pruning.py index c91190f..e39c4ca 100644 --- a/migrations/versions/eba5782c5979_pruning.py +++ b/migrations/versions/eba5782c5979_pruning.py @@ -40,11 +40,11 @@ def upgrade() -> None: op.execute( """ CREATE TABLE prune_config ( - max_storage_gb integer + max_storage_gb integer PRIMARY KEY DEFAULT 0 ); INSERT INTO prune_config (max_storage_gb) - VALUES (null); + VALUES (DEFAULT); """ ) From b1448605e7782fcf6dad0c82cbd2fb4005101b80 Mon Sep 17 00:00:00 2001 From: megascatterbomb Date: Tue, 11 Mar 2025 21:44:11 +1300 Subject: [PATCH 06/10] pruning works, negative ratio to ignore ratio once --- masterbase/lib.py | 68 ++++++++++++++++----- migrations/versions/eba5782c5979_pruning.py | 1 + 2 files changed, 53 insertions(+), 16 deletions(-) diff --git a/masterbase/lib.py b/masterbase/lib.py index 22cad4b..91498e0 100644 --- a/masterbase/lib.py +++ b/masterbase/lib.py @@ -4,6 +4,7 @@ import io import json import logging +import math import os import secrets import socket @@ -921,32 +922,40 @@ def prune_if_necessary(engine: Engine, minio_client: Minio) -> bool: logger.info("Attempting to prune %d MB", max(0, total_bytes_to_remove / (1024 ** 2))) # get the oldest demos that don't have any detections - prunable_demos_oldest_first = conn.execute( + # we allow demos that have already been pruned in case we somehow end up in a state + # where a demo is marked as pruned but its blob remains. + result = conn.execute( sa.text( """ - SELECT session_id, demo_size FROM demo_sessions + SELECT session_id FROM demo_sessions WHERE active = false AND open = false - AND pruned = false AND session_id NOT IN (SELECT session_id FROM analysis) ORDER BY created_at ASC """ ) - ).all() + ) - session_ids_to_remove = {} + prunable_demos_oldest_first = [row[0] for row in result.all()] + + minio_demoblobs_dict = {blob.object_name: blob for blob in minio_client.list_objects("demoblobs")} + session_ids_to_remove = [] + bytes_saved = 0 # prune just enough so we're in our space budget - for row in prunable_demos_oldest_first: - session_id, demo_size = row - if demo_size is None: # this should never happen (TODO: handle by prior cleanup) + for session_id in prunable_demos_oldest_first: + blob = minio_demoblobs_dict.get(demo_blob_name(session_id)) + if blob is None: + # already pruned, do not count continue - session_ids_to_remove[session_id] = demo_size - - if sum(session_ids_to_remove.values()) >= total_bytes_to_remove: + session_ids_to_remove.append(session_id) + bytes_saved += blob.size + if bytes_saved >= total_bytes_to_remove: break - session_ids_to_remove = list(session_ids_to_remove.keys()) + if len(session_ids_to_remove) == 0: + logger.warning("No demos to prune, but we're over the limit!") + return False # mark as pruned conn.execute( @@ -990,12 +999,39 @@ def cleanup_pruned_demos(engine: Engine, minio_client: Minio) -> None: # dicts now contain only orphaned blobs - # If we're gonna wipe more than this % of the blobs, something is probably very wrong. - max_cleanup_ratio = 0.05 - if len(minio_demoblobs_dict) > len(ids_in_db) * max_cleanup_ratio: - logger.warning("Too many orphaned demo blobs found, refusing to clean up because something probably broke.") + ratio_result = conn.execute( + sa.text( + """ + SELECT max_prune_ratio FROM prune_config; + """ + ) + ) + # If we're gonna wipe more than max_prune_ratio (default 0.05) of the blobs, something is probably very wrong. + # Setting this to negative will perform a one-time prune regardless of ratio. + max_prune_ratio = ratio_result.scalar_one() + if len(minio_demoblobs_dict) > len(ids_in_db) * max_prune_ratio and max_prune_ratio >= 0: + logger.warning("Too many orphaned demo blobs: %d (%f%%) found, but limit set to %d (%f%%). Refusing to clean up because something probably broke.", + len(minio_demoblobs_dict), + len(minio_demoblobs_dict) / len(ids_in_db) * 100, + math.floor(len(ids_in_db) * max_prune_ratio), + max_prune_ratio * 100 + ) return + if max_prune_ratio < 0: + max_prune_ratio = abs(max_prune_ratio) + logger.info("Orphaned demo cleanup forced by config. Setting back to %f", max_prune_ratio) + conn.execute( + sa.text( + """ + UPDATE prune_config + SET max_prune_ratio = :max_prune_ratio; + """ + ), + {"max_prune_ratio": max_prune_ratio} + ) + conn.commit() + for blob in minio_demoblobs_dict.values(): logger.info("Removing orphaned demo %s", blob.object_name) minio_client.remove_object("demoblobs", blob.object_name) diff --git a/migrations/versions/eba5782c5979_pruning.py b/migrations/versions/eba5782c5979_pruning.py index e39c4ca..ce272da 100644 --- a/migrations/versions/eba5782c5979_pruning.py +++ b/migrations/versions/eba5782c5979_pruning.py @@ -41,6 +41,7 @@ def upgrade() -> None: """ CREATE TABLE prune_config ( max_storage_gb integer PRIMARY KEY DEFAULT 0 + max_prune_ratio double precision DEFAULT 0.05 ); INSERT INTO prune_config (max_storage_gb) From 1cdf94ebfc5c1cee8e8ab34c49deb71e17fb4573 Mon Sep 17 00:00:00 2001 From: megascatterbomb Date: Wed, 12 Mar 2025 11:36:07 +1300 Subject: [PATCH 07/10] protect demodata & jobs endpoints from pruned demos --- masterbase/app.py | 8 ++++++-- masterbase/lib.py | 1 + 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/masterbase/app.py b/masterbase/app.py index f46d7ed..090e46a 100644 --- a/masterbase/app.py +++ b/masterbase/app.py @@ -200,8 +200,12 @@ async def demodata(request: Request, api_key: str, session_id: str) -> Stream: """Return the demo.""" minio_client = request.app.state.minio_client blob_name = demo_blob_name(session_id) - file = minio_client.get_object("demoblobs", blob_name) - stat = minio_client.stat_object("demoblobs", blob_name) + + try: + file = minio_client.get_object("demoblobs", blob_name) + stat = minio_client.stat_object("demoblobs", blob_name) + except Exception as exc: + raise HTTPException(detail="Demo not found!", status_code=404) from exc headers = { "Content-Disposition": f'attachment; filename="{blob_name}"', diff --git a/masterbase/lib.py b/masterbase/lib.py index 91498e0..49b25d2 100644 --- a/masterbase/lib.py +++ b/masterbase/lib.py @@ -319,6 +319,7 @@ def get_uningested_demos(engine: Engine, limit: int) -> list[str]: active = false AND open = false AND ingested = false + AND pruned = false AND demo_size > 0 ORDER BY created_at ASC From 509892b7a8163b113ec1c7874a1f59df8408e60b Mon Sep 17 00:00:00 2001 From: megascatterbomb Date: Wed, 12 Mar 2025 16:18:27 +1300 Subject: [PATCH 08/10] respect the "reports REFERENCES demo_sessions" when dropping hanging sessions --- masterbase/lib.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/masterbase/lib.py b/masterbase/lib.py index 49b25d2..d45d198 100644 --- a/masterbase/lib.py +++ b/masterbase/lib.py @@ -882,8 +882,15 @@ def cleanup_hung_sessions(engine: Engine) -> None: logger.info(f"Checking for hanging sessions...") with engine.connect() as conn: result = conn.execute( - sa.text( + sa.text( # We have to delete reports first because of the REFERENCES constraint """ + DELETE FROM reports WHERE session_id IN ( + SELECT session_id FROM demo_sessions + WHERE active = true + OR open = true + OR demo_size IS NULL + ); + DELETE FROM demo_sessions WHERE active = true OR open = true From e1da942c2e161a1040b9ff64f0608140c59f456d Mon Sep 17 00:00:00 2001 From: megascatterbomb Date: Wed, 12 Mar 2025 16:30:54 +1300 Subject: [PATCH 09/10] fix missing comma --- migrations/versions/eba5782c5979_pruning.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/migrations/versions/eba5782c5979_pruning.py b/migrations/versions/eba5782c5979_pruning.py index ce272da..0ce7f1d 100644 --- a/migrations/versions/eba5782c5979_pruning.py +++ b/migrations/versions/eba5782c5979_pruning.py @@ -40,7 +40,7 @@ def upgrade() -> None: op.execute( """ CREATE TABLE prune_config ( - max_storage_gb integer PRIMARY KEY DEFAULT 0 + max_storage_gb integer PRIMARY KEY DEFAULT 0, max_prune_ratio double precision DEFAULT 0.05 ); From 64d3b6e34891fb871a53c192ef6b3f5b7dbdee9c Mon Sep 17 00:00:00 2001 From: megascatterbomb Date: Wed, 9 Apr 2025 12:13:58 +1200 Subject: [PATCH 10/10] ruff linting --- masterbase/app.py | 9 +++++++-- masterbase/lib.py | 42 +++++++++++++++++++++++------------------ masterbase/registers.py | 8 +++++--- 3 files changed, 36 insertions(+), 23 deletions(-) diff --git a/masterbase/app.py b/masterbase/app.py index 090e46a..ae00d6a 100644 --- a/masterbase/app.py +++ b/masterbase/app.py @@ -23,6 +23,7 @@ sys.path.append(os.path.dirname(CURRENT_DIR)) # ruff: noqa: E402 +# ruff: noqa: I001 from masterbase.anomaly import DetectionState from masterbase.guards import ( analyst_guard, @@ -46,7 +47,6 @@ close_session_helper, db_export_chunks, demo_blob_name, - json_blob_name, generate_api_key, generate_uuid4_int, get_broadcasts, @@ -66,6 +66,8 @@ from masterbase.registers import shutdown_registers, startup_registers from masterbase.steam import account_exists, is_limited_account +# ruff: enable + logger = logging.getLogger(__name__) @@ -128,6 +130,7 @@ def close_session(request: Request, api_key: str) -> dict[str, bool]: return {"closed_successfully": True} + @post("/close_session", guards=[valid_key_guard, user_in_session_guard], sync_to_thread=False) def close_with_late_bytes(request: Request, api_key: str, data: LateBytesBody) -> dict[str, bool]: """Close a session out. Will find the latest open session for a user. @@ -262,12 +265,14 @@ async def report_player(request: Request, api_key: str, data: ReportBody) -> dic except IntegrityError: raise HTTPException(detail=f"Unknown session ID {data.session_id}", status_code=402) + @get("/broadcasts", sync_to_thread=False) def broadcasts(request: Request) -> list[dict[str, str]]: """Return a list of broadcasts.""" engine = request.app.state.engine return get_broadcasts(engine) + class DemoHandler(WebsocketListener): """Custom Websocket Class.""" @@ -311,7 +316,7 @@ async def on_disconnect(self, socket: WebSocket) -> None: # type: ignore """Close handle on disconnect.""" if socket in streaming_sessions: session_manager = streaming_sessions[socket] - else : + else: logger.warning("Attempting to disconnect from already disconnected socket!") return logger.info(f"Received socket disconnect from session ID: {session_manager.session_id}") diff --git a/masterbase/lib.py b/masterbase/lib.py index d45d198..c5d4416 100644 --- a/masterbase/lib.py +++ b/masterbase/lib.py @@ -589,9 +589,8 @@ def close_session_helper( minio_client: Minio, engine: Engine, steam_id: str, - streaming_sessions: - SocketManagerMapType, - late_bytes: bytes | None + streaming_sessions: SocketManagerMapType, + late_bytes: bytes | None, ) -> str: """Properly close a session and return a summary message. @@ -653,10 +652,12 @@ def demo_blob_name(session_id: str) -> str: """Format the object name for a demo blob.""" return f"{session_id}.dem" + def json_blob_name(session_id: str) -> str: """Format the object name for a json blob.""" return f"{session_id}.json" + def demo_sink_path(session_id: str) -> str: """Format the media path for a demo blob.""" return os.path.join(DEMOS_PATH, demo_blob_name(session_id)) @@ -865,24 +866,24 @@ def check_is_loser(engine: Engine, steam_id: str) -> bool: return bool(result) + def get_broadcasts(engine: Engine) -> list[dict[str, str]]: """Get the list of broadcasts.""" with engine.connect() as conn: - result = conn.execute( - sa.text("SELECT * FROM broadcasts") - ) + result = conn.execute(sa.text("SELECT * FROM broadcasts")) rows = [row._asdict() for row in result.all()] for row in rows: row["post_date"] = row.pop("created_at") return rows + # This function is only meant to run on boot! def cleanup_hung_sessions(engine: Engine) -> None: """Remove any sessions that were left open/active after shutdown.""" - logger.info(f"Checking for hanging sessions...") + logger.info("Checking for hanging sessions...") with engine.connect() as conn: result = conn.execute( - sa.text( # We have to delete reports first because of the REFERENCES constraint + sa.text( # We have to delete reports first because of the REFERENCES constraint """ DELETE FROM reports WHERE session_id IN ( SELECT session_id FROM demo_sessions @@ -890,7 +891,7 @@ def cleanup_hung_sessions(engine: Engine) -> None: OR open = true OR demo_size IS NULL ); - + DELETE FROM demo_sessions WHERE active = true OR open = true @@ -902,10 +903,11 @@ def cleanup_hung_sessions(engine: Engine) -> None: conn.commit() logger.info("Deleted %d hanging sessions.", deleted_rows) + # This function is only meant to run on boot! def prune_if_necessary(engine: Engine, minio_client: Minio) -> bool: """Mark sessions as pruned so the specificed amount of free space is available.""" - logger.info(f"Checking if we need to prune demos...") + logger.info("Checking if we need to prune demos...") current_size = get_total_storage_usage(minio_client) with engine.connect() as conn: @@ -920,14 +922,14 @@ def prune_if_necessary(engine: Engine, minio_client: Minio) -> bool: if max_size_gb is None or max_size_gb <= 0: logger.warning("No storage limit set, enjoy filling your disk!") return False - max_size = max_size_gb * (1024 ** 3) + max_size = max_size_gb * (1024**3) total_bytes_to_remove = current_size - max_size - logger.info("Current size: %d MB; Max size: %d MB", current_size / (1024 ** 2), max_size / (1024 ** 2)) + logger.info("Current size: %d MB; Max size: %d MB", current_size / (1024**2), max_size / (1024**2)) if total_bytes_to_remove <= 0: logger.info("No need to prune.") return False - logger.info("Attempting to prune %d MB", max(0, total_bytes_to_remove / (1024 ** 2))) + logger.info("Attempting to prune %d MB", max(0, total_bytes_to_remove / (1024**2))) # get the oldest demos that don't have any detections # we allow demos that have already been pruned in case we somehow end up in a state @@ -936,7 +938,7 @@ def prune_if_necessary(engine: Engine, minio_client: Minio) -> bool: sa.text( """ SELECT session_id FROM demo_sessions - WHERE active = false + WHERE active = false AND open = false AND session_id NOT IN (SELECT session_id FROM analysis) ORDER BY created_at ASC @@ -974,13 +976,14 @@ def prune_if_necessary(engine: Engine, minio_client: Minio) -> bool: WHERE session_id IN :session_ids_to_remove; """ ), - {"session_ids_to_remove": tuple(session_ids_to_remove)} + {"session_ids_to_remove": tuple(session_ids_to_remove)}, ) conn.commit() logger.info("Marked %d demos for pruning.", len(session_ids_to_remove)) # pruned demo blobs will be deleted by cleanup_orphaned_demos, which runs after this on boot return True + # This function is only meant to run on boot! def cleanup_pruned_demos(engine: Engine, minio_client: Minio) -> None: """Remove blobs for pruned or deleted sessions.""" @@ -1018,11 +1021,13 @@ def cleanup_pruned_demos(engine: Engine, minio_client: Minio) -> None: # Setting this to negative will perform a one-time prune regardless of ratio. max_prune_ratio = ratio_result.scalar_one() if len(minio_demoblobs_dict) > len(ids_in_db) * max_prune_ratio and max_prune_ratio >= 0: - logger.warning("Too many orphaned demo blobs: %d (%f%%) found, but limit set to %d (%f%%). Refusing to clean up because something probably broke.", + logger.warning( + "Too many orphaned demo blobs: %d (%f%%) found, but limit set to %d (%f%%). " + "Refusing to clean up because something probably broke.", len(minio_demoblobs_dict), len(minio_demoblobs_dict) / len(ids_in_db) * 100, math.floor(len(ids_in_db) * max_prune_ratio), - max_prune_ratio * 100 + max_prune_ratio * 100, ) return @@ -1036,7 +1041,7 @@ def cleanup_pruned_demos(engine: Engine, minio_client: Minio) -> None: SET max_prune_ratio = :max_prune_ratio; """ ), - {"max_prune_ratio": max_prune_ratio} + {"max_prune_ratio": max_prune_ratio}, ) conn.commit() @@ -1047,6 +1052,7 @@ def cleanup_pruned_demos(engine: Engine, minio_client: Minio) -> None: logger.info("Removing orphaned json %s", blob.object_name) minio_client.remove_object("jsonblobs", blob.object_name) + def get_total_storage_usage(minio_client: Minio) -> int: """Get the total storage used by all buckets in bytes.""" try: diff --git a/masterbase/registers.py b/masterbase/registers.py index 54fa061..1e8e600 100644 --- a/masterbase/registers.py +++ b/masterbase/registers.py @@ -8,11 +8,11 @@ from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine from masterbase.lib import ( - make_db_uri, - make_minio_client, cleanup_hung_sessions, cleanup_pruned_demos, - prune_if_necessary + make_db_uri, + make_minio_client, + prune_if_necessary, ) @@ -60,6 +60,7 @@ async def close_async_db_connection(app: Litestar) -> None: if getattr(app.state, "async_engine", None): await cast("AsyncEngine", app.state.async_engine).dispose() + def boot_cleanup(app: Litestar) -> None: """Cleanup the database on boot.""" engine = app.state.engine @@ -69,5 +70,6 @@ def boot_cleanup(app: Litestar) -> None: prune_if_necessary(engine, minio_client) cleanup_pruned_demos(engine, minio_client) + startup_registers = (get_db_connection, get_async_db_connection, get_minio_connection, boot_cleanup) shutdown_registers = (close_db_connection, close_async_db_connection)