Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions common/src/main/java/io/a2a/util/Assert.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.a2a.util;

import org.jspecify.annotations.Nullable;

public final class Assert {

/**
Expand All @@ -11,18 +13,21 @@ public final class Assert {
* @return the value that was passed in
* @throws IllegalArgumentException if the value is {@code null}
*/
@NotNull
public static <T> T checkNotNullParam(String name, T value) throws IllegalArgumentException {
public static <T> @NotNull T checkNotNullParam(String name, @Nullable T value) throws IllegalArgumentException {
checkNotNullParamChecked("name", name);
checkNotNullParamChecked(name, value);
if (value == null) {
throw new IllegalArgumentException("Parameter '" + name + "' may not be null");
}
return value;
}

private static <T> void checkNotNullParamChecked(final String name, final T value) {
if (value == null) throw new IllegalArgumentException("Parameter '" + name + "' may not be null");
private static <T> void checkNotNullParamChecked(final String name, final @Nullable T value) {
if (value == null) {
throw new IllegalArgumentException("Parameter '" + name + "' may not be null");
}
}

public static void isNullOrStringOrInteger(Object value) {
public static void isNullOrStringOrInteger(@Nullable Object value) {
if (! (value == null || value instanceof String || value instanceof Integer)) {
throw new IllegalArgumentException("Id must be null, a String, or an Integer");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,13 @@
import java.time.OffsetDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.Map;
import java.util.Set;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonSyntaxException;
import com.google.gson.ToNumberPolicy;
import com.google.gson.TypeAdapter;
import com.google.gson.reflect.TypeToken;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonToken;
import com.google.gson.stream.JsonWriter;
Expand Down Expand Up @@ -435,7 +433,7 @@ private A2AError createErrorInstance(@Nullable Integer code, @Nullable String me
case INVALID_AGENT_RESPONSE_ERROR_CODE ->
new InvalidAgentResponseError(code, message, data);
default ->
new A2AError(code, message, data);
new A2AError(code, message == null ? "" : message, data);
};
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,9 @@ public void getTaskPushNotificationConfiguration(RoutingContext rc) {
try {
if (taskId == null || taskId.isEmpty()) {
response = jsonRestHandler.createErrorResponse(new InvalidParamsError("bad task id"));
} else {
} else if (configId == null || configId.isEmpty()) {
response = jsonRestHandler.createErrorResponse(new InvalidParamsError("bad configuration id"));
}else {
response = jsonRestHandler.getTaskPushNotificationConfiguration(taskId, configId, extractTenant(rc), context);
}
} catch (Throwable t) {
Expand All @@ -299,26 +301,7 @@ public void getTaskPushNotificationConfiguration(RoutingContext rc) {
}
}

@Route(regex = "^\\/(?<tenant>[^\\/]*\\/?)tasks\\/(?<taskId>[^/]+)\\/pushNotificationConfigs\\/$", order = 1, methods = {Route.HttpMethod.GET}, type = Route.HandlerType.BLOCKING)
public void getTaskPushNotificationConfigurationWithoutId(RoutingContext rc) {
String taskId = rc.pathParam("taskId");
ServerCallContext context = createCallContext(rc, GET_TASK_PUSH_NOTIFICATION_CONFIG_METHOD);
HTTPRestResponse response = null;
try {
if (taskId == null || taskId.isEmpty()) {
response = jsonRestHandler.createErrorResponse(new InvalidParamsError("bad task id"));
} else {
// Call get with null configId - trailing slash distinguishes this from list
response = jsonRestHandler.getTaskPushNotificationConfiguration(taskId, null, extractTenant(rc), context);
}
} catch (Throwable t) {
response = jsonRestHandler.createErrorResponse(new InternalError(t.getMessage()));
} finally {
sendResponse(rc, response);
}
}

@Route(regex = "^\\/(?<tenant>[^\\/]*\\/?)tasks\\/(?<taskId>[^/]+)\\/pushNotificationConfigs", order = 3, methods = {Route.HttpMethod.GET}, type = Route.HandlerType.BLOCKING)
@Route(regex = "^\\/(?<tenant>[^\\/]*\\/?)tasks\\/(?<taskId>[^/]+)\\/pushNotificationConfigs\\/?$", order = 3, methods = {Route.HttpMethod.GET}, type = Route.HandlerType.BLOCKING)
public void listTaskPushNotificationConfigurations(RoutingContext rc) {
String taskId = rc.pathParam("taskId");
ServerCallContext context = createCallContext(rc, LIST_TASK_PUSH_NOTIFICATION_CONFIG_METHOD);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,12 @@ public RequestContext(
if (params != null) {
if (taskId != null && !taskId.equals(params.message().taskId())) {
throw new InvalidParamsError("bad task id");
} else {
checkOrGenerateTaskId();
}
this.taskId = checkOrGenerateTaskId();
if (contextId != null && !contextId.equals(params.message().contextId())) {
throw new InvalidParamsError("bad context id");
} else {
checkOrGenerateContextId();
}
this.contextId = checkOrGenerateContextId();
}
}

Expand Down Expand Up @@ -246,9 +244,9 @@ public void attachRelatedTask(Task task) {
relatedTasks.add(task);
}

private void checkOrGenerateTaskId() {
private @Nullable String checkOrGenerateTaskId() {
if (params == null) {
return;
return taskId;
}
if (taskId == null && params.message().taskId() == null) {
// Message is immutable, create new one with generated taskId
Expand All @@ -257,15 +255,17 @@ private void checkOrGenerateTaskId() {
.taskId(generatedTaskId)
.build();
params = new MessageSendParams(updatedMessage, params.configuration(), params.metadata());
this.taskId = generatedTaskId;
} else if (params.message().taskId() != null) {
this.taskId = params.message().taskId();
return generatedTaskId;
}
if (params.message().taskId() != null) {
return params.message().taskId();
}
return taskId;
}

private void checkOrGenerateContextId() {
private @Nullable String checkOrGenerateContextId() {
if (params == null) {
return;
return contextId;
}
if (contextId == null && params.message().contextId() == null) {
// Message is immutable, create new one with generated contextId
Expand All @@ -274,10 +274,12 @@ private void checkOrGenerateContextId() {
.contextId(generatedContextId)
.build();
params = new MessageSendParams(updatedMessage, params.configuration(), params.metadata());
this.contextId = generatedContextId;
} else if (params.message().contextId() != null) {
this.contextId = params.message().contextId();
return generatedContextId;
}
if (params.message().contextId() != null) {
return params.message().contextId();
}
return contextId;
}

private String getMessageText(Message message, String delimiter) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,8 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte

// Store push notification config for newly created tasks (mirrors streaming logic)
// Only for NEW tasks - existing tasks are handled by initMessageSend()
if (mss.task() == null && kind instanceof Task createdTask && shouldAddPushInfo(params)) {
if (mss.task() == null && kind instanceof Task createdTask && pushConfigStore != null
&& params.configuration() != null && params.configuration().pushNotificationConfig() != null) {
LOGGER.debug("Storing push notification config for new task {} (original taskId from params: {})",
createdTask.id(), params.message().taskId());
pushConfigStore.setInfo(createdTask.id(), params.configuration().pushNotificationConfig());
Expand Down Expand Up @@ -565,7 +566,7 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
kind = updatedTask;
LOGGER.debug("DefaultRequestHandler: Step 5 - Fetched current task for {} with state {} and {} artifacts",
taskId.get(), updatedTask.status().state(),
updatedTask.artifacts().size());
updatedTask.artifacts() != null ? updatedTask.artifacts().size() : 0);
} else {
LOGGER.warn("DefaultRequestHandler: Step 5 - Task {} not found in TaskStore!", taskId.get());
}
Expand Down Expand Up @@ -626,7 +627,7 @@ public Flow.Publisher<StreamingEventKind> onMessageSendStream(
// Store push notification config SYNCHRONOUSLY for new tasks before agent starts
// This ensures config is available when MainEventBusProcessor sends push notifications
// For existing tasks, config is stored in initMessageSend()
if (mss.task() == null && shouldAddPushInfo(params)) {
if (mss.task() == null && pushConfigStore != null && params.configuration() != null && params.configuration().pushNotificationConfig() != null) {
// Satisfy Nullaway
Objects.requireNonNull(taskId.get(), "taskId was null");
LOGGER.debug("Storing push notification config for new streaming task {} EARLY (original taskId from params: {})",
Expand Down Expand Up @@ -863,10 +864,6 @@ public void onDeleteTaskPushNotificationConfig(
pushConfigStore.deleteInfo(params.id(), params.pushNotificationConfigId());
}

private boolean shouldAddPushInfo(MessageSendParams params) {
return pushConfigStore != null && params.configuration() != null && params.configuration().pushNotificationConfig() != null;
}

/**
* Register and execute the agent asynchronously in the agent-executor thread pool.
*
Expand Down Expand Up @@ -1014,7 +1011,7 @@ private MessageSendSetup initMessageSend(MessageSendParams params, ServerCallCon
LOGGER.debug("Found task updating with message {}", params.message());
task = taskManager.updateWithMessage(params.message(), task);

if (shouldAddPushInfo(params)) {
if (pushConfigStore != null && params.configuration() != null && params.configuration().pushNotificationConfig() != null) {
LOGGER.debug("Adding push info");
pushConfigStore.setInfo(task.id(), params.configuration().pushNotificationConfig());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.a2a.spec.TaskStatus;
import io.a2a.spec.TaskStatusUpdateEvent;
import io.a2a.spec.TextPart;
import io.a2a.util.Assert;
import org.jspecify.annotations.Nullable;

/**
Expand Down Expand Up @@ -95,8 +96,8 @@
*/
public class AgentEmitter {
private final EventQueue eventQueue;
private final @Nullable String taskId;
private final @Nullable String contextId;
private final String taskId;
private final String contextId;
private final AtomicBoolean terminalStateReached = new AtomicBoolean(false);

/**
Expand All @@ -107,8 +108,8 @@ public class AgentEmitter {
*/
public AgentEmitter(RequestContext context, EventQueue eventQueue) {
this.eventQueue = eventQueue;
this.taskId = context.getTaskId();
this.contextId = context.getContextId();
this.taskId = Assert.checkNotNullParam("taskId",context.getTaskId());
this.contextId = Assert.checkNotNullParam("contextId",context.getContextId());
}

private void updateStatus(TaskState taskState) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void sendNotification(StreamingEventKind event) {
String nextPageToken = null;
do {
ListTaskPushNotificationConfigResult pageResult = configStore.getInfo(new ListTaskPushNotificationConfigParams(taskId,
DEFAULT_PAGE_SIZE, nextPageToken, ""));
DEFAULT_PAGE_SIZE, nextPageToken == null ? "" : nextPageToken, ""));
if (!pageResult.configs().isEmpty()) {
configs.addAll(pageResult.configs());
}
Expand Down Expand Up @@ -111,11 +111,14 @@ public void sendNotification(StreamingEventKind event) {
protected @Nullable String extractTaskId(StreamingEventKind event) {
if (event instanceof Task task) {
return task.id();
} else if (event instanceof Message message) {
}
if (event instanceof Message message) {
return message.taskId();
} else if (event instanceof TaskStatusUpdateEvent statusUpdate) {
}
if (event instanceof TaskStatusUpdateEvent statusUpdate) {
return statusUpdate.taskId();
} else if (event instanceof TaskArtifactUpdateEvent artifactUpdate) {
}
if (event instanceof TaskArtifactUpdateEvent artifactUpdate) {
return artifactUpdate.taskId();
}
throw new IllegalStateException("Unknown StreamingEventKind: " + event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public PushNotificationConfig setInfo(String taskId, PushNotificationConfig noti
Iterator<PushNotificationConfig> notificationConfigIterator = notificationConfigList.iterator();
while (notificationConfigIterator.hasNext()) {
PushNotificationConfig config = notificationConfigIterator.next();
if (config.id().equals(notificationConfig.id())) {
if (config.id() != null && config.id().equals(notificationConfig.id())) {
notificationConfigIterator.remove();
break;
}
Expand Down
Loading