From 15d920019da5ffe74bfafddf2c3c6d13d47e0734 Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Wed, 10 Sep 2025 21:36:29 +0300 Subject: [PATCH] allow additional health checker coroutine for faststream --- docs/introduction/configuration.md | 6 ++++++ .../bootstrappers/faststream_bootstrapper.py | 8 +++++++- tests/test_fastapi_offline_docs.py | 2 +- tests/test_faststream_bootstrap.py | 20 ++++++++++++++++++- 4 files changed, 33 insertions(+), 3 deletions(-) diff --git a/docs/introduction/configuration.md b/docs/introduction/configuration.md index 7cca083..608557e 100644 --- a/docs/introduction/configuration.md +++ b/docs/introduction/configuration.md @@ -126,3 +126,9 @@ Additional params: - `health_checks_path` - `health_checks_include_in_schema` + +### Health checks FastStream + +Additional params: + +- `health_checks_additional_checker` - additional coroutine to check service health diff --git a/lite_bootstrap/bootstrappers/faststream_bootstrapper.py b/lite_bootstrap/bootstrappers/faststream_bootstrapper.py index 568f5cb..a815f0e 100644 --- a/lite_bootstrap/bootstrappers/faststream_bootstrapper.py +++ b/lite_bootstrap/bootstrappers/faststream_bootstrapper.py @@ -56,6 +56,7 @@ class FastStreamConfig(HealthChecksConfig, LoggingConfig, OpentelemetryConfig, P broker: typing.Optional["BrokerUsecase[typing.Any, typing.Any]"] = None opentelemetry_middleware_cls: type[FastStreamTelemetryMiddlewareProtocol] | None = None prometheus_middleware_cls: type[FastStreamPrometheusMiddlewareProtocol] | None = None + health_checks_additional_checker: typing.Callable[[], typing.Coroutine[bool, typing.Any, typing.Any]] | None = None @dataclasses.dataclass(kw_only=True, slots=True, frozen=True) @@ -79,7 +80,12 @@ async def _define_health_status(self) -> bool: if not self.bootstrap_config.application or not self.bootstrap_config.application.broker: return False - return await self.bootstrap_config.application.broker.ping(timeout=5) + additional_check = ( + await self.bootstrap_config.health_checks_additional_checker() + if self.bootstrap_config.health_checks_additional_checker + else True + ) + return additional_check and await self.bootstrap_config.application.broker.ping(timeout=5) @dataclasses.dataclass(kw_only=True, frozen=True) diff --git a/tests/test_fastapi_offline_docs.py b/tests/test_fastapi_offline_docs.py index 5724425..9b449e2 100644 --- a/tests/test_fastapi_offline_docs.py +++ b/tests/test_fastapi_offline_docs.py @@ -46,5 +46,5 @@ def test_fastapi_offline_docs_root_path() -> None: def test_fastapi_offline_docs_raises_without_openapi_url() -> None: app = FastAPI(openapi_url=None) - with pytest.raises(RuntimeError, match="No app.openapi_url specified"): + with pytest.raises(RuntimeError, match=r"No app.openapi_url specified"): enable_offline_docs(app, static_path="/static") diff --git a/tests/test_faststream_bootstrap.py b/tests/test_faststream_bootstrap.py index b15747c..0c49730 100644 --- a/tests/test_faststream_bootstrap.py +++ b/tests/test_faststream_bootstrap.py @@ -21,7 +21,10 @@ def broker() -> RedisBroker: return RedisBroker() -def build_faststream_config(broker: BrokerUsecase[typing.Any, typing.Any] | None = None) -> FastStreamConfig: +def build_faststream_config( + broker: BrokerUsecase[typing.Any, typing.Any] | None = None, + health_checks_additional_checker: typing.Callable[[], typing.Coroutine[bool, typing.Any, typing.Any]] | None = None, +) -> FastStreamConfig: return FastStreamConfig( service_name="microservice", service_version="2.0.0", @@ -37,6 +40,7 @@ def build_faststream_config(broker: BrokerUsecase[typing.Any, typing.Any] | None health_checks_path="/custom-health/", logging_buffer_capacity=0, broker=broker, + health_checks_additional_checker=health_checks_additional_checker, ) @@ -74,6 +78,20 @@ async def test_faststream_bootstrap_health_check_wo_broker() -> None: assert response.text == "Service is unhealthy" +async def test_faststream_bootstrap_additional_health_checker(broker: RedisBroker) -> None: + async def custom_checker() -> bool: + return False + + bootstrap_config = build_faststream_config(broker=broker, health_checks_additional_checker=custom_checker) + bootstrapper = FastStreamBootstrapper(bootstrap_config=bootstrap_config) + application = bootstrapper.bootstrap() + test_client = TestClient(app=application) + + response = test_client.get(bootstrap_config.health_checks_path) + assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR + assert response.text == "Service is unhealthy" + + def test_faststream_bootstrapper_not_ready() -> None: with emulate_package_missing("faststream"), pytest.raises(RuntimeError, match="faststream is not installed"): FastStreamBootstrapper(bootstrap_config=FastStreamConfig())