diff --git a/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java b/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java index fdc6ee11d..5e72d224d 100644 --- a/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java +++ b/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java @@ -1001,7 +1001,7 @@ private CompletableFuture cleanupProducer(@Nullable CompletableFuture { + emitter.startWork(); + agentStarted.countDown(); + try { + agentRelease.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } emitter.complete(); - agentCompleted.countDown(); }; Message initialMessage = Message.builder() @@ -534,6 +543,164 @@ void testRejectMismatchingContextId() throws Exception { .parts(new TextPart("initial message")) .build(); + // returnImmediately=true so onMessageSend returns quickly (on first event) + MessageSendParams initialParams = MessageSendParams.builder() + .message(initialMessage) + .configuration(DEFAULT_CONFIG) + .build(); + + EventKind result = requestHandler.onMessageSend(initialParams, NULL_CONTEXT); + assertInstanceOf(Task.class, result); + Task task = (Task) result; + + // Wait until agent has started (task is in WORKING state, not terminal) + assertTrue(agentStarted.await(5, TimeUnit.SECONDS)); + + try { + // Act & Assert: Send a follow-up message with matching taskId but wrong contextId + // The task is still WORKING, so the terminal guard does NOT fire. + // The contextId mismatch guard should fire instead. + Message mismatchedMessage = Message.builder() + .messageId("msg-2") + .role(Message.Role.ROLE_USER) + .taskId(task.id()) + .contextId("wrong-context-does-not-exist") + .parts(new TextPart("follow-up message")) + .build(); + + MessageSendParams mismatchedParams = MessageSendParams.builder() + .message(mismatchedMessage) + .configuration(DEFAULT_CONFIG) + .build(); + + InvalidParamsError error = assertThrows(InvalidParamsError.class, + () -> requestHandler.onMessageSend(mismatchedParams, NULL_CONTEXT)); + assertTrue(error.getMessage().contains(task.id())); + } finally { + // Release agent so it completes and doesn't leak + agentRelease.countDown(); + } + } + + /** + * Helper: creates a task, drives it to the given terminal state, then asserts that a + * follow-up SendMessage to that task throws UnsupportedOperationError (CORE-SEND-002). + */ + private void assertSendMessageToTerminalStateThrows(TaskState terminalState) throws Exception { + CountDownLatch agentCompleted = new CountDownLatch(1); + + agentExecutorExecute = (context, emitter) -> { + switch (terminalState) { + case TASK_STATE_COMPLETED -> emitter.complete(); + case TASK_STATE_CANCELED -> emitter.cancel(); + case TASK_STATE_REJECTED -> emitter.reject(); + // Use fail() (no-arg) which emits TaskStatusUpdateEvent(FAILED) via the normal path, + // ensuring the task state is persisted to the store before we query it. + case TASK_STATE_FAILED -> emitter.fail(); + default -> throw new IllegalArgumentException("Unexpected state: " + terminalState); + } + agentCompleted.countDown(); + }; + + Message initialMessage = Message.builder() + .messageId("msg-initial-" + terminalState) + .role(Message.Role.ROLE_USER) + .parts(new TextPart("create task")) + .build(); + + EventKind result = requestHandler.onMessageSend( + MessageSendParams.builder().message(initialMessage).configuration(DEFAULT_CONFIG).build(), + NULL_CONTEXT); + assertInstanceOf(Task.class, result); + Task task = (Task) result; + final String finalTaskId = task.id(); + + assertTrue(agentCompleted.await(5, TimeUnit.SECONDS), "Agent should complete"); + Thread.sleep(200); // allow MainEventBusProcessor to persist the final state + + Task storedTask = taskStore.get(finalTaskId); + assertNotNull(storedTask); + assertEquals(terminalState, storedTask.status().state(), + "Task should be in " + terminalState + " state"); + + // Reset: agent executor must NOT be called on the follow-up + agentExecutorExecute = (context, emitter) -> { + throw new AssertionError("AgentExecutor must NOT be invoked for a terminal task"); + }; + + Message followUpMessage = Message.builder() + .messageId("msg-followup-" + terminalState) + .role(Message.Role.ROLE_USER) + .taskId(finalTaskId) + .parts(new TextPart("follow-up to terminal task")) + .build(); + + MessageSendParams followUpParams = MessageSendParams.builder() + .message(followUpMessage) + .configuration(DEFAULT_CONFIG) + .build(); + + UnsupportedOperationError error = assertThrows(UnsupportedOperationError.class, + () -> requestHandler.onMessageSend(followUpParams, NULL_CONTEXT), + "Expected UnsupportedOperationError when sending message to a " + terminalState + " task"); + + assertNotNull(error.getMessage(), "Error message should not be null"); + assertTrue(error.getMessage().contains(finalTaskId), + "Error message should reference the task id"); + } + + /** + * CORE-SEND-002: SendMessage to a completed task must return UnsupportedOperationError. + */ + @Test + void testSendMessage_ToCompletedTask_ThrowsUnsupportedOperationError() throws Exception { + assertSendMessageToTerminalStateThrows(TaskState.TASK_STATE_COMPLETED); + } + + /** + * CORE-SEND-002: SendMessage to a canceled task must return UnsupportedOperationError. + */ + @Test + void testSendMessage_ToCanceledTask_ThrowsUnsupportedOperationError() throws Exception { + assertSendMessageToTerminalStateThrows(TaskState.TASK_STATE_CANCELED); + } + + /** + * CORE-SEND-002: SendMessage to a rejected task must return UnsupportedOperationError. + */ + @Test + void testSendMessage_ToRejectedTask_ThrowsUnsupportedOperationError() throws Exception { + assertSendMessageToTerminalStateThrows(TaskState.TASK_STATE_REJECTED); + } + + /** + * CORE-SEND-002: SendMessage to a failed task must return UnsupportedOperationError. + */ + @Test + void testSendMessage_ToFailedTask_ThrowsUnsupportedOperationError() throws Exception { + assertSendMessageToTerminalStateThrows(TaskState.TASK_STATE_FAILED); + } + + /** + * Test: SendStreamingMessage to a task in a terminal state must also return UnsupportedOperationError + * (CORE-SEND-002, streaming path). + */ + @Test + void testSendMessageStream_ToCompletedTask_ThrowsUnsupportedOperationError() throws Exception { + // Arrange: Create and complete an initial task + CountDownLatch agentCompleted = new CountDownLatch(1); + + agentExecutorExecute = (context, emitter) -> { + emitter.complete(); + agentCompleted.countDown(); + }; + + Message initialMessage = Message.builder() + .messageId("msg-initial-stream") + .role(Message.Role.ROLE_USER) + .parts(new TextPart("create task for stream test")) + .build(); + MessageSendParams initialParams = MessageSendParams.builder() .message(initialMessage) .configuration(DEFAULT_CONFIG) @@ -542,24 +709,35 @@ void testRejectMismatchingContextId() throws Exception { EventKind result = requestHandler.onMessageSend(initialParams, NULL_CONTEXT); assertInstanceOf(Task.class, result); Task task = (Task) result; - assertTrue(agentCompleted.await(5, TimeUnit.SECONDS)); - // Act & Assert: Send a follow-up message with matching taskId but wrong contextId - Message mismatchedMessage = Message.builder() - .messageId("msg-2") + assertTrue(agentCompleted.await(5, TimeUnit.SECONDS), "Agent should complete"); + Thread.sleep(200); // allow MainEventBusProcessor to persist + + // Verify task is in terminal state + Task storedTask = taskStore.get(task.id()); + assertNotNull(storedTask); + assertEquals(TaskState.TASK_STATE_COMPLETED, storedTask.status().state()); + + // Reset: agent executor must NOT be called + agentExecutorExecute = (context, emitter) -> { + throw new AssertionError("AgentExecutor must NOT be invoked for a terminal task"); + }; + + // Act & Assert: streaming follow-up to a terminal task must also be rejected + Message followUpMessage = Message.builder() + .messageId("msg-followup-stream") .role(Message.Role.ROLE_USER) .taskId(task.id()) - .contextId("wrong-context-does-not-exist") - .parts(new TextPart("follow-up message")) + .parts(new TextPart("streaming follow-up to completed task")) .build(); - MessageSendParams mismatchedParams = MessageSendParams.builder() - .message(mismatchedMessage) + MessageSendParams followUpParams = MessageSendParams.builder() + .message(followUpMessage) .configuration(DEFAULT_CONFIG) .build(); - InvalidParamsError error = assertThrows(InvalidParamsError.class, - () -> requestHandler.onMessageSend(mismatchedParams, NULL_CONTEXT)); - assertTrue(error.getMessage().contains(task.id())); + assertThrows(UnsupportedOperationError.class, + () -> requestHandler.onMessageSendStream(followUpParams, NULL_CONTEXT), + "Expected UnsupportedOperationError when streaming message to a completed task"); } }