From af5161d999106260ee59bbbd46c24f79ac7d2e74 Mon Sep 17 00:00:00 2001 From: John Ewart Date: Wed, 3 Dec 2025 12:11:43 -0800 Subject: [PATCH 01/13] Add HTTP health check to celery workers; default is port 9001 but configurable --- docker-compose.yml | 51 +++++------------------------ requirements.txt | 3 +- src/fides/api/tasks/__init__.py | 4 +++ src/fides/config/celery_settings.py | 6 ++++ 4 files changed, 21 insertions(+), 43 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index bbb3718fecb..8190217ef46 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -135,6 +135,9 @@ services: FIDES__CONFIG_PATH: ${FIDES__CONFIG_PATH:-/fides/.fides/fides.toml} FIDES__LOGGING__COLORIZE: "True" FIDES__USER__ANALYTICS_OPT_OUT: "True" + FIDES__CELERY__HEALTHCHECK_PORT: "9001" + expose: + - 9001 volumes: - type: bind source: ./ @@ -143,52 +146,16 @@ services: - /fides/src/fides.egg-info worker-privacy-preferences: - image: ethyca/fides:local + extends: + service: worker-other command: fides worker --queues=fides.privacy_preferences,fides.privacy_request_exports,fides.privacy_request_ingestion - depends_on: - redis: - condition: service_started - restart: always - healthcheck: - test: ["CMD", "celery", "-A", "fides.api.tasks", "inspect", "ping"] - start_period: 60s - interval: 20s - timeout: 5s - retries: 10 - environment: - FIDES__CONFIG_PATH: ${FIDES__CONFIG_PATH:-/fides/.fides/fides.toml} - FIDES__LOGGING__COLORIZE: "True" - FIDES__USER__ANALYTICS_OPT_OUT: "True" - volumes: - - type: bind - source: ./ - target: /fides - read_only: False - - /fides/src/fides.egg-info worker-dsr: - image: ethyca/fides:local - command: fides worker --queues=fides.dsr - depends_on: - redis: - condition: service_started - restart: always + extends: + service: worker-other healthcheck: - test: ["CMD", "celery", "-A", "fides.api.tasks", "inspect", "ping"] - start_period: 60s - interval: 20s - timeout: 5s - retries: 10 - environment: - FIDES__CONFIG_PATH: ${FIDES__CONFIG_PATH:-/fides/.fides/fides.toml} - FIDES__LOGGING__COLORIZE: "True" - FIDES__USER__ANALYTICS_OPT_OUT: "True" - volumes: - - type: bind - source: ./ - target: /fides - read_only: False - - /fides/src/fides.egg-info + test: [ "CMD", "curl", "-f", "http://localhost:9001/"] + command: fides worker --queues=fides.dsr redis: image: "redis:8.0-alpine" diff --git a/requirements.txt b/requirements.txt index 9a634e40946..9f1e878dd62 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,6 +7,7 @@ asyncpg==0.27.0 boto3==1.26.1 certifi==2024.8.30 celery[pytest]==5.5.3 +celery-healthcheck==0.2.0 click==8.1.8 click_default_group==1.2.2 cloud-sql-python-connector==1.9.2 @@ -18,7 +19,7 @@ objgraph==3.6.0 defusedxml==0.7.1 types-defusedxml==0.7.0.20240218 expandvars==0.9.0 -fastapi[all]==0.115.2 +fastapi[all]==0.115.12 fastapi-pagination[sqlalchemy]==0.12.25 fideslog==1.2.14 firebase-admin==5.3.0 diff --git a/src/fides/api/tasks/__init__.py b/src/fides/api/tasks/__init__.py index 63b992673da..bbf4293d47e 100644 --- a/src/fides/api/tasks/__init__.py +++ b/src/fides/api/tasks/__init__.py @@ -1,5 +1,6 @@ from typing import Any, ContextManager, Dict, List, Optional +import celery_healthcheck from celery import Celery, Task from celery.signals import setup_logging as celery_setup_logging from loguru import logger @@ -102,6 +103,7 @@ def _create_celery(config: FidesConfig = CONFIG) -> Celery: ) app = Celery(__name__) + celery_healthcheck.register(app) celery_config: Dict[str, Any] = { # Defaults for the celery config @@ -112,6 +114,8 @@ def _create_celery(config: FidesConfig = CONFIG) -> Celery: # Ops requires this to route emails to separate queues "task_create_missing_queues": True, "task_default_queue": "fides", + "healthcheck_port": config.celery.healthcheck_port, + "healthcheck_ping_timeout": config.celery.healthcheck_ping_timeout, } celery_config.update(config.celery) diff --git a/src/fides/config/celery_settings.py b/src/fides/config/celery_settings.py index ea185e862a4..841ee60dbf2 100644 --- a/src/fides/config/celery_settings.py +++ b/src/fides/config/celery_settings.py @@ -27,6 +27,12 @@ class CelerySettings(FidesSettings): description="If true, tasks are executed locally instead of being sent to the queue. " "If False, tasks are sent to the queue.", ) + healthcheck_port: int = Field( + default=9000, description="The port to use for the health check endpoint" + ) + healthcheck_ping_timeout: float = Field( + default=2.0, description="The timeout in seconds for the health check ping" + ) model_config = SettingsConfigDict(env_prefix=ENV_PREFIX) From 3f84e92d40928512c99847caf9560f520cebf02e Mon Sep 17 00:00:00 2001 From: John Ewart Date: Wed, 3 Dec 2025 17:04:24 -0800 Subject: [PATCH 02/13] Add celery-healthcheck to the codebase because upgrading the required dependencies is causing an issue and will have to be done to move to python 3.13, so we can remove it later --- requirements.txt | 3 +- src/fides/api/tasks/__init__.py | 2 +- .../api/tasks/celery_healthcheck/README.md | 4 ++ .../api/tasks/celery_healthcheck/__init__.py | 5 ++ .../api/tasks/celery_healthcheck/server.py | 68 +++++++++++++++++++ 5 files changed, 79 insertions(+), 3 deletions(-) create mode 100644 src/fides/api/tasks/celery_healthcheck/README.md create mode 100644 src/fides/api/tasks/celery_healthcheck/__init__.py create mode 100644 src/fides/api/tasks/celery_healthcheck/server.py diff --git a/requirements.txt b/requirements.txt index 9f1e878dd62..9a634e40946 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,7 +7,6 @@ asyncpg==0.27.0 boto3==1.26.1 certifi==2024.8.30 celery[pytest]==5.5.3 -celery-healthcheck==0.2.0 click==8.1.8 click_default_group==1.2.2 cloud-sql-python-connector==1.9.2 @@ -19,7 +18,7 @@ objgraph==3.6.0 defusedxml==0.7.1 types-defusedxml==0.7.0.20240218 expandvars==0.9.0 -fastapi[all]==0.115.12 +fastapi[all]==0.115.2 fastapi-pagination[sqlalchemy]==0.12.25 fideslog==1.2.14 firebase-admin==5.3.0 diff --git a/src/fides/api/tasks/__init__.py b/src/fides/api/tasks/__init__.py index bbf4293d47e..792967da302 100644 --- a/src/fides/api/tasks/__init__.py +++ b/src/fides/api/tasks/__init__.py @@ -1,6 +1,6 @@ from typing import Any, ContextManager, Dict, List, Optional -import celery_healthcheck +from fides.api.tasks import celery_healthcheck from celery import Celery, Task from celery.signals import setup_logging as celery_setup_logging from loguru import logger diff --git a/src/fides/api/tasks/celery_healthcheck/README.md b/src/fides/api/tasks/celery_healthcheck/README.md new file mode 100644 index 00000000000..659ae7421b9 --- /dev/null +++ b/src/fides/api/tasks/celery_healthcheck/README.md @@ -0,0 +1,4 @@ +This is a copy of celery-healthcheck - it's added here manually because our current +pinned dependencies are not compatible with the package's pinned dependencies (but +they do work with the code). Once we upgrade fastapi and a few other dependencies, +we can remove this (if we want - it's MIT licensed so this is reasonable). \ No newline at end of file diff --git a/src/fides/api/tasks/celery_healthcheck/__init__.py b/src/fides/api/tasks/celery_healthcheck/__init__.py new file mode 100644 index 00000000000..551381db1d6 --- /dev/null +++ b/src/fides/api/tasks/celery_healthcheck/__init__.py @@ -0,0 +1,5 @@ +from .server import HealthCheckServer + + +def register(celery_app): + celery_app.steps["worker"].add(HealthCheckServer) \ No newline at end of file diff --git a/src/fides/api/tasks/celery_healthcheck/server.py b/src/fides/api/tasks/celery_healthcheck/server.py new file mode 100644 index 00000000000..631bbe5c077 --- /dev/null +++ b/src/fides/api/tasks/celery_healthcheck/server.py @@ -0,0 +1,68 @@ +import logging +import threading + +import uvicorn +from celery import bootsteps +from celery.worker import WorkController +from fastapi import FastAPI, status +from fastapi.responses import JSONResponse + +app = FastAPI() +logger = logging.getLogger("celery.ext.healthcheck") + +HEALTHCHECK_DEFAULT_PORT = 9000 +HEALTHCHECK_DEFAULT_PING_TIMEOUT = 2.0 + + +class HealthCheckServer(bootsteps.StartStopStep): + def __init__(self, parent: WorkController, **kwargs): + self.thread = None + + # facilitates testing + self.app = app + + # config + self.healthcheck_port = int( + getattr(parent.app.conf, "healthcheck_port", HEALTHCHECK_DEFAULT_PORT) + ) + self.healthcheck_ping_timeout = float( + getattr( + parent.app.conf, + "healthcheck_ping_timeout", + HEALTHCHECK_DEFAULT_PING_TIMEOUT, + ) + ) + + def start(self, parent: WorkController): + @self.app.get("/") + async def celery_ping(): + insp = parent.app.control.inspect( + destination=[parent.hostname], timeout=self.healthcheck_ping_timeout + ) + result = insp.ping() + + if result: + return JSONResponse( + content={"status": "ok", "result": result}, + status_code=status.HTTP_200_OK, + ) + else: + return JSONResponse( + content={"status": "error", "result": result}, + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + ) + + def run_server(): + uvicorn.run( + self.app, + host="0.0.0.0", + port=self.healthcheck_port, + ) + + self.thread = threading.Thread(target=run_server, daemon=True) + self.thread.start() + + logger.info(f"Health check server started on port {self.healthcheck_port}") + + def stop(self, parent: WorkController): + pass \ No newline at end of file From 8da58301ff2d34d76a1a02cce1ee0a5b6f2a7f86 Mon Sep 17 00:00:00 2001 From: John Ewart Date: Wed, 3 Dec 2025 17:15:34 -0800 Subject: [PATCH 03/13] Sorry mypy, pylint and black --- src/fides/api/tasks/celery_healthcheck/__init__.py | 6 +++++- src/fides/api/tasks/celery_healthcheck/server.py | 6 +++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/src/fides/api/tasks/celery_healthcheck/__init__.py b/src/fides/api/tasks/celery_healthcheck/__init__.py index 551381db1d6..cdb1c0a4bdd 100644 --- a/src/fides/api/tasks/celery_healthcheck/__init__.py +++ b/src/fides/api/tasks/celery_healthcheck/__init__.py @@ -1,5 +1,9 @@ +# fmt: off +# type: ignore +# pylint: skip-file + from .server import HealthCheckServer def register(celery_app): - celery_app.steps["worker"].add(HealthCheckServer) \ No newline at end of file + celery_app.steps["worker"].add(HealthCheckServer) diff --git a/src/fides/api/tasks/celery_healthcheck/server.py b/src/fides/api/tasks/celery_healthcheck/server.py index 631bbe5c077..75c0e65d11f 100644 --- a/src/fides/api/tasks/celery_healthcheck/server.py +++ b/src/fides/api/tasks/celery_healthcheck/server.py @@ -1,3 +1,7 @@ +# fmt: off +# type: ignore +# pylint: skip-file + import logging import threading @@ -65,4 +69,4 @@ def run_server(): logger.info(f"Health check server started on port {self.healthcheck_port}") def stop(self, parent: WorkController): - pass \ No newline at end of file + pass From 765b06f30a2a380019d722910325957b5448c83f Mon Sep 17 00:00:00 2001 From: John Ewart Date: Wed, 3 Dec 2025 17:21:27 -0800 Subject: [PATCH 04/13] fix / disable isort --- src/fides/api/tasks/__init__.py | 4 ++-- src/fides/api/tasks/celery_healthcheck/__init__.py | 2 ++ src/fides/api/tasks/celery_healthcheck/server.py | 1 + 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/fides/api/tasks/__init__.py b/src/fides/api/tasks/__init__.py index 792967da302..07de8407a8f 100644 --- a/src/fides/api/tasks/__init__.py +++ b/src/fides/api/tasks/__init__.py @@ -1,6 +1,5 @@ from typing import Any, ContextManager, Dict, List, Optional -from fides.api.tasks import celery_healthcheck from celery import Celery, Task from celery.signals import setup_logging as celery_setup_logging from loguru import logger @@ -15,6 +14,7 @@ ) from fides.api.db.session import get_db_engine, get_db_session +from fides.api.tasks import celery_healthcheck from fides.api.util.logger import setup as setup_logging from fides.config import CONFIG, FidesConfig @@ -103,7 +103,7 @@ def _create_celery(config: FidesConfig = CONFIG) -> Celery: ) app = Celery(__name__) - celery_healthcheck.register(app) + celery_healthcheck.register(app) # type: ignore celery_config: Dict[str, Any] = { # Defaults for the celery config diff --git a/src/fides/api/tasks/celery_healthcheck/__init__.py b/src/fides/api/tasks/celery_healthcheck/__init__.py index cdb1c0a4bdd..b2c85f80985 100644 --- a/src/fides/api/tasks/celery_healthcheck/__init__.py +++ b/src/fides/api/tasks/celery_healthcheck/__init__.py @@ -1,6 +1,8 @@ # fmt: off # type: ignore # pylint: skip-file +# isort:off + from .server import HealthCheckServer diff --git a/src/fides/api/tasks/celery_healthcheck/server.py b/src/fides/api/tasks/celery_healthcheck/server.py index 75c0e65d11f..f1692e03634 100644 --- a/src/fides/api/tasks/celery_healthcheck/server.py +++ b/src/fides/api/tasks/celery_healthcheck/server.py @@ -1,6 +1,7 @@ # fmt: off # type: ignore # pylint: skip-file +# isort:off import logging import threading From bb8edb45344896b42c2f5633afa66689be528f16 Mon Sep 17 00:00:00 2001 From: John Ewart Date: Wed, 3 Dec 2025 17:25:16 -0800 Subject: [PATCH 05/13] fix black error --- src/fides/api/tasks/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/fides/api/tasks/__init__.py b/src/fides/api/tasks/__init__.py index 07de8407a8f..08b0fcf4cd9 100644 --- a/src/fides/api/tasks/__init__.py +++ b/src/fides/api/tasks/__init__.py @@ -103,7 +103,7 @@ def _create_celery(config: FidesConfig = CONFIG) -> Celery: ) app = Celery(__name__) - celery_healthcheck.register(app) # type: ignore + celery_healthcheck.register(app) # type: ignore celery_config: Dict[str, Any] = { # Defaults for the celery config From 17068543c563bd04133953b4a278fbeabd7f47f8 Mon Sep 17 00:00:00 2001 From: John Ewart Date: Mon, 8 Dec 2025 18:02:09 -0800 Subject: [PATCH 06/13] Remove uvicorn in favor of plain-old Python HTTP server, add some tests, create celery worker fixture that has a try/except to handle errors when shutting down. (This last one may not be needed with the HTTP server change) --- .../api/tasks/celery_healthcheck/README.md | 4 - .../api/tasks/celery_healthcheck/server.py | 107 +++++++++++------- tests/conftest.py | 51 ++++++++- tests/task/test_healthcheck_server.py | 36 ++++++ 4 files changed, 148 insertions(+), 50 deletions(-) delete mode 100644 src/fides/api/tasks/celery_healthcheck/README.md create mode 100644 tests/task/test_healthcheck_server.py diff --git a/src/fides/api/tasks/celery_healthcheck/README.md b/src/fides/api/tasks/celery_healthcheck/README.md deleted file mode 100644 index 659ae7421b9..00000000000 --- a/src/fides/api/tasks/celery_healthcheck/README.md +++ /dev/null @@ -1,4 +0,0 @@ -This is a copy of celery-healthcheck - it's added here manually because our current -pinned dependencies are not compatible with the package's pinned dependencies (but -they do work with the code). Once we upgrade fastapi and a few other dependencies, -we can remove this (if we want - it's MIT licensed so this is reasonable). \ No newline at end of file diff --git a/src/fides/api/tasks/celery_healthcheck/server.py b/src/fides/api/tasks/celery_healthcheck/server.py index f1692e03634..7640d261b22 100644 --- a/src/fides/api/tasks/celery_healthcheck/server.py +++ b/src/fides/api/tasks/celery_healthcheck/server.py @@ -1,31 +1,58 @@ -# fmt: off -# type: ignore -# pylint: skip-file -# isort:off - -import logging +import json import threading +from http.server import HTTPServer, SimpleHTTPRequestHandler -import uvicorn from celery import bootsteps from celery.worker import WorkController -from fastapi import FastAPI, status -from fastapi.responses import JSONResponse - -app = FastAPI() -logger = logging.getLogger("celery.ext.healthcheck") +from loguru import logger HEALTHCHECK_DEFAULT_PORT = 9000 HEALTHCHECK_DEFAULT_PING_TIMEOUT = 2.0 +DEFAULT_SHUTDOWN_TIMEOUT = 2.0 + + +class HealthcheckHandler(SimpleHTTPRequestHandler): + """HTTP request handler with additional properties and functions""" + + def __init__(self, parent: WorkController, healthcheck_ping_timeout: float, *args): + self.parent = parent + self.healthcheck_ping_timeout = healthcheck_ping_timeout + super().__init__(*args) + + def do_GET(self): + """Handle GET requests""" + # Do something + try: + try: + parent = self.parent + insp = parent.app.control.inspect( + destination=[parent.hostname], timeout=self.healthcheck_ping_timeout + ) + result = insp.ping() + + data = json.dumps({"status": "ok", "data": result}) + logger.info(f"Healthcheck ping result: {data}") + + self.send_response(200) + self.send_header("Content-type", "application/json") + self.end_headers() + self.wfile.write(bytes(data, "utf-8")) + except Exception as e: + logger.warning(f"Healthcheck ping exception: {e}") + response = {"status": "error", "data": str(e)} + self.send_response(503) + self.send_header("Content-type", "application/json") + self.end_headers() + self.wfile.write(bytes(json.dumps(response), "utf-8")) + except Exception as ex: + logger.exception("HealthcheckHandler exception", exc_info=ex) + self.send_response(500) class HealthCheckServer(bootsteps.StartStopStep): def __init__(self, parent: WorkController, **kwargs): self.thread = None - - # facilitates testing - self.app = app - + self.parent = parent # config self.healthcheck_port = int( getattr(parent.app.conf, "healthcheck_port", HEALTHCHECK_DEFAULT_PORT) @@ -37,37 +64,31 @@ def __init__(self, parent: WorkController, **kwargs): HEALTHCHECK_DEFAULT_PING_TIMEOUT, ) ) - - def start(self, parent: WorkController): - @self.app.get("/") - async def celery_ping(): - insp = parent.app.control.inspect( - destination=[parent.hostname], timeout=self.healthcheck_ping_timeout + self.shutdown_timeout = float( + getattr( + parent.app.conf, + "shutdown_timeout", + DEFAULT_SHUTDOWN_TIMEOUT, ) - result = insp.ping() + ) - if result: - return JSONResponse( - content={"status": "ok", "result": result}, - status_code=status.HTTP_200_OK, - ) - else: - return JSONResponse( - content={"status": "error", "result": result}, - status_code=status.HTTP_503_SERVICE_UNAVAILABLE, - ) + def http_handler(self, *args): + HealthcheckHandler(self.parent, self.healthcheck_ping_timeout, *args) - def run_server(): - uvicorn.run( - self.app, - host="0.0.0.0", - port=self.healthcheck_port, - ) + def start(self, parent: WorkController): + self.http_server = HTTPServer( + ("0.0.0.0", self.healthcheck_port), self.http_handler + ) - self.thread = threading.Thread(target=run_server, daemon=True) + self.thread = threading.Thread( + target=self.http_server.serve_forever, daemon=True + ) self.thread.start() - logger.info(f"Health check server started on port {self.healthcheck_port}") - def stop(self, parent: WorkController): - pass + logger.info( + f"Stopping health check server with a timeout of {self.shutdown_timeout} seconds" + ) + self.http_server.shutdown() + self.thread.join(self.shutdown_timeout) + logger.info(f"Health check server stopped on port {self.healthcheck_port}") diff --git a/tests/conftest.py b/tests/conftest.py index af74a23d167..92659a63967 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -59,7 +59,7 @@ from fides.api.schemas.messaging.messaging import MessagingServiceType from fides.api.schemas.privacy_request import PrivacyRequestStatus from fides.api.task.graph_runners import access_runner, consent_runner, erasure_runner -from fides.api.tasks import celery_app +from fides.api.tasks import celery_app, celery_healthcheck from fides.api.tasks.scheduled.scheduler import async_scheduler, scheduler from fides.api.util.cache import get_cache from fides.api.util.collection_util import Row @@ -765,6 +765,49 @@ def celery_enable_logging(): return True +# Register health check for workers +@pytest.fixture(scope="session") +def celery_session_app(celery_session_app): + celery_healthcheck.register(celery_session_app) + return celery_session_app + +# This is here because the test suite occasionally fails to teardown the +# Celery worker if it takes too long to terminate the worker thread. This +# will prevent that and, instead, log a warning +@pytest.fixture(scope="session") +def celery_session_worker( + request, + celery_session_app, + celery_includes, + celery_class_tasks, + celery_worker_pool, + celery_worker_parameters, +): + from celery.contrib.testing import worker + + for module in celery_includes: + celery_session_app.loader.import_task_module(module) + for class_task in celery_class_tasks: + celery_session_app.register_task(class_task) + + try: + + logger.info("Starting safe celery session worker...") + with worker.start_worker( + celery_session_app, + pool=celery_worker_pool, + shutdown_timeout=2.0, + **celery_worker_parameters, + ) as w: + try: + yield w + logger.info("Done with celery worker, trying to dispose of it..") + except RuntimeError: + logger.warning("Failed to dispose of the celery worker.") + except RuntimeError as re: + logger.warning("Failed to stop the celery worker: " + str(re)) + + @pytest.fixture(autouse=True, scope="session") def celery_use_virtual_worker(celery_session_worker): """ @@ -869,7 +912,8 @@ def access_runner_tester( connection_configs, identity, session, - privacy_request_proceed=False, # This allows the DSR 3.0 Access Runner to be tested in isolation, to just test running the access graph without queuing the privacy request + privacy_request_proceed=False, + # This allows the DSR 3.0 Access Runner to be tested in isolation, to just test running the access graph without queuing the privacy request ) except PrivacyRequestExit: # DSR 3.0 intentionally raises a PrivacyRequestExit status while it waits for @@ -927,7 +971,8 @@ def consent_runner_tester( connection_configs, identity, session, - privacy_request_proceed=False, # This allows the DSR 3.0 Consent Runner to be tested in isolation, to just test running the consent graph without queuing the privacy request + privacy_request_proceed=False, + # This allows the DSR 3.0 Consent Runner to be tested in isolation, to just test running the consent graph without queuing the privacy request ) except PrivacyRequestExit: # DSR 3.0 intentionally raises a PrivacyRequestExit status while it waits for diff --git a/tests/task/test_healthcheck_server.py b/tests/task/test_healthcheck_server.py new file mode 100644 index 00000000000..089d628b9a9 --- /dev/null +++ b/tests/task/test_healthcheck_server.py @@ -0,0 +1,36 @@ +import time + +import pytest +import requests +from loguru import logger + + +class TestCeleryHealthCheckServer: + def test_responds_to_ping_properly(self, celery_session_app, celery_session_worker): + try: + response = requests.get("http://127.0.0.1:9000/") + assert response.status_code == 200 + assert response.json()["status"] == "ok" + except requests.exceptions.ConnectionError: + pytest.fail("Connection error") + + +class TestCeleryHealthCheckWorker: + @pytest.fixture(autouse=True) + def setup_teardown(self): + yield + with pytest.raises(Exception): + requests.get("http://127.0.0.1:9000/", timeout=1) + + def test_shutdown_gracefully(self, celery_session_app, celery_session_worker): + try: + logger.info("Shutdown gracefully") + celery_session_worker.stop() + logger.info("Shutdown gracefully finished") + except Exception: + pytest.fail("Failed to stop health check server") + + + + + From 6af3d6d6135ebfd0a9579c61fc6cd7705f3f2d70 Mon Sep 17 00:00:00 2001 From: John Ewart Date: Tue, 9 Dec 2025 12:53:15 -0800 Subject: [PATCH 07/13] Comment out celery_worker_parameters fixture It's being applied in the worker fixture, commenting out to verify it fixes the tests before removing. --- tests/conftest.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index dcdda64a9b7..cb61cb9dd9c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -807,15 +807,15 @@ def celery_session_worker( except RuntimeError as re: logger.warning("Failed to stop the celery worker: " + str(re)) -@pytest.fixture(scope="session") -def celery_worker_parameters(): - """Configure celery worker parameters for testing. - - Increase shutdown_timeout to avoid flaky test failures when the worker - takes longer to shut down, especially during parallel test runs with pytest-xdist. - The CI environment can be slow, so we use a generous timeout. - """ - return {"shutdown_timeout": 180.0} +# @pytest.fixture(scope="session") +# def celery_worker_parameters(): +# """Configure celery worker parameters for testing. + +# Increase shutdown_timeout to avoid flaky test failures when the worker +# takes longer to shut down, especially during parallel test runs with pytest-xdist. +# The CI environment can be slow, so we use a generous timeout. +# """ +# return {"shutdown_timeout": 180.0} @pytest.fixture(autouse=True, scope="session") From 4153a687ce9b12e9d00c4c1a3fd5a302d524f8e7 Mon Sep 17 00:00:00 2001 From: John Ewart Date: Wed, 10 Dec 2025 23:06:40 -0800 Subject: [PATCH 08/13] Update tests/conftest.py Co-authored-by: Adrian Galvan --- tests/conftest.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index cb61cb9dd9c..b559fe4c9c5 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -807,15 +807,6 @@ def celery_session_worker( except RuntimeError as re: logger.warning("Failed to stop the celery worker: " + str(re)) -# @pytest.fixture(scope="session") -# def celery_worker_parameters(): -# """Configure celery worker parameters for testing. - -# Increase shutdown_timeout to avoid flaky test failures when the worker -# takes longer to shut down, especially during parallel test runs with pytest-xdist. -# The CI environment can be slow, so we use a generous timeout. -# """ -# return {"shutdown_timeout": 180.0} @pytest.fixture(autouse=True, scope="session") From 195868e54277652c9f9988af4bd17248291f2a18 Mon Sep 17 00:00:00 2001 From: John Ewart Date: Wed, 10 Dec 2025 23:10:00 -0800 Subject: [PATCH 09/13] Update src/fides/api/tasks/celery_healthcheck/server.py Co-authored-by: Adrian Galvan --- src/fides/api/tasks/celery_healthcheck/server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/fides/api/tasks/celery_healthcheck/server.py b/src/fides/api/tasks/celery_healthcheck/server.py index 7640d261b22..59dc9001a60 100644 --- a/src/fides/api/tasks/celery_healthcheck/server.py +++ b/src/fides/api/tasks/celery_healthcheck/server.py @@ -31,7 +31,7 @@ def do_GET(self): result = insp.ping() data = json.dumps({"status": "ok", "data": result}) - logger.info(f"Healthcheck ping result: {data}") + logger.debug(f"Healthcheck ping result: {data}") self.send_response(200) self.send_header("Content-type", "application/json") From 8da1f48e2414691a5f5bf3bf0e33a74c3cf96ab2 Mon Sep 17 00:00:00 2001 From: John Ewart Date: Wed, 10 Dec 2025 23:10:15 -0800 Subject: [PATCH 10/13] Update tests/task/test_healthcheck_server.py Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com> --- tests/task/test_healthcheck_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/task/test_healthcheck_server.py b/tests/task/test_healthcheck_server.py index 089d628b9a9..171ffba9680 100644 --- a/tests/task/test_healthcheck_server.py +++ b/tests/task/test_healthcheck_server.py @@ -1,4 +1,4 @@ -import time +import pytest import pytest import requests From b6a4eee089e1bde21aee75c48cf5e20505bb8766 Mon Sep 17 00:00:00 2001 From: John Ewart Date: Wed, 10 Dec 2025 23:10:31 -0800 Subject: [PATCH 11/13] Update src/fides/api/tasks/celery_healthcheck/server.py Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com> --- src/fides/api/tasks/celery_healthcheck/server.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/fides/api/tasks/celery_healthcheck/server.py b/src/fides/api/tasks/celery_healthcheck/server.py index 59dc9001a60..08c47c859c6 100644 --- a/src/fides/api/tasks/celery_healthcheck/server.py +++ b/src/fides/api/tasks/celery_healthcheck/server.py @@ -21,7 +21,9 @@ def __init__(self, parent: WorkController, healthcheck_ping_timeout: float, *arg def do_GET(self): """Handle GET requests""" - # Do something + def do_GET(self): + """Handle GET requests""" + try: try: try: parent = self.parent From 073e0b8adc20fe5012a137c5684ef2a8c2259fa7 Mon Sep 17 00:00:00 2001 From: John Ewart Date: Thu, 11 Dec 2025 13:33:31 -0800 Subject: [PATCH 12/13] Clean up some variable names, add typing / linting fixes --- .../api/tasks/celery_healthcheck/server.py | 56 +++++++++++++------ 1 file changed, 38 insertions(+), 18 deletions(-) diff --git a/src/fides/api/tasks/celery_healthcheck/server.py b/src/fides/api/tasks/celery_healthcheck/server.py index 08c47c859c6..c21241201f8 100644 --- a/src/fides/api/tasks/celery_healthcheck/server.py +++ b/src/fides/api/tasks/celery_healthcheck/server.py @@ -1,6 +1,7 @@ import json import threading from http.server import HTTPServer, SimpleHTTPRequestHandler +from typing import Any, Optional from celery import bootsteps from celery.worker import WorkController @@ -8,22 +9,21 @@ HEALTHCHECK_DEFAULT_PORT = 9000 HEALTHCHECK_DEFAULT_PING_TIMEOUT = 2.0 -DEFAULT_SHUTDOWN_TIMEOUT = 2.0 +HEALTHCHECK_DEFAULT_HTTP_SERVER_SHUTDOWN_TIMEOUT = 2.0 class HealthcheckHandler(SimpleHTTPRequestHandler): """HTTP request handler with additional properties and functions""" - def __init__(self, parent: WorkController, healthcheck_ping_timeout: float, *args): + def __init__( + self, parent: WorkController, healthcheck_ping_timeout: float, *args: Any + ): self.parent = parent self.healthcheck_ping_timeout = healthcheck_ping_timeout super().__init__(*args) - def do_GET(self): + def do_GET(self) -> None: """Handle GET requests""" - def do_GET(self): - """Handle GET requests""" - try: try: try: parent = self.parent @@ -52,9 +52,13 @@ def do_GET(self): class HealthCheckServer(bootsteps.StartStopStep): - def __init__(self, parent: WorkController, **kwargs): - self.thread = None + # ignore kwargs type + def __init__(self, parent: WorkController, **kwargs): # type: ignore [arg-type, no-untyped-def] + self.thread: Optional[threading.Thread] = None + self.http_server: Optional[HTTPServer] = None + self.parent = parent + # config self.healthcheck_port = int( getattr(parent.app.conf, "healthcheck_port", HEALTHCHECK_DEFAULT_PORT) @@ -70,16 +74,21 @@ def __init__(self, parent: WorkController, **kwargs): getattr( parent.app.conf, "shutdown_timeout", - DEFAULT_SHUTDOWN_TIMEOUT, + HEALTHCHECK_DEFAULT_HTTP_SERVER_SHUTDOWN_TIMEOUT, ) ) - def http_handler(self, *args): + super().__init__(**kwargs) + + # The mypy hints for an HTTP handler are strange, so ignoring them here + def http_handler(self, *args) -> None: # type: ignore [arg-type, no-untyped-def] HealthcheckHandler(self.parent, self.healthcheck_ping_timeout, *args) - def start(self, parent: WorkController): + def start(self, parent: WorkController) -> None: + # Ignore mypy hints here as the constructed object immediately handles the request + # (if you look in the source code for SimpleHTTPRequestHandler, specifically the finalize request method) self.http_server = HTTPServer( - ("0.0.0.0", self.healthcheck_port), self.http_handler + ("0.0.0.0", self.healthcheck_port), self.http_handler # type: ignore [arg-type] ) self.thread = threading.Thread( @@ -87,10 +96,21 @@ def start(self, parent: WorkController): ) self.thread.start() - def stop(self, parent: WorkController): - logger.info( - f"Stopping health check server with a timeout of {self.shutdown_timeout} seconds" - ) - self.http_server.shutdown() - self.thread.join(self.shutdown_timeout) + def stop(self, parent: WorkController) -> None: + if self.http_server is None: + logger.warning( + "Requested stop of HTTP healthcheck server, but no server was started" + ) + else: + logger.info( + f"Stopping health check server with a timeout of {self.shutdown_timeout} seconds" + ) + self.http_server.shutdown() + + # Really this should not happen if the HTTP server is None, but just in case, we should check. + if self.thread is None: + logger.warning("No thread in HTTP healthcheck server to shutdown...") + else: + self.thread.join(self.shutdown_timeout) + logger.info(f"Health check server stopped on port {self.healthcheck_port}") From b2648c6d22eecc91b000adff57d20c2edb9210c1 Mon Sep 17 00:00:00 2001 From: John Ewart Date: Thu, 11 Dec 2025 13:34:40 -0800 Subject: [PATCH 13/13] Add note for port override --- docker-compose.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docker-compose.yml b/docker-compose.yml index 8190217ef46..48de8683c0b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -135,6 +135,8 @@ services: FIDES__CONFIG_PATH: ${FIDES__CONFIG_PATH:-/fides/.fides/fides.toml} FIDES__LOGGING__COLORIZE: "True" FIDES__USER__ANALYTICS_OPT_OUT: "True" + # The default HTTP health check port is 9000, override it here to ensure that + # the override works as expected. FIDES__CELERY__HEALTHCHECK_PORT: "9001" expose: - 9001