From 2a4090c9340a779062ba686859c6ed0f789ca48b Mon Sep 17 00:00:00 2001 From: megascatterbomb Date: Sat, 1 Feb 2025 17:16:41 +1300 Subject: [PATCH 1/3] late_bytes can be handled by POST /close_session --- masterbase/app.py | 32 +++++++++++++++++++++++++++----- masterbase/guards.py | 6 +++--- masterbase/lib.py | 6 +++++- 3 files changed, 35 insertions(+), 9 deletions(-) diff --git a/masterbase/app.py b/masterbase/app.py index afc8222..f292ebd 100644 --- a/masterbase/app.py +++ b/masterbase/app.py @@ -77,7 +77,7 @@ def landing() -> Redirect: return Redirect(path="https://github.com/MegaAntiCheat/client-backend") -@get("/session_id", guards=[valid_key_guard, user_in_session_guard, valid_session_guard], sync_to_thread=False) +@get("/session_id", guards=[valid_key_guard, user_not_in_session_guard, valid_session_guard], sync_to_thread=False) def session_id( request: Request, api_key: str, @@ -110,7 +110,7 @@ def session_id( return {"session_id": _session_id} -@get("/close_session", guards=[valid_key_guard, user_not_in_session_guard], sync_to_thread=False) +@get("/close_session", guards=[valid_key_guard, user_in_session_guard], sync_to_thread=False) def close_session(request: Request, api_key: str) -> dict[str, bool]: """Close a session out. Will find the latest open session for a user. @@ -119,15 +119,32 @@ def close_session(request: Request, api_key: str) -> dict[str, bool]: """ minio_client = request.app.state.minio_client engine = request.app.state.engine + steam_id = steam_id_from_api_key(engine, api_key) + + msg = close_session_helper(minio_client, engine, steam_id, streaming_sessions, None) + logger.info(msg) + + return {"closed_successfully": True} + +@post("/close_session", guards=[valid_key_guard, user_in_session_guard], sync_to_thread=False) +def close_with_late_bytes(request: Request, api_key: str, data: LateBytesBody) -> dict[str, bool]: + """Close a session out. Will find the latest open session for a user. + Returns: + {"closed_successfully": True} + """ + minio_client = request.app.state.minio_client + engine = request.app.state.engine + late_bytes = bytes.fromhex(data.late_bytes) steam_id = steam_id_from_api_key(engine, api_key) - msg = close_session_helper(minio_client, engine, steam_id, streaming_sessions) + + msg = close_session_helper(minio_client, engine, steam_id, streaming_sessions, late_bytes) logger.info(msg) return {"closed_successfully": True} -@post("/late_bytes", guards=[valid_key_guard, user_not_in_session_guard], sync_to_thread=False) +@post("/late_bytes", guards=[valid_key_guard, user_in_session_guard], sync_to_thread=False) def late_bytes(request: Request, api_key: str, data: LateBytesBody) -> dict[str, bool]: """Add late bytes to a closed demo session. @@ -281,7 +298,11 @@ async def on_accept(self, socket: WebSocket, api_key: str, session_id: str) -> N async def on_disconnect(self, socket: WebSocket) -> None: # type: ignore """Close handle on disconnect.""" - session_manager = streaming_sessions[socket] + if socket in streaming_sessions: + session_manager = streaming_sessions[socket] + else : + logger.warning("Attempting to disconnect from already disconnected socket!") + return logger.info(f"Received socket disconnect from session ID: {session_manager.session_id}") session_manager.disconnect() await set_open_false(socket.app.state.async_engine, session_manager.session_id) @@ -440,6 +461,7 @@ def plain_text_exception_handler(_: Request, exception: Exception) -> Response: landing, session_id, close_session, + close_with_late_bytes, DemoHandler, provision, provision_handler, diff --git a/masterbase/guards.py b/masterbase/guards.py index af83129..6783d5d 100644 --- a/masterbase/guards.py +++ b/masterbase/guards.py @@ -44,7 +44,7 @@ async def analyst_guard(connection: ASGIConnection, _: BaseRouteHandler) -> None raise NotAuthorizedException() -async def user_in_session_guard(connection: ASGIConnection, _: BaseRouteHandler) -> None: +async def user_not_in_session_guard(connection: ASGIConnection, _: BaseRouteHandler) -> None: """Assert that the user is not currently in a session.""" async_engine = connection.app.state.async_engine @@ -59,8 +59,8 @@ async def user_in_session_guard(connection: ASGIConnection, _: BaseRouteHandler) ) -async def user_not_in_session_guard(connection: ASGIConnection, _: BaseRouteHandler) -> None: - """Assert that the user is not currently in a session.""" +async def user_in_session_guard(connection: ASGIConnection, _: BaseRouteHandler) -> None: + """Assert that the user is currently in a session.""" async_engine = connection.app.state.async_engine api_key = connection.query_params["api_key"] diff --git a/masterbase/lib.py b/masterbase/lib.py index 386fe65..5b4e37d 100644 --- a/masterbase/lib.py +++ b/masterbase/lib.py @@ -587,7 +587,7 @@ def _close_session_with_demo( def close_session_helper( - minio_client: Minio, engine: Engine, steam_id: str, streaming_sessions: SocketManagerMapType + minio_client: Minio, engine: Engine, steam_id: str, streaming_sessions: SocketManagerMapType, late_bytes: bytes | None ) -> str: """Properly close a session and return a summary message. @@ -618,6 +618,10 @@ def close_session_helper( msg = "No active session found, closing anyway." else: if os.path.exists(session_manager.demo_path): + if late_bytes is not None: + late_bytes_msg = late_bytes_helper(engine, steam_id, late_bytes, current_time) + if late_bytes_msg is not None: + return late_bytes_msg _close_session_with_demo( minio_client, engine, From 4109519eeb8416abf96934036bedb7780ab786b7 Mon Sep 17 00:00:00 2001 From: megascatterbomb Date: Tue, 11 Feb 2025 20:36:47 +1300 Subject: [PATCH 2/3] broadcast support --- masterbase/app.py | 7 ++++ masterbase/lib.py | 11 +++++ .../versions/f51cab87d3fd_broadcasts.py | 41 +++++++++++++++++++ 3 files changed, 59 insertions(+) create mode 100644 migrations/versions/f51cab87d3fd_broadcasts.py diff --git a/masterbase/app.py b/masterbase/app.py index f292ebd..2ef6dc6 100644 --- a/masterbase/app.py +++ b/masterbase/app.py @@ -48,6 +48,7 @@ demo_blob_name, generate_api_key, generate_uuid4_int, + get_broadcasts, get_uningested_demos, ingest_demo, late_bytes_helper, @@ -256,6 +257,11 @@ async def report_player(request: Request, api_key: str, data: ReportBody) -> dic except IntegrityError: raise HTTPException(detail=f"Unknown session ID {data.session_id}", status_code=402) +@get("/broadcasts", sync_to_thread=False) +def broadcasts(request: Request) -> list[dict[str, str]]: + """Return a list of broadcasts.""" + engine = request.app.state.engine + return get_broadcasts(engine) class DemoHandler(WebsocketListener): """Custom Websocket Class.""" @@ -472,6 +478,7 @@ def plain_text_exception_handler(_: Request, exception: Exception) -> Response: report_player, db_export, jobs, + broadcasts, ingest, ], exception_handlers={Exception: plain_text_exception_handler}, diff --git a/masterbase/lib.py b/masterbase/lib.py index 5b4e37d..48d77c8 100644 --- a/masterbase/lib.py +++ b/masterbase/lib.py @@ -857,3 +857,14 @@ def check_is_loser(engine: Engine, steam_id: str) -> bool: ).scalar_one_or_none() return bool(result) + +def get_broadcasts(engine: Engine) -> list[dict[str, str]]: + """Get the list of broadcasts""" + with engine.connect() as conn: + result = conn.execute( + sa.text("SELECT * FROM broadcasts") + ) + rows = [row._asdict() for row in result.all()] + for row in rows: + row["post_date"] = row.pop("created_at") + return rows \ No newline at end of file diff --git a/migrations/versions/f51cab87d3fd_broadcasts.py b/migrations/versions/f51cab87d3fd_broadcasts.py new file mode 100644 index 0000000..1f5e611 --- /dev/null +++ b/migrations/versions/f51cab87d3fd_broadcasts.py @@ -0,0 +1,41 @@ +"""broadcasts + +Revision ID: f51cab87d3fd +Revises: b941ebee3091 +Create Date: 2025-02-10 17:03:28.325372 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = 'f51cab87d3fd' +down_revision: Union[str, None] = 'b941ebee3091' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Add Broadcasts table""" + op.execute( + """ + CREATE TABLE broadcasts ( + message varchar, + importance varchar, + created_at timestamptz, + PRIMARY KEY (message) + ); + """ + ) + + +def downgrade() -> None: + """Delete broadcasts table""" + op.execute( + """ + DROP TABLE broadcasts; + """ + ) \ No newline at end of file From 70ab864eb3a20df8ece01ead35f0f081f5529a68 Mon Sep 17 00:00:00 2001 From: megascatterbomb Date: Wed, 12 Feb 2025 14:55:21 +1300 Subject: [PATCH 3/3] lint --- masterbase/lib.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/masterbase/lib.py b/masterbase/lib.py index 48d77c8..4995b3c 100644 --- a/masterbase/lib.py +++ b/masterbase/lib.py @@ -587,7 +587,12 @@ def _close_session_with_demo( def close_session_helper( - minio_client: Minio, engine: Engine, steam_id: str, streaming_sessions: SocketManagerMapType, late_bytes: bytes | None + minio_client: Minio, + engine: Engine, + steam_id: str, + streaming_sessions: + SocketManagerMapType, + late_bytes: bytes | None ) -> str: """Properly close a session and return a summary message. @@ -859,7 +864,7 @@ def check_is_loser(engine: Engine, steam_id: str) -> bool: return bool(result) def get_broadcasts(engine: Engine) -> list[dict[str, str]]: - """Get the list of broadcasts""" + """Get the list of broadcasts.""" with engine.connect() as conn: result = conn.execute( sa.text("SELECT * FROM broadcasts") @@ -867,4 +872,4 @@ def get_broadcasts(engine: Engine) -> list[dict[str, str]]: rows = [row._asdict() for row in result.all()] for row in rows: row["post_date"] = row.pop("created_at") - return rows \ No newline at end of file + return rows