diff --git a/README.md b/README.md
index bd86952da..48d28a8cd 100644
--- a/README.md
+++ b/README.md
@@ -143,7 +143,7 @@ public class WeatherAgentCardProducer {
import io.a2a.server.agentexecution.AgentExecutor;
import io.a2a.server.agentexecution.RequestContext;
import io.a2a.server.events.EventQueue;
-import io.a2a.server.tasks.TaskUpdater;
+import io.a2a.server.tasks.AgentEmitter;
import io.a2a.spec.JSONRPCError;
import io.a2a.spec.Message;
import io.a2a.spec.Part;
@@ -173,14 +173,12 @@ public class WeatherAgentExecutorProducer {
}
@Override
- public void execute(RequestContext context, EventQueue eventQueue) throws JSONRPCError {
- TaskUpdater updater = new TaskUpdater(context, eventQueue);
-
+ public void execute(RequestContext context, AgentEmitter agentEmitter) throws JSONRPCError {
// mark the task as submitted and start working on it
if (context.getTask() == null) {
- updater.submit();
+ agentEmitter.submit();
}
- updater.startWork();
+ agentEmitter.startWork();
// extract the text from the message
String userMessage = extractTextFromMessage(context.getMessage());
@@ -189,16 +187,16 @@ public class WeatherAgentExecutorProducer {
String response = weatherAgent.chat(userMessage);
// create the response part
- TextPart responsePart = new TextPart(response, null);
+ TextPart responsePart = new TextPart(response);
List
- *
*
@@ -56,14 +56,12 @@
* }
*
* @Override
- * public void execute(RequestContext context, EventQueue eventQueue) {
- * TaskUpdater updater = new TaskUpdater(context, eventQueue);
- *
+ * public void execute(RequestContext context, AgentEmitter emitter) {
* // Initialize task if this is a new conversation
* if (context.getTask() == null) {
- * updater.submit();
+ * emitter.submit();
* }
- * updater.startWork();
+ * emitter.startWork();
*
* // Extract user input from the message
* String userMessage = context.getUserInput("\n");
@@ -72,14 +70,14 @@
* String weatherData = weatherService.getWeather(userMessage);
*
* // Return result as artifact
- * updater.addArtifact(List.of(new TextPart(weatherData, null)));
- * updater.complete();
+ * emitter.addArtifact(List.of(new TextPart(weatherData, null)));
+ * emitter.complete();
* }
*
* @Override
- * public void cancel(RequestContext context, EventQueue eventQueue) {
+ * public void cancel(RequestContext context, AgentEmitter emitter) {
* // Clean up resources and mark as canceled
- * new TaskUpdater(context, eventQueue).cancel();
+ * emitter.cancel();
* }
* }
* }
@@ -87,16 +85,15 @@
* Streaming Results
* For long-running operations or LLM streaming, enqueue multiple artifacts:
* {@code
- * updater.startWork();
+ * emitter.startWork();
* for (String chunk : llmService.stream(userInput)) {
- * updater.addArtifact(List.of(new TextPart(chunk, null)));
+ * emitter.addArtifact(List.of(new TextPart(chunk, null)));
* }
- * updater.complete(); // Final event closes the queue
+ * emitter.complete(); // Final event closes the queue
* }
*
* @see RequestContext
- * @see io.a2a.server.tasks.TaskUpdater
- * @see io.a2a.server.events.EventQueue
+ * @see AgentEmitter
* @see io.a2a.server.requesthandlers.DefaultRequestHandler
* @see io.a2a.spec.AgentCard
*/
@@ -111,15 +108,15 @@ public interface AgentExecutor {
*
* Important: Don't throw exceptions for business logic errors. Instead, use - * {@code updater.fail(errorMessage)} to communicate failures to the client gracefully. + * {@code emitter.fail(errorMessage)} to communicate failures to the client gracefully. * Only throw {@link A2AError} for truly exceptional conditions. *
* * @param context the request context containing the message, task state, and configuration - * @param eventQueue the queue for enqueueing status updates and artifacts + * @param emitter the agent emitter for sending messages, updating status, and streaming artifacts * @throws A2AError if execution fails catastrophically (exception propagates to client) */ - void execute(RequestContext context, EventQueue eventQueue) throws A2AError; + void execute(RequestContext context, AgentEmitter emitter) throws A2AError; /** * Cancels an ongoing agent execution. @@ -128,11 +125,11 @@ public interface AgentExecutor { * You should: *- * Note: The {@link #execute(RequestContext, EventQueue)} method may still be + * Note: The {@link #execute(RequestContext, AgentEmitter)} method may still be * running on another thread. Use appropriate synchronization or interruption mechanisms * if your agent maintains cancellable state. *
@@ -146,9 +143,9 @@ public interface AgentExecutor { * * * @param context the request context for the task being canceled - * @param eventQueue the queue for enqueueing the cancellation event + * @param emitter the agent emitter for sending the cancellation event * @throws io.a2a.spec.TaskNotCancelableError if this agent does not support cancellation * @throws A2AError if cancellation is supported but failed to execute */ - void cancel(RequestContext context, EventQueue eventQueue) throws A2AError; + void cancel(RequestContext context, AgentEmitter emitter) throws A2AError; } diff --git a/server-common/src/main/java/io/a2a/server/agentexecution/RequestContext.java b/server-common/src/main/java/io/a2a/server/agentexecution/RequestContext.java index e79298703..82a8b9e17 100644 --- a/server-common/src/main/java/io/a2a/server/agentexecution/RequestContext.java +++ b/server-common/src/main/java/io/a2a/server/agentexecution/RequestContext.java @@ -35,7 +35,7 @@ * *
{@code
- * public void execute(RequestContext context, EventQueue queue) {
+ * public void execute(RequestContext context, AgentEmitter emitter) {
* // Check if this is a new conversation or continuation
* Task existingTask = context.getTask();
* if (existingTask == null) {
diff --git a/server-common/src/main/java/io/a2a/server/events/EnhancedRunnable.java b/server-common/src/main/java/io/a2a/server/events/EnhancedRunnable.java
index 779ad02da..2ccf3fb60 100644
--- a/server-common/src/main/java/io/a2a/server/events/EnhancedRunnable.java
+++ b/server-common/src/main/java/io/a2a/server/events/EnhancedRunnable.java
@@ -2,12 +2,14 @@
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.jspecify.annotations.Nullable;
public abstract class EnhancedRunnable implements Runnable {
private volatile @Nullable Throwable error;
private final List doneCallbacks = new CopyOnWriteArrayList<>();
+ private final AtomicBoolean started = new AtomicBoolean(false);
public @Nullable Throwable getError() {
return error;
@@ -18,9 +20,22 @@ public void setError(Throwable error) {
}
public void addDoneCallback(DoneCallback doneCallback) {
+ if (started.get()) {
+ throw new IllegalStateException(
+ "Cannot add callback after runnable has started execution. " +
+ "Callbacks must be registered before CompletableFuture.runAsync() is called.");
+ }
doneCallbacks.add(doneCallback);
}
+ /**
+ * Marks this runnable as started, preventing further callback additions.
+ * This should be called immediately before submitting to CompletableFuture.runAsync().
+ */
+ public void markStarted() {
+ started.set(true);
+ }
+
public void invokeDoneCallbacks() {
for (DoneCallback doneCallback : doneCallbacks) {
doneCallback.done(this);
diff --git a/server-common/src/main/java/io/a2a/server/events/EventConsumer.java b/server-common/src/main/java/io/a2a/server/events/EventConsumer.java
index 7c7b28452..239385a39 100644
--- a/server-common/src/main/java/io/a2a/server/events/EventConsumer.java
+++ b/server-common/src/main/java/io/a2a/server/events/EventConsumer.java
@@ -2,6 +2,7 @@
import java.util.concurrent.Flow;
+import io.a2a.spec.A2AError;
import io.a2a.spec.A2AServerException;
import io.a2a.spec.Event;
import io.a2a.spec.Message;
@@ -136,6 +137,10 @@ public Flow.Publisher consumeAll() {
LOGGER.debug("Received QueueClosedEvent for task {}, treating as final event",
((QueueClosedEvent) event).getTaskId());
isFinalEvent = true;
+ } else if (event instanceof A2AError) {
+ // A2AError events are terminal - they trigger automatic FAILED state transition
+ LOGGER.debug("Received A2AError event, treating as final event");
+ isFinalEvent = true;
}
// Only send event if it's not a QueueClosedEvent
diff --git a/server-common/src/main/java/io/a2a/server/events/EventQueue.java b/server-common/src/main/java/io/a2a/server/events/EventQueue.java
index 4899a926d..53ae22da4 100644
--- a/server-common/src/main/java/io/a2a/server/events/EventQueue.java
+++ b/server-common/src/main/java/io/a2a/server/events/EventQueue.java
@@ -13,6 +13,7 @@
import io.a2a.server.tasks.TaskStateProvider;
import io.a2a.spec.Event;
import io.a2a.spec.Task;
+import io.a2a.spec.TaskArtifactUpdateEvent;
import io.a2a.spec.TaskStatusUpdateEvent;
import org.jspecify.annotations.Nullable;
import org.slf4j.Logger;
@@ -431,6 +432,9 @@ public void enqueueItem(EventQueueItem item) {
// We bypass the parent's closed check and enqueue directly
Event event = item.getEvent();
+ // Validate event taskId matches queue taskId
+ validateEventIds(event);
+
// Check if this is a final event BEFORE submitting to MainEventBus
// If it is, notify all children to expect it (so they wait for MainEventBusProcessor)
if (isFinalEvent(event)) {
@@ -458,6 +462,47 @@ public void enqueueItem(EventQueueItem item) {
mainEventBus.submit(taskId, this, item);
}
+ /**
+ * Validates that events with taskId fields match this queue's taskId.
+ *
+ * Validation Rules:
+ *
+ * - Task, TaskStatusUpdateEvent, TaskArtifactUpdateEvent: MUST match queue taskId
+ * - Message: taskId is OPTIONAL, not validated (can exist without tasks)
+ * - Other events: no validation
+ * - Null queue taskId: skip validation (initialization phase)
+ *
+ *
+ * @param event the event to validate
+ * @throws IllegalArgumentException if event has mismatched taskId
+ */
+ private void validateEventIds(Event event) {
+ if (taskId == null) {
+ return; // Allow any event during initialization
+ }
+
+ String eventTaskId = null;
+ String eventType = null;
+
+ if (event instanceof Task task) {
+ eventTaskId = task.id();
+ eventType = "Task";
+ } else if (event instanceof TaskStatusUpdateEvent statusEvent) {
+ eventTaskId = statusEvent.taskId();
+ eventType = "TaskStatusUpdateEvent";
+ } else if (event instanceof TaskArtifactUpdateEvent artifactEvent) {
+ eventTaskId = artifactEvent.taskId();
+ eventType = "TaskArtifactUpdateEvent";
+ }
+ // Note: Message.taskId is NOT validated - messages can exist independently
+
+ if (eventTaskId != null && !eventTaskId.equals(taskId)) {
+ throw new IllegalArgumentException(
+ String.format("Event taskId mismatch: queue=%s, event=%s, eventType=%s",
+ taskId, eventTaskId, eventType));
+ }
+ }
+
/**
* Checks if an event represents a final task state.
*/
diff --git a/server-common/src/main/java/io/a2a/server/events/MainEventBusProcessor.java b/server-common/src/main/java/io/a2a/server/events/MainEventBusProcessor.java
index 0fc7b3aa9..575078679 100644
--- a/server-common/src/main/java/io/a2a/server/events/MainEventBusProcessor.java
+++ b/server-common/src/main/java/io/a2a/server/events/MainEventBusProcessor.java
@@ -11,6 +11,7 @@
import io.a2a.server.tasks.PushNotificationSender;
import io.a2a.server.tasks.TaskManager;
import io.a2a.server.tasks.TaskStore;
+import io.a2a.spec.A2AError;
import io.a2a.spec.A2AServerException;
import io.a2a.spec.Event;
import io.a2a.spec.InternalError;
@@ -377,7 +378,7 @@ private String extractContextId(Event event) {
* Checks if an event represents a final task state.
*
* @param event the event to check
- * @return true if the event represents a final state (COMPLETED, FAILED, CANCELED, REJECTED, UNKNOWN)
+ * @return true if the event represents a final state (COMPLETED, FAILED, CANCELED, REJECTED, UNKNOWN, or A2AError)
*/
private boolean isFinalEvent(Event event) {
if (event instanceof Task task) {
@@ -385,6 +386,9 @@ private boolean isFinalEvent(Event event) {
&& task.status().state().isFinal();
} else if (event instanceof TaskStatusUpdateEvent statusUpdate) {
return statusUpdate.isFinal();
+ } else if (event instanceof A2AError) {
+ // A2AError events are terminal - they trigger FAILED state transition
+ return true;
}
return false;
}
diff --git a/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java b/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java
index 31a1a6670..974482ce2 100644
--- a/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java
+++ b/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java
@@ -36,6 +36,7 @@
import io.a2a.server.events.EventQueueItem;
import io.a2a.server.events.MainEventBusProcessor;
import io.a2a.server.events.QueueManager;
+import io.a2a.server.tasks.AgentEmitter;
import io.a2a.server.tasks.PushNotificationConfigStore;
import io.a2a.server.tasks.PushNotificationSender;
import io.a2a.server.tasks.ResultAggregator;
@@ -98,7 +99,7 @@
* Transport calls {@link #onMessageSend(MessageSendParams, ServerCallContext)}
* Initialize {@link TaskManager} and {@link RequestContext}
* Create or tap {@link EventQueue} via {@link QueueManager}
- * Execute {@link AgentExecutor#execute(RequestContext, EventQueue)} asynchronously in background thread pool
+ * Execute {@link AgentExecutor#execute(RequestContext, AgentEmitter)} asynchronously in background thread pool
* Consume events from queue on Vert.x worker thread via {@link EventConsumer}
* For blocking=true: wait for agent completion and full event consumption
* Return {@link Task} or {@link Message} to transport
@@ -109,7 +110,7 @@
*
* - Transport calls {@link #onMessageSendStream(MessageSendParams, ServerCallContext)}
* - Initialize components (same as blocking)
- * - Execute {@link AgentExecutor#execute(RequestContext, EventQueue)} asynchronously
+ * - Execute {@link AgentExecutor#execute(RequestContext, AgentEmitter)} asynchronously
* - Return {@link java.util.concurrent.Flow.Publisher Flow.Publisher}<StreamingEventKind> immediately
* - Events stream to client as they arrive in the queue
* - On client disconnect: continue consumption in background (fire-and-forget)
@@ -128,7 +129,7 @@
* Threading Model
*
* - Vert.x worker threads: Execute request handler methods (onMessageSend, etc.)
- * - Agent-executor pool (@Internal): Execute {@link AgentExecutor#execute(RequestContext, EventQueue)}
+ * - Agent-executor pool (@Internal): Execute {@link AgentExecutor#execute(RequestContext, AgentEmitter)}
* - Background cleanup: {@link java.util.concurrent.CompletableFuture CompletableFuture} async tasks
*
*
@@ -378,14 +379,29 @@ public Task onCancelTask(TaskIdParams params, ServerCallContext context) throws
ResultAggregator resultAggregator = new ResultAggregator(taskManager, null, executor, eventConsumerExecutor);
EventQueue queue = queueManager.createOrTap(task.id());
- agentExecutor.cancel(
- requestContextBuilder.get()
- .setTaskId(task.id())
- .setContextId(task.contextId())
- .setTask(task)
- .setServerCallContext(context)
- .build(),
- queue);
+ RequestContext cancelRequestContext = requestContextBuilder.get()
+ .setTaskId(task.id())
+ .setContextId(task.contextId())
+ .setTask(task)
+ .setServerCallContext(context)
+ .build();
+ AgentEmitter emitter = new AgentEmitter(cancelRequestContext, queue);
+ try {
+ agentExecutor.cancel(cancelRequestContext, emitter);
+ } catch (TaskNotCancelableError e) {
+ // Expected error - log at INFO level
+ LOGGER.info("Task {} is not cancelable", task.id());
+ throw e;
+ } catch (A2AError e) {
+ // Other A2A errors - log at WARN level with stack trace
+ LOGGER.warn("Agent cancellation threw A2AError for task {}: {} - {}",
+ task.id(), e.getClass().getSimpleName(), e.getMessage(), e);
+ throw e;
+ } catch (Exception e) {
+ // Unexpected errors - log at ERROR level
+ LOGGER.error("Agent cancellation threw unexpected exception for task {}", task.id(), e);
+ throw new io.a2a.spec.InternalError("Agent cancellation failed: " + e.getMessage());
+ }
Optional.ofNullable(runningAgents.get(task.id()))
.ifPresent(cf -> cf.cancel(true));
@@ -439,15 +455,14 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
boolean interruptedOrNonBlocking = false;
- EnhancedRunnable producerRunnable = registerAndExecuteAgentAsync(queueTaskId, mss.requestContext, queue);
+ // Create consumer BEFORE starting agent - callback is registered inside registerAndExecuteAgentAsync
+ EventConsumer consumer = new EventConsumer(queue);
+
+ EnhancedRunnable producerRunnable = registerAndExecuteAgentAsync(queueTaskId, mss.requestContext, queue, consumer.createAgentRunnableDoneCallback());
+
ResultAggregator.EventTypeAndInterrupt etai = null;
EventKind kind = null; // Declare outside try block so it's in scope for return
try {
- EventConsumer consumer = new EventConsumer(queue);
-
- // This callback must be added before we start consuming. Otherwise,
- // any errors thrown by the producerRunnable are not picked up by the consumer
- producerRunnable.addDoneCallback(consumer.createAgentRunnableDoneCallback());
// Get agent future before consuming (for blocking calls to wait for agent completion)
CompletableFuture agentFuture = runningAgents.get(queueTaskId);
@@ -621,11 +636,10 @@ public Flow.Publisher onMessageSendStream(
ResultAggregator resultAggregator = new ResultAggregator(mss.taskManager, null, executor, eventConsumerExecutor);
- EnhancedRunnable producerRunnable = registerAndExecuteAgentAsync(queueTaskId, mss.requestContext, queue);
-
- // Move consumer creation and callback registration outside try block
+ // Create consumer BEFORE starting agent - callback is registered inside registerAndExecuteAgentAsync
EventConsumer consumer = new EventConsumer(queue);
- producerRunnable.addDoneCallback(consumer.createAgentRunnableDoneCallback());
+
+ EnhancedRunnable producerRunnable = registerAndExecuteAgentAsync(queueTaskId, mss.requestContext, queue, consumer.createAgentRunnableDoneCallback());
// Store cancel callback in context for closeHandler to access
// When client disconnects, closeHandler can call this to stop EventConsumer polling loop
@@ -863,21 +877,48 @@ private boolean shouldAddPushInfo(MessageSendParams params) {
*
* This design avoids blocking agent-executor threads waiting for consumer polling to start,
* eliminating cascading delays when Vert.x worker threads are busy.
+ *
+ * @param doneCallback Callback to invoke when agent completes - MUST be added before starting CompletableFuture
*/
- private EnhancedRunnable registerAndExecuteAgentAsync(String taskId, RequestContext requestContext, EventQueue queue) {
+ private EnhancedRunnable registerAndExecuteAgentAsync(String taskId, RequestContext requestContext, EventQueue queue, EnhancedRunnable.DoneCallback doneCallback) {
LOGGER.debug("Registering agent execution for task {}, runningAgents.size() before: {}", taskId, runningAgents.size());
logThreadStats("AGENT START");
EnhancedRunnable runnable = new EnhancedRunnable() {
@Override
public void run() {
LOGGER.debug("Agent execution starting for task {}", taskId);
- agentExecutor.execute(requestContext, queue);
+ AgentEmitter emitter = new AgentEmitter(requestContext, queue);
+ try {
+ agentExecutor.execute(requestContext, emitter);
+ } catch (A2AError e) {
+ // Log A2A errors at WARN level with full stack trace
+ // These are expected business errors but should be tracked
+ LOGGER.warn("Agent execution threw A2AError for task {}: {} - {}",
+ taskId, e.getClass().getSimpleName(), e.getMessage(), e);
+ emitter.fail(e);
+ } catch (RuntimeException e) {
+ // Log unexpected runtime exceptions at ERROR level
+ // These indicate bugs in agent implementation
+ LOGGER.error("Agent execution threw unexpected RuntimeException for task {}", taskId, e);
+ emitter.fail(new io.a2a.spec.InternalError("Agent execution failed: " + e.getMessage()));
+ } catch (Exception e) {
+ // Log other exceptions at ERROR level
+ LOGGER.error("Agent execution threw unexpected Exception for task {}", taskId, e);
+ emitter.fail(new io.a2a.spec.InternalError("Agent execution failed: " + e.getMessage()));
+ }
LOGGER.debug("Agent execution completed for task {}", taskId);
// The consumer (running on the Vert.x worker thread) handles queue lifecycle.
// This avoids blocking agent-executor threads waiting for worker threads.
}
};
+ // CRITICAL: Add callback BEFORE starting CompletableFuture to avoid race condition
+ // If agent completes very fast, whenComplete can fire before caller adds callbacks
+ runnable.addDoneCallback(doneCallback);
+
+ // Mark as started to prevent further callback additions (enforced by runtime check)
+ runnable.markStarted();
+
CompletableFuture cf = CompletableFuture.runAsync(runnable, executor)
.whenComplete((v, err) -> {
if (err != null) {
diff --git a/server-common/src/main/java/io/a2a/server/tasks/AgentEmitter.java b/server-common/src/main/java/io/a2a/server/tasks/AgentEmitter.java
new file mode 100644
index 000000000..57ff6372c
--- /dev/null
+++ b/server-common/src/main/java/io/a2a/server/tasks/AgentEmitter.java
@@ -0,0 +1,628 @@
+package io.a2a.server.tasks;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import io.a2a.server.agentexecution.RequestContext;
+import io.a2a.server.events.EventQueue;
+import io.a2a.spec.A2AError;
+import io.a2a.spec.Artifact;
+import io.a2a.spec.Event;
+import io.a2a.spec.Message;
+import io.a2a.spec.Part;
+import io.a2a.spec.Task;
+import io.a2a.spec.TaskArtifactUpdateEvent;
+import io.a2a.spec.TaskState;
+import io.a2a.spec.TaskStatus;
+import io.a2a.spec.TaskStatusUpdateEvent;
+import io.a2a.spec.TextPart;
+import org.jspecify.annotations.Nullable;
+
+/**
+ * Helper for emitting events from AgentExecutor implementations.
+ *
+ * AgentEmitter provides a simplified API for agents to communicate with clients through
+ * the A2A protocol. It handles both task lifecycle management and direct message sending,
+ * automatically populating events with correct task and context IDs from the RequestContext.
+ *
+ *
Core Capabilities
+ *
+ * - Task Lifecycle: {@link #submit()}, {@link #startWork()}, {@link #complete()},
+ * {@link #fail()}, {@link #cancel()}, {@link #reject()}
+ * - Message Sending: {@link #sendMessage(String)}, {@link #sendMessage(List)},
+ * {@link #sendMessage(List, Map)}
+ * - Artifact Streaming: {@link #addArtifact(List)}, {@link #addArtifact(List, String, String, Map)}
+ * - Auth/Input Requirements: {@link #requiresAuth()}, {@link #requiresInput()}
+ * - Custom Events: {@link #taskBuilder()}, {@link #messageBuilder()}, {@link #addTask(Task)}, {@link #emitEvent(Event)}
+ *
+ *
+ * Usage Patterns
+ *
+ * Simple Message Response (No Task)
+ * {@code
+ * public void execute(RequestContext context, AgentEmitter emitter) {
+ * String response = processRequest(context.getUserInput("\n"));
+ * emitter.sendMessage(response);
+ * }
+ * }
+ *
+ * Task Lifecycle with Artifacts
+ * {@code
+ * public void execute(RequestContext context, AgentEmitter emitter) {
+ * if (context.getTask() == null) {
+ * emitter.submit(); // Create task in SUBMITTED state
+ * }
+ * emitter.startWork(); // Transition to WORKING
+ *
+ * // Process and stream results
+ * List> results = doWork(context.getUserInput("\n"));
+ * emitter.addArtifact(results);
+ *
+ * emitter.complete(); // Mark as COMPLETED
+ * }
+ * }
+ *
+ * Streaming Response (LLM)
+ * {@code
+ * public void execute(RequestContext context, AgentEmitter emitter) {
+ * emitter.startWork();
+ *
+ * for (String chunk : llmService.stream(context.getUserInput("\n"))) {
+ * emitter.addArtifact(List.of(new TextPart(chunk)));
+ * }
+ *
+ * emitter.complete();
+ * }
+ * }
+ *
+ * Event ID Management
+ * All emitted events are automatically populated with:
+ *
+ * - taskId: From RequestContext (may be null for message-only responses)
+ * - contextId: From RequestContext
+ * - messageId: Generated UUID for messages
+ * - artifactId: Generated UUID for artifacts (unless explicitly provided)
+ *
+ *
+ * Events are validated by the EventQueue to ensure taskId correctness.
+ *
+ * @see io.a2a.server.agentexecution.AgentExecutor
+ * @see RequestContext
+ * @see EventQueue
+ * @since 1.0.0
+ */
+public class AgentEmitter {
+ private final EventQueue eventQueue;
+ private final @Nullable String taskId;
+ private final @Nullable String contextId;
+ private final AtomicBoolean terminalStateReached = new AtomicBoolean(false);
+
+ /**
+ * Creates a new AgentEmitter for the given request context and event queue.
+ *
+ * @param context the request context containing task and context IDs
+ * @param eventQueue the event queue for enqueueing events
+ */
+ public AgentEmitter(RequestContext context, EventQueue eventQueue) {
+ this.eventQueue = eventQueue;
+ this.taskId = context.getTaskId();
+ this.contextId = context.getContextId();
+ }
+
+ private void updateStatus(TaskState taskState) {
+ updateStatus(taskState, null, taskState.isFinal());
+ }
+
+ /**
+ * Updates the task status to the given state with an optional message.
+ *
+ * @param taskState the new task state
+ * @param message optional message to include with the status update
+ */
+ public void updateStatus(TaskState taskState, @Nullable Message message) {
+ updateStatus(taskState, message, taskState.isFinal());
+ }
+
+ /**
+ * Updates the task status to the given state with an optional message and finality flag.
+ *
+ * @param state the new task state
+ * @param message optional message to include with the status update
+ * @param isFinal whether this is a final status (prevents further updates)
+ */
+ private void updateStatus(TaskState state, @Nullable Message message, boolean isFinal) {
+ // Check terminal state first (fail fast)
+ if (terminalStateReached.get()) {
+ throw new IllegalStateException("Cannot update task status - terminal state already reached");
+ }
+
+ // For final states, atomically set the flag
+ if (isFinal) {
+ if (!terminalStateReached.compareAndSet(false, true)) {
+ throw new IllegalStateException("Cannot update task status - terminal state already reached");
+ }
+ }
+
+ TaskStatusUpdateEvent event = TaskStatusUpdateEvent.builder()
+ .taskId(taskId)
+ .contextId(contextId)
+ .status(new TaskStatus(state, message, null))
+ .build();
+ eventQueue.enqueueEvent(event);
+ }
+
+ /**
+ * Returns the context ID for this emitter.
+ *
+ * @return the context ID, or null if not available
+ */
+ public @Nullable String getContextId() {
+ return this.contextId;
+ }
+
+ /**
+ * Returns the task ID for this emitter.
+ *
+ * @return the task ID, or null if no task is associated
+ */
+ public @Nullable String getTaskId() {
+ return this.taskId;
+ }
+
+ /**
+ * Adds an artifact with the given parts to the task.
+ *
+ * @param parts the parts to include in the artifact
+ */
+ public void addArtifact(List> parts) {
+ addArtifact(parts, null, null, null);
+ }
+
+ /**
+ * Adds an artifact with the given parts, artifact ID, name, and metadata.
+ *
+ * @param parts the parts to include in the artifact
+ * @param artifactId optional artifact ID (generated if null)
+ * @param name optional artifact name
+ * @param metadata optional metadata map
+ */
+ public void addArtifact(List> parts, @Nullable String artifactId, @Nullable String name, @Nullable Map metadata) {
+ addArtifact(parts, artifactId, name, metadata, null, null);
+ }
+
+ /**
+ * Adds an artifact with all optional parameters.
+ *
+ * @param parts the parts to include in the artifact
+ * @param artifactId optional artifact ID (generated if null)
+ * @param name optional artifact name
+ * @param metadata optional metadata map
+ * @param append whether to append to an existing artifact
+ * @param lastChunk whether this is the last chunk in a streaming sequence
+ */
+ public void addArtifact(List> parts, @Nullable String artifactId, @Nullable String name, @Nullable Map metadata,
+ @Nullable Boolean append, @Nullable Boolean lastChunk) {
+ if (artifactId == null) {
+ artifactId = UUID.randomUUID().toString();
+ }
+ TaskArtifactUpdateEvent event = TaskArtifactUpdateEvent.builder()
+ .taskId(taskId)
+ .contextId(contextId)
+ .artifact(
+ Artifact.builder()
+ .artifactId(artifactId)
+ .name(name)
+ .parts(parts)
+ .metadata(metadata)
+ .build()
+ )
+ .append(append)
+ .lastChunk(lastChunk)
+ .build();
+ eventQueue.enqueueEvent(event);
+ }
+
+ /**
+ * Marks the task as COMPLETED.
+ */
+ public void complete() {
+ complete(null);
+ }
+
+ /**
+ * Marks the task as COMPLETED with an optional message.
+ *
+ * @param message optional message to include with completion
+ */
+ public void complete(@Nullable Message message) {
+ updateStatus(TaskState.COMPLETED, message);
+ }
+
+ /**
+ * Marks the task as FAILED.
+ */
+ public void fail() {
+ fail((Message) null);
+ }
+
+ /**
+ * Marks the task as FAILED with an optional message.
+ *
+ * @param message optional message to include with failure
+ */
+ public void fail(@Nullable Message message) {
+ updateStatus(TaskState.FAILED, message);
+ }
+
+ /**
+ * Enqueues an A2A error event which will automatically transition the task to FAILED.
+ *
+ * Use this when you need to fail the task with a specific A2A error (such as
+ * {@link io.a2a.spec.UnsupportedOperationError}, {@link io.a2a.spec.InvalidRequestError},
+ * {@link io.a2a.spec.TaskNotFoundError}, etc.) that should be sent to the client.
+ *
+ *
+ * The error event is enqueued and the MainEventBusProcessor will automatically transition
+ * the task to FAILED state. This ensures thread-safe state transitions without race conditions,
+ * as the single-threaded MainEventBusProcessor handles all state updates.
+ *
+ *
+ * Error events are terminal (stop event consumption) and trigger automatic FAILED state transition.
+ * The error details are sent to the originating client only, while the FAILED status is replicated
+ * to all nodes in multi-instance deployments.
+ *
+ * Example usage:
+ *
{@code
+ * public void execute(RequestContext context, AgentEmitter emitter) {
+ * if (!isSupported(context.getMessage())) {
+ * emitter.fail(new UnsupportedOperationError("Feature not supported"));
+ * return;
+ * }
+ * // ... normal processing
+ * }
+ * }
+ *
+ * @param error the A2A error to enqueue and send to the client
+ * @since 1.0.0
+ */
+ public void fail(A2AError error) {
+ // Set terminal state flag BEFORE enqueueing error
+ // This prevents race conditions where agent calls fail(error) then complete()
+ if (!terminalStateReached.compareAndSet(false, true)) {
+ throw new IllegalStateException("Cannot update task status - terminal state already reached");
+ }
+
+ eventQueue.enqueueEvent(error);
+ // Status transition happens automatically in MainEventBusProcessor
+ // The error event is terminal and will trigger FAILED state transition
+ }
+
+ /**
+ * Marks the task as SUBMITTED.
+ */
+ public void submit() {
+ submit(null);
+ }
+
+ /**
+ * Marks the task as SUBMITTED with an optional message.
+ *
+ * @param message optional message to include
+ */
+ public void submit(@Nullable Message message) {
+ updateStatus(TaskState.SUBMITTED, message);
+ }
+
+ /**
+ * Marks the task as WORKING (actively being processed).
+ */
+ public void startWork() {
+ startWork(null);
+ }
+
+ /**
+ * Marks the task as WORKING with an optional message.
+ *
+ * @param message optional message to include
+ */
+ public void startWork(@Nullable Message message) {
+ updateStatus(TaskState.WORKING, message);
+ }
+
+ /**
+ * Marks the task as CANCELED.
+ */
+ public void cancel() {
+ cancel(null);
+ }
+
+ /**
+ * Marks the task as CANCELED with an optional message.
+ *
+ * @param message optional message to include
+ */
+ public void cancel(@Nullable Message message) {
+ updateStatus(TaskState.CANCELED, message);
+ }
+
+ /**
+ * Marks the task as REJECTED.
+ */
+ public void reject() {
+ reject(null);
+ }
+
+ /**
+ * Marks the task as REJECTED with an optional message.
+ *
+ * @param message optional message to include
+ */
+ public void reject(@Nullable Message message) {
+ updateStatus(TaskState.REJECTED, message);
+ }
+
+ /**
+ * Marks the task as INPUT_REQUIRED, indicating the agent needs user input to continue.
+ */
+ public void requiresInput() {
+ requiresInput(null, false);
+ }
+
+ /**
+ * Marks the task as INPUT_REQUIRED with an optional message.
+ *
+ * @param message optional message to include
+ */
+ public void requiresInput(@Nullable Message message) {
+ requiresInput(message, false);
+ }
+
+ /**
+ * Marks the task as INPUT_REQUIRED with a finality flag.
+ *
+ * @param isFinal whether this is a final status (prevents further updates)
+ */
+ public void requiresInput(boolean isFinal) {
+ requiresInput(null, isFinal);
+ }
+
+ /**
+ * Marks the task as INPUT_REQUIRED with an optional message and finality flag.
+ *
+ * @param message optional message to include
+ * @param isFinal whether this is a final status (prevents further updates)
+ */
+ public void requiresInput(@Nullable Message message, boolean isFinal) {
+ updateStatus(TaskState.INPUT_REQUIRED, message, isFinal);
+ }
+
+ /**
+ * Marks the task as AUTH_REQUIRED, indicating the agent needs authentication to continue.
+ */
+ public void requiresAuth() {
+ requiresAuth(null, false);
+ }
+
+ /**
+ * Marks the task as AUTH_REQUIRED with an optional message.
+ *
+ * @param message optional message to include
+ */
+ public void requiresAuth(@Nullable Message message) {
+ requiresAuth(message, false);
+ }
+
+ /**
+ * Marks the task as AUTH_REQUIRED with a finality flag.
+ *
+ * @param isFinal whether this is a final status (prevents further updates)
+ */
+ public void requiresAuth(boolean isFinal) {
+ requiresAuth(null, isFinal);
+ }
+
+ /**
+ * Marks the task as AUTH_REQUIRED with an optional message and finality flag.
+ *
+ * @param message optional message to include
+ * @param isFinal whether this is a final status (prevents further updates)
+ */
+ public void requiresAuth(@Nullable Message message, boolean isFinal) {
+ updateStatus(TaskState.AUTH_REQUIRED, message, isFinal);
+ }
+
+ /**
+ * Creates a new agent message with the given parts and metadata.
+ * Pre-populates the message with agent role, task ID, context ID, and a generated message ID.
+ *
+ * @param parts the parts to include in the message
+ * @param metadata optional metadata to attach to the message
+ * @return a new Message object ready to be sent
+ */
+ public Message newAgentMessage(List> parts, @Nullable Map metadata) {
+ return Message.builder()
+ .role(Message.Role.AGENT)
+ .taskId(taskId)
+ .contextId(contextId)
+ .messageId(UUID.randomUUID().toString())
+ .metadata(metadata)
+ .parts(parts)
+ .build();
+ }
+
+ /**
+ * Sends a simple text message to the client.
+ * Convenience method for agents that respond with plain text without creating a task.
+ *
+ * @param text the text content to send
+ */
+ public void sendMessage(String text) {
+ sendMessage(List.of(new TextPart(text)));
+ }
+
+ /**
+ * Sends a message with custom parts (text, images, etc.) to the client.
+ * Use this for rich responses that don't require task lifecycle management.
+ *
+ * @param parts the message parts to send
+ */
+ public void sendMessage(List> parts) {
+ sendMessage(parts, null);
+ }
+
+ /**
+ * Sends a message with parts and metadata to the client.
+ * Creates an agent message with the current task and context IDs (if available)
+ * and enqueues it to the event queue.
+ *
+ * @param parts the message parts to send
+ * @param metadata optional metadata to attach to the message
+ */
+ public void sendMessage(List> parts, @Nullable Map metadata) {
+ Message message = newAgentMessage(parts, metadata);
+ eventQueue.enqueueEvent(message);
+ }
+
+ /**
+ * Sends an existing Message object directly to the client.
+ *
+ * Use this when you need to forward or echo an existing message without creating a new one.
+ * The message is enqueued as-is, preserving its messageId, metadata, and all other fields.
+ *
+ *
+ * Note: This is typically used for forwarding user messages or preserving specific
+ * message properties. For most cases, prefer {@link #sendMessage(String)} or
+ * {@link #sendMessage(List)} which create new agent messages with generated IDs.
+ *
+ * Example usage:
+ *
{@code
+ * public void execute(RequestContext context, AgentEmitter emitter) {
+ * // Echo the user's message back
+ * emitter.sendMessage(context.getMessage());
+ * }
+ * }
+ *
+ * @param message the message to send to the client
+ * @since 1.0.0
+ */
+ public void sendMessage(Message message) {
+ eventQueue.enqueueEvent(message);
+ }
+
+ /**
+ * Adds a custom Task object to be sent to the client.
+ *
+ * Use this when you need to create a Task with specific fields (history, artifacts, etc.)
+ * that the convenience methods like {@link #submit()}, {@link #startWork()}, or
+ * {@link #complete()} don't provide.
+ *
+ *
+ * Typical usage pattern: Build a task with {@link #taskBuilder()}, customize it,
+ * then add it with this method.
+ *
+ * Example usage:
+ *
{@code
+ * public void execute(RequestContext context, AgentEmitter emitter) {
+ * // Create a task with specific status and history
+ * Task task = emitter.taskBuilder()
+ * .status(new TaskStatus(TaskState.SUBMITTED))
+ * .history(List.of(context.getMessage()))
+ * .build();
+ * emitter.addTask(task);
+ * }
+ * }
+ *
+ * @param task the task to add
+ * @since 1.0.0
+ */
+ public void addTask(Task task) {
+ eventQueue.enqueueEvent(task);
+ }
+
+ /**
+ * Emits a custom Event object to the client.
+ *
+ * This is a general-purpose method for emitting any Event type. Most agents should use the
+ * convenience methods ({@link #sendMessage(String)}, {@link #addTask(Task)},
+ * {@link #addArtifact(List)}, {@link #complete()}, etc.), but this method provides flexibility
+ * for agents that need to create and emit custom events using the event builders.
+ *
+ * Example usage:
+ *
{@code
+ * public void execute(RequestContext context, AgentEmitter emitter) {
+ * // Create a custom TaskStatusUpdateEvent
+ * TaskStatusUpdateEvent event = TaskStatusUpdateEvent.builder()
+ * .taskId(context.getTaskId())
+ * .contextId(context.getContextId())
+ * .status(new TaskStatus(TaskState.WORKING))
+ * .isFinal(false)
+ * .build();
+ * emitter.emitEvent(event);
+ * }
+ * }
+ *
+ * @param event the event to emit
+ * @since 1.0.0
+ */
+ public void emitEvent(Event event) {
+ eventQueue.enqueueEvent(event);
+ }
+
+ /**
+ * Creates a Task.Builder pre-populated with the correct task and context IDs.
+ * Agents can customize other Task fields (status, artifacts, etc.) before calling build().
+ *
+ * Example usage:
+ *
{@code
+ * Task task = emitter.taskBuilder()
+ * .status(new TaskStatus(TaskState.WORKING))
+ * .build();
+ * }
+ *
+ * @return a Task.Builder with id and contextId already set
+ */
+ public Task.Builder taskBuilder() {
+ return Task.builder()
+ .id(taskId)
+ .contextId(contextId);
+ }
+
+ /**
+ * Creates a Message.Builder pre-populated with agent defaults.
+ * Sets taskId only if non-null (messages can exist independently of tasks).
+ *
+ * Pre-populated fields:
+ *
+ * - taskId - set only if this AgentEmitter has a non-null taskId
+ * - contextId - current context ID
+ * - role - Message.Role.AGENT
+ * - messageId - generated UUID
+ *
+ *
+ * Example usage:
+ *
{@code
+ * Message msg = emitter.messageBuilder()
+ * .parts(List.of(new TextPart("Hello")))
+ * .metadata(Map.of("key", "value"))
+ * .build();
+ * }
+ *
+ * @return a Message.Builder with common agent fields already set
+ */
+ public Message.Builder messageBuilder() {
+ Message.Builder builder = Message.builder()
+ .contextId(contextId)
+ .role(Message.Role.AGENT)
+ .messageId(UUID.randomUUID().toString());
+
+ // Only set taskId if present (messages can exist without tasks)
+ if (taskId != null) {
+ builder.taskId(taskId);
+ }
+
+ return builder;
+ }
+
+}
diff --git a/server-common/src/main/java/io/a2a/server/tasks/ResultAggregator.java b/server-common/src/main/java/io/a2a/server/tasks/ResultAggregator.java
index 506b3f3b6..b36868a63 100644
--- a/server-common/src/main/java/io/a2a/server/tasks/ResultAggregator.java
+++ b/server-common/src/main/java/io/a2a/server/tasks/ResultAggregator.java
@@ -166,7 +166,8 @@ public EventTypeAndInterrupt consumeAndBreakOnInterrupt(EventConsumer consumer,
// Determine interrupt behavior
boolean shouldInterrupt = false;
boolean isFinalEvent = (event instanceof Task task && task.status().state().isFinal())
- || (event instanceof TaskStatusUpdateEvent tsue && tsue.isFinal());
+ || (event instanceof TaskStatusUpdateEvent tsue && tsue.isFinal())
+ || (event instanceof A2AError); // A2AError events are terminal
boolean isAuthRequired = (event instanceof Task task && task.status().state() == TaskState.AUTH_REQUIRED)
|| (event instanceof TaskStatusUpdateEvent tsue && tsue.status().state() == TaskState.AUTH_REQUIRED);
diff --git a/server-common/src/main/java/io/a2a/server/tasks/TaskManager.java b/server-common/src/main/java/io/a2a/server/tasks/TaskManager.java
index 948ec596c..abb764755 100644
--- a/server-common/src/main/java/io/a2a/server/tasks/TaskManager.java
+++ b/server-common/src/main/java/io/a2a/server/tasks/TaskManager.java
@@ -1,5 +1,6 @@
package io.a2a.server.tasks;
+import static io.a2a.spec.TaskState.FAILED;
import static io.a2a.spec.TaskState.SUBMITTED;
import static io.a2a.util.Assert.checkNotNullParam;
import static io.a2a.util.Utils.appendArtifactToTask;
@@ -10,6 +11,7 @@
import java.util.List;
import java.util.Map;
+import io.a2a.spec.A2AError;
import io.a2a.spec.A2AServerException;
import io.a2a.spec.Event;
import io.a2a.spec.InternalError;
@@ -112,6 +114,42 @@ public boolean process(Event event, boolean isReplicated) throws A2AServerExcept
isFinal = saveTaskEvent(taskStatusUpdateEvent, isReplicated);
} else if (event instanceof TaskArtifactUpdateEvent taskArtifactUpdateEvent) {
isFinal = saveTaskEvent(taskArtifactUpdateEvent, isReplicated);
+ } else if (event instanceof A2AError) {
+ // A2AError events trigger automatic transition to FAILED state
+ // Error details are NOT persisted in TaskStore (client-specific)
+ // Only the FAILED status is persisted and replicated across nodes
+
+ // A2AError events don't have taskId/contextId fields, so we need to ensure
+ // we have these from the existing task or TaskManager state
+ if (taskId == null) {
+ // No task context - A2AError event will be distributed to clients but no state update
+ LOGGER.debug("A2AError event without task context - skipping state update");
+ return true; // Return true (is final) to stop event consumption
+ }
+
+ // Ensure we have contextId - get from existing task if not set
+ String errorContextId = contextId;
+ if (errorContextId == null) {
+ Task existingTask = getTask();
+ if (existingTask != null) {
+ errorContextId = existingTask.contextId();
+ }
+ }
+
+ // Only create status update if we have contextId
+ if (errorContextId != null) {
+ LOGGER.debug("A2AError event detected, transitioning task {} to FAILED", taskId);
+ TaskStatusUpdateEvent failedEvent = TaskStatusUpdateEvent.builder()
+ .taskId(taskId)
+ .contextId(errorContextId)
+ .status(new TaskStatus(FAILED))
+ .build();
+ isFinal = saveTaskEvent(failedEvent, isReplicated);
+ } else {
+ // Can't update status without contextId, but error is still terminal
+ LOGGER.debug("A2AError event for task {} without contextId - skipping state update", taskId);
+ isFinal = true;
+ }
}
return isFinal;
}
diff --git a/server-common/src/main/java/io/a2a/server/tasks/TaskUpdater.java b/server-common/src/main/java/io/a2a/server/tasks/TaskUpdater.java
deleted file mode 100644
index c28e55c5c..000000000
--- a/server-common/src/main/java/io/a2a/server/tasks/TaskUpdater.java
+++ /dev/null
@@ -1,190 +0,0 @@
-package io.a2a.server.tasks;
-
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import io.a2a.server.agentexecution.RequestContext;
-import io.a2a.server.events.EventQueue;
-import io.a2a.spec.Artifact;
-import io.a2a.spec.Message;
-import io.a2a.spec.Part;
-import io.a2a.spec.TaskArtifactUpdateEvent;
-import io.a2a.spec.TaskState;
-import io.a2a.spec.TaskStatus;
-import io.a2a.spec.TaskStatusUpdateEvent;
-import org.jspecify.annotations.Nullable;
-
-public class TaskUpdater {
- private final EventQueue eventQueue;
- private final @Nullable String taskId;
- private final @Nullable String contextId;
- private final AtomicBoolean terminalStateReached = new AtomicBoolean(false);
- private final Object stateLock = new Object();
-
- public TaskUpdater(RequestContext context, EventQueue eventQueue) {
- this.eventQueue = eventQueue;
- this.taskId = context.getTaskId();
- this.contextId = context.getContextId();
- }
-
- private void updateStatus(TaskState taskState) {
- updateStatus(taskState, null, taskState.isFinal());
- }
-
- public void updateStatus(TaskState taskState, @Nullable Message message) {
- updateStatus(taskState, message, taskState.isFinal());
- }
-
- public void updateStatus(TaskState state, @Nullable Message message, boolean isFinal) {
- synchronized (stateLock) {
- // Check if we're already in a terminal state
- if (terminalStateReached.get()) {
- throw new IllegalStateException("Cannot update task status - terminal state already reached");
- }
-
- // If this is a final state, set the flag
- if (isFinal) {
- terminalStateReached.set(true);
- }
-
- TaskStatusUpdateEvent event = TaskStatusUpdateEvent.builder()
- .taskId(taskId)
- .contextId(contextId)
- .status(new TaskStatus(state, message, null))
- .build();
- eventQueue.enqueueEvent(event);
- }
- }
-
- public @Nullable String getContextId() {
- return this.contextId;
- }
-
- public @Nullable String getTaskId() {
- return this.taskId;
- }
-
- public void addArtifact(List> parts) {
- addArtifact(parts, null, null, null);
- }
-
- public void addArtifact(List> parts, @Nullable String artifactId, @Nullable String name, @Nullable Map metadata) {
- addArtifact(parts, artifactId, name, metadata, null, null);
- }
-
- public void addArtifact(List> parts, @Nullable String artifactId, @Nullable String name, @Nullable Map metadata,
- @Nullable Boolean append, @Nullable Boolean lastChunk) {
- if (artifactId == null) {
- artifactId = UUID.randomUUID().toString();
- }
- TaskArtifactUpdateEvent event = TaskArtifactUpdateEvent.builder()
- .taskId(taskId)
- .contextId(contextId)
- .artifact(
- Artifact.builder()
- .artifactId(artifactId)
- .name(name)
- .parts(parts)
- .metadata(metadata)
- .build()
- )
- .append(append)
- .lastChunk(lastChunk)
- .build();
- eventQueue.enqueueEvent(event);
- }
-
- public void complete() {
- complete(null);
- }
-
- public void complete(@Nullable Message message) {
- updateStatus(TaskState.COMPLETED, message);
- }
-
- public void fail() {
- fail(null);
- }
-
- public void fail(@Nullable Message message) {
- updateStatus(TaskState.FAILED, message);
- }
-
- public void submit() {
- submit(null);
- }
-
- public void submit(@Nullable Message message) {
- updateStatus(TaskState.SUBMITTED, message);
- }
-
- public void startWork() {
- startWork(null);
- }
-
- public void startWork(@Nullable Message message) {
- updateStatus(TaskState.WORKING, message);
- }
-
- public void cancel() {
- cancel(null);
- }
-
- public void cancel(@Nullable Message message) {
- updateStatus(TaskState.CANCELED, message);
- }
-
- public void reject() {
- reject(null);
- }
-
- public void reject(@Nullable Message message) {
- updateStatus(TaskState.REJECTED, message);
- }
-
- public void requiresInput() {
- requiresInput(null, false);
- }
-
- public void requiresInput(@Nullable Message message) {
- requiresInput(message, false);
- }
-
- public void requiresInput(boolean isFinal) {
- requiresInput(null, isFinal);
- }
-
- public void requiresInput(@Nullable Message message, boolean isFinal) {
- updateStatus(TaskState.INPUT_REQUIRED, message, isFinal);
- }
-
- public void requiresAuth() {
- requiresAuth(null, false);
- }
-
- public void requiresAuth(@Nullable Message message) {
- requiresAuth(message, false);
- }
-
- public void requiresAuth(boolean isFinal) {
- requiresAuth(null, isFinal);
- }
-
- public void requiresAuth(@Nullable Message message, boolean isFinal) {
- updateStatus(TaskState.AUTH_REQUIRED, message, isFinal);
- }
-
- public Message newAgentMessage(List> parts, @Nullable Map metadata) {
- return Message.builder()
- .role(Message.Role.AGENT)
- .taskId(taskId)
- .contextId(contextId)
- .messageId(UUID.randomUUID().toString())
- .metadata(metadata)
- .parts(parts)
- .build();
- }
-
-}
diff --git a/server-common/src/test/java/io/a2a/server/events/EnhancedRunnableTest.java b/server-common/src/test/java/io/a2a/server/events/EnhancedRunnableTest.java
new file mode 100644
index 000000000..2b1913355
--- /dev/null
+++ b/server-common/src/test/java/io/a2a/server/events/EnhancedRunnableTest.java
@@ -0,0 +1,136 @@
+package io.a2a.server.events;
+
+import org.junit.jupiter.api.Test;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+/**
+ * Tests for EnhancedRunnable to verify callback registration enforcement.
+ * These tests ensure that callbacks cannot be added after execution starts,
+ * preventing race conditions in agent execution.
+ */
+public class EnhancedRunnableTest {
+
+ @Test
+ public void testCannotAddCallbackAfterStart() {
+ EnhancedRunnable runnable = new EnhancedRunnable() {
+ @Override
+ public void run() {
+ // Empty
+ }
+ };
+
+ // Add callback before start - should succeed
+ AtomicBoolean called = new AtomicBoolean(false);
+ runnable.addDoneCallback((r) -> called.set(true));
+
+ // Mark as started
+ runnable.markStarted();
+
+ // Try to add callback after start - should fail
+ IllegalStateException exception = assertThrows(IllegalStateException.class,
+ () -> runnable.addDoneCallback((r) -> {}));
+
+ assertTrue(exception.getMessage().contains("Cannot add callback after runnable has started"));
+ }
+
+ @Test
+ public void testCallbacksInvokedAfterCompletion() throws Exception {
+ EnhancedRunnable runnable = new EnhancedRunnable() {
+ @Override
+ public void run() {
+ // Empty
+ }
+ };
+
+ AtomicBoolean callback1Called = new AtomicBoolean(false);
+ AtomicBoolean callback2Called = new AtomicBoolean(false);
+
+ runnable.addDoneCallback((r) -> callback1Called.set(true));
+ runnable.addDoneCallback((r) -> callback2Called.set(true));
+ runnable.markStarted();
+
+ CompletableFuture.runAsync(runnable, Executors.newSingleThreadExecutor())
+ .thenRun(runnable::invokeDoneCallbacks)
+ .get();
+
+ assertTrue(callback1Called.get());
+ assertTrue(callback2Called.get());
+ }
+
+ @Test
+ public void testMultipleCallbacksBeforeStart() {
+ EnhancedRunnable runnable = new EnhancedRunnable() {
+ @Override
+ public void run() {
+ // Empty
+ }
+ };
+
+ // Should be able to add multiple callbacks before start
+ assertDoesNotThrow(() -> {
+ runnable.addDoneCallback((r) -> {});
+ runnable.addDoneCallback((r) -> {});
+ runnable.addDoneCallback((r) -> {});
+ });
+
+ // Mark as started
+ runnable.markStarted();
+
+ // Now adding should fail
+ assertThrows(IllegalStateException.class,
+ () -> runnable.addDoneCallback((r) -> {}));
+ }
+
+ @Test
+ public void testErrorHandling() {
+ EnhancedRunnable runnable = new EnhancedRunnable() {
+ @Override
+ public void run() {
+ throw new RuntimeException("Test error");
+ }
+ };
+
+ AtomicBoolean callbackInvoked = new AtomicBoolean(false);
+ runnable.addDoneCallback((r) -> {
+ callbackInvoked.set(true);
+ assertNotNull(r.getError());
+ assertEquals("Test error", r.getError().getMessage());
+ });
+
+ runnable.markStarted();
+
+ try {
+ runnable.run();
+ } catch (RuntimeException e) {
+ runnable.setError(e);
+ }
+
+ runnable.invokeDoneCallbacks();
+ assertTrue(callbackInvoked.get());
+ }
+
+ @Test
+ public void testMarkStartedIdempotent() {
+ EnhancedRunnable runnable = new EnhancedRunnable() {
+ @Override
+ public void run() {
+ // Empty
+ }
+ };
+
+ // Should be able to call markStarted multiple times
+ assertDoesNotThrow(() -> {
+ runnable.markStarted();
+ runnable.markStarted();
+ runnable.markStarted();
+ });
+
+ // But callbacks should still be blocked
+ assertThrows(IllegalStateException.class,
+ () -> runnable.addDoneCallback((r) -> {}));
+ }
+}
diff --git a/server-common/src/test/java/io/a2a/server/requesthandlers/AbstractA2ARequestHandlerTest.java b/server-common/src/test/java/io/a2a/server/requesthandlers/AbstractA2ARequestHandlerTest.java
index e428c679e..39ff07f09 100644
--- a/server-common/src/test/java/io/a2a/server/requesthandlers/AbstractA2ARequestHandlerTest.java
+++ b/server-common/src/test/java/io/a2a/server/requesthandlers/AbstractA2ARequestHandlerTest.java
@@ -23,7 +23,7 @@
import io.a2a.jsonrpc.common.json.JsonUtil;
import io.a2a.server.agentexecution.AgentExecutor;
import io.a2a.server.agentexecution.RequestContext;
-import io.a2a.server.events.EventQueue;
+import io.a2a.server.tasks.AgentEmitter;
import io.a2a.server.events.EventQueueItem;
import io.a2a.server.events.EventQueueUtil;
import io.a2a.server.events.InMemoryQueueManager;
@@ -87,16 +87,16 @@ public class AbstractA2ARequestHandlerTest {
public void init() {
executor = new AgentExecutor() {
@Override
- public void execute(RequestContext context, EventQueue eventQueue) throws A2AError {
+ public void execute(RequestContext context, AgentEmitter agentEmitter) throws A2AError {
if (agentExecutorExecute != null) {
- agentExecutorExecute.invoke(context, eventQueue);
+ agentExecutorExecute.invoke(context, agentEmitter);
}
}
@Override
- public void cancel(RequestContext context, EventQueue eventQueue) throws A2AError {
+ public void cancel(RequestContext context, AgentEmitter agentEmitter) throws A2AError {
if (agentExecutorCancel != null) {
- agentExecutorCancel.invoke(context, eventQueue);
+ agentExecutorCancel.invoke(context, agentEmitter);
}
}
};
@@ -166,7 +166,7 @@ private static String loadPreferredTransportFromProperties() {
}
protected interface AgentExecutorMethod {
- void invoke(RequestContext context, EventQueue eventQueue) throws A2AError;
+ void invoke(RequestContext context, AgentEmitter agentEmitter) throws A2AError;
}
/**
diff --git a/server-common/src/test/java/io/a2a/server/tasks/AgentEmitterConcurrencyTest.java b/server-common/src/test/java/io/a2a/server/tasks/AgentEmitterConcurrencyTest.java
new file mode 100644
index 000000000..eafcc712a
--- /dev/null
+++ b/server-common/src/test/java/io/a2a/server/tasks/AgentEmitterConcurrencyTest.java
@@ -0,0 +1,248 @@
+package io.a2a.server.tasks;
+
+import io.a2a.server.agentexecution.RequestContext;
+import io.a2a.server.events.EventQueue;
+import io.a2a.spec.UnsupportedOperationError;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.Mockito.*;
+
+/**
+ * Concurrency tests for AgentEmitter to verify thread-safety of terminal state management.
+ * These tests ensure that the AtomicBoolean-based terminal state flag prevents race conditions
+ * when multiple threads attempt to set terminal states concurrently.
+ */
+public class AgentEmitterConcurrencyTest {
+
+ @Test
+ public void testConcurrentTerminalStateUpdates() throws InterruptedException {
+ // Setup
+ RequestContext context = mock(RequestContext.class);
+ when(context.getTaskId()).thenReturn("test-task-123");
+ when(context.getContextId()).thenReturn("test-context-456");
+
+ EventQueue eventQueue = mock(EventQueue.class);
+ AgentEmitter emitter = new AgentEmitter(context, eventQueue);
+
+ // Test concurrent completion attempts
+ int threadCount = 10;
+ ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+ CountDownLatch startLatch = new CountDownLatch(1);
+ CountDownLatch doneLatch = new CountDownLatch(threadCount);
+ AtomicInteger successCount = new AtomicInteger(0);
+ AtomicInteger failureCount = new AtomicInteger(0);
+
+ for (int i = 0; i < threadCount; i++) {
+ executor.submit(() -> {
+ try {
+ startLatch.await(); // Wait for all threads to be ready
+ emitter.complete();
+ successCount.incrementAndGet();
+ } catch (IllegalStateException e) {
+ failureCount.incrementAndGet();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ doneLatch.countDown();
+ }
+ });
+ }
+
+ startLatch.countDown(); // Start all threads simultaneously
+ assertTrue(doneLatch.await(5, TimeUnit.SECONDS), "All threads should complete");
+
+ // Verify: exactly one success, rest failures
+ assertEquals(1, successCount.get(), "Exactly one thread should succeed");
+ assertEquals(threadCount - 1, failureCount.get(), "All other threads should fail");
+
+ // Verify: only one event was enqueued
+ verify(eventQueue, times(1)).enqueueEvent(any());
+
+ executor.shutdown();
+ }
+
+ @Test
+ public void testConcurrentMixedTerminalStates() throws InterruptedException {
+ // Setup
+ RequestContext context = mock(RequestContext.class);
+ when(context.getTaskId()).thenReturn("test-task-123");
+ when(context.getContextId()).thenReturn("test-context-456");
+
+ EventQueue eventQueue = mock(EventQueue.class);
+ AgentEmitter emitter = new AgentEmitter(context, eventQueue);
+
+ // Test concurrent different terminal state attempts
+ ExecutorService executor = Executors.newFixedThreadPool(3);
+ CountDownLatch startLatch = new CountDownLatch(1);
+ CountDownLatch doneLatch = new CountDownLatch(3);
+ AtomicInteger successCount = new AtomicInteger(0);
+
+ // Thread 1: complete
+ executor.submit(() -> {
+ try {
+ startLatch.await();
+ emitter.complete();
+ successCount.incrementAndGet();
+ } catch (Exception e) {
+ // Expected for 2 out of 3 threads
+ } finally {
+ doneLatch.countDown();
+ }
+ });
+
+ // Thread 2: fail
+ executor.submit(() -> {
+ try {
+ startLatch.await();
+ emitter.fail();
+ successCount.incrementAndGet();
+ } catch (Exception e) {
+ // Expected for 2 out of 3 threads
+ } finally {
+ doneLatch.countDown();
+ }
+ });
+
+ // Thread 3: cancel
+ executor.submit(() -> {
+ try {
+ startLatch.await();
+ emitter.cancel();
+ successCount.incrementAndGet();
+ } catch (Exception e) {
+ // Expected for 2 out of 3 threads
+ } finally {
+ doneLatch.countDown();
+ }
+ });
+
+ startLatch.countDown();
+ assertTrue(doneLatch.await(5, TimeUnit.SECONDS));
+
+ // Verify: exactly one success
+ assertEquals(1, successCount.get(), "Exactly one terminal state should succeed");
+ verify(eventQueue, times(1)).enqueueEvent(any());
+
+ executor.shutdown();
+ }
+
+ @Test
+ public void testConcurrentFailWithErrorAndComplete() throws InterruptedException {
+ // Setup
+ RequestContext context = mock(RequestContext.class);
+ when(context.getTaskId()).thenReturn("test-task-123");
+ when(context.getContextId()).thenReturn("test-context-456");
+
+ EventQueue eventQueue = mock(EventQueue.class);
+ AgentEmitter emitter = new AgentEmitter(context, eventQueue);
+
+ ExecutorService executor = Executors.newFixedThreadPool(2);
+ CountDownLatch startLatch = new CountDownLatch(1);
+ CountDownLatch doneLatch = new CountDownLatch(2);
+ AtomicInteger successCount = new AtomicInteger(0);
+
+ executor.submit(() -> {
+ try {
+ startLatch.await();
+ emitter.fail(new UnsupportedOperationError());
+ successCount.incrementAndGet();
+ } catch (Exception e) {
+ // Expected for one thread
+ } finally {
+ doneLatch.countDown();
+ }
+ });
+
+ executor.submit(() -> {
+ try {
+ startLatch.await();
+ emitter.complete();
+ successCount.incrementAndGet();
+ } catch (Exception e) {
+ // Expected for one thread
+ } finally {
+ doneLatch.countDown();
+ }
+ });
+
+ startLatch.countDown();
+ assertTrue(doneLatch.await(5, TimeUnit.SECONDS));
+ assertEquals(1, successCount.get(), "Exactly one terminal operation should succeed");
+
+ executor.shutdown();
+ }
+
+ @Test
+ public void testFailWithErrorSetsTerminalState() {
+ // Setup
+ RequestContext context = mock(RequestContext.class);
+ when(context.getTaskId()).thenReturn("test-task-123");
+ when(context.getContextId()).thenReturn("test-context-456");
+
+ EventQueue eventQueue = mock(EventQueue.class);
+ AgentEmitter emitter = new AgentEmitter(context, eventQueue);
+
+ // Call fail with error
+ emitter.fail(new UnsupportedOperationError());
+
+ // Verify terminal state is set - subsequent calls should throw
+ IllegalStateException exception = assertThrows(IllegalStateException.class,
+ () -> emitter.complete());
+ assertEquals("Cannot update task status - terminal state already reached",
+ exception.getMessage());
+ }
+
+ @Test
+ public void testFailWithErrorThenFailWithMessage() {
+ // Setup
+ RequestContext context = mock(RequestContext.class);
+ when(context.getTaskId()).thenReturn("test-task-123");
+ when(context.getContextId()).thenReturn("test-context-456");
+
+ EventQueue eventQueue = mock(EventQueue.class);
+ AgentEmitter emitter = new AgentEmitter(context, eventQueue);
+
+ // Call fail with error
+ emitter.fail(new UnsupportedOperationError());
+
+ // Second fail should throw
+ IllegalStateException exception = assertThrows(IllegalStateException.class,
+ () -> emitter.fail());
+ assertEquals("Cannot update task status - terminal state already reached",
+ exception.getMessage());
+ }
+
+ @Test
+ public void testNonTerminalThenTerminalState() throws InterruptedException {
+ // Setup
+ RequestContext context = mock(RequestContext.class);
+ when(context.getTaskId()).thenReturn("test-task-123");
+ when(context.getContextId()).thenReturn("test-context-456");
+
+ EventQueue eventQueue = mock(EventQueue.class);
+ AgentEmitter emitter = new AgentEmitter(context, eventQueue);
+
+ // Non-terminal states should work
+ emitter.submit();
+ emitter.startWork();
+
+ // Terminal state should work
+ emitter.complete();
+
+ // Verify events were enqueued
+ verify(eventQueue, times(3)).enqueueEvent(any());
+
+ // Further updates should fail
+ IllegalStateException exception = assertThrows(IllegalStateException.class,
+ () -> emitter.startWork());
+ assertEquals("Cannot update task status - terminal state already reached",
+ exception.getMessage());
+ }
+}
diff --git a/server-common/src/test/java/io/a2a/server/tasks/TaskUpdaterTest.java b/server-common/src/test/java/io/a2a/server/tasks/AgentEmitterTest.java
similarity index 87%
rename from server-common/src/test/java/io/a2a/server/tasks/TaskUpdaterTest.java
rename to server-common/src/test/java/io/a2a/server/tasks/AgentEmitterTest.java
index a5ec77f01..853037a9c 100644
--- a/server-common/src/test/java/io/a2a/server/tasks/TaskUpdaterTest.java
+++ b/server-common/src/test/java/io/a2a/server/tasks/AgentEmitterTest.java
@@ -30,7 +30,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-public class TaskUpdaterTest {
+public class AgentEmitterTest {
public static final String TEST_TASK_ID = "test-task-id";
public static final String TEST_TASK_CONTEXT_ID = "test-task-context-id";
@@ -48,7 +48,7 @@ public class TaskUpdaterTest {
EventQueue eventQueue;
private MainEventBus mainEventBus;
private MainEventBusProcessor mainEventBusProcessor;
- private TaskUpdater taskUpdater;
+ private AgentEmitter agentEmitter;
@@ -69,7 +69,7 @@ public void init() {
.setTaskId(TEST_TASK_ID)
.setContextId(TEST_TASK_CONTEXT_ID)
.build();
- taskUpdater = new TaskUpdater(context, eventQueue);
+ agentEmitter = new AgentEmitter(context, eventQueue);
}
@AfterEach
@@ -81,7 +81,7 @@ public void cleanup() {
@Test
public void testAddArtifactWithCustomIdAndName() throws Exception {
- taskUpdater.addArtifact(SAMPLE_PARTS, "custom-artifact-id", "Custom Artifact", null);
+ agentEmitter.addArtifact(SAMPLE_PARTS, "custom-artifact-id", "Custom Artifact", null);
EventQueueItem item = eventQueue.dequeueEventItem(5000);
assertNotNull(item);
Event event = item.getEvent();
@@ -101,147 +101,147 @@ public void testAddArtifactWithCustomIdAndName() throws Exception {
@Test
public void testCompleteWithoutMessage() throws Exception {
- taskUpdater.complete();
+ agentEmitter.complete();
checkTaskStatusUpdateEventOnQueue(true, TaskState.COMPLETED, null);
}
@Test
public void testCompleteWithMessage() throws Exception {
- taskUpdater.complete(SAMPLE_MESSAGE);
+ agentEmitter.complete(SAMPLE_MESSAGE);
checkTaskStatusUpdateEventOnQueue(true, TaskState.COMPLETED, SAMPLE_MESSAGE);
}
@Test
public void testSubmitWithoutMessage() throws Exception {
- taskUpdater.submit();
+ agentEmitter.submit();
checkTaskStatusUpdateEventOnQueue(false, TaskState.SUBMITTED, null);
}
@Test
public void testSubmitWithMessage() throws Exception {
- taskUpdater.submit(SAMPLE_MESSAGE);
+ agentEmitter.submit(SAMPLE_MESSAGE);
checkTaskStatusUpdateEventOnQueue(false, TaskState.SUBMITTED, SAMPLE_MESSAGE);
}
@Test
public void testStartWorkWithoutMessage() throws Exception {
- taskUpdater.startWork();
+ agentEmitter.startWork();
checkTaskStatusUpdateEventOnQueue(false, TaskState.WORKING, null);
}
@Test
public void testStartWorkWithMessage() throws Exception {
- taskUpdater.startWork(SAMPLE_MESSAGE);
+ agentEmitter.startWork(SAMPLE_MESSAGE);
checkTaskStatusUpdateEventOnQueue(false, TaskState.WORKING, SAMPLE_MESSAGE);
}
@Test
public void testFailedWithoutMessage() throws Exception {
- taskUpdater.fail();
+ agentEmitter.fail();
checkTaskStatusUpdateEventOnQueue(true, TaskState.FAILED, null);
}
@Test
public void testFailedWithMessage() throws Exception {
- taskUpdater.fail(SAMPLE_MESSAGE);
+ agentEmitter.fail(SAMPLE_MESSAGE);
checkTaskStatusUpdateEventOnQueue(true, TaskState.FAILED, SAMPLE_MESSAGE);
}
@Test
public void testCanceledWithoutMessage() throws Exception {
- taskUpdater.cancel();
+ agentEmitter.cancel();
checkTaskStatusUpdateEventOnQueue(true, TaskState.CANCELED, null);
}
@Test
public void testCanceledWithMessage() throws Exception {
- taskUpdater.cancel(SAMPLE_MESSAGE);
+ agentEmitter.cancel(SAMPLE_MESSAGE);
checkTaskStatusUpdateEventOnQueue(true, TaskState.CANCELED, SAMPLE_MESSAGE);
}
@Test
public void testRejectWithoutMessage() throws Exception {
- taskUpdater.reject();
+ agentEmitter.reject();
checkTaskStatusUpdateEventOnQueue(true, TaskState.REJECTED, null);
}
@Test
public void testRejectWithMessage() throws Exception {
- taskUpdater.reject(SAMPLE_MESSAGE);
+ agentEmitter.reject(SAMPLE_MESSAGE);
checkTaskStatusUpdateEventOnQueue(true, TaskState.REJECTED, SAMPLE_MESSAGE);
}
@Test
public void testRequiresInputWithoutMessage() throws Exception {
- taskUpdater.requiresInput();
+ agentEmitter.requiresInput();
checkTaskStatusUpdateEventOnQueue(false, TaskState.INPUT_REQUIRED, null);
}
@Test
public void testRequiresInputWithMessage() throws Exception {
- taskUpdater.requiresInput(SAMPLE_MESSAGE);
+ agentEmitter.requiresInput(SAMPLE_MESSAGE);
checkTaskStatusUpdateEventOnQueue(false, TaskState.INPUT_REQUIRED, SAMPLE_MESSAGE);
}
@Test
public void testRequiresInputWithFinalTrue() throws Exception {
- taskUpdater.requiresInput(true);
+ agentEmitter.requiresInput(true);
checkTaskStatusUpdateEventOnQueue(false, TaskState.INPUT_REQUIRED, null);
}
@Test
public void testRequiresInputWithMessageAndFinalTrue() throws Exception {
- taskUpdater.requiresInput(SAMPLE_MESSAGE, true);
+ agentEmitter.requiresInput(SAMPLE_MESSAGE, true);
checkTaskStatusUpdateEventOnQueue(false, TaskState.INPUT_REQUIRED, SAMPLE_MESSAGE);
}
@Test
public void testRequiresAuthWithoutMessage() throws Exception {
- taskUpdater.requiresAuth();
+ agentEmitter.requiresAuth();
checkTaskStatusUpdateEventOnQueue(false, TaskState.AUTH_REQUIRED, null);
}
@Test
public void testRequiresAuthWithMessage() throws Exception {
- taskUpdater.requiresAuth(SAMPLE_MESSAGE);
+ agentEmitter.requiresAuth(SAMPLE_MESSAGE);
checkTaskStatusUpdateEventOnQueue(false, TaskState.AUTH_REQUIRED, SAMPLE_MESSAGE);
}
@Test
public void testRequiresAuthWithFinalTrue() throws Exception {
- taskUpdater.requiresAuth(true);
+ agentEmitter.requiresAuth(true);
checkTaskStatusUpdateEventOnQueue(false, TaskState.AUTH_REQUIRED, null);
}
@Test
public void testRequiresAuthWithMessageAndFinalTrue() throws Exception {
- taskUpdater.requiresAuth(SAMPLE_MESSAGE, true);
+ agentEmitter.requiresAuth(SAMPLE_MESSAGE, true);
checkTaskStatusUpdateEventOnQueue(false, TaskState.AUTH_REQUIRED, SAMPLE_MESSAGE);
}
@Test
public void testNonTerminalStateUpdatesAllowed() throws Exception {
// Non-terminal states should be allowed multiple times
- taskUpdater.submit();
+ agentEmitter.submit();
checkTaskStatusUpdateEventOnQueue(false, TaskState.SUBMITTED, null);
- taskUpdater.startWork();
+ agentEmitter.startWork();
checkTaskStatusUpdateEventOnQueue(false, TaskState.WORKING, null);
- taskUpdater.requiresInput();
+ agentEmitter.requiresInput();
checkTaskStatusUpdateEventOnQueue(false, TaskState.INPUT_REQUIRED, null);
- taskUpdater.requiresAuth();
+ agentEmitter.requiresAuth();
checkTaskStatusUpdateEventOnQueue(false, TaskState.AUTH_REQUIRED, null);
// Should still be able to complete
- taskUpdater.complete();
+ agentEmitter.complete();
checkTaskStatusUpdateEventOnQueue(true, TaskState.COMPLETED, null);
}
@Test
public void testNewAgentMessage() throws Exception {
- Message message = taskUpdater.newAgentMessage(SAMPLE_PARTS, null);
+ Message message = agentEmitter.newAgentMessage(SAMPLE_PARTS, null);
assertEquals(AGENT, message.role());
assertEquals(TEST_TASK_ID, message.taskId());
@@ -254,7 +254,7 @@ public void testNewAgentMessage() throws Exception {
@Test
public void testNewAgentMessageWithMetadata() throws Exception {
Map metadata = Map.of("key", "value");
- Message message = taskUpdater.newAgentMessage(SAMPLE_PARTS, metadata);
+ Message message = agentEmitter.newAgentMessage(SAMPLE_PARTS, metadata);
assertEquals(AGENT, message.role());
assertEquals(TEST_TASK_ID, message.taskId());
@@ -266,7 +266,7 @@ public void testNewAgentMessageWithMetadata() throws Exception {
@Test
public void testAddArtifactWithAppendTrue() throws Exception {
- taskUpdater.addArtifact(SAMPLE_PARTS, "artifact-id", "Test Artifact", null, true, null);
+ agentEmitter.addArtifact(SAMPLE_PARTS, "artifact-id", "Test Artifact", null, true, null);
EventQueueItem item = eventQueue.dequeueEventItem(5000);
assertNotNull(item);
Event event = item.getEvent();
@@ -287,7 +287,7 @@ public void testAddArtifactWithAppendTrue() throws Exception {
@Test
public void testAddArtifactWithLastChunkTrue() throws Exception {
- taskUpdater.addArtifact(SAMPLE_PARTS, "artifact-id", "Test Artifact", null, null, true);
+ agentEmitter.addArtifact(SAMPLE_PARTS, "artifact-id", "Test Artifact", null, null, true);
EventQueueItem item = eventQueue.dequeueEventItem(5000);
assertNotNull(item);
Event event = item.getEvent();
@@ -304,7 +304,7 @@ public void testAddArtifactWithLastChunkTrue() throws Exception {
@Test
public void testAddArtifactWithAppendAndLastChunk() throws Exception {
- taskUpdater.addArtifact(SAMPLE_PARTS, "artifact-id", "Test Artifact", null, true, false);
+ agentEmitter.addArtifact(SAMPLE_PARTS, "artifact-id", "Test Artifact", null, true, false);
EventQueueItem item = eventQueue.dequeueEventItem(5000);
assertNotNull(item);
Event event = item.getEvent();
@@ -320,7 +320,7 @@ public void testAddArtifactWithAppendAndLastChunk() throws Exception {
@Test
public void testAddArtifactGeneratesIdWhenNull() throws Exception {
- taskUpdater.addArtifact(SAMPLE_PARTS, null, "Test Artifact", null);
+ agentEmitter.addArtifact(SAMPLE_PARTS, null, "Test Artifact", null);
EventQueueItem item = eventQueue.dequeueEventItem(5000);
assertNotNull(item);
Event event = item.getEvent();
@@ -340,11 +340,11 @@ public void testAddArtifactGeneratesIdWhenNull() throws Exception {
@Test
public void testTerminalStateProtectionAfterComplete() throws Exception {
// Complete the task first
- taskUpdater.complete();
+ agentEmitter.complete();
checkTaskStatusUpdateEventOnQueue(true, TaskState.COMPLETED, null);
// Try to update status again - should throw RuntimeException
- RuntimeException exception = assertThrows(RuntimeException.class, () -> taskUpdater.startWork());
+ RuntimeException exception = assertThrows(RuntimeException.class, () -> agentEmitter.startWork());
assertEquals("Cannot update task status - terminal state already reached", exception.getMessage());
// Verify no additional events were queued
@@ -354,11 +354,11 @@ public void testTerminalStateProtectionAfterComplete() throws Exception {
@Test
public void testTerminalStateProtectionAfterFail() throws Exception {
// Fail the task first
- taskUpdater.fail();
+ agentEmitter.fail();
checkTaskStatusUpdateEventOnQueue(true, TaskState.FAILED, null);
// Try to update status again - should throw RuntimeException
- RuntimeException exception = assertThrows(RuntimeException.class, () -> taskUpdater.complete());
+ RuntimeException exception = assertThrows(RuntimeException.class, () -> agentEmitter.complete());
assertEquals("Cannot update task status - terminal state already reached", exception.getMessage());
// Verify no additional events were queued
@@ -368,11 +368,11 @@ public void testTerminalStateProtectionAfterFail() throws Exception {
@Test
public void testTerminalStateProtectionAfterReject() throws Exception {
// Reject the task first
- taskUpdater.reject();
+ agentEmitter.reject();
checkTaskStatusUpdateEventOnQueue(true, TaskState.REJECTED, null);
// Try to update status again - should throw RuntimeException
- RuntimeException exception = assertThrows(RuntimeException.class, () -> taskUpdater.startWork());
+ RuntimeException exception = assertThrows(RuntimeException.class, () -> agentEmitter.startWork());
assertEquals("Cannot update task status - terminal state already reached", exception.getMessage());
// Verify no additional events were queued
@@ -382,11 +382,11 @@ public void testTerminalStateProtectionAfterReject() throws Exception {
@Test
public void testTerminalStateProtectionAfterCancel() throws Exception {
// Cancel the task first
- taskUpdater.cancel();
+ agentEmitter.cancel();
checkTaskStatusUpdateEventOnQueue(true, TaskState.CANCELED, null);
// Try to update status again - should throw RuntimeException
- RuntimeException exception = assertThrows(RuntimeException.class, () -> taskUpdater.submit());
+ RuntimeException exception = assertThrows(RuntimeException.class, () -> agentEmitter.submit());
assertEquals("Cannot update task status - terminal state already reached", exception.getMessage());
// Verify no additional events were queued
@@ -398,7 +398,7 @@ public void testConcurrentCompletionAttempts() throws Exception {
// This test simulates race condition between multiple completion attempts
Thread thread1 = new Thread(() -> {
try {
- taskUpdater.complete();
+ agentEmitter.complete();
} catch (RuntimeException e) {
// Expected for one of the threads
}
@@ -406,7 +406,7 @@ public void testConcurrentCompletionAttempts() throws Exception {
Thread thread2 = new Thread(() -> {
try {
- taskUpdater.fail();
+ agentEmitter.fail();
} catch (RuntimeException e) {
// Expected for one of the threads
}
diff --git a/tck/src/main/java/io/a2a/tck/server/AgentExecutorProducer.java b/tck/src/main/java/io/a2a/tck/server/AgentExecutorProducer.java
index 848ced9ec..9d16a86ea 100644
--- a/tck/src/main/java/io/a2a/tck/server/AgentExecutorProducer.java
+++ b/tck/src/main/java/io/a2a/tck/server/AgentExecutorProducer.java
@@ -1,20 +1,20 @@
package io.a2a.tck.server;
+import java.util.List;
+
import jakarta.annotation.PreDestroy;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
import io.a2a.server.agentexecution.AgentExecutor;
import io.a2a.server.agentexecution.RequestContext;
-import io.a2a.server.events.EventQueue;
-import io.a2a.server.tasks.TaskUpdater;
+import io.a2a.server.tasks.AgentEmitter;
import io.a2a.spec.A2AError;
import io.a2a.spec.Task;
import io.a2a.spec.TaskNotCancelableError;
import io.a2a.spec.TaskState;
import io.a2a.spec.TaskStatus;
import io.a2a.spec.TaskStatusUpdateEvent;
-import java.util.List;
@ApplicationScoped
public class AgentExecutorProducer {
@@ -27,7 +27,7 @@ public AgentExecutor agentExecutor() {
private static class FireAndForgetAgentExecutor implements AgentExecutor {
@Override
- public void execute(RequestContext context, EventQueue eventQueue) throws A2AError {
+ public void execute(RequestContext context, AgentEmitter agentEmitter) throws A2AError {
Task task = context.getTask();
if (task == null) {
@@ -46,7 +46,7 @@ public void execute(RequestContext context, EventQueue eventQueue) throws A2AErr
.status(new TaskStatus(TaskState.SUBMITTED))
.history(List.of(context.getMessage()))
.build();
- eventQueue.enqueueEvent(task);
+ agentEmitter.addTask(task);
}
// Sleep to allow task state persistence before TCK subscribe test
@@ -59,10 +59,9 @@ public void execute(RequestContext context, EventQueue eventQueue) throws A2AErr
Thread.currentThread().interrupt();
}
}
- TaskUpdater updater = new TaskUpdater(context, eventQueue);
// Immediately set to WORKING state
- updater.startWork();
+ agentEmitter.startWork();
System.out.println("====> task set to WORKING, starting background execution");
// Method returns immediately - task continues in background
@@ -70,7 +69,7 @@ public void execute(RequestContext context, EventQueue eventQueue) throws A2AErr
}
@Override
- public void cancel(RequestContext context, EventQueue eventQueue) throws A2AError {
+ public void cancel(RequestContext context, AgentEmitter agentEmitter) throws A2AError {
System.out.println("====> task cancel request received");
Task task = context.getTask();
if (task == null) {
@@ -87,15 +86,7 @@ public void cancel(RequestContext context, EventQueue eventQueue) throws A2AErro
throw new TaskNotCancelableError();
}
- TaskUpdater updater = new TaskUpdater(context, eventQueue);
- updater.cancel();
- eventQueue.enqueueEvent(new TaskStatusUpdateEvent(
- task.id(),
- new TaskStatus(TaskState.CANCELED),
- task.contextId(),
- true, // isFinal - TaskState.CANCELED is a final state
- null));
-
+ agentEmitter.cancel();
System.out.println("====> task canceled");
}
diff --git a/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentExecutorProducer.java b/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentExecutorProducer.java
index 364d2275f..40839e7fb 100644
--- a/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentExecutorProducer.java
+++ b/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentExecutorProducer.java
@@ -7,8 +7,7 @@
import io.a2a.server.agentexecution.AgentExecutor;
import io.a2a.server.agentexecution.RequestContext;
-import io.a2a.server.events.EventQueue;
-import io.a2a.server.tasks.TaskUpdater;
+import io.a2a.server.tasks.AgentEmitter;
import io.a2a.spec.A2AError;
import io.a2a.spec.InvalidParamsError;
import io.a2a.spec.Message;
@@ -25,23 +24,22 @@ public class AgentExecutorProducer {
public AgentExecutor agentExecutor() {
return new AgentExecutor() {
@Override
- public void execute(RequestContext context, EventQueue eventQueue) throws A2AError {
- TaskUpdater updater = new TaskUpdater(context, eventQueue);
+ public void execute(RequestContext context, AgentEmitter agentEmitter) throws A2AError {
String taskId = context.getTaskId();
// Special handling for multi-event test
if (taskId != null && taskId.startsWith("multi-event-test")) {
// First call: context.getTask() == null (new task)
if (context.getTask() == null) {
- updater.startWork();
+ agentEmitter.startWork();
// Return immediately - queue stays open because task is in WORKING state
return;
} else {
// Second call: context.getTask() != null (existing task)
- updater.addArtifact(
+ agentEmitter.addArtifact(
List.of(new TextPart("Second message artifact")),
"artifact-2", "Second Artifact", null);
- updater.complete();
+ agentEmitter.complete();
return;
}
}
@@ -50,8 +48,8 @@ public void execute(RequestContext context, EventQueue eventQueue) throws A2AErr
if (taskId != null && taskId.startsWith("input-required-test")) {
// First call: context.getTask() == null (new task)
if (context.getTask() == null) {
- updater.startWork();
- updater.requiresInput(updater.newAgentMessage(
+ agentEmitter.startWork();
+ agentEmitter.requiresInput(agentEmitter.newAgentMessage(
List.of(new TextPart("Please provide additional information")),
context.getMessage().metadata()));
// Return immediately - queue stays open because task is in INPUT_REQUIRED state
@@ -62,23 +60,26 @@ public void execute(RequestContext context, EventQueue eventQueue) throws A2AErr
throw new InvalidParamsError("We didn't get the expected input");
}
// Second call: context.getTask() != null (input provided)
- updater.startWork();
- updater.complete();
+ agentEmitter.startWork();
+ agentEmitter.complete();
return;
}
}
if (context.getTaskId().equals("task-not-supported-123")) {
- eventQueue.enqueueEvent(new UnsupportedOperationError());
+ throw new UnsupportedOperationError();
+ }
+ if (context.getMessage() != null) {
+ agentEmitter.sendMessage(context.getMessage());
+ } else {
+ agentEmitter.addTask(context.getTask());
}
- eventQueue.enqueueEvent(context.getMessage() != null ? context.getMessage() : context.getTask());
}
@Override
- public void cancel(RequestContext context, EventQueue eventQueue) throws A2AError {
+ public void cancel(RequestContext context, AgentEmitter agentEmitter) throws A2AError {
if (context.getTask().id().equals("cancel-task-123")) {
- TaskUpdater taskUpdater = new TaskUpdater(context, eventQueue);
- taskUpdater.cancel();
+ agentEmitter.cancel();
} else if (context.getTask().id().equals("cancel-task-not-supported-123")) {
throw new UnsupportedOperationError();
}
diff --git a/transport/grpc/src/test/java/io/a2a/transport/grpc/handler/GrpcHandlerTest.java b/transport/grpc/src/test/java/io/a2a/transport/grpc/handler/GrpcHandlerTest.java
index 350e8b99a..80d7d7f39 100644
--- a/transport/grpc/src/test/java/io/a2a/transport/grpc/handler/GrpcHandlerTest.java
+++ b/transport/grpc/src/test/java/io/a2a/transport/grpc/handler/GrpcHandlerTest.java
@@ -23,7 +23,7 @@
import io.a2a.server.requesthandlers.AbstractA2ARequestHandlerTest;
import io.a2a.server.requesthandlers.DefaultRequestHandler;
import io.a2a.server.requesthandlers.RequestHandler;
-import io.a2a.server.tasks.TaskUpdater;
+import io.a2a.server.tasks.AgentEmitter;
import io.a2a.spec.AgentCapabilities;
import io.a2a.spec.AgentCard;
import io.a2a.spec.AgentExtension;
@@ -36,7 +36,6 @@
import io.a2a.spec.TaskStatusUpdateEvent;
import io.a2a.spec.TextPart;
import io.a2a.spec.UnsupportedOperationError;
-
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.internal.testing.StreamRecorder;
@@ -102,13 +101,12 @@ public void testOnCancelTaskSuccess() throws Exception {
GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler, internalExecutor);
taskStore.save(AbstractA2ARequestHandlerTest.MINIMAL_TASK, false);
- agentExecutorCancel = (context, eventQueue) -> {
+ agentExecutorCancel = (context, agentEmitter) -> {
// We need to cancel the task or the EventConsumer never finds a 'final' event.
// Looking at the Python implementation, they typically use AgentExecutors that
// don't support cancellation. So my theory is the Agent updates the task to the CANCEL status
io.a2a.spec.Task task = context.getTask();
- TaskUpdater taskUpdater = new TaskUpdater(context, eventQueue);
- taskUpdater.cancel();
+ agentEmitter.cancel();
};
CancelTaskRequest request = CancelTaskRequest.newBuilder()
@@ -133,7 +131,7 @@ public void testOnCancelTaskNotSupported() throws Exception {
GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler, internalExecutor);
taskStore.save(AbstractA2ARequestHandlerTest.MINIMAL_TASK, false);
- agentExecutorCancel = (context, eventQueue) -> {
+ agentExecutorCancel = (context, agentEmitter) -> {
throw new UnsupportedOperationError();
};
@@ -163,8 +161,8 @@ public void testOnCancelTaskNotFound() throws Exception {
@Test
public void testOnMessageNewMessageSuccess() throws Exception {
GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler, internalExecutor);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.sendMessage(context.getMessage());
};
StreamRecorder streamRecorder = sendMessageRequest(handler);
@@ -180,8 +178,8 @@ public void testOnMessageNewMessageSuccess() throws Exception {
public void testOnMessageNewMessageWithExistingTaskSuccess() throws Exception {
GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler, internalExecutor);
taskStore.save(AbstractA2ARequestHandlerTest.MINIMAL_TASK, false);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.sendMessage(context.getMessage());
};
StreamRecorder streamRecorder = sendMessageRequest(handler);
Assertions.assertNull(streamRecorder.getError());
@@ -195,8 +193,8 @@ public void testOnMessageNewMessageWithExistingTaskSuccess() throws Exception {
@Test
public void testOnMessageError() throws Exception {
GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler, internalExecutor);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(new UnsupportedOperationError());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.fail(new UnsupportedOperationError());
};
StreamRecorder streamRecorder = sendMessageRequest(handler);
assertGrpcError(streamRecorder, Status.Code.UNIMPLEMENTED);
@@ -225,8 +223,8 @@ public void testSetPushNotificationConfigSuccess() throws Exception {
@Test
public void testGetPushNotificationConfigSuccess() throws Exception {
GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler, internalExecutor);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage());
};
// first set the task push notification config
@@ -284,8 +282,8 @@ public void testOnSetPushNotificationNoPushNotifierConfig() throws Exception {
@Test
public void testOnMessageStreamNewMessageSuccess() throws Exception {
GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler, internalExecutor);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage());
};
StreamRecorder streamRecorder = sendStreamingMessageRequest(handler);
@@ -302,8 +300,8 @@ public void testOnMessageStreamNewMessageSuccess() throws Exception {
@Test
public void testOnMessageStreamNewMessageExistingTaskSuccess() throws Exception {
GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler, internalExecutor);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage());
};
io.a2a.spec.Task task = io.a2a.spec.Task.builder(AbstractA2ARequestHandlerTest.MINIMAL_TASK)
@@ -426,10 +424,10 @@ public void testOnMessageStreamNewMessageSendPushNotificationSuccess() throws Ex
.status(new io.a2a.spec.TaskStatus(io.a2a.spec.TaskState.COMPLETED))
.build());
- agentExecutorExecute = (context, eventQueue) -> {
+ agentExecutorExecute = (context, agentEmitter) -> {
// Hardcode the events to send here
for (Event event : events) {
- eventQueue.enqueueEvent(event);
+ agentEmitter.emitEvent(event);
}
};
@@ -508,8 +506,8 @@ public void testOnSubscribeExistingTaskSuccess() throws Exception {
taskStore.save(AbstractA2ARequestHandlerTest.MINIMAL_TASK, false);
queueManager.createOrTap(AbstractA2ARequestHandlerTest.MINIMAL_TASK.id());
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.sendMessage(context.getMessage());
};
StreamRecorder streamRecorder = StreamRecorder.create();
@@ -626,8 +624,8 @@ public void testOnMessageStreamInternalError() throws Exception {
public void testListPushNotificationConfig() throws Exception {
GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler, internalExecutor);
taskStore.save(AbstractA2ARequestHandlerTest.MINIMAL_TASK, false);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage());
};
StreamRecorder pushRecorder = createTaskPushNotificationConfigRequest(handler,
@@ -652,8 +650,8 @@ public void testListPushNotificationConfigNotSupported() throws Exception {
AgentCard card = AbstractA2ARequestHandlerTest.createAgentCard(true, false);
GrpcHandler handler = new TestGrpcHandler(card, requestHandler, internalExecutor);
taskStore.save(AbstractA2ARequestHandlerTest.MINIMAL_TASK, false);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage());
};
ListTaskPushNotificationConfigRequest request = ListTaskPushNotificationConfigRequest.newBuilder()
@@ -669,8 +667,8 @@ public void testListPushNotificationConfigNoPushConfigStore() {
DefaultRequestHandler requestHandler = DefaultRequestHandler.create(executor, taskStore, queueManager, null, mainEventBusProcessor, internalExecutor, internalExecutor);
GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler, internalExecutor);
taskStore.save(AbstractA2ARequestHandlerTest.MINIMAL_TASK, false);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage());
};
ListTaskPushNotificationConfigRequest request = ListTaskPushNotificationConfigRequest.newBuilder()
@@ -684,8 +682,8 @@ public void testListPushNotificationConfigNoPushConfigStore() {
@Test
public void testListPushNotificationConfigTaskNotFound() {
GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler, internalExecutor);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage());
};
ListTaskPushNotificationConfigRequest request = ListTaskPushNotificationConfigRequest.newBuilder()
@@ -700,8 +698,8 @@ public void testListPushNotificationConfigTaskNotFound() {
public void testDeletePushNotificationConfig() throws Exception {
GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler, internalExecutor);
taskStore.save(AbstractA2ARequestHandlerTest.MINIMAL_TASK, false);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage());
};
StreamRecorder pushRecorder = createTaskPushNotificationConfigRequest(handler, AbstractA2ARequestHandlerTest.MINIMAL_TASK.id(),
AbstractA2ARequestHandlerTest.MINIMAL_TASK.id());
@@ -723,8 +721,8 @@ public void testDeletePushNotificationConfigNotSupported() throws Exception {
AgentCard card = AbstractA2ARequestHandlerTest.createAgentCard(true, false);
GrpcHandler handler = new TestGrpcHandler(card, requestHandler, internalExecutor);
taskStore.save(AbstractA2ARequestHandlerTest.MINIMAL_TASK, false);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage());
};
DeleteTaskPushNotificationConfigRequest request = DeleteTaskPushNotificationConfigRequest.newBuilder()
.setId(AbstractA2ARequestHandlerTest.MINIMAL_TASK.id())
@@ -762,15 +760,14 @@ public void testStreamingDoesNotBlockMainThread() throws Exception {
CountDownLatch streamStarted = new CountDownLatch(1);
GrpcHandler.setStreamingSubscribedRunnable(streamStarted::countDown);
CountDownLatch eventProcessed = new CountDownLatch(1);
-
- agentExecutorExecute = (context, eventQueue) -> {
+ agentExecutorExecute = (context, agentEmitter) -> {
// Wait a bit to ensure the main thread continues
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
- eventQueue.enqueueEvent(context.getMessage());
+ agentEmitter.sendMessage(context.getMessage());
};
// Start streaming with a custom StreamObserver
@@ -928,8 +925,8 @@ public ServerCallContext create(StreamObserver streamObserver) {
}
};
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.sendMessage(context.getMessage());
};
SendMessageRequest request = SendMessageRequest.newBuilder()
@@ -1069,8 +1066,8 @@ public ServerCallContext create(StreamObserver streamObserver) {
}
};
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.sendMessage(context.getMessage());
};
SendMessageRequest request = SendMessageRequest.newBuilder()
@@ -1120,8 +1117,8 @@ public ServerCallContext create(StreamObserver streamObserver) {
}
};
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.sendMessage(context.getMessage());
};
SendMessageRequest request = SendMessageRequest.newBuilder()
diff --git a/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java b/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java
index a97883e5a..ea58beba4 100644
--- a/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java
+++ b/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java
@@ -22,6 +22,8 @@
import io.a2a.jsonrpc.common.wrappers.CancelTaskRequest;
import io.a2a.jsonrpc.common.wrappers.CancelTaskResponse;
+import io.a2a.jsonrpc.common.wrappers.CreateTaskPushNotificationConfigRequest;
+import io.a2a.jsonrpc.common.wrappers.CreateTaskPushNotificationConfigResponse;
import io.a2a.jsonrpc.common.wrappers.DeleteTaskPushNotificationConfigRequest;
import io.a2a.jsonrpc.common.wrappers.DeleteTaskPushNotificationConfigResponse;
import io.a2a.jsonrpc.common.wrappers.GetExtendedAgentCardRequest;
@@ -39,8 +41,6 @@
import io.a2a.jsonrpc.common.wrappers.SendMessageResponse;
import io.a2a.jsonrpc.common.wrappers.SendStreamingMessageRequest;
import io.a2a.jsonrpc.common.wrappers.SendStreamingMessageResponse;
-import io.a2a.jsonrpc.common.wrappers.CreateTaskPushNotificationConfigRequest;
-import io.a2a.jsonrpc.common.wrappers.CreateTaskPushNotificationConfigResponse;
import io.a2a.jsonrpc.common.wrappers.SubscribeToTaskRequest;
import io.a2a.server.ServerCallContext;
import io.a2a.server.auth.UnauthenticatedUser;
@@ -48,7 +48,6 @@
import io.a2a.server.requesthandlers.AbstractA2ARequestHandlerTest;
import io.a2a.server.requesthandlers.DefaultRequestHandler;
import io.a2a.server.tasks.ResultAggregator;
-import io.a2a.server.tasks.TaskUpdater;
import io.a2a.spec.AgentCapabilities;
import io.a2a.spec.AgentCard;
import io.a2a.spec.AgentExtension;
@@ -119,13 +118,12 @@ public void testOnCancelTaskSuccess() throws Exception {
JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor);
taskStore.save(MINIMAL_TASK, false);
- agentExecutorCancel = (context, eventQueue) -> {
+ agentExecutorCancel = (context, agentEmitter) -> {
// We need to cancel the task or the EventConsumer never finds a 'final' event.
// Looking at the Python implementation, they typically use AgentExecutors that
// don't support cancellation. So my theory is the Agent updates the task to the CANCEL status
Task task = context.getTask();
- TaskUpdater taskUpdater = new TaskUpdater(context, eventQueue);
- taskUpdater.cancel();
+ agentEmitter.cancel();
};
CancelTaskRequest request = new CancelTaskRequest("111", new TaskIdParams(MINIMAL_TASK.id()));
@@ -144,7 +142,7 @@ public void testOnCancelTaskNotSupported() {
JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor);
taskStore.save(MINIMAL_TASK, false);
- agentExecutorCancel = (context, eventQueue) -> {
+ agentExecutorCancel = (context, agentEmitter) -> {
throw new UnsupportedOperationError();
};
@@ -168,8 +166,8 @@ public void testOnCancelTaskNotFound() {
@Test
public void testOnMessageNewMessageSuccess() {
JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.sendMessage(context.getMessage());
};
Message message = Message.builder(MESSAGE)
.taskId(MINIMAL_TASK.id())
@@ -185,8 +183,8 @@ public void testOnMessageNewMessageSuccess() {
public void testOnMessageNewMessageWithExistingTaskSuccess() {
JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor);
taskStore.save(MINIMAL_TASK, false);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.sendMessage(context.getMessage());
};
Message message = Message.builder(MESSAGE)
.taskId(MINIMAL_TASK.id())
@@ -203,8 +201,8 @@ public void testOnMessageError() {
// See testMessageOnErrorMocks() for a test more similar to the Python implementation, using mocks for
// EventConsumer.consumeAll()
JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(new UnsupportedOperationError());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.fail(new UnsupportedOperationError());
};
Message message = Message.builder(MESSAGE)
.taskId(MINIMAL_TASK.id())
@@ -242,8 +240,8 @@ public void testOnMessageErrorMocks() {
@Test
public void testOnMessageStreamNewMessageSuccess() throws InterruptedException {
JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage());
};
Message message = Message.builder(MESSAGE)
@@ -323,13 +321,13 @@ public void testOnMessageStreamNewMessageMultipleEventsSuccess() throws Interrup
.build();
// Configure the agent executor to enqueue multiple events
- agentExecutorExecute = (context, eventQueue) -> {
+ agentExecutorExecute = (context, agentEmitter) -> {
// Enqueue the task with WORKING state
- eventQueue.enqueueEvent(taskEvent);
+ agentEmitter.emitEvent(taskEvent);
// Enqueue an artifact update event
- eventQueue.enqueueEvent(artifactEvent);
+ agentEmitter.emitEvent(artifactEvent);
// Enqueue a status update event to complete the task (this is the "final" event)
- eventQueue.enqueueEvent(statusEvent);
+ agentEmitter.emitEvent(statusEvent);
};
Message message = Message.builder(MESSAGE)
@@ -490,8 +488,8 @@ public void onComplete() {
@Test
public void testOnMessageStreamNewMessageExistingTaskSuccess() throws Exception {
JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage());
};
Task task = Task.builder(MINIMAL_TASK)
@@ -664,8 +662,8 @@ public void testSetPushNotificationConfigSuccess() {
public void testGetPushNotificationConfigSuccess() {
JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor);
taskStore.save(MINIMAL_TASK, false);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage());
};
TaskPushNotificationConfig taskPushConfig
@@ -716,10 +714,11 @@ public void testOnMessageStreamNewMessageSendPushNotificationSuccess() throws Ex
.status(new TaskStatus(TaskState.COMPLETED))
.build());
- agentExecutorExecute = (context, eventQueue) -> {
+
+ agentExecutorExecute = (context, agentEmitter) -> {
// Hardcode the events to send here
for (Event event : events) {
- eventQueue.enqueueEvent(event);
+ agentEmitter.emitEvent(event);
}
};
@@ -811,10 +810,10 @@ public void testOnSubscribeExistingTaskSuccess() {
taskStore.save(MINIMAL_TASK, false);
queueManager.createOrTap(MINIMAL_TASK.id());
- agentExecutorExecute = (context, eventQueue) -> {
+ agentExecutorExecute = (context, agentEmitter) -> {
// The only thing hitting the agent is the onMessageSend() and we should use the message
- eventQueue.enqueueEvent(context.getMessage());
- //eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage());
+ agentEmitter.sendMessage(context.getMessage());
+ //agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage());
};
SubscribeToTaskRequest request = new SubscribeToTaskRequest("1", new TaskIdParams(MINIMAL_TASK.id()));
@@ -1267,8 +1266,8 @@ public void testOnMessageSendTaskIdMismatch() {
JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor);
taskStore.save(MINIMAL_TASK, false);
- agentExecutorExecute = ((context, eventQueue) -> {
- eventQueue.enqueueEvent(MINIMAL_TASK);
+ agentExecutorExecute = ((context, agentEmitter) -> {
+ agentEmitter.emitEvent(MINIMAL_TASK);
});
SendMessageRequest request = new SendMessageRequest("1",
new MessageSendParams(MESSAGE, null, null));
@@ -1283,8 +1282,8 @@ public void testOnMessageStreamTaskIdMismatch() throws InterruptedException {
JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor);
taskStore.save(MINIMAL_TASK, false);
- agentExecutorExecute = ((context, eventQueue) -> {
- eventQueue.enqueueEvent(MINIMAL_TASK);
+ agentExecutorExecute = ((context, agentEmitter) -> {
+ agentEmitter.emitEvent(MINIMAL_TASK);
});
SendStreamingMessageRequest request = new SendStreamingMessageRequest("1", new MessageSendParams(MESSAGE, null, null));
@@ -1334,8 +1333,8 @@ public void onComplete() {
public void testListPushNotificationConfig() {
JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor);
taskStore.save(MINIMAL_TASK, false);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage());
};
TaskPushNotificationConfig taskPushConfig
@@ -1365,8 +1364,8 @@ public void testListPushNotificationConfigNotSupported() {
AgentCard card = createAgentCard(true, false);
JSONRPCHandler handler = new JSONRPCHandler(card, requestHandler, internalExecutor);
taskStore.save(MINIMAL_TASK, false);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage());
};
TaskPushNotificationConfig taskPushConfig
@@ -1393,8 +1392,8 @@ public void testListPushNotificationConfigNoPushConfigStore() {
DefaultRequestHandler requestHandler = DefaultRequestHandler.create(executor, taskStore, queueManager, null, mainEventBusProcessor, internalExecutor, internalExecutor);
JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor);
taskStore.save(MINIMAL_TASK, false);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage());
};
ListTaskPushNotificationConfigRequest listRequest
@@ -1410,8 +1409,8 @@ public void testListPushNotificationConfigNoPushConfigStore() {
@Test
public void testListPushNotificationConfigTaskNotFound() {
JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage());
};
ListTaskPushNotificationConfigRequest listRequest
@@ -1428,8 +1427,8 @@ public void testListPushNotificationConfigTaskNotFound() {
public void testDeletePushNotificationConfig() {
JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor);
taskStore.save(MINIMAL_TASK, false);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage());
};
TaskPushNotificationConfig taskPushConfig
@@ -1456,8 +1455,8 @@ public void testDeletePushNotificationConfigNotSupported() {
AgentCard card = createAgentCard(true, false);
JSONRPCHandler handler = new JSONRPCHandler(card, requestHandler, internalExecutor);
taskStore.save(MINIMAL_TASK, false);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage());
};
TaskPushNotificationConfig taskPushConfig
@@ -1485,8 +1484,8 @@ public void testDeletePushNotificationConfigNoPushConfigStore() {
DefaultRequestHandler.create(executor, taskStore, queueManager, null, mainEventBusProcessor, internalExecutor, internalExecutor);
JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor);
taskStore.save(MINIMAL_TASK, false);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage());
};
TaskPushNotificationConfig taskPushConfig
@@ -1528,14 +1527,14 @@ public void testStreamingDoesNotBlockMainThread() throws Exception {
CountDownLatch streamStarted = new CountDownLatch(1);
CountDownLatch eventProcessed = new CountDownLatch(1);
- agentExecutorExecute = (context, eventQueue) -> {
+ agentExecutorExecute = (context, agentEmitter) -> {
// Wait a bit to ensure the main thread continues
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
- eventQueue.enqueueEvent(context.getMessage());
+ agentEmitter.sendMessage(context.getMessage());
};
Message message = Message.builder(MESSAGE)
@@ -1736,8 +1735,8 @@ public void testRequiredExtensionProvidedSuccess() {
requestedExtensions
);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.sendMessage(context.getMessage());
};
Message message = Message.builder(MESSAGE)
@@ -1894,8 +1893,8 @@ public void testCompatibleVersionSuccess() {
"1.1" // Compatible version (same major version)
);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.sendMessage(context.getMessage());
};
Message message = Message.builder(MESSAGE)
@@ -1930,8 +1929,8 @@ public void testNoVersionDefaultsToCurrentVersionSuccess() {
JSONRPCHandler handler = new JSONRPCHandler(agentCard, requestHandler, internalExecutor);
// Use default callContext (no version - should default to 1.0)
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.sendMessage(context.getMessage());
};
Message message = Message.builder(MESSAGE)
diff --git a/transport/rest/src/test/java/io/a2a/transport/rest/handler/RestHandlerTest.java b/transport/rest/src/test/java/io/a2a/transport/rest/handler/RestHandlerTest.java
index 2d1c19b84..b0dc58141 100644
--- a/transport/rest/src/test/java/io/a2a/transport/rest/handler/RestHandlerTest.java
+++ b/transport/rest/src/test/java/io/a2a/transport/rest/handler/RestHandlerTest.java
@@ -15,7 +15,7 @@
import io.a2a.server.ServerCallContext;
import io.a2a.server.auth.UnauthenticatedUser;
import io.a2a.server.requesthandlers.AbstractA2ARequestHandlerTest;
-import io.a2a.server.tasks.TaskUpdater;
+import io.a2a.server.tasks.AgentEmitter;
import io.a2a.spec.AgentCapabilities;
import io.a2a.spec.AgentCard;
import io.a2a.spec.AgentExtension;
@@ -87,8 +87,8 @@ public void testListTasksInvalidStatus() {
@Test
public void testSendMessage() throws InvalidProtocolBufferException {
RestHandler handler = new RestHandler(CARD, requestHandler, internalExecutor);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.sendMessage(context.getMessage());
};
String requestBody = """
{
@@ -167,13 +167,12 @@ public void testCancelTaskSuccess() {
RestHandler handler = new RestHandler(CARD, requestHandler, internalExecutor);
taskStore.save(MINIMAL_TASK, false);
- agentExecutorCancel = (context, eventQueue) -> {
+ agentExecutorCancel = (context, agentEmitter) -> {
// We need to cancel the task or the EventConsumer never finds a 'final' event.
// Looking at the Python implementation, they typically use AgentExecutors that
// don't support cancellation. So my theory is the Agent updates the task to the CANCEL status
Task task = context.getTask();
- TaskUpdater taskUpdater = new TaskUpdater(context, eventQueue);
- taskUpdater.cancel();
+ agentEmitter.cancel();
};
RestHandler.HTTPRestResponse response = handler.cancelTask(MINIMAL_TASK.id(), "", callContext);
@@ -197,8 +196,8 @@ public void testCancelTaskNotFound() {
@Test
public void testSendStreamingMessageSuccess() {
RestHandler handler = new RestHandler(CARD, requestHandler, internalExecutor);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.sendMessage(context.getMessage());
};
String requestBody = """
{
@@ -357,14 +356,14 @@ public void testStreamingDoesNotBlockMainThread() throws Exception {
AtomicBoolean eventReceived = new AtomicBoolean(false);
CountDownLatch streamStarted = new CountDownLatch(1);
CountDownLatch eventProcessed = new CountDownLatch(1);
- agentExecutorExecute = (context, eventQueue) -> {
+ agentExecutorExecute = (context, agentEmitter) -> {
// Wait a bit to ensure the main thread continues
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
- eventQueue.enqueueEvent(context.getMessage());
+ agentEmitter.sendMessage(context.getMessage());
};
String requestBody = """
@@ -602,8 +601,8 @@ public void testRequiredExtensionProvidedSuccess() {
requestedExtensions
);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.sendMessage(context.getMessage());
};
String requestBody = """
@@ -796,8 +795,8 @@ public void testCompatibleVersionSuccess() {
"1.1" // Compatible version (same major version)
);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.sendMessage(context.getMessage());
};
String requestBody = """
@@ -844,8 +843,8 @@ public void testNoVersionDefaultsToCurrentVersionSuccess() {
RestHandler handler = new RestHandler(agentCard, requestHandler, internalExecutor);
// Use default callContext (no version - should default to 1.0)
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.sendMessage(context.getMessage());
};
String requestBody = """