From 9df1805b824c2dbf9b796f45b5577111467a3b71 Mon Sep 17 00:00:00 2001 From: megascatterbomb Date: Tue, 8 Oct 2024 14:45:47 +1300 Subject: [PATCH 01/17] schema revision --- migrations/versions/b941ebee3091_analysis.py | 55 ++++++++++++++++++++ 1 file changed, 55 insertions(+) create mode 100644 migrations/versions/b941ebee3091_analysis.py diff --git a/migrations/versions/b941ebee3091_analysis.py b/migrations/versions/b941ebee3091_analysis.py new file mode 100644 index 0000000..f1e9746 --- /dev/null +++ b/migrations/versions/b941ebee3091_analysis.py @@ -0,0 +1,55 @@ +"""analysis + +Revision ID: b941ebee3091 +Revises: 53d7f00c595e +Create Date: 2024-10-08 13:56:46.796256 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = 'b941ebee3091' +down_revision: Union[str, None] = '53d7f00c595e' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Add analysis and review tables.""" + op.execute( + """ + CREATE TABLE analysis ( + session_id varchar REFERENCES demo_sessions, + target_steam_id varchar, + created_at timestamptz, + detection_count int, + PRIMARY KEY (session_id, target_steam_id) + ); + """ + ) + op.execute( + """ + CREATE TABLE reviews ( + session_id varchar REFERENCES demo_sessions, + target_steam_id varchar, + reviewer_steam_id varchar, + created_at timestamptz, + verdict varchar, + PRIMARY KEY (session_id, target_steam_id, reviewer_steam_id) + ); + """ + ) + + +def downgrade() -> None: + """Remove analysis and review tables.""" + op.execute( + """ + DROP TABLE analysis; + DROP TABLE reviews; + """ + ) From 8b65a50de62a851e4edc9861f46e0280ae4c62e0 Mon Sep 17 00:00:00 2001 From: megascatterbomb Date: Tue, 8 Oct 2024 16:10:40 +1300 Subject: [PATCH 02/17] (WIP) endpoints for analysis client --- .github/workflows/integration.yaml | 1 + masterbase/app.py | 22 +++++++++++- masterbase/lib.py | 58 ++++++++++++++++++++++++++++++ masterbase/models.py | 11 ++++++ 4 files changed, 91 insertions(+), 1 deletion(-) diff --git a/.github/workflows/integration.yaml b/.github/workflows/integration.yaml index e77ac82..6a0f879 100644 --- a/.github/workflows/integration.yaml +++ b/.github/workflows/integration.yaml @@ -49,6 +49,7 @@ jobs: run: ./mc alias set blobs http://127.0.0.1:9000 MEGASCATTERBOMB masterbase - name: more minio bs run: ./mc mb -p blobs/demoblobs + run: ./mc mb -p blobs/analysisblobs - name: Remove mc client run: rm -v ./mc diff --git a/masterbase/app.py b/masterbase/app.py index 1959952..a0735b5 100644 --- a/masterbase/app.py +++ b/masterbase/app.py @@ -51,7 +51,7 @@ steam_id_from_api_key, update_api_key, ) -from masterbase.models import ExportTable, LateBytesBody, ReportBody +from masterbase.models import ExportTable, LateBytesBody, ReportBody, IngestBody from masterbase.registers import shutdown_registers, startup_registers from masterbase.steam import account_exists, is_limited_account @@ -196,6 +196,26 @@ def db_export(request: Request, api_key: str, table: ExportTable) -> Stream: }, ) +@get("/jobs", guards=[valid_key_guard, analyst_guard], sync_to_thread=False) +def jobs(request: Request, api_key: str, limit: int = 1) -> list[dict[str, str]]: + """Return a list of demos that need analysis.""" + engine = request.app.state.engine + demos = get_uningested_demos(engine, limit) + + return demos + +@post("/ingest", guards=[valid_key_guard, analyst_guard], sync_to_thread=False) +def ingest(request: Request, api_key: str, session_id: str, data: IngestBody) -> dict[str, bool]: + """Upload completed analysis.""" + try: + data = IngestBody(**data) + except pydantic.ValidationError as e: + raise HTTPException(status_code=400, detail="Malformed analysis data.") + + ingest_demo(request.app.state.engine, session_id, data) + + return {"ingest_successful": True} + @post("/report", guards=[valid_key_guard]) async def report_player(request: Request, api_key: str, data: ReportBody) -> dict[str, bool]: diff --git a/masterbase/lib.py b/masterbase/lib.py index 9aec222..27b84aa 100644 --- a/masterbase/lib.py +++ b/masterbase/lib.py @@ -21,6 +21,7 @@ from sqlalchemy.ext.asyncio import AsyncEngine from masterbase.anomaly import DetectionState +from masterbase.models import IngestBody logger = logging.getLogger(__name__) @@ -304,6 +305,63 @@ async def check_analyst(engine: AsyncEngine, steam_id: str) -> bool: return analyst +def get_uningested_demos(engine: Engine, limit: int) -> list[str]: + """Get a list of uningested demos.""" + sql = "SELECT session_id FROM demo_sessions WHERE ingested = false ORDER BY created_at ASC LIMIT :limit;" + params = {"limit": limit} + + with engine.connect() as conn: + result = conn.execute( + sa.text(sql), + params, + ) + + data = result.all() + uningested_demos = [row["session_id"] for row in data] + + return uningested_demos + + +def ingest_demo(engine: Engine, session_id: str, data: IngestBody): + """Ingest a demo analysis from an analysis client.""" + + # ensure the demo session is not already ingested + is_ingested_sql = "SELECT ingested FROM demo_sessions WHERE session_id = :session_id;" + + # Wipe existing analysis data (we want to be able to reingest a demo if necessary by manually setting ingested = false) + analysis_sql = "DELETE FROM analysis WHERE session_id = :session_id;" + reviews_sql = "DELETE FROM reviews WHERE session_id = :session_id;" + + # Mark the demo as ingested + mark_ingested_sql = "UPDATE demo_sessions SET ingested = true WHERE session_id = :session_id;" + + with engine.connect() as conn: + result = conn.execute( + sa.text(is_ingested_sql), + {"session_id": session_id}, + ) + + data = result.one_or_none() + if data is None: + return "demo not found" + if data["ingested"]: + return "demo already ingested" + + conn.execute( + sa.text(analysis_sql), + {"session_id": session_id}, + ) + conn.execute( + sa.text(reviews_sql), + {"session_id": session_id}, + ) + conn.execute( + sa.text(mark_ingested_sql), + {"session_id": session_id}, + ) + conn.commit() + + async def session_closed(engine: AsyncEngine, session_id: str) -> bool: """Determine if a session is active.""" sql = "SELECT active FROM demo_sessions WHERE session_id = :session_id;" diff --git a/masterbase/models.py b/masterbase/models.py index 2ff131c..dc33459 100644 --- a/masterbase/models.py +++ b/masterbase/models.py @@ -20,6 +20,17 @@ class ReportBody(BaseModel): reason: ReportReason +class Detection(BaseModel): + """A single detection from the analysis client.""" + tick: int + algorithm: str + player: int + data: dict + +class IngestBody(BaseModel): + """The body of the POST /demos endpoint.""" + detections: list[Detection] + class ExportTable(str, Enum): """Tables to be allowed in database exports.""" From ecdd9ac2420b1169efc0da200944282c9fca5089 Mon Sep 17 00:00:00 2001 From: megascatterbomb Date: Tue, 8 Oct 2024 20:43:05 +1300 Subject: [PATCH 03/17] "it works on my machine" --- masterbase/app.py | 4 ++++ services/api/Dockerfile | 5 ++++- services/minio/Dockerfile | 2 ++ 3 files changed, 10 insertions(+), 1 deletion(-) diff --git a/masterbase/app.py b/masterbase/app.py index a0735b5..e269953 100644 --- a/masterbase/app.py +++ b/masterbase/app.py @@ -16,6 +16,10 @@ from litestar.status_codes import HTTP_500_INTERNAL_SERVER_ERROR from sqlalchemy.exc import IntegrityError +import sys +CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) +sys.path.append(os.path.dirname(CURRENT_DIR)) + from masterbase.anomaly import DetectionState from masterbase.guards import ( analyst_guard, diff --git a/services/api/Dockerfile b/services/api/Dockerfile index 1a84fd6..b667cb9 100644 --- a/services/api/Dockerfile +++ b/services/api/Dockerfile @@ -11,7 +11,8 @@ COPY . . RUN apt-get update && apt-get install -y --no-install-recommends \ apt-utils \ - postgresql-client + postgresql-client \ + dos2unix RUN pdm sync --prod --no-editable @@ -22,4 +23,6 @@ RUN touch /first_run COPY services/api/start.sh /usr/local/bin/start.sh RUN chmod +x /usr/local/bin/start.sh +RUN dos2unix /usr/local/bin/start.sh + ENTRYPOINT /usr/local/bin/start.sh diff --git a/services/minio/Dockerfile b/services/minio/Dockerfile index f7893a6..989be77 100644 --- a/services/minio/Dockerfile +++ b/services/minio/Dockerfile @@ -12,6 +12,8 @@ EXPOSE 9001 COPY services/minio/start.sh /usr/local/bin/start.sh +RUN tr -d '\r' < /usr/local/bin/start.sh > /usr/local/bin/start_unix.sh && mv /usr/local/bin/start_unix.sh /usr/local/bin/start.sh + RUN chmod +x /usr/local/bin/start.sh ENTRYPOINT /usr/local/bin/start.sh \ No newline at end of file From f1cb03a2215acceaffa6d67e84270a5e8060e7fe Mon Sep 17 00:00:00 2001 From: megascatterbomb Date: Tue, 8 Oct 2024 21:13:04 +1300 Subject: [PATCH 04/17] fix WIP errors, add vars.ps1 --- masterbase/app.py | 5 ++++- vars.ps1 | 11 +++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) create mode 100644 vars.ps1 diff --git a/masterbase/app.py b/masterbase/app.py index e269953..5cb1dff 100644 --- a/masterbase/app.py +++ b/masterbase/app.py @@ -15,6 +15,7 @@ from litestar.response import Redirect, Response, Stream from litestar.status_codes import HTTP_500_INTERNAL_SERVER_ERROR from sqlalchemy.exc import IntegrityError +from pydantic import ValidationError import sys CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) @@ -45,6 +46,8 @@ demo_blob_name, generate_api_key, generate_uuid4_int, + get_uningested_demos, + ingest_demo, late_bytes_helper, list_demos_helper, provision_api_key, @@ -213,7 +216,7 @@ def ingest(request: Request, api_key: str, session_id: str, data: IngestBody) -> """Upload completed analysis.""" try: data = IngestBody(**data) - except pydantic.ValidationError as e: + except ValidationError: raise HTTPException(status_code=400, detail="Malformed analysis data.") ingest_demo(request.app.state.engine, session_id, data) diff --git a/vars.ps1 b/vars.ps1 new file mode 100644 index 0000000..99e7c54 --- /dev/null +++ b/vars.ps1 @@ -0,0 +1,11 @@ +$env:DEVELOPMENT = $true +$env:DEBUG_WAIT_FOR_ATTACH = $true +$env:STEAM_API_KEY = "foo" +$env:POSTGRES_USER = "MEGASCATTERBOMB" +$env:POSTGRES_PASSWORD = "masterbase" +$env:POSTGRES_HOST = "localhost" +$env:POSTGRES_PORT = 8050 +$env:MINIO_HOST = "localhost" +$env:MINIO_PORT = 9000 +$env:MINIO_ACCESS_KEY = "MEGASCATTERBOMB" +$env:MINIO_SECRET_KEY = "masterbase" From e9f95e911cfd97d10f79121094e95235cfa53a7c Mon Sep 17 00:00:00 2001 From: megascatterbomb Date: Tue, 8 Oct 2024 21:52:37 +1300 Subject: [PATCH 05/17] /jobs --- masterbase/app.py | 2 ++ masterbase/lib.py | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/masterbase/app.py b/masterbase/app.py index 5cb1dff..a83025d 100644 --- a/masterbase/app.py +++ b/masterbase/app.py @@ -448,6 +448,8 @@ def plain_text_exception_handler(_: Request, exception: Exception) -> Response: analyst_list_demos, report_player, db_export, + jobs, + ingest ], exception_handlers={Exception: plain_text_exception_handler}, on_shutdown=shutdown_registers, diff --git a/masterbase/lib.py b/masterbase/lib.py index 27b84aa..8b68383 100644 --- a/masterbase/lib.py +++ b/masterbase/lib.py @@ -317,7 +317,7 @@ def get_uningested_demos(engine: Engine, limit: int) -> list[str]: ) data = result.all() - uningested_demos = [row["session_id"] for row in data] + uningested_demos = [row[0] for row in data] return uningested_demos From 52d2fc454cae04d976301970a8326f16ab428092 Mon Sep 17 00:00:00 2001 From: megascatterbomb Date: Tue, 8 Oct 2024 21:58:28 +1300 Subject: [PATCH 06/17] /jobs doesn't return active sessions --- masterbase/lib.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/masterbase/lib.py b/masterbase/lib.py index 8b68383..505e81d 100644 --- a/masterbase/lib.py +++ b/masterbase/lib.py @@ -307,7 +307,19 @@ async def check_analyst(engine: AsyncEngine, steam_id: str) -> bool: def get_uningested_demos(engine: Engine, limit: int) -> list[str]: """Get a list of uningested demos.""" - sql = "SELECT session_id FROM demo_sessions WHERE ingested = false ORDER BY created_at ASC LIMIT :limit;" + sql = """ + SELECT + session_id + FROM + demo_sessions + WHERE + active = false + AND open = false + AND ingested = false + ORDER BY + created_at ASC + LIMIT :limit; + """ params = {"limit": limit} with engine.connect() as conn: From 973c671591793508cf509b30a408833c182fad03 Mon Sep 17 00:00:00 2001 From: megascatterbomb Date: Thu, 10 Oct 2024 13:52:38 +1300 Subject: [PATCH 07/17] (WIP) analysis ingestion --- masterbase/lib.py | 78 ++++++++++++++------ migrations/versions/b941ebee3091_analysis.py | 13 +++- 2 files changed, 64 insertions(+), 27 deletions(-) diff --git a/masterbase/lib.py b/masterbase/lib.py index 505e81d..c2a6ff4 100644 --- a/masterbase/lib.py +++ b/masterbase/lib.py @@ -341,38 +341,68 @@ def ingest_demo(engine: Engine, session_id: str, data: IngestBody): is_ingested_sql = "SELECT ingested FROM demo_sessions WHERE session_id = :session_id;" # Wipe existing analysis data (we want to be able to reingest a demo if necessary by manually setting ingested = false) - analysis_sql = "DELETE FROM analysis WHERE session_id = :session_id;" - reviews_sql = "DELETE FROM reviews WHERE session_id = :session_id;" + wipe_analysis_sql = "DELETE FROM analysis WHERE session_id = :session_id;" + wipe_reviews_sql = "DELETE FROM reviews WHERE session_id = :session_id;" + + # Insert the analysis data + detections = data.detections + algorithm_counts = {} + for detection in detections: + key = (detection.player, detection.algorithm) + if key not in algorithm_counts: + algorithm_counts[key] = 0 + algorithm_counts[key] += 1 + + insert_sql = """\ + INSERT INTO analysis ( + session_id, target_steam_id, algorithm_type, detection_count + ) VALUES ( + :session_id, :target_steam_id, :algorithm, :count + ); + """ # Mark the demo as ingested mark_ingested_sql = "UPDATE demo_sessions SET ingested = true WHERE session_id = :session_id;" with engine.connect() as conn: - result = conn.execute( - sa.text(is_ingested_sql), - {"session_id": session_id}, - ) + with conn.begin(): + result = conn.execute( + sa.text(is_ingested_sql), + {"session_id": session_id}, + ) - data = result.one_or_none() - if data is None: - return "demo not found" - if data["ingested"]: - return "demo already ingested" + data = result.one_or_none() + if data is None: + conn.rollback() + return "demo not found" + if data["ingested"]: + conn.rollback() + return "demo already ingested" + + conn.execute( + sa.text(wipe_analysis_sql), + {"session_id": session_id}, + ) + conn.execute( + sa.text(wipe_reviews_sql), + {"session_id": session_id}, + ) - conn.execute( - sa.text(analysis_sql), - {"session_id": session_id}, - ) - conn.execute( - sa.text(reviews_sql), - {"session_id": session_id}, - ) - conn.execute( - sa.text(mark_ingested_sql), - {"session_id": session_id}, - ) - conn.commit() + for key, count in algorithm_counts.items(): + conn.execute( + sa.text(insert_sql), + { + "session_id": session_id, + "target_steam_id": key[0], + "algorithm": key[1], + "count": count, + }, + ) + conn.execute( + sa.text(mark_ingested_sql), + {"session_id": session_id}, + ) async def session_closed(engine: AsyncEngine, session_id: str) -> bool: """Determine if a session is active.""" diff --git a/migrations/versions/b941ebee3091_analysis.py b/migrations/versions/b941ebee3091_analysis.py index f1e9746..bcece0a 100644 --- a/migrations/versions/b941ebee3091_analysis.py +++ b/migrations/versions/b941ebee3091_analysis.py @@ -20,14 +20,20 @@ def upgrade() -> None: """Add analysis and review tables.""" + op.execute( + """ + CREATE TYPE verdict AS ENUM ('none', 'benign', 'inconclusive', 'confirmed', 'error'); + """ + ) op.execute( """ CREATE TABLE analysis ( session_id varchar REFERENCES demo_sessions, target_steam_id varchar, - created_at timestamptz, + algorithm_type varchar, detection_count int, - PRIMARY KEY (session_id, target_steam_id) + created_at timestamptz, + PRIMARY KEY (session_id, target_steam_id, algorithm_type) ); """ ) @@ -37,8 +43,8 @@ def upgrade() -> None: session_id varchar REFERENCES demo_sessions, target_steam_id varchar, reviewer_steam_id varchar, + verdict verdict, created_at timestamptz, - verdict varchar, PRIMARY KEY (session_id, target_steam_id, reviewer_steam_id) ); """ @@ -53,3 +59,4 @@ def downgrade() -> None: DROP TABLE reviews; """ ) + op.execute("DROP TYPE verdict;") From 95eefbfec19fa73baf41860c2111bf94d30a76c5 Mon Sep 17 00:00:00 2001 From: megascatterbomb Date: Sat, 12 Oct 2024 15:49:04 +1300 Subject: [PATCH 08/17] don't pass data through /ingest (we'll get it from minio) --- masterbase/app.py | 14 +++++--------- masterbase/lib.py | 17 +++++++++++++++-- masterbase/models.py | 2 +- 3 files changed, 21 insertions(+), 12 deletions(-) diff --git a/masterbase/app.py b/masterbase/app.py index a83025d..56f42d6 100644 --- a/masterbase/app.py +++ b/masterbase/app.py @@ -58,7 +58,7 @@ steam_id_from_api_key, update_api_key, ) -from masterbase.models import ExportTable, LateBytesBody, ReportBody, IngestBody +from masterbase.models import ExportTable, LateBytesBody, ReportBody from masterbase.registers import shutdown_registers, startup_registers from masterbase.steam import account_exists, is_limited_account @@ -212,14 +212,10 @@ def jobs(request: Request, api_key: str, limit: int = 1) -> list[dict[str, str]] return demos @post("/ingest", guards=[valid_key_guard, analyst_guard], sync_to_thread=False) -def ingest(request: Request, api_key: str, session_id: str, data: IngestBody) -> dict[str, bool]: - """Upload completed analysis.""" - try: - data = IngestBody(**data) - except ValidationError: - raise HTTPException(status_code=400, detail="Malformed analysis data.") - - ingest_demo(request.app.state.engine, session_id, data) +def ingest(request: Request, api_key: str, session_id: str) -> dict[str, bool]: + """Report analysis as completed, ingest into database.""" + minio_client = request.app.state.minio_client + ingest_demo(minio_client, request.app.state.engine, session_id) return {"ingest_successful": True} diff --git a/masterbase/lib.py b/masterbase/lib.py index c2a6ff4..bd838af 100644 --- a/masterbase/lib.py +++ b/masterbase/lib.py @@ -16,12 +16,13 @@ from litestar import WebSocket from minio import Minio, S3Error from minio.datatypes import Object as BlobStat +from pydantic import ValidationError from sqlalchemy import Engine from sqlalchemy.exc import NoResultFound from sqlalchemy.ext.asyncio import AsyncEngine from masterbase.anomaly import DetectionState -from masterbase.models import IngestBody +from masterbase.models import Analysis logger = logging.getLogger(__name__) @@ -334,9 +335,21 @@ def get_uningested_demos(engine: Engine, limit: int) -> list[str]: return uningested_demos -def ingest_demo(engine: Engine, session_id: str, data: IngestBody): +def ingest_demo(minio_client: Minio, engine: Engine, session_id: str): """Ingest a demo analysis from an analysis client.""" + blob_name = f"jsonblobs/{session_id}.json" + try: + data = minio_client.get_object("jsonblobs", blob_name).read().decode("utf-8") + data = Analysis.parse_raw(data) + except S3Error as err: + if err.code == "NoSuchKey": + return "no analysis data found." + else: + return "unknown error while looking up analysis data." + except ValidationError: + return "malformed analysis data." + # ensure the demo session is not already ingested is_ingested_sql = "SELECT ingested FROM demo_sessions WHERE session_id = :session_id;" diff --git a/masterbase/models.py b/masterbase/models.py index dc33459..36740b5 100644 --- a/masterbase/models.py +++ b/masterbase/models.py @@ -27,7 +27,7 @@ class Detection(BaseModel): player: int data: dict -class IngestBody(BaseModel): +class Analysis(BaseModel): """The body of the POST /demos endpoint.""" detections: list[Detection] From de5aa8d57a26ffec3f2777a425e7c797b02c0278 Mon Sep 17 00:00:00 2001 From: megascatterbomb Date: Fri, 18 Oct 2024 12:22:52 +1300 Subject: [PATCH 09/17] (WIP) load json on ingest --- masterbase/app.py | 6 ++++-- masterbase/lib.py | 22 +++++++++++++++++----- masterbase/models.py | 4 ++++ masterbase/registers.py | 2 ++ 4 files changed, 27 insertions(+), 7 deletions(-) diff --git a/masterbase/app.py b/masterbase/app.py index 56f42d6..dd5ff46 100644 --- a/masterbase/app.py +++ b/masterbase/app.py @@ -215,9 +215,11 @@ def jobs(request: Request, api_key: str, limit: int = 1) -> list[dict[str, str]] def ingest(request: Request, api_key: str, session_id: str) -> dict[str, bool]: """Report analysis as completed, ingest into database.""" minio_client = request.app.state.minio_client - ingest_demo(minio_client, request.app.state.engine, session_id) + err = ingest_demo(minio_client, request.app.state.engine, session_id) - return {"ingest_successful": True} + if err is None: + return {"ingest_successful": True} + return {"ingest_successful": False, "error": err} @post("/report", guards=[valid_key_guard]) diff --git a/masterbase/lib.py b/masterbase/lib.py index bd838af..ee85906 100644 --- a/masterbase/lib.py +++ b/masterbase/lib.py @@ -2,6 +2,7 @@ import hashlib import io +import json import logging import os import secrets @@ -17,6 +18,7 @@ from minio import Minio, S3Error from minio.datatypes import Object as BlobStat from pydantic import ValidationError +from pydantic_core import from_json from sqlalchemy import Engine from sqlalchemy.exc import NoResultFound from sqlalchemy.ext.asyncio import AsyncEngine @@ -338,16 +340,25 @@ def get_uningested_demos(engine: Engine, limit: int) -> list[str]: def ingest_demo(minio_client: Minio, engine: Engine, session_id: str): """Ingest a demo analysis from an analysis client.""" - blob_name = f"jsonblobs/{session_id}.json" + blob_name = f"{session_id}.json" try: - data = minio_client.get_object("jsonblobs", blob_name).read().decode("utf-8") - data = Analysis.parse_raw(data) + # Skip 2 bytes to avoid utf-16 decoding issues + data = minio_client.get_object("jsonblobs", blob_name, 2).read(decode_content=True) + print("bytes") + print(data) + data = data.decode("utf-16-le") + print("string") + print(data) + data = json.JSONDecoder().decode(data) + print("json") + print(data) except S3Error as err: if err.code == "NoSuchKey": return "no analysis data found." else: - return "unknown error while looking up analysis data." - except ValidationError: + return "unknown S3 error while looking up analysis data." + except ValidationError as err: + print(err) return "malformed analysis data." # ensure the demo session is not already ingested @@ -416,6 +427,7 @@ def ingest_demo(minio_client: Minio, engine: Engine, session_id: str): sa.text(mark_ingested_sql), {"session_id": session_id}, ) + return None async def session_closed(engine: AsyncEngine, session_id: str) -> bool: """Determine if a session is active.""" diff --git a/masterbase/models.py b/masterbase/models.py index 36740b5..b0c31d7 100644 --- a/masterbase/models.py +++ b/masterbase/models.py @@ -29,7 +29,11 @@ class Detection(BaseModel): class Analysis(BaseModel): """The body of the POST /demos endpoint.""" + author: str detections: list[Detection] + duration: int + map: str + server_ip: str class ExportTable(str, Enum): """Tables to be allowed in database exports.""" diff --git a/masterbase/registers.py b/masterbase/registers.py index 4006783..dfe6fc5 100644 --- a/masterbase/registers.py +++ b/masterbase/registers.py @@ -16,6 +16,8 @@ def get_minio_connection(app: Litestar) -> Minio: minio_client = make_minio_client() if not minio_client.bucket_exists("demoblobs"): minio_client.make_bucket("demoblobs", "us-east-1") + if not minio_client.bucket_exists("jsonblobs"): + minio_client.make_bucket("jsonblobs", "us-east-1") app.state.minio_client = minio_client return cast(Minio, app.state.minio_client) From 272d0bc12d295973ae6c06ae1061001eb1e131eb Mon Sep 17 00:00:00 2001 From: megascatterbomb Date: Fri, 18 Oct 2024 12:51:12 +1300 Subject: [PATCH 10/17] ingestion works! --- masterbase/app.py | 4 ++-- masterbase/lib.py | 32 ++++++++++++++------------------ masterbase/models.py | 3 ++- 3 files changed, 18 insertions(+), 21 deletions(-) diff --git a/masterbase/app.py b/masterbase/app.py index dd5ff46..03c0297 100644 --- a/masterbase/app.py +++ b/masterbase/app.py @@ -218,8 +218,8 @@ def ingest(request: Request, api_key: str, session_id: str) -> dict[str, bool]: err = ingest_demo(minio_client, request.app.state.engine, session_id) if err is None: - return {"ingest_successful": True} - return {"ingest_successful": False, "error": err} + return {"ingested": True} + raise HTTPException(detail=f"Internal Error Occured: {err}", status_code=500) @post("/report", guards=[valid_key_guard]) diff --git a/masterbase/lib.py b/masterbase/lib.py index ee85906..b175ee3 100644 --- a/masterbase/lib.py +++ b/masterbase/lib.py @@ -344,14 +344,9 @@ def ingest_demo(minio_client: Minio, engine: Engine, session_id: str): try: # Skip 2 bytes to avoid utf-16 decoding issues data = minio_client.get_object("jsonblobs", blob_name, 2).read(decode_content=True) - print("bytes") - print(data) data = data.decode("utf-16-le") - print("string") - print(data) data = json.JSONDecoder().decode(data) - print("json") - print(data) + data = Analysis.parse_obj(data) except S3Error as err: if err.code == "NoSuchKey": return "no analysis data found." @@ -361,6 +356,15 @@ def ingest_demo(minio_client: Minio, engine: Engine, session_id: str): print(err) return "malformed analysis data." + # Data preprocessing + print(data) + algorithm_counts = {} + for detection in data.detections: + key = (detection.player, detection.algorithm) + if key not in algorithm_counts: + algorithm_counts[key] = 0 + algorithm_counts[key] += 1 + # ensure the demo session is not already ingested is_ingested_sql = "SELECT ingested FROM demo_sessions WHERE session_id = :session_id;" @@ -369,14 +373,6 @@ def ingest_demo(minio_client: Minio, engine: Engine, session_id: str): wipe_reviews_sql = "DELETE FROM reviews WHERE session_id = :session_id;" # Insert the analysis data - detections = data.detections - algorithm_counts = {} - for detection in detections: - key = (detection.player, detection.algorithm) - if key not in algorithm_counts: - algorithm_counts[key] = 0 - algorithm_counts[key] += 1 - insert_sql = """\ INSERT INTO analysis ( session_id, target_steam_id, algorithm_type, detection_count @@ -395,11 +391,11 @@ def ingest_demo(minio_client: Minio, engine: Engine, session_id: str): {"session_id": session_id}, ) - data = result.one_or_none() - if data is None: + result = result.one_or_none() + if result is None: conn.rollback() return "demo not found" - if data["ingested"]: + if result.ingested is True: conn.rollback() return "demo already ingested" @@ -414,7 +410,7 @@ def ingest_demo(minio_client: Minio, engine: Engine, session_id: str): for key, count in algorithm_counts.items(): conn.execute( - sa.text(insert_sql), + sa.text(insert_sql), { "session_id": session_id, "target_steam_id": key[0], diff --git a/masterbase/models.py b/masterbase/models.py index b0c31d7..48d1996 100644 --- a/masterbase/models.py +++ b/masterbase/models.py @@ -1,6 +1,7 @@ """Module of pydantic models.""" from enum import Enum +from typing import Any from pydantic import BaseModel @@ -25,7 +26,7 @@ class Detection(BaseModel): tick: int algorithm: str player: int - data: dict + data: Any class Analysis(BaseModel): """The body of the POST /demos endpoint.""" From 41a465cff11e7f902fca7cf0d0f26d8e42efee79 Mon Sep 17 00:00:00 2001 From: megascatterbomb Date: Fri, 18 Oct 2024 13:46:36 +1300 Subject: [PATCH 11/17] never ingest active session --- masterbase/lib.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/masterbase/lib.py b/masterbase/lib.py index b175ee3..1e94e03 100644 --- a/masterbase/lib.py +++ b/masterbase/lib.py @@ -398,6 +398,12 @@ def ingest_demo(minio_client: Minio, engine: Engine, session_id: str): if result.ingested is True: conn.rollback() return "demo already ingested" + if result.active is True: + conn.rollback() + return "session is still active" + if result.open is True: + conn.rollback() + return "session is still open" conn.execute( sa.text(wipe_analysis_sql), From 14b3a02ad22923259c4015d8ce139fa2c196d928 Mon Sep 17 00:00:00 2001 From: megascatterbomb Date: Sun, 20 Oct 2024 16:10:50 +1300 Subject: [PATCH 12/17] use utf-8, don't job null demo_blobs --- masterbase/app.py | 1 - masterbase/lib.py | 11 +++++------ 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/masterbase/app.py b/masterbase/app.py index 03c0297..e9719b3 100644 --- a/masterbase/app.py +++ b/masterbase/app.py @@ -216,7 +216,6 @@ def ingest(request: Request, api_key: str, session_id: str) -> dict[str, bool]: """Report analysis as completed, ingest into database.""" minio_client = request.app.state.minio_client err = ingest_demo(minio_client, request.app.state.engine, session_id) - if err is None: return {"ingested": True} raise HTTPException(detail=f"Internal Error Occured: {err}", status_code=500) diff --git a/masterbase/lib.py b/masterbase/lib.py index 1e94e03..01c5a7a 100644 --- a/masterbase/lib.py +++ b/masterbase/lib.py @@ -319,6 +319,8 @@ def get_uningested_demos(engine: Engine, limit: int) -> list[str]: active = false AND open = false AND ingested = false + AND demo_size > 0 + AND blob_name IS NOT NULL ORDER BY created_at ASC LIMIT :limit; @@ -342,9 +344,8 @@ def ingest_demo(minio_client: Minio, engine: Engine, session_id: str): blob_name = f"{session_id}.json" try: - # Skip 2 bytes to avoid utf-16 decoding issues - data = minio_client.get_object("jsonblobs", blob_name, 2).read(decode_content=True) - data = data.decode("utf-16-le") + data = minio_client.get_object("jsonblobs", blob_name).read() + data = data.decode("utf-8") data = json.JSONDecoder().decode(data) data = Analysis.parse_obj(data) except S3Error as err: @@ -353,11 +354,9 @@ def ingest_demo(minio_client: Minio, engine: Engine, session_id: str): else: return "unknown S3 error while looking up analysis data." except ValidationError as err: - print(err) return "malformed analysis data." # Data preprocessing - print(data) algorithm_counts = {} for detection in data.detections: key = (detection.player, detection.algorithm) @@ -366,7 +365,7 @@ def ingest_demo(minio_client: Minio, engine: Engine, session_id: str): algorithm_counts[key] += 1 # ensure the demo session is not already ingested - is_ingested_sql = "SELECT ingested FROM demo_sessions WHERE session_id = :session_id;" + is_ingested_sql = "SELECT ingested, active, open FROM demo_sessions WHERE session_id = :session_id;" # Wipe existing analysis data (we want to be able to reingest a demo if necessary by manually setting ingested = false) wipe_analysis_sql = "DELETE FROM analysis WHERE session_id = :session_id;" From 8a6849535745cab3fe0b4f7eab44d3963b887373 Mon Sep 17 00:00:00 2001 From: megascatterbomb Date: Wed, 30 Oct 2024 13:18:57 +1300 Subject: [PATCH 13/17] populate created_at --- masterbase/lib.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/masterbase/lib.py b/masterbase/lib.py index 01c5a7a..b045233 100644 --- a/masterbase/lib.py +++ b/masterbase/lib.py @@ -374,14 +374,15 @@ def ingest_demo(minio_client: Minio, engine: Engine, session_id: str): # Insert the analysis data insert_sql = """\ INSERT INTO analysis ( - session_id, target_steam_id, algorithm_type, detection_count + session_id, target_steam_id, algorithm_type, detection_count, created_at ) VALUES ( - :session_id, :target_steam_id, :algorithm, :count + :session_id, :target_steam_id, :algorithm, :count, :created_at ); """ # Mark the demo as ingested mark_ingested_sql = "UPDATE demo_sessions SET ingested = true WHERE session_id = :session_id;" + created_at = datetime.now().astimezone(timezone.utc).isoformat() with engine.connect() as conn: with conn.begin(): @@ -421,6 +422,7 @@ def ingest_demo(minio_client: Minio, engine: Engine, session_id: str): "target_steam_id": key[0], "algorithm": key[1], "count": count, + "created_at": created_at, }, ) From 5ff8e2e53951f8c28f18ba9b918e1262ddb989ac Mon Sep 17 00:00:00 2001 From: megascatterbomb Date: Mon, 27 Jan 2025 13:13:59 +1300 Subject: [PATCH 14/17] linting --- masterbase/app.py | 11 +++++++---- masterbase/lib.py | 1 + masterbase/models.py | 4 ++++ 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/masterbase/app.py b/masterbase/app.py index e9719b3..5bc9449 100644 --- a/masterbase/app.py +++ b/masterbase/app.py @@ -18,6 +18,7 @@ from pydantic import ValidationError import sys + CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) sys.path.append(os.path.dirname(CURRENT_DIR)) @@ -203,14 +204,16 @@ def db_export(request: Request, api_key: str, table: ExportTable) -> Stream: }, ) + @get("/jobs", guards=[valid_key_guard, analyst_guard], sync_to_thread=False) def jobs(request: Request, api_key: str, limit: int = 1) -> list[dict[str, str]]: """Return a list of demos that need analysis.""" engine = request.app.state.engine demos = get_uningested_demos(engine, limit) - + return demos + @post("/ingest", guards=[valid_key_guard, analyst_guard], sync_to_thread=False) def ingest(request: Request, api_key: str, session_id: str) -> dict[str, bool]: """Report analysis as completed, ingest into database.""" @@ -303,8 +306,8 @@ def provision(request: Request) -> Redirect: """ # enforce https on base_url base_url = str(request.base_url) - dev_mode = os.getenv('DEVELOPMENT', 'false') - proto = "http://" if dev_mode.lower() == 'true' else "https://" + dev_mode = os.getenv("DEVELOPMENT", "false") + proto = "http://" if dev_mode.lower() == "true" else "https://" base_url = proto + base_url.split("//")[-1] auth_params = { @@ -446,7 +449,7 @@ def plain_text_exception_handler(_: Request, exception: Exception) -> Response: report_player, db_export, jobs, - ingest + ingest, ], exception_handlers={Exception: plain_text_exception_handler}, on_shutdown=shutdown_registers, diff --git a/masterbase/lib.py b/masterbase/lib.py index b045233..703af9d 100644 --- a/masterbase/lib.py +++ b/masterbase/lib.py @@ -432,6 +432,7 @@ def ingest_demo(minio_client: Minio, engine: Engine, session_id: str): ) return None + async def session_closed(engine: AsyncEngine, session_id: str) -> bool: """Determine if a session is active.""" sql = "SELECT active FROM demo_sessions WHERE session_id = :session_id;" diff --git a/masterbase/models.py b/masterbase/models.py index 48d1996..4f5939a 100644 --- a/masterbase/models.py +++ b/masterbase/models.py @@ -23,19 +23,23 @@ class ReportBody(BaseModel): class Detection(BaseModel): """A single detection from the analysis client.""" + tick: int algorithm: str player: int data: Any + class Analysis(BaseModel): """The body of the POST /demos endpoint.""" + author: str detections: list[Detection] duration: int map: str server_ip: str + class ExportTable(str, Enum): """Tables to be allowed in database exports.""" From 8243852a4569d8b4a5545637a2f873621d2dcf8a Mon Sep 17 00:00:00 2001 From: megascatterbomb Date: Mon, 27 Jan 2025 13:17:00 +1300 Subject: [PATCH 15/17] fix duplicate run in gh actions --- .github/workflows/integration.yaml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/.github/workflows/integration.yaml b/.github/workflows/integration.yaml index 6a0f879..df3d76c 100644 --- a/.github/workflows/integration.yaml +++ b/.github/workflows/integration.yaml @@ -48,8 +48,7 @@ jobs: - name: Create the bucket run: ./mc alias set blobs http://127.0.0.1:9000 MEGASCATTERBOMB masterbase - name: more minio bs - run: ./mc mb -p blobs/demoblobs - run: ./mc mb -p blobs/analysisblobs + run: ./mc mb -p blobs/demoblobs && ./mc mb -p blobs/analysisblobs - name: Remove mc client run: rm -v ./mc From eed26e4737c44bda362c322f17487ac9b1850bea Mon Sep 17 00:00:00 2001 From: megascatterbomb Date: Mon, 27 Jan 2025 13:19:45 +1300 Subject: [PATCH 16/17] linting 2 electric boogaloo --- masterbase/app.py | 10 ++++------ masterbase/lib.py | 7 +++---- 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/masterbase/app.py b/masterbase/app.py index 5bc9449..899c70f 100644 --- a/masterbase/app.py +++ b/masterbase/app.py @@ -2,6 +2,7 @@ import logging import os +import sys import time from datetime import datetime, timezone from hmac import compare_digest @@ -15,12 +16,6 @@ from litestar.response import Redirect, Response, Stream from litestar.status_codes import HTTP_500_INTERNAL_SERVER_ERROR from sqlalchemy.exc import IntegrityError -from pydantic import ValidationError - -import sys - -CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) -sys.path.append(os.path.dirname(CURRENT_DIR)) from masterbase.anomaly import DetectionState from masterbase.guards import ( @@ -63,6 +58,9 @@ from masterbase.registers import shutdown_registers, startup_registers from masterbase.steam import account_exists, is_limited_account +CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) +sys.path.append(os.path.dirname(CURRENT_DIR)) + logger = logging.getLogger(__name__) diff --git a/masterbase/lib.py b/masterbase/lib.py index 703af9d..6de7a22 100644 --- a/masterbase/lib.py +++ b/masterbase/lib.py @@ -18,7 +18,6 @@ from minio import Minio, S3Error from minio.datatypes import Object as BlobStat from pydantic import ValidationError -from pydantic_core import from_json from sqlalchemy import Engine from sqlalchemy.exc import NoResultFound from sqlalchemy.ext.asyncio import AsyncEngine @@ -341,7 +340,6 @@ def get_uningested_demos(engine: Engine, limit: int) -> list[str]: def ingest_demo(minio_client: Minio, engine: Engine, session_id: str): """Ingest a demo analysis from an analysis client.""" - blob_name = f"{session_id}.json" try: data = minio_client.get_object("jsonblobs", blob_name).read() @@ -353,7 +351,7 @@ def ingest_demo(minio_client: Minio, engine: Engine, session_id: str): return "no analysis data found." else: return "unknown S3 error while looking up analysis data." - except ValidationError as err: + except ValidationError: return "malformed analysis data." # Data preprocessing @@ -367,7 +365,8 @@ def ingest_demo(minio_client: Minio, engine: Engine, session_id: str): # ensure the demo session is not already ingested is_ingested_sql = "SELECT ingested, active, open FROM demo_sessions WHERE session_id = :session_id;" - # Wipe existing analysis data (we want to be able to reingest a demo if necessary by manually setting ingested = false) + # Wipe existing analysis data + # (we want to be able to reingest a demo if necessary by manually setting ingested = false) wipe_analysis_sql = "DELETE FROM analysis WHERE session_id = :session_id;" wipe_reviews_sql = "DELETE FROM reviews WHERE session_id = :session_id;" From 98ac34f18ed19096b6856b5e9e9b1b9569a65565 Mon Sep 17 00:00:00 2001 From: megascatterbomb Date: Mon, 27 Jan 2025 13:35:03 +1300 Subject: [PATCH 17/17] don't abuse the dynamic characteristics of a strong dynamic typed language --- masterbase/app.py | 2 +- masterbase/lib.py | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/masterbase/app.py b/masterbase/app.py index 899c70f..5187148 100644 --- a/masterbase/app.py +++ b/masterbase/app.py @@ -204,7 +204,7 @@ def db_export(request: Request, api_key: str, table: ExportTable) -> Stream: @get("/jobs", guards=[valid_key_guard, analyst_guard], sync_to_thread=False) -def jobs(request: Request, api_key: str, limit: int = 1) -> list[dict[str, str]]: +def jobs(request: Request, api_key: str, limit: int = 1) -> list[str]: """Return a list of demos that need analysis.""" engine = request.app.state.engine demos = get_uningested_demos(engine, limit) diff --git a/masterbase/lib.py b/masterbase/lib.py index 6de7a22..386fe65 100644 --- a/masterbase/lib.py +++ b/masterbase/lib.py @@ -342,10 +342,10 @@ def ingest_demo(minio_client: Minio, engine: Engine, session_id: str): """Ingest a demo analysis from an analysis client.""" blob_name = f"{session_id}.json" try: - data = minio_client.get_object("jsonblobs", blob_name).read() - data = data.decode("utf-8") - data = json.JSONDecoder().decode(data) - data = Analysis.parse_obj(data) + raw_data = minio_client.get_object("jsonblobs", blob_name).read() + decoded_data = raw_data.decode("utf-8") + json_data = json.JSONDecoder().decode(decoded_data) + data = Analysis.parse_obj(json_data) except S3Error as err: if err.code == "NoSuchKey": return "no analysis data found." @@ -385,12 +385,12 @@ def ingest_demo(minio_client: Minio, engine: Engine, session_id: str): with engine.connect() as conn: with conn.begin(): - result = conn.execute( + command = conn.execute( sa.text(is_ingested_sql), {"session_id": session_id}, ) - result = result.one_or_none() + result = command.one_or_none() if result is None: conn.rollback() return "demo not found"