Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -137,4 +137,40 @@ public Response savePushNotificationConfigInStore(@PathParam("taskId") String ta
testUtilsBean.saveTaskPushNotificationConfig(taskId, notificationConfig);
return Response.ok().build();
}

/**
* REST endpoint to wait for queue poller to start.
* Waits for the EventConsumer polling loop to start for the specified task's queue,
* ensuring the queue is ready to receive and process events.
*
* @param taskId the task ID whose queue poller to wait for
* @return HTTP 200 response when poller has started
* @throws InterruptedException if interrupted while waiting
*/
@POST
@Path("/queue/awaitPollerStart/{taskId}")
public Response awaitQueuePollerStart(@PathParam("taskId") String taskId) throws InterruptedException {
testUtilsBean.awaitQueuePollerStart(taskId);
return Response.ok().build();
}

/**
* REST endpoint to wait for child queue count to stabilize.
* Waits for the specified task's child queue count to match expectedCount for 3 consecutive
* checks (150ms total), ensuring EventConsumer polling loops have started.
*
* @param taskId the task ID whose child queues to monitor
* @param expectedCount the expected number of active child queues
* @param timeoutMs maximum time to wait in milliseconds
* @return HTTP 200 response with "true" if count stabilized, "false" if timeout occurred
* @throws InterruptedException if interrupted while waiting
*/
@POST
@Path("/queue/awaitChildCountStable/{taskId}/{expectedCount}/{timeoutMs}")
public Response awaitChildQueueCountStable(@PathParam("taskId") String taskId,
@PathParam("expectedCount") int expectedCount,
@PathParam("timeoutMs") long timeoutMs) throws InterruptedException {
boolean stable = testUtilsBean.awaitChildQueueCountStable(taskId, expectedCount, timeoutMs);
return Response.ok(String.valueOf(stable), TEXT_PLAIN).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,30 @@ public void saveTaskPushNotificationConfig(@Param String taskId, @Body String bo
}
}

/**
* REST endpoint to wait for child queue count to stabilize.
* Waits for the specified task's child queue count to match expectedCount for 3 consecutive
* checks (150ms total), ensuring EventConsumer polling loops have started.
*
* @param taskId the task ID whose child queues to monitor
* @param expectedCountStr the expected number of active child queues (as string)
* @param timeoutMsStr maximum time to wait in milliseconds (as string)
* @param rc the Vert.x routing context
*/
@Route(path = "/test/queue/awaitChildCountStable/:taskId/:expectedCount/:timeoutMs", methods = {Route.HttpMethod.POST}, type = Route.HandlerType.BLOCKING)
public void awaitChildQueueCountStable(@Param("taskId") String taskId, @Param("expectedCount") String expectedCountStr, @Param("timeoutMs") String timeoutMsStr, RoutingContext rc) {
try {
int expectedCount = Integer.parseInt(expectedCountStr);
long timeoutMs = Long.parseLong(timeoutMsStr);
boolean stable = testUtilsBean.awaitChildQueueCountStable(taskId, expectedCount, timeoutMs);
rc.response()
.setStatusCode(200)
.end(String.valueOf(stable));
} catch (Throwable t) {
errorResponse(t, rc);
}
}

private void errorResponse(Throwable t, RoutingContext rc) {
t.printStackTrace();
rc.response()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,30 @@ public void saveTaskPushNotificationConfig(@Param String taskId, @Body String bo
}
}

/**
* REST endpoint to wait for child queue count to stabilize.
* Waits for the specified task's child queue count to match expectedCount for 3 consecutive
* checks (150ms total), ensuring EventConsumer polling loops have started.
*
* @param taskId the task ID whose child queues to monitor
* @param expectedCountStr the expected number of active child queues (as string)
* @param timeoutMsStr maximum time to wait in milliseconds (as string)
* @param rc the Vert.x routing context
*/
@Route(path = "/test/queue/awaitChildCountStable/:taskId/:expectedCount/:timeoutMs", methods = {Route.HttpMethod.POST}, type = Route.HandlerType.BLOCKING)
public void awaitChildQueueCountStable(@Param("taskId") String taskId, @Param("expectedCount") String expectedCountStr, @Param("timeoutMs") String timeoutMsStr, RoutingContext rc) {
try {
int expectedCount = Integer.parseInt(expectedCountStr);
long timeoutMs = Long.parseLong(timeoutMsStr);
boolean stable = testUtilsBean.awaitChildQueueCountStable(taskId, expectedCount, timeoutMs);
rc.response()
.setStatusCode(200)
.end(String.valueOf(stable));
} catch (Throwable t) {
errorResponse(t, rc);
}
}

private void errorResponse(Throwable t, RoutingContext rc) {
t.printStackTrace();
rc.response()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1401,6 +1401,19 @@ public void testNonBlockingWithMultipleMessages() throws Exception {

assertTrue(subscriptionLatch.await(15, TimeUnit.SECONDS));

// CRITICAL SYNCHRONIZATION: Wait for subscribeToTask's EventConsumer polling loop to start
//
// Race condition: awaitStreamingSubscription() only guarantees transport-level subscription
// (Flow.Subscriber.onSubscribe() called), but EventConsumer polling starts asynchronously
// on a separate thread. Without this check, the agent could emit events before any consumer
// is ready to receive them, causing events to be lost.
//
// This stability check waits for the child queue count to match expectedCount for 3
// consecutive checks (150ms), ensuring the EventConsumer is actively polling and won't
// miss events when the agent executes.
assertTrue(awaitChildQueueCountStable(taskId, 1, 15000),
"subscribeToTask child queue should be created and stable");

// 3. Send second streaming message to same taskId
Message message2 = Message.builder(MESSAGE)
.taskId(multiEventTaskId) // Same taskId
Expand Down Expand Up @@ -1435,8 +1448,8 @@ public void testNonBlockingWithMultipleMessages() throws Exception {
"Stream subscription should be established");

// 4. Verify both consumers received artifact-2 and completion
assertTrue(resubEventLatch.await(10, TimeUnit.SECONDS));
assertTrue(streamEventLatch.await(10, TimeUnit.SECONDS));
assertTrue(resubEventLatch.await(15, TimeUnit.SECONDS));
assertTrue(streamEventLatch.await(15, TimeUnit.SECONDS));

assertFalse(resubUnexpectedEvent.get());
assertFalse(streamUnexpectedEvent.get());
Expand Down Expand Up @@ -1492,6 +1505,34 @@ public void testNonBlockingWithMultipleMessages() throws Exception {
}
}

/**
* Waits for the child queue count to stabilize at the expected value by calling the server's
* test endpoint. This ensures EventConsumer polling loops have started before proceeding.
*
* @param taskId the task ID whose child queues to monitor
* @param expectedCount the expected number of active child queues
* @param timeoutMs maximum time to wait in milliseconds
* @return true if the count stabilized at the expected value, false if timeout occurred
* @throws IOException if the HTTP request fails
* @throws InterruptedException if interrupted while waiting
*/
private boolean awaitChildQueueCountStable(String taskId, int expectedCount, long timeoutMs) throws IOException, InterruptedException {
HttpClient client = HttpClient.newBuilder()
.version(HttpClient.Version.HTTP_2)
.build();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create("http://localhost:" + serverPort + "/test/queue/awaitChildCountStable/" +
taskId + "/" + expectedCount + "/" + timeoutMs))
.POST(HttpRequest.BodyPublishers.noBody())
.build();

HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString(StandardCharsets.UTF_8));
if (response.statusCode() != 200) {
throw new RuntimeException(response.statusCode() + ": Awaiting child queue count failed! " + response.body());
}
return Boolean.parseBoolean(response.body());
}

@Test
@Timeout(value = 1, unit = TimeUnit.MINUTES)
public void testInputRequiredWorkflow() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,53 @@ public void deleteTaskPushNotificationConfig(String taskId, String configId) {
public void saveTaskPushNotificationConfig(String taskId, PushNotificationConfig notificationConfig) {
pushNotificationConfigStore.setInfo(taskId, notificationConfig);
}

/**
* Waits for the EventConsumer polling loop to start for the specified task's queue.
* This ensures the queue is ready to receive and process events.
*
* @param taskId the task ID whose queue poller to wait for
* @throws InterruptedException if interrupted while waiting
*/
public void awaitQueuePollerStart(String taskId) throws InterruptedException {
queueManager.awaitQueuePollerStart(queueManager.get(taskId));
}

/**
* Waits for the child queue count to stabilize at the expected value.
* <p>
* This method addresses a race condition where EventConsumer polling loops may not have started
* yet when events are emitted. It waits for the child queue count to match the expected value
* for 3 consecutive checks (150ms total), ensuring EventConsumers are actively polling and
* won't miss events.
* <p>
* Use this after operations that create child queues (e.g., subscribeToTask, sendMessage) to
* ensure their EventConsumer polling loops have started before the agent emits events.
*
* @param taskId the task ID whose child queues to monitor
* @param expectedCount the expected number of active child queues
* @param timeoutMs maximum time to wait in milliseconds
* @return true if the count stabilized at the expected value, false if timeout occurred
* @throws InterruptedException if interrupted while waiting
*/
public boolean awaitChildQueueCountStable(String taskId, int expectedCount, long timeoutMs) throws InterruptedException {
long endTime = System.currentTimeMillis() + timeoutMs;
int consecutiveMatches = 0;
final int requiredMatches = 3; // Count must match 3 times in a row (150ms) to be considered stable

while (System.currentTimeMillis() < endTime) {
int count = queueManager.getActiveChildQueueCount(taskId);
if (count == expectedCount) {
consecutiveMatches++;
if (consecutiveMatches >= requiredMatches) {
// Count is stable - all child queues exist and haven't closed
return true;
}
} else {
consecutiveMatches = 0; // Reset if count changes
}
Thread.sleep(50);
}
return false;
}
}