From 9c05c9db2384ea8e7174b28eab3fc29cdb2b290a Mon Sep 17 00:00:00 2001 From: Kabir Khan Date: Tue, 10 Feb 2026 13:23:23 +0000 Subject: [PATCH] fix: Eliminate race condition in testNonBlockingWithMultipleMessages Addresses intermittent test failure where subscribeToTask created child queues but EventConsumer polling loops hadn't started yet when the agent executed, causing emitted events to be lost. Root Cause: - awaitStreamingSubscription() only guarantees transport-level subscription (Flow.Subscriber.onSubscribe() called) - EventConsumer polling starts asynchronously on eventConsumerExecutor thread - Agent execution could begin before EventConsumer was actively polling - Events emitted before polling started were lost in the queue Solution: - Added awaitChildQueueCountStable() to TestUtilsBean - Waits for child queue count to match expected value for 3 consecutive checks (150ms total), ensuring EventConsumer is actively polling - Follows pattern from commit 18d2abff which fixed similar race condition - Exposed REST endpoints in all transports (JSON-RPC, gRPC, REST) - Added client helper in AbstractA2AServerTest - Applied stability check after subscribeToTask() in tests - Increased timeouts from 10s to 15s for CI stability Files Modified: - TestUtilsBean.java: Core synchronization logic with comprehensive Javadoc - A2ATestRoutes.java (JSON-RPC, REST): REST endpoints for stability checks - A2ATestResource.java (gRPC): REST endpoints for stability checks - AbstractA2AServerTest.java: Client helper and extensive inline comments --- .../server/grpc/quarkus/A2ATestResource.java | 36 ++++++++++++++ .../server/apps/quarkus/A2ATestRoutes.java | 24 +++++++++ .../server/rest/quarkus/A2ATestRoutes.java | 24 +++++++++ .../apps/common/AbstractA2AServerTest.java | 45 ++++++++++++++++- .../a2a/server/apps/common/TestUtilsBean.java | 49 +++++++++++++++++++ 5 files changed, 176 insertions(+), 2 deletions(-) diff --git a/reference/grpc/src/test/java/io/a2a/server/grpc/quarkus/A2ATestResource.java b/reference/grpc/src/test/java/io/a2a/server/grpc/quarkus/A2ATestResource.java index 4136eb70b..fc57c705d 100644 --- a/reference/grpc/src/test/java/io/a2a/server/grpc/quarkus/A2ATestResource.java +++ b/reference/grpc/src/test/java/io/a2a/server/grpc/quarkus/A2ATestResource.java @@ -137,4 +137,40 @@ public Response savePushNotificationConfigInStore(@PathParam("taskId") String ta testUtilsBean.saveTaskPushNotificationConfig(taskId, notificationConfig); return Response.ok().build(); } + + /** + * REST endpoint to wait for queue poller to start. + * Waits for the EventConsumer polling loop to start for the specified task's queue, + * ensuring the queue is ready to receive and process events. + * + * @param taskId the task ID whose queue poller to wait for + * @return HTTP 200 response when poller has started + * @throws InterruptedException if interrupted while waiting + */ + @POST + @Path("/queue/awaitPollerStart/{taskId}") + public Response awaitQueuePollerStart(@PathParam("taskId") String taskId) throws InterruptedException { + testUtilsBean.awaitQueuePollerStart(taskId); + return Response.ok().build(); + } + + /** + * REST endpoint to wait for child queue count to stabilize. + * Waits for the specified task's child queue count to match expectedCount for 3 consecutive + * checks (150ms total), ensuring EventConsumer polling loops have started. + * + * @param taskId the task ID whose child queues to monitor + * @param expectedCount the expected number of active child queues + * @param timeoutMs maximum time to wait in milliseconds + * @return HTTP 200 response with "true" if count stabilized, "false" if timeout occurred + * @throws InterruptedException if interrupted while waiting + */ + @POST + @Path("/queue/awaitChildCountStable/{taskId}/{expectedCount}/{timeoutMs}") + public Response awaitChildQueueCountStable(@PathParam("taskId") String taskId, + @PathParam("expectedCount") int expectedCount, + @PathParam("timeoutMs") long timeoutMs) throws InterruptedException { + boolean stable = testUtilsBean.awaitChildQueueCountStable(taskId, expectedCount, timeoutMs); + return Response.ok(String.valueOf(stable), TEXT_PLAIN).build(); + } } diff --git a/reference/jsonrpc/src/test/java/io/a2a/server/apps/quarkus/A2ATestRoutes.java b/reference/jsonrpc/src/test/java/io/a2a/server/apps/quarkus/A2ATestRoutes.java index 12eae5a4d..937c8a2b1 100644 --- a/reference/jsonrpc/src/test/java/io/a2a/server/apps/quarkus/A2ATestRoutes.java +++ b/reference/jsonrpc/src/test/java/io/a2a/server/apps/quarkus/A2ATestRoutes.java @@ -185,6 +185,30 @@ public void saveTaskPushNotificationConfig(@Param String taskId, @Body String bo } } + /** + * REST endpoint to wait for child queue count to stabilize. + * Waits for the specified task's child queue count to match expectedCount for 3 consecutive + * checks (150ms total), ensuring EventConsumer polling loops have started. + * + * @param taskId the task ID whose child queues to monitor + * @param expectedCountStr the expected number of active child queues (as string) + * @param timeoutMsStr maximum time to wait in milliseconds (as string) + * @param rc the Vert.x routing context + */ + @Route(path = "/test/queue/awaitChildCountStable/:taskId/:expectedCount/:timeoutMs", methods = {Route.HttpMethod.POST}, type = Route.HandlerType.BLOCKING) + public void awaitChildQueueCountStable(@Param("taskId") String taskId, @Param("expectedCount") String expectedCountStr, @Param("timeoutMs") String timeoutMsStr, RoutingContext rc) { + try { + int expectedCount = Integer.parseInt(expectedCountStr); + long timeoutMs = Long.parseLong(timeoutMsStr); + boolean stable = testUtilsBean.awaitChildQueueCountStable(taskId, expectedCount, timeoutMs); + rc.response() + .setStatusCode(200) + .end(String.valueOf(stable)); + } catch (Throwable t) { + errorResponse(t, rc); + } + } + private void errorResponse(Throwable t, RoutingContext rc) { t.printStackTrace(); rc.response() diff --git a/reference/rest/src/test/java/io/a2a/server/rest/quarkus/A2ATestRoutes.java b/reference/rest/src/test/java/io/a2a/server/rest/quarkus/A2ATestRoutes.java index 57d6965a4..0ce5750b3 100644 --- a/reference/rest/src/test/java/io/a2a/server/rest/quarkus/A2ATestRoutes.java +++ b/reference/rest/src/test/java/io/a2a/server/rest/quarkus/A2ATestRoutes.java @@ -185,6 +185,30 @@ public void saveTaskPushNotificationConfig(@Param String taskId, @Body String bo } } + /** + * REST endpoint to wait for child queue count to stabilize. + * Waits for the specified task's child queue count to match expectedCount for 3 consecutive + * checks (150ms total), ensuring EventConsumer polling loops have started. + * + * @param taskId the task ID whose child queues to monitor + * @param expectedCountStr the expected number of active child queues (as string) + * @param timeoutMsStr maximum time to wait in milliseconds (as string) + * @param rc the Vert.x routing context + */ + @Route(path = "/test/queue/awaitChildCountStable/:taskId/:expectedCount/:timeoutMs", methods = {Route.HttpMethod.POST}, type = Route.HandlerType.BLOCKING) + public void awaitChildQueueCountStable(@Param("taskId") String taskId, @Param("expectedCount") String expectedCountStr, @Param("timeoutMs") String timeoutMsStr, RoutingContext rc) { + try { + int expectedCount = Integer.parseInt(expectedCountStr); + long timeoutMs = Long.parseLong(timeoutMsStr); + boolean stable = testUtilsBean.awaitChildQueueCountStable(taskId, expectedCount, timeoutMs); + rc.response() + .setStatusCode(200) + .end(String.valueOf(stable)); + } catch (Throwable t) { + errorResponse(t, rc); + } + } + private void errorResponse(Throwable t, RoutingContext rc) { t.printStackTrace(); rc.response() diff --git a/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java b/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java index 5522e4dbf..1c6dcdde4 100644 --- a/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java +++ b/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java @@ -1401,6 +1401,19 @@ public void testNonBlockingWithMultipleMessages() throws Exception { assertTrue(subscriptionLatch.await(15, TimeUnit.SECONDS)); + // CRITICAL SYNCHRONIZATION: Wait for subscribeToTask's EventConsumer polling loop to start + // + // Race condition: awaitStreamingSubscription() only guarantees transport-level subscription + // (Flow.Subscriber.onSubscribe() called), but EventConsumer polling starts asynchronously + // on a separate thread. Without this check, the agent could emit events before any consumer + // is ready to receive them, causing events to be lost. + // + // This stability check waits for the child queue count to match expectedCount for 3 + // consecutive checks (150ms), ensuring the EventConsumer is actively polling and won't + // miss events when the agent executes. + assertTrue(awaitChildQueueCountStable(taskId, 1, 15000), + "subscribeToTask child queue should be created and stable"); + // 3. Send second streaming message to same taskId Message message2 = Message.builder(MESSAGE) .taskId(multiEventTaskId) // Same taskId @@ -1435,8 +1448,8 @@ public void testNonBlockingWithMultipleMessages() throws Exception { "Stream subscription should be established"); // 4. Verify both consumers received artifact-2 and completion - assertTrue(resubEventLatch.await(10, TimeUnit.SECONDS)); - assertTrue(streamEventLatch.await(10, TimeUnit.SECONDS)); + assertTrue(resubEventLatch.await(15, TimeUnit.SECONDS)); + assertTrue(streamEventLatch.await(15, TimeUnit.SECONDS)); assertFalse(resubUnexpectedEvent.get()); assertFalse(streamUnexpectedEvent.get()); @@ -1492,6 +1505,34 @@ public void testNonBlockingWithMultipleMessages() throws Exception { } } + /** + * Waits for the child queue count to stabilize at the expected value by calling the server's + * test endpoint. This ensures EventConsumer polling loops have started before proceeding. + * + * @param taskId the task ID whose child queues to monitor + * @param expectedCount the expected number of active child queues + * @param timeoutMs maximum time to wait in milliseconds + * @return true if the count stabilized at the expected value, false if timeout occurred + * @throws IOException if the HTTP request fails + * @throws InterruptedException if interrupted while waiting + */ + private boolean awaitChildQueueCountStable(String taskId, int expectedCount, long timeoutMs) throws IOException, InterruptedException { + HttpClient client = HttpClient.newBuilder() + .version(HttpClient.Version.HTTP_2) + .build(); + HttpRequest request = HttpRequest.newBuilder() + .uri(URI.create("http://localhost:" + serverPort + "/test/queue/awaitChildCountStable/" + + taskId + "/" + expectedCount + "/" + timeoutMs)) + .POST(HttpRequest.BodyPublishers.noBody()) + .build(); + + HttpResponse response = client.send(request, HttpResponse.BodyHandlers.ofString(StandardCharsets.UTF_8)); + if (response.statusCode() != 200) { + throw new RuntimeException(response.statusCode() + ": Awaiting child queue count failed! " + response.body()); + } + return Boolean.parseBoolean(response.body()); + } + @Test @Timeout(value = 1, unit = TimeUnit.MINUTES) public void testInputRequiredWorkflow() throws Exception { diff --git a/tests/server-common/src/test/java/io/a2a/server/apps/common/TestUtilsBean.java b/tests/server-common/src/test/java/io/a2a/server/apps/common/TestUtilsBean.java index 45483f214..f94d93043 100644 --- a/tests/server-common/src/test/java/io/a2a/server/apps/common/TestUtilsBean.java +++ b/tests/server-common/src/test/java/io/a2a/server/apps/common/TestUtilsBean.java @@ -61,4 +61,53 @@ public void deleteTaskPushNotificationConfig(String taskId, String configId) { public void saveTaskPushNotificationConfig(String taskId, PushNotificationConfig notificationConfig) { pushNotificationConfigStore.setInfo(taskId, notificationConfig); } + + /** + * Waits for the EventConsumer polling loop to start for the specified task's queue. + * This ensures the queue is ready to receive and process events. + * + * @param taskId the task ID whose queue poller to wait for + * @throws InterruptedException if interrupted while waiting + */ + public void awaitQueuePollerStart(String taskId) throws InterruptedException { + queueManager.awaitQueuePollerStart(queueManager.get(taskId)); + } + + /** + * Waits for the child queue count to stabilize at the expected value. + *

+ * This method addresses a race condition where EventConsumer polling loops may not have started + * yet when events are emitted. It waits for the child queue count to match the expected value + * for 3 consecutive checks (150ms total), ensuring EventConsumers are actively polling and + * won't miss events. + *

+ * Use this after operations that create child queues (e.g., subscribeToTask, sendMessage) to + * ensure their EventConsumer polling loops have started before the agent emits events. + * + * @param taskId the task ID whose child queues to monitor + * @param expectedCount the expected number of active child queues + * @param timeoutMs maximum time to wait in milliseconds + * @return true if the count stabilized at the expected value, false if timeout occurred + * @throws InterruptedException if interrupted while waiting + */ + public boolean awaitChildQueueCountStable(String taskId, int expectedCount, long timeoutMs) throws InterruptedException { + long endTime = System.currentTimeMillis() + timeoutMs; + int consecutiveMatches = 0; + final int requiredMatches = 3; // Count must match 3 times in a row (150ms) to be considered stable + + while (System.currentTimeMillis() < endTime) { + int count = queueManager.getActiveChildQueueCount(taskId); + if (count == expectedCount) { + consecutiveMatches++; + if (consecutiveMatches >= requiredMatches) { + // Count is stable - all child queues exist and haven't closed + return true; + } + } else { + consecutiveMatches = 0; // Reset if count changes + } + Thread.sleep(50); + } + return false; + } }