Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1001,7 +1001,7 @@ private CompletableFuture<Void> cleanupProducer(@Nullable CompletableFuture<Void
});
}

private MessageSendSetup initMessageSend(MessageSendParams params, ServerCallContext context) {
private MessageSendSetup initMessageSend(MessageSendParams params, ServerCallContext context) throws A2AError {
// Build RequestContext FIRST to get the real taskId (auto-generated if not provided)
// This eliminates the need for temporary IDs - we use the same UUID throughout
RequestContext requestContext = requestContextBuilder.get()
Expand All @@ -1026,6 +1026,17 @@ private MessageSendSetup initMessageSend(MessageSendParams params, ServerCallCon

Task task = taskManager.getTask();
if (task != null) {
// Reject messages to tasks that are in a terminal state (completed, canceled, rejected, failed).
// Per A2A spec section 3.1.1 (CORE-SEND-002): the SDK MUST return UnsupportedOperationError
// before forwarding the message to the AgentExecutor.
if (task.status().state().isFinal()) {
throw new UnsupportedOperationError(
null,
"Cannot send message to task " + task.id() +
" - task is in a terminal state: " + task.status().state(),
null);
}

// Validate contextId matches the existing task's contextId
String messageContextId = params.message().contextId();
if (messageContextId != null && !messageContextId.equals(task.contextId())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import io.a2a.spec.TaskStatus;
import io.a2a.spec.TaskStatusUpdateEvent;
import io.a2a.spec.TextPart;
import io.a2a.spec.UnsupportedOperationError;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -516,15 +517,23 @@ void testAuthRequired_Resubscription() throws Exception {
* Test: Reject SendMessage with mismatching contextId and taskId.
* When a message references an existing task but provides a different contextId,
* the request must be rejected with an InvalidParamsError.
* The task must not be in a terminal state, or the terminal-state guard fires first.
*/
@Test
void testRejectMismatchingContextId() throws Exception {
// Arrange: Create an initial task to get valid identifiers
CountDownLatch agentCompleted = new CountDownLatch(1);
// Arrange: Create an initial task – agent stays active (working) so the task is NOT terminal
CountDownLatch agentStarted = new CountDownLatch(1);
CountDownLatch agentRelease = new CountDownLatch(1);

agentExecutorExecute = (context, emitter) -> {
emitter.startWork();
agentStarted.countDown();
try {
agentRelease.await(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
emitter.complete();
agentCompleted.countDown();
};

Message initialMessage = Message.builder()
Expand All @@ -534,6 +543,164 @@ void testRejectMismatchingContextId() throws Exception {
.parts(new TextPart("initial message"))
.build();

// returnImmediately=true so onMessageSend returns quickly (on first event)
MessageSendParams initialParams = MessageSendParams.builder()
.message(initialMessage)
.configuration(DEFAULT_CONFIG)
.build();

EventKind result = requestHandler.onMessageSend(initialParams, NULL_CONTEXT);
assertInstanceOf(Task.class, result);
Task task = (Task) result;

// Wait until agent has started (task is in WORKING state, not terminal)
assertTrue(agentStarted.await(5, TimeUnit.SECONDS));

try {
// Act & Assert: Send a follow-up message with matching taskId but wrong contextId
// The task is still WORKING, so the terminal guard does NOT fire.
// The contextId mismatch guard should fire instead.
Message mismatchedMessage = Message.builder()
.messageId("msg-2")
.role(Message.Role.ROLE_USER)
.taskId(task.id())
.contextId("wrong-context-does-not-exist")
.parts(new TextPart("follow-up message"))
.build();

MessageSendParams mismatchedParams = MessageSendParams.builder()
.message(mismatchedMessage)
.configuration(DEFAULT_CONFIG)
.build();

InvalidParamsError error = assertThrows(InvalidParamsError.class,
() -> requestHandler.onMessageSend(mismatchedParams, NULL_CONTEXT));
assertTrue(error.getMessage().contains(task.id()));
} finally {
// Release agent so it completes and doesn't leak
agentRelease.countDown();
}
}

/**
* Helper: creates a task, drives it to the given terminal state, then asserts that a
* follow-up SendMessage to that task throws UnsupportedOperationError (CORE-SEND-002).
*/
private void assertSendMessageToTerminalStateThrows(TaskState terminalState) throws Exception {
CountDownLatch agentCompleted = new CountDownLatch(1);

agentExecutorExecute = (context, emitter) -> {
switch (terminalState) {
case TASK_STATE_COMPLETED -> emitter.complete();
case TASK_STATE_CANCELED -> emitter.cancel();
case TASK_STATE_REJECTED -> emitter.reject();
// Use fail() (no-arg) which emits TaskStatusUpdateEvent(FAILED) via the normal path,
// ensuring the task state is persisted to the store before we query it.
case TASK_STATE_FAILED -> emitter.fail();
default -> throw new IllegalArgumentException("Unexpected state: " + terminalState);
}
agentCompleted.countDown();
};

Message initialMessage = Message.builder()
.messageId("msg-initial-" + terminalState)
.role(Message.Role.ROLE_USER)
.parts(new TextPart("create task"))
.build();

EventKind result = requestHandler.onMessageSend(
MessageSendParams.builder().message(initialMessage).configuration(DEFAULT_CONFIG).build(),
NULL_CONTEXT);
assertInstanceOf(Task.class, result);
Task task = (Task) result;
final String finalTaskId = task.id();

assertTrue(agentCompleted.await(5, TimeUnit.SECONDS), "Agent should complete");
Thread.sleep(200); // allow MainEventBusProcessor to persist the final state

Task storedTask = taskStore.get(finalTaskId);
assertNotNull(storedTask);
assertEquals(terminalState, storedTask.status().state(),
"Task should be in " + terminalState + " state");

// Reset: agent executor must NOT be called on the follow-up
agentExecutorExecute = (context, emitter) -> {
throw new AssertionError("AgentExecutor must NOT be invoked for a terminal task");
};

Message followUpMessage = Message.builder()
.messageId("msg-followup-" + terminalState)
.role(Message.Role.ROLE_USER)
.taskId(finalTaskId)
.parts(new TextPart("follow-up to terminal task"))
.build();

MessageSendParams followUpParams = MessageSendParams.builder()
.message(followUpMessage)
.configuration(DEFAULT_CONFIG)
.build();

UnsupportedOperationError error = assertThrows(UnsupportedOperationError.class,
() -> requestHandler.onMessageSend(followUpParams, NULL_CONTEXT),
"Expected UnsupportedOperationError when sending message to a " + terminalState + " task");

assertNotNull(error.getMessage(), "Error message should not be null");
assertTrue(error.getMessage().contains(finalTaskId),
"Error message should reference the task id");
}

/**
* CORE-SEND-002: SendMessage to a completed task must return UnsupportedOperationError.
*/
@Test
void testSendMessage_ToCompletedTask_ThrowsUnsupportedOperationError() throws Exception {
assertSendMessageToTerminalStateThrows(TaskState.TASK_STATE_COMPLETED);
}

/**
* CORE-SEND-002: SendMessage to a canceled task must return UnsupportedOperationError.
*/
@Test
void testSendMessage_ToCanceledTask_ThrowsUnsupportedOperationError() throws Exception {
assertSendMessageToTerminalStateThrows(TaskState.TASK_STATE_CANCELED);
}

/**
* CORE-SEND-002: SendMessage to a rejected task must return UnsupportedOperationError.
*/
@Test
void testSendMessage_ToRejectedTask_ThrowsUnsupportedOperationError() throws Exception {
assertSendMessageToTerminalStateThrows(TaskState.TASK_STATE_REJECTED);
}

/**
* CORE-SEND-002: SendMessage to a failed task must return UnsupportedOperationError.
*/
@Test
void testSendMessage_ToFailedTask_ThrowsUnsupportedOperationError() throws Exception {
assertSendMessageToTerminalStateThrows(TaskState.TASK_STATE_FAILED);
}

/**
* Test: SendStreamingMessage to a task in a terminal state must also return UnsupportedOperationError
* (CORE-SEND-002, streaming path).
*/
@Test
void testSendMessageStream_ToCompletedTask_ThrowsUnsupportedOperationError() throws Exception {
// Arrange: Create and complete an initial task
CountDownLatch agentCompleted = new CountDownLatch(1);

agentExecutorExecute = (context, emitter) -> {
emitter.complete();
agentCompleted.countDown();
};

Message initialMessage = Message.builder()
.messageId("msg-initial-stream")
.role(Message.Role.ROLE_USER)
.parts(new TextPart("create task for stream test"))
.build();

MessageSendParams initialParams = MessageSendParams.builder()
.message(initialMessage)
.configuration(DEFAULT_CONFIG)
Expand All @@ -542,24 +709,35 @@ void testRejectMismatchingContextId() throws Exception {
EventKind result = requestHandler.onMessageSend(initialParams, NULL_CONTEXT);
assertInstanceOf(Task.class, result);
Task task = (Task) result;
assertTrue(agentCompleted.await(5, TimeUnit.SECONDS));

// Act & Assert: Send a follow-up message with matching taskId but wrong contextId
Message mismatchedMessage = Message.builder()
.messageId("msg-2")
assertTrue(agentCompleted.await(5, TimeUnit.SECONDS), "Agent should complete");
Thread.sleep(200); // allow MainEventBusProcessor to persist

// Verify task is in terminal state
Task storedTask = taskStore.get(task.id());
assertNotNull(storedTask);
assertEquals(TaskState.TASK_STATE_COMPLETED, storedTask.status().state());

// Reset: agent executor must NOT be called
agentExecutorExecute = (context, emitter) -> {
throw new AssertionError("AgentExecutor must NOT be invoked for a terminal task");
};

// Act & Assert: streaming follow-up to a terminal task must also be rejected
Message followUpMessage = Message.builder()
.messageId("msg-followup-stream")
.role(Message.Role.ROLE_USER)
.taskId(task.id())
.contextId("wrong-context-does-not-exist")
.parts(new TextPart("follow-up message"))
.parts(new TextPart("streaming follow-up to completed task"))
.build();

MessageSendParams mismatchedParams = MessageSendParams.builder()
.message(mismatchedMessage)
MessageSendParams followUpParams = MessageSendParams.builder()
.message(followUpMessage)
.configuration(DEFAULT_CONFIG)
.build();

InvalidParamsError error = assertThrows(InvalidParamsError.class,
() -> requestHandler.onMessageSend(mismatchedParams, NULL_CONTEXT));
assertTrue(error.getMessage().contains(task.id()));
assertThrows(UnsupportedOperationError.class,
() -> requestHandler.onMessageSendStream(followUpParams, NULL_CONTEXT),
"Expected UnsupportedOperationError when streaming message to a completed task");
}
}
Loading