Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public class EventConsumer {
private volatile boolean cancelled = false;
private volatile boolean agentCompleted = false;
private volatile int pollTimeoutsAfterAgentCompleted = 0;
private volatile @Nullable TaskState lastSeenTaskState = null;

private static final String ERROR_MSG = "Agent did not return any response";
private static final int NO_WAIT = -1;
Expand Down Expand Up @@ -89,7 +90,12 @@ public Flow.Publisher<EventQueueItem> consumeAll() {
//
// IMPORTANT: In replicated scenarios, remote events may arrive AFTER local agent completes!
// Use grace period to allow for Kafka replication delays (can be 400-500ms)
if (agentCompleted && queueSize == 0) {
//
// CRITICAL: Do NOT close if task is in interrupted state (INPUT_REQUIRED, AUTH_REQUIRED)
// Per A2A spec, interrupted states are NOT terminal - the stream must stay open
// for future state updates even after agent completes (agent will be re-invoked later).
boolean isInterruptedState = lastSeenTaskState != null && lastSeenTaskState.isInterrupted();
if (agentCompleted && queueSize == 0 && !isInterruptedState) {
pollTimeoutsAfterAgentCompleted++;
if (pollTimeoutsAfterAgentCompleted >= MAX_POLL_TIMEOUTS_AFTER_AGENT_COMPLETED) {
LOGGER.debug("Agent completed with {} consecutive poll timeouts and empty queue, closing for graceful completion (queue={})",
Expand All @@ -102,6 +108,10 @@ public Flow.Publisher<EventQueueItem> consumeAll() {
LOGGER.debug("Agent completed but grace period active ({}/{} timeouts), continuing to poll (queue={})",
pollTimeoutsAfterAgentCompleted, MAX_POLL_TIMEOUTS_AFTER_AGENT_COMPLETED, System.identityHashCode(queue));
}
} else if (agentCompleted && isInterruptedState) {
LOGGER.debug("Agent completed but task is in interrupted state ({}), stream must remain open (queue={})",
lastSeenTaskState, System.identityHashCode(queue));
pollTimeoutsAfterAgentCompleted = 0; // Reset counter
} else if (agentCompleted && queueSize > 0) {
LOGGER.debug("Agent completed but queue has {} pending events, resetting timeout counter and continuing to poll (queue={})",
queueSize, System.identityHashCode(queue));
Expand All @@ -115,6 +125,13 @@ public Flow.Publisher<EventQueueItem> consumeAll() {
LOGGER.debug("EventConsumer received event: {} (queue={})",
event.getClass().getSimpleName(), System.identityHashCode(queue));

// Track the latest task state for grace period logic
if (event instanceof Task task) {
lastSeenTaskState = task.status().state();
} else if (event instanceof TaskStatusUpdateEvent tue) {
lastSeenTaskState = tue.status().state();
}

// Defensive logging for error handling
if (event instanceof Throwable thr) {
LOGGER.debug("EventConsumer detected Throwable event: {} - triggering tube.fail()",
Expand Down Expand Up @@ -195,17 +212,21 @@ public Flow.Publisher<EventQueueItem> consumeAll() {

/**
* Determines if a task is in a state for terminating the stream.
* <p>A task is terminating if:</p>
* <ul>
* <li>Its state is final (e.g., completed, canceled, rejected, failed), OR</li>
* <li>Its state is interrupted (e.g., input-required)</li>
* </ul>
* <p>
* Per A2A Protocol Specification 3.1.6 (SubscribeToTask):
* "The stream MUST terminate when the task reaches a terminal state
* (completed, failed, canceled, or rejected)."
* <p>
* Interrupted states (INPUT_REQUIRED, AUTH_REQUIRED) are NOT terminal.
* The stream should remain open to deliver future state updates when
* the task resumes after receiving the required input or authorization.
*
* @param task the task to check
* @return true if the task has a final state or an interrupted state, false otherwise
* @return true if the task has a terminal/final state, false otherwise
*/
private boolean isStreamTerminatingTask(Task task) {
TaskState state = task.status().state();
return state.isFinal() || state == TaskState.TASK_STATE_INPUT_REQUIRED;
return state.isFinal();
}

public EnhancedRunnable.DoneCallback createAgentRunnableDoneCallback() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,26 +238,32 @@ public void testConsumeMessageEvents() throws Exception {

@Test
public void testConsumeTaskInputRequired() {
// Per A2A Protocol Specification 3.1.6 (SubscribeToTask):
// "The stream MUST terminate when the task reaches a terminal state
// (completed, failed, canceled, or rejected)."
//
// INPUT_REQUIRED is an interrupted state, NOT a terminal state.
// The stream should remain open to deliver future state updates.
Task task = Task.builder()
.id(TASK_ID)
.contextId("session-xyz")
.status(new TaskStatus(TaskState.TASK_STATE_INPUT_REQUIRED))
.build();
List<Event> events = List.of(
task,
TaskArtifactUpdateEvent.builder()
TaskArtifactUpdateEvent artifactEvent = TaskArtifactUpdateEvent.builder()
.taskId(TASK_ID)
.contextId("session-xyz")
.artifact(Artifact.builder()
.artifactId("11")
.parts(new TextPart("text"))
.build())
.build(),
TaskStatusUpdateEvent.builder()
.build();
TaskStatusUpdateEvent completedEvent = TaskStatusUpdateEvent.builder()
.taskId(TASK_ID)
.contextId("session-xyz")
.status(new TaskStatus(TaskState.TASK_STATE_COMPLETED))
.build());
.build();
List<Event> events = List.of(task, artifactEvent, completedEvent);

for (Event event : events) {
eventQueue.enqueueEvent(event);
}
Expand All @@ -269,9 +275,12 @@ public void testConsumeTaskInputRequired() {
publisher.subscribe(getSubscriber(receivedEvents, error));

assertNull(error.get());
// The stream is closed after the input_required task
assertEquals(1, receivedEvents.size());
// Stream should remain open for INPUT_REQUIRED and deliver all events
// until the terminal COMPLETED state is reached
assertEquals(3, receivedEvents.size());
assertSame(task, receivedEvents.get(0));
assertSame(artifactEvent, receivedEvents.get(1));
assertSame(completedEvent, receivedEvents.get(2));
}

private Flow.Subscriber<EventQueueItem> getSubscriber(List<Event> receivedEvents, AtomicReference<Throwable> error) {
Expand Down
59 changes: 45 additions & 14 deletions spec/src/main/java/io/a2a/spec/TaskState.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,17 @@
* TaskState represents the discrete states a task can be in during its execution lifecycle.
* States are categorized as either transitional (non-final) or terminal (final), where
* terminal states indicate that the task has reached its end state and will not transition further.
* A subset of transitional states are also marked as interrupted, indicating the task execution
* has paused and requires external action before proceeding.
* <p>
* <b>Transitional States:</b>
* <b>Active Transitional States:</b>
* <ul>
* <li><b>TASK_STATE_SUBMITTED:</b> Task has been received by the agent and is queued for processing</li>
* <li><b>TASK_STATE_WORKING:</b> Agent is actively processing the task and may produce incremental results</li>
* </ul>
* <p>
* <b>Interrupted States:</b>
* <ul>
* <li><b>TASK_STATE_INPUT_REQUIRED:</b> Agent needs additional input from the user to continue</li>
* <li><b>TASK_STATE_AUTH_REQUIRED:</b> Agent requires authentication or authorization before proceeding</li>
* </ul>
Expand All @@ -25,44 +31,47 @@
* </ul>
* <p>
* The {@link #isFinal()} method can be used to determine if a state is terminal, which is
* important for event queue management and client polling logic.
* important for event queue management and client polling logic. The {@link #isInterrupted()}
* method identifies states where the task is paused awaiting external action.
*
* @see TaskStatus
* @see Task
* @see <a href="https://a2a-protocol.org/latest/">A2A Protocol Specification</a>
*/
public enum TaskState {
/** Task has been received and is queued for processing (transitional state). */
TASK_STATE_SUBMITTED(false),
TASK_STATE_SUBMITTED(false, false),

/** Agent is actively processing the task (transitional state). */
TASK_STATE_WORKING(false),
TASK_STATE_WORKING(false, false),

/** Agent requires additional input from the user to continue (transitional state). */
TASK_STATE_INPUT_REQUIRED(false),
/** Agent requires additional input from the user to continue (interrupted state). */
TASK_STATE_INPUT_REQUIRED(false, true),

/** Agent requires authentication or authorization to proceed (transitional state). */
TASK_STATE_AUTH_REQUIRED(false),
/** Agent requires authentication or authorization to proceed (interrupted state). */
TASK_STATE_AUTH_REQUIRED(false, true),

/** Task completed successfully (terminal state). */
TASK_STATE_COMPLETED(true),
TASK_STATE_COMPLETED(true, false),

/** Task was canceled by user or system (terminal state). */
TASK_STATE_CANCELED(true),
TASK_STATE_CANCELED(true, false),

/** Task failed due to an error (terminal state). */
TASK_STATE_FAILED(true),
TASK_STATE_FAILED(true, false),

/** Task was rejected by the agent (terminal state). */
TASK_STATE_REJECTED(true),
TASK_STATE_REJECTED(true, false),

/** Task state is unknown or cannot be determined (terminal state). */
UNRECOGNIZED(true);
UNRECOGNIZED(true, false);

private final boolean isFinal;
private final boolean isInterrupted;

TaskState(boolean isFinal) {
TaskState(boolean isFinal, boolean isInterrupted) {
this.isFinal = isFinal;
this.isInterrupted = isInterrupted;
}

/**
Expand All @@ -71,10 +80,32 @@ public enum TaskState {
* Terminal states indicate that the task has completed its lifecycle and will
* not transition to any other state. This is used by the event queue system
* to determine when to close queues and by clients to know when to stop polling.
* <p>
* Terminal states: COMPLETED, FAILED, CANCELED, REJECTED, UNRECOGNIZED.
*
* @return {@code true} if this is a terminal state, {@code false} else.
*/
public boolean isFinal(){
return isFinal;
}

/**
* Determines whether this state is an interrupted state.
* <p>
* Interrupted states indicate that the task execution has paused and requires
* external action before proceeding. The task may resume after the required
* action is provided. Interrupted states are NOT terminal - streams should
* remain open to deliver state updates.
* <p>
* Interrupted states: INPUT_REQUIRED, AUTH_REQUIRED.
* <p>
* Per A2A Protocol Specification 4.1.3 (TaskState):
* "TASK_STATE_INPUT_REQUIRED: This is an interrupted state."
* "TASK_STATE_AUTH_REQUIRED: This is an interrupted state."
*
* @return {@code true} if this is an interrupted state, {@code false} else.
*/
public boolean isInterrupted() {
return isInterrupted;
}
}
Loading
Loading