From 636a749d968218fcb5d115d5b5daed07a5da4ff8 Mon Sep 17 00:00:00 2001 From: brucearctor Date: Sun, 15 Mar 2026 16:52:33 -0700 Subject: [PATCH] fix: reject SendMessage to tasks in terminal state with UnsupportedOperationError Per A2A spec section 3.1.1 (CORE-SEND-002), the SDK MUST return an UnsupportedOperationError when a client attempts to send a message to a task that is already in a terminal state (completed, canceled, rejected, or failed). The guard is added in DefaultRequestHandler.initMessageSend(), which is called by both onMessageSend (blocking) and onMessageSendStream (streaming). This ensures the check happens before the message is forwarded to the AgentExecutor on all three transports (JSON-RPC, gRPC, HTTP+JSON). This fixes #741 --- .../DefaultRequestHandler.java | 13 +- .../DefaultRequestHandlerTest.java | 206 ++++++++++++++++-- 2 files changed, 204 insertions(+), 15 deletions(-) diff --git a/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java b/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java index fdc6ee11d..5e72d224d 100644 --- a/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java +++ b/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java @@ -1001,7 +1001,7 @@ private CompletableFuture cleanupProducer(@Nullable CompletableFuture { + emitter.startWork(); + agentStarted.countDown(); + try { + agentRelease.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } emitter.complete(); - agentCompleted.countDown(); }; Message initialMessage = Message.builder() @@ -534,6 +543,164 @@ void testRejectMismatchingContextId() throws Exception { .parts(new TextPart("initial message")) .build(); + // returnImmediately=true so onMessageSend returns quickly (on first event) + MessageSendParams initialParams = MessageSendParams.builder() + .message(initialMessage) + .configuration(DEFAULT_CONFIG) + .build(); + + EventKind result = requestHandler.onMessageSend(initialParams, NULL_CONTEXT); + assertInstanceOf(Task.class, result); + Task task = (Task) result; + + // Wait until agent has started (task is in WORKING state, not terminal) + assertTrue(agentStarted.await(5, TimeUnit.SECONDS)); + + try { + // Act & Assert: Send a follow-up message with matching taskId but wrong contextId + // The task is still WORKING, so the terminal guard does NOT fire. + // The contextId mismatch guard should fire instead. + Message mismatchedMessage = Message.builder() + .messageId("msg-2") + .role(Message.Role.ROLE_USER) + .taskId(task.id()) + .contextId("wrong-context-does-not-exist") + .parts(new TextPart("follow-up message")) + .build(); + + MessageSendParams mismatchedParams = MessageSendParams.builder() + .message(mismatchedMessage) + .configuration(DEFAULT_CONFIG) + .build(); + + InvalidParamsError error = assertThrows(InvalidParamsError.class, + () -> requestHandler.onMessageSend(mismatchedParams, NULL_CONTEXT)); + assertTrue(error.getMessage().contains(task.id())); + } finally { + // Release agent so it completes and doesn't leak + agentRelease.countDown(); + } + } + + /** + * Helper: creates a task, drives it to the given terminal state, then asserts that a + * follow-up SendMessage to that task throws UnsupportedOperationError (CORE-SEND-002). + */ + private void assertSendMessageToTerminalStateThrows(TaskState terminalState) throws Exception { + CountDownLatch agentCompleted = new CountDownLatch(1); + + agentExecutorExecute = (context, emitter) -> { + switch (terminalState) { + case TASK_STATE_COMPLETED -> emitter.complete(); + case TASK_STATE_CANCELED -> emitter.cancel(); + case TASK_STATE_REJECTED -> emitter.reject(); + // Use fail() (no-arg) which emits TaskStatusUpdateEvent(FAILED) via the normal path, + // ensuring the task state is persisted to the store before we query it. + case TASK_STATE_FAILED -> emitter.fail(); + default -> throw new IllegalArgumentException("Unexpected state: " + terminalState); + } + agentCompleted.countDown(); + }; + + Message initialMessage = Message.builder() + .messageId("msg-initial-" + terminalState) + .role(Message.Role.ROLE_USER) + .parts(new TextPart("create task")) + .build(); + + EventKind result = requestHandler.onMessageSend( + MessageSendParams.builder().message(initialMessage).configuration(DEFAULT_CONFIG).build(), + NULL_CONTEXT); + assertInstanceOf(Task.class, result); + Task task = (Task) result; + final String finalTaskId = task.id(); + + assertTrue(agentCompleted.await(5, TimeUnit.SECONDS), "Agent should complete"); + Thread.sleep(200); // allow MainEventBusProcessor to persist the final state + + Task storedTask = taskStore.get(finalTaskId); + assertNotNull(storedTask); + assertEquals(terminalState, storedTask.status().state(), + "Task should be in " + terminalState + " state"); + + // Reset: agent executor must NOT be called on the follow-up + agentExecutorExecute = (context, emitter) -> { + throw new AssertionError("AgentExecutor must NOT be invoked for a terminal task"); + }; + + Message followUpMessage = Message.builder() + .messageId("msg-followup-" + terminalState) + .role(Message.Role.ROLE_USER) + .taskId(finalTaskId) + .parts(new TextPart("follow-up to terminal task")) + .build(); + + MessageSendParams followUpParams = MessageSendParams.builder() + .message(followUpMessage) + .configuration(DEFAULT_CONFIG) + .build(); + + UnsupportedOperationError error = assertThrows(UnsupportedOperationError.class, + () -> requestHandler.onMessageSend(followUpParams, NULL_CONTEXT), + "Expected UnsupportedOperationError when sending message to a " + terminalState + " task"); + + assertNotNull(error.getMessage(), "Error message should not be null"); + assertTrue(error.getMessage().contains(finalTaskId), + "Error message should reference the task id"); + } + + /** + * CORE-SEND-002: SendMessage to a completed task must return UnsupportedOperationError. + */ + @Test + void testSendMessage_ToCompletedTask_ThrowsUnsupportedOperationError() throws Exception { + assertSendMessageToTerminalStateThrows(TaskState.TASK_STATE_COMPLETED); + } + + /** + * CORE-SEND-002: SendMessage to a canceled task must return UnsupportedOperationError. + */ + @Test + void testSendMessage_ToCanceledTask_ThrowsUnsupportedOperationError() throws Exception { + assertSendMessageToTerminalStateThrows(TaskState.TASK_STATE_CANCELED); + } + + /** + * CORE-SEND-002: SendMessage to a rejected task must return UnsupportedOperationError. + */ + @Test + void testSendMessage_ToRejectedTask_ThrowsUnsupportedOperationError() throws Exception { + assertSendMessageToTerminalStateThrows(TaskState.TASK_STATE_REJECTED); + } + + /** + * CORE-SEND-002: SendMessage to a failed task must return UnsupportedOperationError. + */ + @Test + void testSendMessage_ToFailedTask_ThrowsUnsupportedOperationError() throws Exception { + assertSendMessageToTerminalStateThrows(TaskState.TASK_STATE_FAILED); + } + + /** + * Test: SendStreamingMessage to a task in a terminal state must also return UnsupportedOperationError + * (CORE-SEND-002, streaming path). + */ + @Test + void testSendMessageStream_ToCompletedTask_ThrowsUnsupportedOperationError() throws Exception { + // Arrange: Create and complete an initial task + CountDownLatch agentCompleted = new CountDownLatch(1); + + agentExecutorExecute = (context, emitter) -> { + emitter.complete(); + agentCompleted.countDown(); + }; + + Message initialMessage = Message.builder() + .messageId("msg-initial-stream") + .role(Message.Role.ROLE_USER) + .parts(new TextPart("create task for stream test")) + .build(); + MessageSendParams initialParams = MessageSendParams.builder() .message(initialMessage) .configuration(DEFAULT_CONFIG) @@ -542,24 +709,35 @@ void testRejectMismatchingContextId() throws Exception { EventKind result = requestHandler.onMessageSend(initialParams, NULL_CONTEXT); assertInstanceOf(Task.class, result); Task task = (Task) result; - assertTrue(agentCompleted.await(5, TimeUnit.SECONDS)); - // Act & Assert: Send a follow-up message with matching taskId but wrong contextId - Message mismatchedMessage = Message.builder() - .messageId("msg-2") + assertTrue(agentCompleted.await(5, TimeUnit.SECONDS), "Agent should complete"); + Thread.sleep(200); // allow MainEventBusProcessor to persist + + // Verify task is in terminal state + Task storedTask = taskStore.get(task.id()); + assertNotNull(storedTask); + assertEquals(TaskState.TASK_STATE_COMPLETED, storedTask.status().state()); + + // Reset: agent executor must NOT be called + agentExecutorExecute = (context, emitter) -> { + throw new AssertionError("AgentExecutor must NOT be invoked for a terminal task"); + }; + + // Act & Assert: streaming follow-up to a terminal task must also be rejected + Message followUpMessage = Message.builder() + .messageId("msg-followup-stream") .role(Message.Role.ROLE_USER) .taskId(task.id()) - .contextId("wrong-context-does-not-exist") - .parts(new TextPart("follow-up message")) + .parts(new TextPart("streaming follow-up to completed task")) .build(); - MessageSendParams mismatchedParams = MessageSendParams.builder() - .message(mismatchedMessage) + MessageSendParams followUpParams = MessageSendParams.builder() + .message(followUpMessage) .configuration(DEFAULT_CONFIG) .build(); - InvalidParamsError error = assertThrows(InvalidParamsError.class, - () -> requestHandler.onMessageSend(mismatchedParams, NULL_CONTEXT)); - assertTrue(error.getMessage().contains(task.id())); + assertThrows(UnsupportedOperationError.class, + () -> requestHandler.onMessageSendStream(followUpParams, NULL_CONTEXT), + "Expected UnsupportedOperationError when streaming message to a completed task"); } }