diff --git a/reference/grpc/src/test/java/io/a2a/server/grpc/quarkus/A2ATestResource.java b/reference/grpc/src/test/java/io/a2a/server/grpc/quarkus/A2ATestResource.java index 4136eb70b..fc57c705d 100644 --- a/reference/grpc/src/test/java/io/a2a/server/grpc/quarkus/A2ATestResource.java +++ b/reference/grpc/src/test/java/io/a2a/server/grpc/quarkus/A2ATestResource.java @@ -137,4 +137,40 @@ public Response savePushNotificationConfigInStore(@PathParam("taskId") String ta testUtilsBean.saveTaskPushNotificationConfig(taskId, notificationConfig); return Response.ok().build(); } + + /** + * REST endpoint to wait for queue poller to start. + * Waits for the EventConsumer polling loop to start for the specified task's queue, + * ensuring the queue is ready to receive and process events. + * + * @param taskId the task ID whose queue poller to wait for + * @return HTTP 200 response when poller has started + * @throws InterruptedException if interrupted while waiting + */ + @POST + @Path("/queue/awaitPollerStart/{taskId}") + public Response awaitQueuePollerStart(@PathParam("taskId") String taskId) throws InterruptedException { + testUtilsBean.awaitQueuePollerStart(taskId); + return Response.ok().build(); + } + + /** + * REST endpoint to wait for child queue count to stabilize. + * Waits for the specified task's child queue count to match expectedCount for 3 consecutive + * checks (150ms total), ensuring EventConsumer polling loops have started. + * + * @param taskId the task ID whose child queues to monitor + * @param expectedCount the expected number of active child queues + * @param timeoutMs maximum time to wait in milliseconds + * @return HTTP 200 response with "true" if count stabilized, "false" if timeout occurred + * @throws InterruptedException if interrupted while waiting + */ + @POST + @Path("/queue/awaitChildCountStable/{taskId}/{expectedCount}/{timeoutMs}") + public Response awaitChildQueueCountStable(@PathParam("taskId") String taskId, + @PathParam("expectedCount") int expectedCount, + @PathParam("timeoutMs") long timeoutMs) throws InterruptedException { + boolean stable = testUtilsBean.awaitChildQueueCountStable(taskId, expectedCount, timeoutMs); + return Response.ok(String.valueOf(stable), TEXT_PLAIN).build(); + } } diff --git a/reference/jsonrpc/src/test/java/io/a2a/server/apps/quarkus/A2ATestRoutes.java b/reference/jsonrpc/src/test/java/io/a2a/server/apps/quarkus/A2ATestRoutes.java index 12eae5a4d..937c8a2b1 100644 --- a/reference/jsonrpc/src/test/java/io/a2a/server/apps/quarkus/A2ATestRoutes.java +++ b/reference/jsonrpc/src/test/java/io/a2a/server/apps/quarkus/A2ATestRoutes.java @@ -185,6 +185,30 @@ public void saveTaskPushNotificationConfig(@Param String taskId, @Body String bo } } + /** + * REST endpoint to wait for child queue count to stabilize. + * Waits for the specified task's child queue count to match expectedCount for 3 consecutive + * checks (150ms total), ensuring EventConsumer polling loops have started. + * + * @param taskId the task ID whose child queues to monitor + * @param expectedCountStr the expected number of active child queues (as string) + * @param timeoutMsStr maximum time to wait in milliseconds (as string) + * @param rc the Vert.x routing context + */ + @Route(path = "/test/queue/awaitChildCountStable/:taskId/:expectedCount/:timeoutMs", methods = {Route.HttpMethod.POST}, type = Route.HandlerType.BLOCKING) + public void awaitChildQueueCountStable(@Param("taskId") String taskId, @Param("expectedCount") String expectedCountStr, @Param("timeoutMs") String timeoutMsStr, RoutingContext rc) { + try { + int expectedCount = Integer.parseInt(expectedCountStr); + long timeoutMs = Long.parseLong(timeoutMsStr); + boolean stable = testUtilsBean.awaitChildQueueCountStable(taskId, expectedCount, timeoutMs); + rc.response() + .setStatusCode(200) + .end(String.valueOf(stable)); + } catch (Throwable t) { + errorResponse(t, rc); + } + } + private void errorResponse(Throwable t, RoutingContext rc) { t.printStackTrace(); rc.response() diff --git a/reference/rest/src/test/java/io/a2a/server/rest/quarkus/A2ATestRoutes.java b/reference/rest/src/test/java/io/a2a/server/rest/quarkus/A2ATestRoutes.java index 57d6965a4..0ce5750b3 100644 --- a/reference/rest/src/test/java/io/a2a/server/rest/quarkus/A2ATestRoutes.java +++ b/reference/rest/src/test/java/io/a2a/server/rest/quarkus/A2ATestRoutes.java @@ -185,6 +185,30 @@ public void saveTaskPushNotificationConfig(@Param String taskId, @Body String bo } } + /** + * REST endpoint to wait for child queue count to stabilize. + * Waits for the specified task's child queue count to match expectedCount for 3 consecutive + * checks (150ms total), ensuring EventConsumer polling loops have started. + * + * @param taskId the task ID whose child queues to monitor + * @param expectedCountStr the expected number of active child queues (as string) + * @param timeoutMsStr maximum time to wait in milliseconds (as string) + * @param rc the Vert.x routing context + */ + @Route(path = "/test/queue/awaitChildCountStable/:taskId/:expectedCount/:timeoutMs", methods = {Route.HttpMethod.POST}, type = Route.HandlerType.BLOCKING) + public void awaitChildQueueCountStable(@Param("taskId") String taskId, @Param("expectedCount") String expectedCountStr, @Param("timeoutMs") String timeoutMsStr, RoutingContext rc) { + try { + int expectedCount = Integer.parseInt(expectedCountStr); + long timeoutMs = Long.parseLong(timeoutMsStr); + boolean stable = testUtilsBean.awaitChildQueueCountStable(taskId, expectedCount, timeoutMs); + rc.response() + .setStatusCode(200) + .end(String.valueOf(stable)); + } catch (Throwable t) { + errorResponse(t, rc); + } + } + private void errorResponse(Throwable t, RoutingContext rc) { t.printStackTrace(); rc.response() diff --git a/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java b/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java index 5522e4dbf..1c6dcdde4 100644 --- a/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java +++ b/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java @@ -1401,6 +1401,19 @@ public void testNonBlockingWithMultipleMessages() throws Exception { assertTrue(subscriptionLatch.await(15, TimeUnit.SECONDS)); + // CRITICAL SYNCHRONIZATION: Wait for subscribeToTask's EventConsumer polling loop to start + // + // Race condition: awaitStreamingSubscription() only guarantees transport-level subscription + // (Flow.Subscriber.onSubscribe() called), but EventConsumer polling starts asynchronously + // on a separate thread. Without this check, the agent could emit events before any consumer + // is ready to receive them, causing events to be lost. + // + // This stability check waits for the child queue count to match expectedCount for 3 + // consecutive checks (150ms), ensuring the EventConsumer is actively polling and won't + // miss events when the agent executes. + assertTrue(awaitChildQueueCountStable(taskId, 1, 15000), + "subscribeToTask child queue should be created and stable"); + // 3. Send second streaming message to same taskId Message message2 = Message.builder(MESSAGE) .taskId(multiEventTaskId) // Same taskId @@ -1435,8 +1448,8 @@ public void testNonBlockingWithMultipleMessages() throws Exception { "Stream subscription should be established"); // 4. Verify both consumers received artifact-2 and completion - assertTrue(resubEventLatch.await(10, TimeUnit.SECONDS)); - assertTrue(streamEventLatch.await(10, TimeUnit.SECONDS)); + assertTrue(resubEventLatch.await(15, TimeUnit.SECONDS)); + assertTrue(streamEventLatch.await(15, TimeUnit.SECONDS)); assertFalse(resubUnexpectedEvent.get()); assertFalse(streamUnexpectedEvent.get()); @@ -1492,6 +1505,34 @@ public void testNonBlockingWithMultipleMessages() throws Exception { } } + /** + * Waits for the child queue count to stabilize at the expected value by calling the server's + * test endpoint. This ensures EventConsumer polling loops have started before proceeding. + * + * @param taskId the task ID whose child queues to monitor + * @param expectedCount the expected number of active child queues + * @param timeoutMs maximum time to wait in milliseconds + * @return true if the count stabilized at the expected value, false if timeout occurred + * @throws IOException if the HTTP request fails + * @throws InterruptedException if interrupted while waiting + */ + private boolean awaitChildQueueCountStable(String taskId, int expectedCount, long timeoutMs) throws IOException, InterruptedException { + HttpClient client = HttpClient.newBuilder() + .version(HttpClient.Version.HTTP_2) + .build(); + HttpRequest request = HttpRequest.newBuilder() + .uri(URI.create("http://localhost:" + serverPort + "/test/queue/awaitChildCountStable/" + + taskId + "/" + expectedCount + "/" + timeoutMs)) + .POST(HttpRequest.BodyPublishers.noBody()) + .build(); + + HttpResponse response = client.send(request, HttpResponse.BodyHandlers.ofString(StandardCharsets.UTF_8)); + if (response.statusCode() != 200) { + throw new RuntimeException(response.statusCode() + ": Awaiting child queue count failed! " + response.body()); + } + return Boolean.parseBoolean(response.body()); + } + @Test @Timeout(value = 1, unit = TimeUnit.MINUTES) public void testInputRequiredWorkflow() throws Exception { diff --git a/tests/server-common/src/test/java/io/a2a/server/apps/common/TestUtilsBean.java b/tests/server-common/src/test/java/io/a2a/server/apps/common/TestUtilsBean.java index 45483f214..f94d93043 100644 --- a/tests/server-common/src/test/java/io/a2a/server/apps/common/TestUtilsBean.java +++ b/tests/server-common/src/test/java/io/a2a/server/apps/common/TestUtilsBean.java @@ -61,4 +61,53 @@ public void deleteTaskPushNotificationConfig(String taskId, String configId) { public void saveTaskPushNotificationConfig(String taskId, PushNotificationConfig notificationConfig) { pushNotificationConfigStore.setInfo(taskId, notificationConfig); } + + /** + * Waits for the EventConsumer polling loop to start for the specified task's queue. + * This ensures the queue is ready to receive and process events. + * + * @param taskId the task ID whose queue poller to wait for + * @throws InterruptedException if interrupted while waiting + */ + public void awaitQueuePollerStart(String taskId) throws InterruptedException { + queueManager.awaitQueuePollerStart(queueManager.get(taskId)); + } + + /** + * Waits for the child queue count to stabilize at the expected value. + *

+ * This method addresses a race condition where EventConsumer polling loops may not have started + * yet when events are emitted. It waits for the child queue count to match the expected value + * for 3 consecutive checks (150ms total), ensuring EventConsumers are actively polling and + * won't miss events. + *

+ * Use this after operations that create child queues (e.g., subscribeToTask, sendMessage) to + * ensure their EventConsumer polling loops have started before the agent emits events. + * + * @param taskId the task ID whose child queues to monitor + * @param expectedCount the expected number of active child queues + * @param timeoutMs maximum time to wait in milliseconds + * @return true if the count stabilized at the expected value, false if timeout occurred + * @throws InterruptedException if interrupted while waiting + */ + public boolean awaitChildQueueCountStable(String taskId, int expectedCount, long timeoutMs) throws InterruptedException { + long endTime = System.currentTimeMillis() + timeoutMs; + int consecutiveMatches = 0; + final int requiredMatches = 3; // Count must match 3 times in a row (150ms) to be considered stable + + while (System.currentTimeMillis() < endTime) { + int count = queueManager.getActiveChildQueueCount(taskId); + if (count == expectedCount) { + consecutiveMatches++; + if (consecutiveMatches >= requiredMatches) { + // Count is stable - all child queues exist and haven't closed + return true; + } + } else { + consecutiveMatches = 0; // Reset if count changes + } + Thread.sleep(50); + } + return false; + } }