-
Notifications
You must be signed in to change notification settings - Fork 147
Expand file tree
/
Copy pathDefaultRequestHandlerTest.java
More file actions
743 lines (625 loc) · 29.8 KB
/
DefaultRequestHandlerTest.java
File metadata and controls
743 lines (625 loc) · 29.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
package io.a2a.server.requesthandlers;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import io.a2a.server.ServerCallContext;
import io.a2a.server.agentexecution.AgentExecutor;
import io.a2a.server.agentexecution.RequestContext;
import io.a2a.server.events.EventQueue;
import io.a2a.server.events.EventQueueItem;
import io.a2a.server.events.EventQueueUtil;
import io.a2a.server.events.InMemoryQueueManager;
import io.a2a.server.events.MainEventBus;
import io.a2a.server.events.MainEventBusProcessor;
import io.a2a.server.tasks.AgentEmitter;
import io.a2a.server.tasks.InMemoryPushNotificationConfigStore;
import io.a2a.server.tasks.InMemoryTaskStore;
import io.a2a.server.tasks.PushNotificationConfigStore;
import io.a2a.server.tasks.PushNotificationSender;
import io.a2a.server.tasks.TaskStore;
import io.a2a.spec.A2AError;
import io.a2a.spec.Event;
import io.a2a.spec.EventKind;
import io.a2a.spec.InvalidParamsError;
import io.a2a.spec.Message;
import io.a2a.spec.MessageSendConfiguration;
import io.a2a.spec.MessageSendParams;
import io.a2a.spec.Task;
import io.a2a.spec.TaskArtifactUpdateEvent;
import io.a2a.spec.TaskState;
import io.a2a.spec.TaskStatus;
import io.a2a.spec.TaskStatusUpdateEvent;
import io.a2a.spec.TextPart;
import io.a2a.spec.UnsupportedOperationError;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
/**
* Integration tests for DefaultRequestHandler focusing on AUTH_REQUIRED workflow.
* Tests verify the special interrupt behavior where AUTH_REQUIRED tasks:
* 1. Return immediately to the client
* 2. Continue agent execution in background
* 3. Keep queues open for late events
* 4. Perform async cleanup
*/
public class DefaultRequestHandlerTest {
private static final MessageSendConfiguration DEFAULT_CONFIG = MessageSendConfiguration.builder()
.returnImmediately(true)
.acceptedOutputModes(List.of())
.build();
private static final ServerCallContext NULL_CONTEXT = null;
private static final Message MESSAGE = Message.builder()
.messageId("111")
.role(Message.Role.ROLE_AGENT)
.parts(new TextPart("test message"))
.build();
private static final PushNotificationSender NOOP_PUSHNOTIFICATION_SENDER = task -> {};
// Test infrastructure components
protected AgentExecutor executor;
protected TaskStore taskStore;
protected RequestHandler requestHandler;
protected InMemoryQueueManager queueManager;
protected MainEventBus mainEventBus;
protected MainEventBusProcessor mainEventBusProcessor;
protected AgentExecutorMethod agentExecutorExecute;
protected AgentExecutorMethod agentExecutorCancel;
protected final Executor internalExecutor = Executors.newCachedThreadPool();
@BeforeEach
public void init() {
// Create test AgentExecutor with mocked execute/cancel methods
executor = new AgentExecutor() {
@Override
public void execute(RequestContext context, AgentEmitter agentEmitter) throws A2AError {
if (agentExecutorExecute != null) {
agentExecutorExecute.invoke(context, agentEmitter);
}
}
@Override
public void cancel(RequestContext context, AgentEmitter agentEmitter) throws A2AError {
if (agentExecutorCancel != null) {
agentExecutorCancel.invoke(context, agentEmitter);
}
}
};
// Set up infrastructure
InMemoryTaskStore inMemoryTaskStore = new InMemoryTaskStore();
taskStore = inMemoryTaskStore;
PushNotificationConfigStore pushConfigStore = new InMemoryPushNotificationConfigStore();
// Create MainEventBus and MainEventBusProcessor
mainEventBus = new MainEventBus();
queueManager = new InMemoryQueueManager(inMemoryTaskStore, mainEventBus);
mainEventBusProcessor = new MainEventBusProcessor(mainEventBus, taskStore, NOOP_PUSHNOTIFICATION_SENDER, queueManager);
EventQueueUtil.start(mainEventBusProcessor);
// Create DefaultRequestHandler
requestHandler = DefaultRequestHandler.create(
executor, taskStore, queueManager, pushConfigStore, mainEventBusProcessor, internalExecutor, internalExecutor);
}
@AfterEach
public void cleanup() {
agentExecutorExecute = null;
agentExecutorCancel = null;
// Stop MainEventBusProcessor background thread
if (mainEventBusProcessor != null) {
EventQueueUtil.stop(mainEventBusProcessor);
}
}
/**
* Functional interface for test agent executor methods.
*/
protected interface AgentExecutorMethod {
void invoke(RequestContext context, AgentEmitter agentEmitter) throws A2AError;
}
/**
* Test 1: Non-streaming AUTH_REQUIRED returns immediately while agent continues.
* Verifies:
* - Task returned immediately with AUTH_REQUIRED state
* - Agent still running in background (not blocked)
* - TaskStore persisted AUTH_REQUIRED state
* - Agent completes after release
* - Final state persisted to TaskStore
*/
@Test
void testAuthRequired_NonStreaming_ReturnsImmediately() throws Exception {
// Arrange: Set up agent that emits AUTH_REQUIRED then waits
CountDownLatch authRequiredEmitted = new CountDownLatch(1);
CountDownLatch continueAgent = new CountDownLatch(1);
agentExecutorExecute = (context, emitter) -> {
// Emit AUTH_REQUIRED - client should receive immediately
emitter.requiresAuth(Message.builder()
.role(Message.Role.ROLE_AGENT)
.parts(new TextPart("Please authenticate with OAuth provider"))
.build());
authRequiredEmitted.countDown();
// Agent continues processing (simulating waiting for out-of-band auth)
try {
continueAgent.await(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
// Complete after "auth received"
emitter.complete();
};
// Create MessageSendParams
MessageSendParams params = MessageSendParams.builder()
.message(MESSAGE)
.configuration(DEFAULT_CONFIG)
.build();
// Act: Send message (non-streaming)
EventKind eventKind = requestHandler.onMessageSend(params, NULL_CONTEXT);
// Assert: Task returned immediately with AUTH_REQUIRED state
assertNotNull(eventKind, "Result should not be null");
assertInstanceOf(Task.class, eventKind, "Result should be a Task");
Task result = (Task) eventKind;
assertEquals(TaskState.TASK_STATE_AUTH_REQUIRED, result.status().state(),
"Task should be in AUTH_REQUIRED state");
assertTrue(authRequiredEmitted.await(2, TimeUnit.SECONDS),
"AUTH_REQUIRED should be emitted quickly");
// Verify agent still running (continueAgent latch not counted down yet)
assertFalse(continueAgent.await(100, TimeUnit.MILLISECONDS),
"Agent should still be waiting (not completed yet)");
// Verify TaskStore has AUTH_REQUIRED state
Task storedTask = taskStore.get(result.id());
assertNotNull(storedTask, "Task should be persisted in TaskStore");
assertEquals(TaskState.TASK_STATE_AUTH_REQUIRED, storedTask.status().state(),
"TaskStore should have AUTH_REQUIRED state");
// Release agent to complete
continueAgent.countDown();
// Wait for completion and verify final state
Thread.sleep(1000); // Allow time for completion to process through MainEventBus
Task finalTask = taskStore.get(result.id());
assertEquals(TaskState.TASK_STATE_COMPLETED, finalTask.status().state(),
"TaskStore should have COMPLETED state after agent finishes");
}
/**
* Test 2: Queue remains open after AUTH_REQUIRED for late events.
* Verifies:
* - Queue stays open after AUTH_REQUIRED response
* - Can tap into queue after AUTH_REQUIRED
* - Late artifacts arrive on tapped queue
* - Completion event arrives on tapped queue
*/
@Test
void testAuthRequired_QueueRemainsOpen() throws Exception {
// Arrange: Agent emits AUTH_REQUIRED then continues with late events
CountDownLatch authEmitted = new CountDownLatch(1);
CountDownLatch continueAgent = new CountDownLatch(1);
agentExecutorExecute = (context, emitter) -> {
// Emit AUTH_REQUIRED
emitter.requiresAuth(Message.builder()
.role(Message.Role.ROLE_AGENT)
.parts(new TextPart("Authenticate required"))
.build());
authEmitted.countDown();
// Wait for test to tap queue
try {
continueAgent.await(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
// Emit late artifact after AUTH_REQUIRED
emitter.addArtifact(List.of(new TextPart("Late artifact after auth")));
emitter.complete();
};
// Create MessageSendParams
MessageSendParams params = MessageSendParams.builder()
.message(MESSAGE)
.configuration(DEFAULT_CONFIG)
.build();
// Act: Send message, get AUTH_REQUIRED response
EventKind eventKind = requestHandler.onMessageSend(params, NULL_CONTEXT);
assertInstanceOf(Task.class, eventKind);
Task task = (Task) eventKind;
assertTrue(authEmitted.await(2, TimeUnit.SECONDS),
"AUTH_REQUIRED should be emitted");
// Tap into the queue (simulates client resubscription after AUTH_REQUIRED)
EventQueue tappedQueue = queueManager.tap(task.id());
assertNotNull(tappedQueue, "Queue should remain open after AUTH_REQUIRED");
// Release agent to continue and emit late events
continueAgent.countDown();
// Assert: Late events arrive on tapped queue
// First event should be the late artifact
EventQueueItem item = tappedQueue.dequeueEventItem(5000);
assertNotNull(item, "Should receive late artifact event");
Event event = item.getEvent();
assertInstanceOf(TaskArtifactUpdateEvent.class, event,
"First event should be TaskArtifactUpdateEvent");
// Second event should be completion
item = tappedQueue.dequeueEventItem(5000);
assertNotNull(item, "Should receive completion event");
event = item.getEvent();
assertInstanceOf(TaskStatusUpdateEvent.class, event,
"Second event should be TaskStatusUpdateEvent");
assertEquals(TaskState.TASK_STATE_COMPLETED,
((TaskStatusUpdateEvent) event).status().state(),
"Task should be completed");
}
/**
* Test 3: TaskStore persistence through AUTH_REQUIRED lifecycle.
* Verifies:
* - AUTH_REQUIRED state persisted correctly
* - State transitions persisted (AUTH_REQUIRED → WORKING → COMPLETED)
* - TaskStore always reflects current state
*/
@Test
void testAuthRequired_TaskStorePersistence() throws Exception {
// Arrange: Agent emits AUTH_REQUIRED, then WORKING, then COMPLETED
CountDownLatch authEmitted = new CountDownLatch(1);
CountDownLatch continueAgent = new CountDownLatch(1);
agentExecutorExecute = (context, emitter) -> {
// Emit AUTH_REQUIRED
emitter.requiresAuth();
authEmitted.countDown();
// Wait for test to verify AUTH_REQUIRED persisted
try {
continueAgent.await(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
// Continue working (simulating auth received out-of-band)
emitter.startWork();
// Complete the task
emitter.complete();
};
// Create MessageSendParams
MessageSendParams params = MessageSendParams.builder()
.message(MESSAGE)
.configuration(DEFAULT_CONFIG)
.build();
// Act: Send message
EventKind eventKind = requestHandler.onMessageSend(params, NULL_CONTEXT);
assertInstanceOf(Task.class, eventKind);
Task task = (Task) eventKind;
assertTrue(authEmitted.await(2, TimeUnit.SECONDS),
"AUTH_REQUIRED should be emitted");
// Assert: Verify AUTH_REQUIRED state persisted
Task storedTask1 = taskStore.get(task.id());
assertNotNull(storedTask1, "Task should be in TaskStore");
assertEquals(TaskState.TASK_STATE_AUTH_REQUIRED, storedTask1.status().state(),
"TaskStore should have AUTH_REQUIRED state");
// Release agent to continue
continueAgent.countDown();
// Wait for state transitions to process
Thread.sleep(1000);
// Verify WORKING state persisted
Task storedTask2 = taskStore.get(task.id());
// Note: WORKING might be skipped if processing is fast, so we accept either WORKING or COMPLETED
TaskState state2 = storedTask2.status().state();
assertTrue(state2 == TaskState.TASK_STATE_WORKING || state2 == TaskState.TASK_STATE_COMPLETED,
"TaskStore should have WORKING or COMPLETED state");
// Wait a bit more and verify final COMPLETED state
Thread.sleep(500);
Task storedTask3 = taskStore.get(task.id());
assertEquals(TaskState.TASK_STATE_COMPLETED, storedTask3.status().state(),
"TaskStore should have COMPLETED state after agent finishes");
}
/**
* Test 4: Streaming with AUTH_REQUIRED continues in background.
* Verifies:
* - Client receives AUTH_REQUIRED in stream
* - Agent continues emitting artifacts after AUTH_REQUIRED
* - Artifacts stream to client
* - Completion event arrives in stream
*/
@Test
void testAuthRequired_Streaming_ContinuesInBackground() throws Exception {
// Arrange: Agent emits AUTH_REQUIRED, then streams artifacts
CountDownLatch authEmitted = new CountDownLatch(1);
CountDownLatch continueAgent = new CountDownLatch(1);
agentExecutorExecute = (context, emitter) -> {
// Emit AUTH_REQUIRED
emitter.requiresAuth();
authEmitted.countDown();
// Wait briefly (simulating auth happening out-of-band)
try {
continueAgent.await(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
// Continue streaming artifacts
emitter.addArtifact(List.of(new TextPart("Artifact 1")));
emitter.addArtifact(List.of(new TextPart("Artifact 2")));
emitter.complete();
};
// Create MessageSendParams
MessageSendParams params = MessageSendParams.builder()
.message(MESSAGE)
.configuration(DEFAULT_CONFIG)
.build();
// Act: Send message with streaming enabled
EventKind eventKind = requestHandler.onMessageSend(params, NULL_CONTEXT);
assertInstanceOf(Task.class, eventKind);
Task result = (Task) eventKind;
assertTrue(authEmitted.await(2, TimeUnit.SECONDS),
"AUTH_REQUIRED should be emitted");
// Verify AUTH_REQUIRED received
assertEquals(TaskState.TASK_STATE_AUTH_REQUIRED, result.status().state(),
"Should receive AUTH_REQUIRED state");
// Tap queue to receive subsequent events
EventQueue tappedQueue = queueManager.tap(result.id());
// Release agent to continue streaming
continueAgent.countDown();
// Assert: Verify artifacts stream through
EventQueueItem item1 = tappedQueue.dequeueEventItem(5000);
assertNotNull(item1, "Should receive first artifact");
assertInstanceOf(TaskArtifactUpdateEvent.class, item1.getEvent());
EventQueueItem item2 = tappedQueue.dequeueEventItem(5000);
assertNotNull(item2, "Should receive second artifact");
assertInstanceOf(TaskArtifactUpdateEvent.class, item2.getEvent());
// Verify completion arrives
EventQueueItem completionItem = tappedQueue.dequeueEventItem(5000);
assertNotNull(completionItem, "Should receive completion");
Event completionEvent = completionItem.getEvent();
assertInstanceOf(TaskStatusUpdateEvent.class, completionEvent);
assertEquals(TaskState.TASK_STATE_COMPLETED,
((TaskStatusUpdateEvent) completionEvent).status().state());
}
/**
* Test 5: Resubscription after AUTH_REQUIRED works correctly.
* Verifies:
* - Queue stays open after AUTH_REQUIRED and client disconnect
* - Can resubscribe (tap) after AUTH_REQUIRED
* - Late events received on resubscribed queue
* - Completion event arrives on resubscribed queue
*/
@Test
void testAuthRequired_Resubscription() throws Exception {
// Arrange: Agent emits AUTH_REQUIRED, simulates client disconnect, then continues
CountDownLatch authEmitted = new CountDownLatch(1);
CountDownLatch continueAgent = new CountDownLatch(1);
agentExecutorExecute = (context, emitter) -> {
// Emit AUTH_REQUIRED
emitter.requiresAuth();
authEmitted.countDown();
// Wait for test to simulate disconnect and resubscribe
try {
continueAgent.await(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
// Emit late events after "client reconnect"
emitter.addArtifact(List.of(new TextPart("Event after reconnect")));
emitter.complete();
};
// Create MessageSendParams
MessageSendParams params = MessageSendParams.builder()
.message(MESSAGE)
.configuration(DEFAULT_CONFIG)
.build();
// Act: Send message, get AUTH_REQUIRED
EventKind eventKind = requestHandler.onMessageSend(params, NULL_CONTEXT);
assertInstanceOf(Task.class, eventKind);
Task task = (Task) eventKind;
assertTrue(authEmitted.await(2, TimeUnit.SECONDS),
"AUTH_REQUIRED should be emitted");
assertEquals(TaskState.TASK_STATE_AUTH_REQUIRED, task.status().state(),
"Should receive AUTH_REQUIRED state");
// Simulate client disconnect by just waiting
Thread.sleep(100);
// Client reconnects: tap into queue (resubscription)
EventQueue resubscribedQueue = queueManager.tap(task.id());
assertNotNull(resubscribedQueue,
"Should be able to resubscribe after AUTH_REQUIRED");
// Release agent to continue
continueAgent.countDown();
// Assert: Late events arrive on resubscribed queue
EventQueueItem item = resubscribedQueue.dequeueEventItem(5000);
assertNotNull(item, "Should receive late artifact on resubscribed queue");
assertInstanceOf(TaskArtifactUpdateEvent.class, item.getEvent(),
"Should receive artifact update event");
// Verify completion arrives
EventQueueItem completionItem = resubscribedQueue.dequeueEventItem(5000);
assertNotNull(completionItem, "Should receive completion event");
Event completionEvent = completionItem.getEvent();
assertInstanceOf(TaskStatusUpdateEvent.class, completionEvent,
"Should receive status update event");
assertEquals(TaskState.TASK_STATE_COMPLETED,
((TaskStatusUpdateEvent) completionEvent).status().state(),
"Task should be completed");
}
/**
* Test: Reject SendMessage with mismatching contextId and taskId.
* When a message references an existing task but provides a different contextId,
* the request must be rejected with an InvalidParamsError.
* The task must not be in a terminal state, or the terminal-state guard fires first.
*/
@Test
void testRejectMismatchingContextId() throws Exception {
// Arrange: Create an initial task – agent stays active (working) so the task is NOT terminal
CountDownLatch agentStarted = new CountDownLatch(1);
CountDownLatch agentRelease = new CountDownLatch(1);
agentExecutorExecute = (context, emitter) -> {
emitter.startWork();
agentStarted.countDown();
try {
agentRelease.await(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
emitter.complete();
};
Message initialMessage = Message.builder()
.messageId("msg-1")
.role(Message.Role.ROLE_USER)
.contextId("original-context")
.parts(new TextPart("initial message"))
.build();
// returnImmediately=true so onMessageSend returns quickly (on first event)
MessageSendParams initialParams = MessageSendParams.builder()
.message(initialMessage)
.configuration(DEFAULT_CONFIG)
.build();
EventKind result = requestHandler.onMessageSend(initialParams, NULL_CONTEXT);
assertInstanceOf(Task.class, result);
Task task = (Task) result;
// Wait until agent has started (task is in WORKING state, not terminal)
assertTrue(agentStarted.await(5, TimeUnit.SECONDS));
try {
// Act & Assert: Send a follow-up message with matching taskId but wrong contextId
// The task is still WORKING, so the terminal guard does NOT fire.
// The contextId mismatch guard should fire instead.
Message mismatchedMessage = Message.builder()
.messageId("msg-2")
.role(Message.Role.ROLE_USER)
.taskId(task.id())
.contextId("wrong-context-does-not-exist")
.parts(new TextPart("follow-up message"))
.build();
MessageSendParams mismatchedParams = MessageSendParams.builder()
.message(mismatchedMessage)
.configuration(DEFAULT_CONFIG)
.build();
InvalidParamsError error = assertThrows(InvalidParamsError.class,
() -> requestHandler.onMessageSend(mismatchedParams, NULL_CONTEXT));
assertTrue(error.getMessage().contains(task.id()));
} finally {
// Release agent so it completes and doesn't leak
agentRelease.countDown();
}
}
/**
* Helper: creates a task, drives it to the given terminal state, then asserts that a
* follow-up SendMessage to that task throws UnsupportedOperationError (CORE-SEND-002).
*/
private void assertSendMessageToTerminalStateThrows(TaskState terminalState) throws Exception {
CountDownLatch agentCompleted = new CountDownLatch(1);
agentExecutorExecute = (context, emitter) -> {
switch (terminalState) {
case TASK_STATE_COMPLETED -> emitter.complete();
case TASK_STATE_CANCELED -> emitter.cancel();
case TASK_STATE_REJECTED -> emitter.reject();
// Use fail() (no-arg) which emits TaskStatusUpdateEvent(FAILED) via the normal path,
// ensuring the task state is persisted to the store before we query it.
case TASK_STATE_FAILED -> emitter.fail();
default -> throw new IllegalArgumentException("Unexpected state: " + terminalState);
}
agentCompleted.countDown();
};
Message initialMessage = Message.builder()
.messageId("msg-initial-" + terminalState)
.role(Message.Role.ROLE_USER)
.parts(new TextPart("create task"))
.build();
EventKind result = requestHandler.onMessageSend(
MessageSendParams.builder().message(initialMessage).configuration(DEFAULT_CONFIG).build(),
NULL_CONTEXT);
assertInstanceOf(Task.class, result);
Task task = (Task) result;
final String finalTaskId = task.id();
assertTrue(agentCompleted.await(5, TimeUnit.SECONDS), "Agent should complete");
Thread.sleep(200); // allow MainEventBusProcessor to persist the final state
Task storedTask = taskStore.get(finalTaskId);
assertNotNull(storedTask);
assertEquals(terminalState, storedTask.status().state(),
"Task should be in " + terminalState + " state");
// Reset: agent executor must NOT be called on the follow-up
agentExecutorExecute = (context, emitter) -> {
throw new AssertionError("AgentExecutor must NOT be invoked for a terminal task");
};
Message followUpMessage = Message.builder()
.messageId("msg-followup-" + terminalState)
.role(Message.Role.ROLE_USER)
.taskId(finalTaskId)
.parts(new TextPart("follow-up to terminal task"))
.build();
MessageSendParams followUpParams = MessageSendParams.builder()
.message(followUpMessage)
.configuration(DEFAULT_CONFIG)
.build();
UnsupportedOperationError error = assertThrows(UnsupportedOperationError.class,
() -> requestHandler.onMessageSend(followUpParams, NULL_CONTEXT),
"Expected UnsupportedOperationError when sending message to a " + terminalState + " task");
assertNotNull(error.getMessage(), "Error message should not be null");
assertTrue(error.getMessage().contains(finalTaskId),
"Error message should reference the task id");
}
/**
* CORE-SEND-002: SendMessage to a completed task must return UnsupportedOperationError.
*/
@Test
void testSendMessage_ToCompletedTask_ThrowsUnsupportedOperationError() throws Exception {
assertSendMessageToTerminalStateThrows(TaskState.TASK_STATE_COMPLETED);
}
/**
* CORE-SEND-002: SendMessage to a canceled task must return UnsupportedOperationError.
*/
@Test
void testSendMessage_ToCanceledTask_ThrowsUnsupportedOperationError() throws Exception {
assertSendMessageToTerminalStateThrows(TaskState.TASK_STATE_CANCELED);
}
/**
* CORE-SEND-002: SendMessage to a rejected task must return UnsupportedOperationError.
*/
@Test
void testSendMessage_ToRejectedTask_ThrowsUnsupportedOperationError() throws Exception {
assertSendMessageToTerminalStateThrows(TaskState.TASK_STATE_REJECTED);
}
/**
* CORE-SEND-002: SendMessage to a failed task must return UnsupportedOperationError.
*/
@Test
void testSendMessage_ToFailedTask_ThrowsUnsupportedOperationError() throws Exception {
assertSendMessageToTerminalStateThrows(TaskState.TASK_STATE_FAILED);
}
/**
* Test: SendStreamingMessage to a task in a terminal state must also return UnsupportedOperationError
* (CORE-SEND-002, streaming path).
*/
@Test
void testSendMessageStream_ToCompletedTask_ThrowsUnsupportedOperationError() throws Exception {
// Arrange: Create and complete an initial task
CountDownLatch agentCompleted = new CountDownLatch(1);
agentExecutorExecute = (context, emitter) -> {
emitter.complete();
agentCompleted.countDown();
};
Message initialMessage = Message.builder()
.messageId("msg-initial-stream")
.role(Message.Role.ROLE_USER)
.parts(new TextPart("create task for stream test"))
.build();
MessageSendParams initialParams = MessageSendParams.builder()
.message(initialMessage)
.configuration(DEFAULT_CONFIG)
.build();
EventKind result = requestHandler.onMessageSend(initialParams, NULL_CONTEXT);
assertInstanceOf(Task.class, result);
Task task = (Task) result;
assertTrue(agentCompleted.await(5, TimeUnit.SECONDS), "Agent should complete");
Thread.sleep(200); // allow MainEventBusProcessor to persist
// Verify task is in terminal state
Task storedTask = taskStore.get(task.id());
assertNotNull(storedTask);
assertEquals(TaskState.TASK_STATE_COMPLETED, storedTask.status().state());
// Reset: agent executor must NOT be called
agentExecutorExecute = (context, emitter) -> {
throw new AssertionError("AgentExecutor must NOT be invoked for a terminal task");
};
// Act & Assert: streaming follow-up to a terminal task must also be rejected
Message followUpMessage = Message.builder()
.messageId("msg-followup-stream")
.role(Message.Role.ROLE_USER)
.taskId(task.id())
.parts(new TextPart("streaming follow-up to completed task"))
.build();
MessageSendParams followUpParams = MessageSendParams.builder()
.message(followUpMessage)
.configuration(DEFAULT_CONFIG)
.build();
assertThrows(UnsupportedOperationError.class,
() -> requestHandler.onMessageSendStream(followUpParams, NULL_CONTEXT),
"Expected UnsupportedOperationError when streaming message to a completed task");
}
}