From 33b51b3fb9f0beb9e6f18d4ff5af7527cac79c29 Mon Sep 17 00:00:00 2001 From: Timothy Zelinsky Date: Mon, 23 Mar 2026 15:30:30 +1100 Subject: [PATCH 1/2] Add streamed structured output and split messaging internals --- .../AgentDemoViewModel+StructuredOutput.swift | 94 +++++ .../Shared/AgentDemoViewModel.swift | 13 + .../Shared/DemoStructuredOutputExamples.swift | 27 ++ .../Shared/StructuredOutputDemoView.swift | 213 ++++++++++ .../Shared/ThreadDetailView.swift | 64 ++- README.md | 30 ++ Sources/CodexKit/Runtime/AgentBackend.swift | 4 + Sources/CodexKit/Runtime/AgentModels.swift | 27 ++ .../Runtime/AgentRuntime+Memory.swift | 1 + .../AgentRuntime+MessageCollection.swift | 76 ++++ .../Runtime/AgentRuntime+Messaging.swift | 315 +++++---------- .../AgentRuntime+TurnConsumption.swift | 376 ++++++++++++++++++ .../Runtime/AgentStructuredStreaming.swift | 78 ++++ .../CodexResponsesBackend+Models.swift | 3 + .../CodexResponsesBackend+Streaming.swift | 14 +- ...dexResponsesBackend+StructuredOutput.swift | 246 ++++++++++++ .../Runtime/CodexResponsesBackend.swift | 123 +++++- Sources/CodexKit/Support/JSONValue.swift | 9 + .../AgentRuntimeMessageTests.swift | 317 +++++++++++++++ Tests/CodexKitTests/AgentRuntimeTests.swift | 39 +- .../CodexResponsesBackendTests.swift | 103 +++++ .../Support/InMemoryAgentBackend.swift | 105 ++++- 22 files changed, 2044 insertions(+), 233 deletions(-) create mode 100644 Sources/CodexKit/Runtime/AgentRuntime+MessageCollection.swift create mode 100644 Sources/CodexKit/Runtime/AgentRuntime+TurnConsumption.swift create mode 100644 Sources/CodexKit/Runtime/AgentStructuredStreaming.swift create mode 100644 Sources/CodexKit/Runtime/CodexResponsesBackend+StructuredOutput.swift diff --git a/DemoApp/AssistantRuntimeDemoApp/Shared/AgentDemoViewModel+StructuredOutput.swift b/DemoApp/AssistantRuntimeDemoApp/Shared/AgentDemoViewModel+StructuredOutput.swift index fea9a70..be7262b 100644 --- a/DemoApp/AssistantRuntimeDemoApp/Shared/AgentDemoViewModel+StructuredOutput.swift +++ b/DemoApp/AssistantRuntimeDemoApp/Shared/AgentDemoViewModel+StructuredOutput.swift @@ -99,4 +99,98 @@ extension AgentDemoViewModel { lastError = error.localizedDescription } } + + func runStreamedStructuredOutputDemo() async { + guard session != nil else { + lastError = "Sign in before running the streamed structured output demo." + return + } + guard !isRunningStructuredStreamingDemo else { + return + } + + isRunningStructuredStreamingDemo = true + lastError = nil + structuredStreamingResult = nil + structuredStreamingError = nil + defer { + isRunningStructuredStreamingDemo = false + } + + do { + let thread = try await runtime.createThread( + title: "Structured Output: Streamed Delivery Update", + personaStack: Self.supportPersona + ) + let request = DemoStructuredOutputExamples.streamedStructuredRequest() + if showResolvedInstructionsDebug { + lastResolvedInstructions = try await runtime.resolvedInstructionsPreview( + for: thread.id, + request: request + ) + lastResolvedInstructionsThreadTitle = thread.title ?? "Structured Output: Streamed Delivery Update" + } + + let stream = try await runtime.streamMessage( + request, + in: thread.id, + expecting: StreamedStructuredDeliveryUpdate.self + ) + + var visibleText = "" + var partialSnapshots: [StreamedStructuredDeliveryUpdate] = [] + var committedPayload: StreamedStructuredDeliveryUpdate? + + for try await event in stream { + switch event { + case let .assistantMessageDelta(_, _, delta): + visibleText += delta + + case let .messageCommitted(message): + if message.role == .assistant { + visibleText = message.displayText + } + + case let .structuredOutputPartial(partial): + if partialSnapshots.last != partial { + partialSnapshots.append(partial) + } + + case let .structuredOutputCommitted(payload): + committedPayload = payload + + case let .turnFailed(error): + throw error + + default: + break + } + } + + let messages = await runtime.messages(for: thread.id) + let persistedMetadata = messages.last(where: { $0.role == .assistant })?.structuredOutput + + guard let committedPayload else { + throw AgentRuntimeError.structuredOutputMissing( + formatName: StreamedStructuredDeliveryUpdate.responseFormat.name + ) + } + + structuredStreamingResult = StructuredStreamingDemoResult( + threadID: thread.id, + threadTitle: thread.title ?? "Structured Output: Streamed Delivery Update", + prompt: DemoStructuredOutputExamples.streamedStructuredPrompt, + visibleText: visibleText.trimmingCharacters(in: .whitespacesAndNewlines), + partialSnapshots: partialSnapshots, + committedPayload: committedPayload, + persistedMetadata: persistedMetadata + ) + threads = await runtime.threads() + activeThreadID = thread.id + setMessages(messages) + } catch { + structuredStreamingError = error.localizedDescription + lastError = error.localizedDescription + } + } } diff --git a/DemoApp/AssistantRuntimeDemoApp/Shared/AgentDemoViewModel.swift b/DemoApp/AssistantRuntimeDemoApp/Shared/AgentDemoViewModel.swift index 4a6094d..2c63019 100644 --- a/DemoApp/AssistantRuntimeDemoApp/Shared/AgentDemoViewModel.swift +++ b/DemoApp/AssistantRuntimeDemoApp/Shared/AgentDemoViewModel.swift @@ -39,6 +39,16 @@ struct StructuredOutputDemoImportResult: Sendable { let summary: StructuredImportedContentSummary } +struct StructuredStreamingDemoResult: Sendable { + let threadID: String + let threadTitle: String + let prompt: String + let visibleText: String + let partialSnapshots: [StreamedStructuredDeliveryUpdate] + let committedPayload: StreamedStructuredDeliveryUpdate + let persistedMetadata: AgentStructuredOutputMetadata? +} + struct GuidedMemoryDemoResult: Sendable { let record: MemoryRecord let diagnostics: MemoryStoreDiagnostics @@ -135,8 +145,11 @@ final class AgentDemoViewModel: @unchecked Sendable { var pendingComposerImages: [AgentImageAttachment] = [] var composerText = "" var isRunningStructuredOutputDemo = false + var isRunningStructuredStreamingDemo = false var structuredShippingReplyResult: StructuredOutputDemoDraftResult? var structuredImportedSummaryResult: StructuredOutputDemoImportResult? + var structuredStreamingResult: StructuredStreamingDemoResult? + var structuredStreamingError: String? var isRunningMemoryDemo = false var automaticMemoryResult: AutomaticMemoryDemoResult? var automaticPolicyMemoryResult: AutomaticPolicyMemoryDemoResult? diff --git a/DemoApp/AssistantRuntimeDemoApp/Shared/DemoStructuredOutputExamples.swift b/DemoApp/AssistantRuntimeDemoApp/Shared/DemoStructuredOutputExamples.swift index 3b79e35..4800671 100644 --- a/DemoApp/AssistantRuntimeDemoApp/Shared/DemoStructuredOutputExamples.swift +++ b/DemoApp/AssistantRuntimeDemoApp/Shared/DemoStructuredOutputExamples.swift @@ -47,6 +47,26 @@ struct StructuredImportedContentSummary: AgentStructuredOutput, Sendable { ) } +struct StreamedStructuredDeliveryUpdate: AgentStructuredOutput, Sendable, Hashable { + let statusHeadline: String + let customerPromise: String + let nextAction: String + + static let responseFormat = AgentStructuredOutputFormat( + name: "streamed_delivery_update", + description: "A structured operational delivery update produced alongside visible assistant narration.", + schema: .object( + properties: [ + "statusHeadline": .string(), + "customerPromise": .string(), + "nextAction": .string(), + ], + required: ["statusHeadline", "customerPromise", "nextAction"], + additionalProperties: false + ) + ) +} + enum DemoStructuredOutputExamples { static let shippingCustomerMessage = """ My package was supposed to arrive yesterday for a birthday on Saturday. Tracking has not moved in two days and I need to know whether it will make it in time. @@ -57,6 +77,9 @@ enum DemoStructuredOutputExamples { """ static let importedArticleURL = URL(string: "https://github.com/timazed/CodexKit")! + static let streamedStructuredPrompt = """ + The package is delayed ahead of a birthday delivery. Talk to the customer like an in-app support assistant while you work through the situation. Stream a short human-readable response only. Do not restate the final structured delivery fields in prose because the app receives those separately. Then provide the final typed delivery update for the app. + """ static func shippingReplyRequest() -> UserMessageRequest { UserMessageRequest( @@ -78,4 +101,8 @@ enum DemoStructuredOutputExamples { ) ) } + + static func streamedStructuredRequest() -> UserMessageRequest { + UserMessageRequest(text: streamedStructuredPrompt) + } } diff --git a/DemoApp/AssistantRuntimeDemoApp/Shared/StructuredOutputDemoView.swift b/DemoApp/AssistantRuntimeDemoApp/Shared/StructuredOutputDemoView.swift index f2eb763..7934bdd 100644 --- a/DemoApp/AssistantRuntimeDemoApp/Shared/StructuredOutputDemoView.swift +++ b/DemoApp/AssistantRuntimeDemoApp/Shared/StructuredOutputDemoView.swift @@ -1,3 +1,4 @@ +import CodexKit import Foundation import SwiftUI @@ -15,6 +16,7 @@ struct StructuredOutputDemoView: View { ScrollView { VStack(alignment: .leading, spacing: 18) { overviewCard + streamedStructuredCard shippingDraftCard importedContentCard } @@ -34,6 +36,10 @@ private extension StructuredOutputDemoView { .font(.subheadline) .foregroundStyle(.secondary) + Text("The streamed demo goes one step further: it shows live assistant narration, best-effort typed partials, and the final persisted payload metadata without asking the app to parse hidden text markers.") + .font(.subheadline) + .foregroundStyle(.secondary) + if let session = viewModel.session { Label("Signed in as \(session.account.email)", systemImage: "checkmark.seal.fill") .font(.subheadline.weight(.medium)) @@ -51,6 +57,176 @@ private extension StructuredOutputDemoView { } } + var streamedStructuredCard: some View { + DemoSectionCard { + Text("Streamed Text + Typed Payload") + .font(.headline) + + Text("Streams customer-facing prose and a typed delivery update in the same turn. The final payload is also persisted on the assistant message as metadata.") + .font(.subheadline) + .foregroundStyle(.secondary) + + sampleInputCard( + title: "Sample mixed-mode prompt", + body: DemoStructuredOutputExamples.streamedStructuredPrompt + ) + + DemoActionTile( + title: viewModel.isRunningStructuredStreamingDemo ? "Streaming Structured Turn..." : "Run Streamed Structured Demo", + subtitle: "Uses `streamMessage(..., expecting:)` to yield prose deltas, typed partials, and a committed payload.", + systemImage: "bubble.left.and.text.bubble.right.fill", + isProminent: true, + isDisabled: viewModel.session == nil || viewModel.isRunningStructuredStreamingDemo + ) { + Task { + await viewModel.runStreamedStructuredOutputDemo() + } + } + + streamedStatusPanel + + if let result = viewModel.structuredStreamingResult { + VStack(alignment: .leading, spacing: 14) { + VStack(alignment: .leading, spacing: 8) { + Text("Run checks") + .font(.subheadline.weight(.semibold)) + + statusRow( + title: "Committed typed payload", + detail: "A final `StreamedStructuredDeliveryUpdate` was emitted before turn completion.", + passed: true + ) + statusRow( + title: "Persisted metadata", + detail: result.persistedMetadata == nil + ? "The final assistant message did not keep structured metadata." + : "The final assistant message now includes `structuredOutput` metadata for restore and inspection.", + passed: result.persistedMetadata != nil + ) + statusRow( + title: "Partial snapshots", + detail: result.partialSnapshots.isEmpty + ? "This run did not produce a decodable partial before commit, which is still valid." + : "Received \(result.partialSnapshots.count) best-effort typed partial snapshot\(result.partialSnapshots.count == 1 ? "" : "s").", + passed: true + ) + } + .padding(14) + .background( + RoundedRectangle(cornerRadius: 16, style: .continuous) + .fill(Color.accentColor.opacity(0.10)) + ) + + Text("Visible assistant text") + .font(.subheadline.weight(.semibold)) + + Text(result.visibleText.isEmpty ? "No visible text captured." : result.visibleText) + .font(.body) + .frame(maxWidth: .infinity, alignment: .leading) + .padding(14) + .background( + RoundedRectangle(cornerRadius: 16, style: .continuous) + .fill(Color.primary.opacity(0.04)) + ) + + VStack(alignment: .leading, spacing: 8) { + Text("Typed partial snapshots") + .font(.subheadline.weight(.semibold)) + + if result.partialSnapshots.isEmpty { + Text("No decodable partial arrived before commit on this run.") + .font(.callout) + .foregroundStyle(.secondary) + } else { + ForEach(Array(result.partialSnapshots.enumerated()), id: \.offset) { index, partial in + VStack(alignment: .leading, spacing: 6) { + Text("Partial \(index + 1)") + .font(.caption.weight(.semibold)) + .foregroundStyle(.secondary) + resultRow(label: "Headline", value: partial.statusHeadline) + resultRow(label: "Promise", value: partial.customerPromise) + resultRow(label: "Next Action", value: partial.nextAction) + } + .padding(12) + .background( + RoundedRectangle(cornerRadius: 14, style: .continuous) + .fill(Color.accentColor.opacity(0.08)) + ) + } + } + } + + VStack(alignment: .leading, spacing: 8) { + Text("Committed payload") + .font(.subheadline.weight(.semibold)) + resultRow(label: "Headline", value: result.committedPayload.statusHeadline) + resultRow(label: "Promise", value: result.committedPayload.customerPromise) + resultRow(label: "Next Action", value: result.committedPayload.nextAction) + } + .padding(14) + .background( + RoundedRectangle(cornerRadius: 16, style: .continuous) + .fill(Color.accentColor.opacity(0.10)) + ) + + VStack(alignment: .leading, spacing: 8) { + Text("Persisted metadata") + .font(.subheadline.weight(.semibold)) + + if let metadata = result.persistedMetadata { + resultRow(label: "Format", value: metadata.formatName) + Text(metadata.payload.prettyJSONString) + .font(.system(.footnote, design: .monospaced)) + .frame(maxWidth: .infinity, alignment: .leading) + .padding(12) + .background( + RoundedRectangle(cornerRadius: 14, style: .continuous) + .fill(Color.primary.opacity(0.04)) + ) + .textSelection(.enabled) + } else { + Text("No structured metadata was persisted with the final assistant message.") + .font(.callout) + .foregroundStyle(.secondary) + } + } + + Button("Open Thread In Assistant") { + Task { + await viewModel.activateThread(id: result.threadID) + selectedTab = .assistant + } + } + .buttonStyle(.bordered) + } + } + } + } + + @ViewBuilder + var streamedStatusPanel: some View { + if viewModel.isRunningStructuredStreamingDemo { + statusBanner( + title: "Streaming in progress", + detail: "Watching for live prose, typed partials, and the final persisted payload metadata.", + tint: .orange + ) + } else if let result = viewModel.structuredStreamingResult { + let metadataState = result.persistedMetadata == nil ? "missing" : "saved" + statusBanner( + title: "Streamed structured demo passed", + detail: "Committed payload received and metadata \(metadataState) on thread `\(result.threadTitle)`.", + tint: .green + ) + } else if let error = viewModel.structuredStreamingError { + statusBanner( + title: "Streamed structured demo failed", + detail: error, + tint: .red + ) + } + } + var shippingDraftCard: some View { DemoSectionCard { Text("Shipping Reply Draft") @@ -202,4 +378,41 @@ private extension StructuredOutputDemoView { .frame(maxWidth: .infinity, alignment: .leading) } } + + func statusBanner(title: String, detail: String, tint: Color) -> some View { + VStack(alignment: .leading, spacing: 6) { + Text(title) + .font(.subheadline.weight(.semibold)) + Text(detail) + .font(.callout) + .foregroundStyle(.secondary) + } + .frame(maxWidth: .infinity, alignment: .leading) + .padding(14) + .background( + RoundedRectangle(cornerRadius: 16, style: .continuous) + .fill(tint.opacity(0.12)) + ) + .overlay( + RoundedRectangle(cornerRadius: 16, style: .continuous) + .strokeBorder(tint.opacity(0.25), lineWidth: 1) + ) + } + + func statusRow(title: String, detail: String, passed: Bool) -> some View { + HStack(alignment: .top, spacing: 10) { + Image(systemName: passed ? "checkmark.circle.fill" : "xmark.circle.fill") + .foregroundStyle(passed ? .green : .red) + .font(.subheadline) + + VStack(alignment: .leading, spacing: 2) { + Text(title) + .font(.subheadline.weight(.medium)) + Text(detail) + .font(.caption) + .foregroundStyle(.secondary) + } + } + .frame(maxWidth: .infinity, alignment: .leading) + } } diff --git a/DemoApp/AssistantRuntimeDemoApp/Shared/ThreadDetailView.swift b/DemoApp/AssistantRuntimeDemoApp/Shared/ThreadDetailView.swift index a37aa06..d77e17f 100644 --- a/DemoApp/AssistantRuntimeDemoApp/Shared/ThreadDetailView.swift +++ b/DemoApp/AssistantRuntimeDemoApp/Shared/ThreadDetailView.swift @@ -119,8 +119,14 @@ private extension ThreadDetailView { .font(.caption.weight(.semibold)) .foregroundStyle(.secondary) - Text(message.displayText) - .frame(maxWidth: .infinity, alignment: .leading) + if shouldShowVisibleText(for: message) { + Text(message.displayText) + .frame(maxWidth: .infinity, alignment: .leading) + } + + if let structuredOutput = message.structuredOutput { + structuredOutputCard(structuredOutput, for: message) + } if !message.images.isEmpty { attachmentGallery(for: message.images) @@ -289,6 +295,60 @@ private extension ThreadDetailView { } } + func structuredOutputCard( + _ structuredOutput: AgentStructuredOutputMetadata, + for message: AgentMessage + ) -> some View { + VStack(alignment: .leading, spacing: 8) { + Text("Structured Payload") + .font(.caption.weight(.semibold)) + .foregroundStyle(.secondary) + + if isPureStructuredPayloadMessage(message) { + Text("This assistant turn resolved into a typed structured payload.") + .font(.callout) + .foregroundStyle(.secondary) + } + + Label(structuredOutput.formatName, systemImage: "square.stack.3d.up.fill") + .font(.caption.weight(.medium)) + .foregroundStyle(.secondary) + + Text(structuredOutput.payload.prettyJSONString) + .font(.system(.footnote, design: .monospaced)) + .frame(maxWidth: .infinity, alignment: .leading) + .padding(12) + .background( + RoundedRectangle(cornerRadius: 14, style: .continuous) + .fill(Color.primary.opacity(0.04)) + ) + .textSelection(.enabled) + } + .padding(.top, shouldShowVisibleText(for: message) ? 4 : 0) + } + + func shouldShowVisibleText(for message: AgentMessage) -> Bool { + guard !isPureStructuredPayloadMessage(message) else { + return false + } + return !message.displayText.trimmingCharacters(in: .whitespacesAndNewlines).isEmpty + } + + func isPureStructuredPayloadMessage(_ message: AgentMessage) -> Bool { + guard let structuredOutput = message.structuredOutput else { + return false + } + + let rawText = message.text.trimmingCharacters(in: .whitespacesAndNewlines) + guard !rawText.isEmpty, + let data = rawText.data(using: .utf8), + let parsed = try? JSONDecoder().decode(JSONValue.self, from: data) else { + return false + } + + return parsed == structuredOutput.payload + } + func importPhoto(from item: PhotosPickerItem) async { isImportingPhoto = true diff --git a/README.md b/README.md index 0510901..7826c58 100644 --- a/README.md +++ b/README.md @@ -121,6 +121,7 @@ let stream = try await runtime.streamMessage( | Structured local memory layer | Yes | | Text + image input | Yes | | Typed structured output (`Decodable`) | Yes | +| Mixed streamed text + typed structured output | Yes | | Share/import helper (`AgentImportedContent`) | Yes | | App Intents / Shortcuts example | Yes | | Assistant image attachment rendering | Yes | @@ -216,6 +217,8 @@ For most apps, there are now three common send paths: - `streamMessage(...)` Stream deltas, tool events, approvals, and final turn completion. +- `streamMessage(..., expecting:)` + Stream normal turn events plus typed structured-output events in the same turn. - `sendMessage(...)` Return the assistant's final text as a `String`. - `sendMessage(..., expecting:)` @@ -260,6 +263,32 @@ let draft = try await runtime.sendMessage( ) ``` +If you want streamed prose and typed machine output in the same turn, use the streaming overload: + +```swift +let stream = try await runtime.streamMessage( + UserMessageRequest(text: "Draft a response for the delayed package."), + in: thread.id, + expecting: ShippingReplyDraft.self, + options: .init(required: true) +) + +for try await event in stream { + switch event { + case let .assistantMessageDelta(_, _, delta): + print("visible:", delta) + case let .structuredOutputPartial(snapshot): + print("partial:", snapshot) + case let .structuredOutputCommitted(snapshot): + print("final:", snapshot) + default: + break + } +} +``` + +The structured payload is delivered out-of-band from assistant prose. CodexKit strips its internal framing before emitting text deltas or committed assistant messages, and persists the final committed payload metadata with the assistant message for later restore/inspection. + `CodexKit` sends that through the OpenAI Responses structured-output path and stores the assistant's final JSON reply in thread history like any other assistant turn. If you need something more specialized, `AgentStructuredOutputFormat` still supports a raw-schema escape hatch via `rawSchema: JSONValue`. @@ -761,6 +790,7 @@ print(preview) The 2.0 line standardizes runtime sends around: - `streamMessage(...)` for streaming turn events +- `streamMessage(..., expecting:)` for mixed prose + typed structured stream events - `sendMessage(...)` for final text - `sendMessage(..., expecting:)` for typed structured replies diff --git a/Sources/CodexKit/Runtime/AgentBackend.swift b/Sources/CodexKit/Runtime/AgentBackend.swift index b0bf3f7..a27ec9e 100644 --- a/Sources/CodexKit/Runtime/AgentBackend.swift +++ b/Sources/CodexKit/Runtime/AgentBackend.swift @@ -4,6 +4,9 @@ public enum AgentBackendEvent: Sendable { case turnStarted(AgentTurn) case assistantMessageDelta(threadID: String, turnID: String, delta: String) case assistantMessageCompleted(AgentMessage) + case structuredOutputPartial(JSONValue) + case structuredOutputCommitted(JSONValue) + case structuredOutputValidationFailed(AgentStructuredOutputValidationFailure) case toolCallRequested(ToolInvocation) case turnCompleted(AgentTurnSummary) } @@ -23,6 +26,7 @@ public protocol AgentBackend: Sendable { message: UserMessageRequest, instructions: String, responseFormat: AgentStructuredOutputFormat?, + streamedStructuredOutput: AgentStreamedStructuredOutputRequest?, tools: [ToolDefinition], session: ChatGPTSession ) async throws -> any AgentTurnStreaming diff --git a/Sources/CodexKit/Runtime/AgentModels.swift b/Sources/CodexKit/Runtime/AgentModels.swift index a7dcea3..56da2a0 100644 --- a/Sources/CodexKit/Runtime/AgentModels.swift +++ b/Sources/CodexKit/Runtime/AgentModels.swift @@ -59,6 +59,25 @@ public struct AgentRuntimeError: Error, LocalizedError, Equatable, Sendable { ) } + public static func structuredOutputMissing( + formatName: String + ) -> AgentRuntimeError { + AgentRuntimeError( + code: "structured_output_missing", + message: "The assistant turn completed without returning structured output for \(formatName)." + ) + } + + public static func structuredOutputInvalid( + stage: AgentStructuredOutputValidationStage, + underlyingMessage: String + ) -> AgentRuntimeError { + AgentRuntimeError( + code: "structured_output_invalid", + message: "The assistant returned invalid \(stage.rawValue) structured output: \(underlyingMessage)" + ) + } + public static func invalidSkillID(_ skillID: String) -> AgentRuntimeError { AgentRuntimeError( code: "invalid_skill_id", @@ -367,6 +386,7 @@ public struct AgentMessage: Identifiable, Codable, Hashable, Sendable { public var role: AgentRole public var text: String public var images: [AgentImageAttachment] + public var structuredOutput: AgentStructuredOutputMetadata? public var createdAt: Date public init( @@ -375,6 +395,7 @@ public struct AgentMessage: Identifiable, Codable, Hashable, Sendable { role: AgentRole, text: String, images: [AgentImageAttachment] = [], + structuredOutput: AgentStructuredOutputMetadata? = nil, createdAt: Date = Date() ) { self.id = id @@ -382,6 +403,7 @@ public struct AgentMessage: Identifiable, Codable, Hashable, Sendable { self.role = role self.text = text self.images = images + self.structuredOutput = structuredOutput self.createdAt = createdAt } @@ -407,6 +429,7 @@ public struct AgentMessage: Identifiable, Codable, Hashable, Sendable { case role case text case images + case structuredOutput case createdAt } @@ -417,6 +440,10 @@ public struct AgentMessage: Identifiable, Codable, Hashable, Sendable { role = try container.decode(AgentRole.self, forKey: .role) text = try container.decode(String.self, forKey: .text) images = try container.decodeIfPresent([AgentImageAttachment].self, forKey: .images) ?? [] + structuredOutput = try container.decodeIfPresent( + AgentStructuredOutputMetadata.self, + forKey: .structuredOutput + ) createdAt = try container.decodeIfPresent(Date.self, forKey: .createdAt) ?? Date() } } diff --git a/Sources/CodexKit/Runtime/AgentRuntime+Memory.swift b/Sources/CodexKit/Runtime/AgentRuntime+Memory.swift index 484dc86..cc6eda6 100644 --- a/Sources/CodexKit/Runtime/AgentRuntime+Memory.swift +++ b/Sources/CodexKit/Runtime/AgentRuntime+Memory.swift @@ -178,6 +178,7 @@ extension AgentRuntime { responseFormat: MemoryExtractionDraftResponse.responseFormat( maxMemories: max(1, options.maxMemories) ), + streamedStructuredOutput: nil, tools: [], session: session ) diff --git a/Sources/CodexKit/Runtime/AgentRuntime+MessageCollection.swift b/Sources/CodexKit/Runtime/AgentRuntime+MessageCollection.swift new file mode 100644 index 0000000..951650b --- /dev/null +++ b/Sources/CodexKit/Runtime/AgentRuntime+MessageCollection.swift @@ -0,0 +1,76 @@ +import Foundation + +extension AgentRuntime { + // MARK: - Message Collection + + func collectFinalAssistantMessage( + from stream: AsyncThrowingStream + ) async throws -> AgentMessage { + var latestAssistantMessage: AgentMessage? + + for try await event in stream { + guard case let .messageCommitted(message) = event, + message.role == .assistant + else { + continue + } + + latestAssistantMessage = message + } + + guard let latestAssistantMessage else { + throw AgentRuntimeError.assistantResponseMissing() + } + + return latestAssistantMessage + } + + func decodeStructuredValue( + _ value: JSONValue, + as outputType: Output.Type, + decoder: JSONDecoder + ) throws -> Output { + let payload = try JSONEncoder().encode(value) + do { + return try decoder.decode(outputType, from: payload) + } catch { + throw AgentRuntimeError.structuredOutputDecodingFailed( + typeName: String(describing: outputType), + underlyingMessage: error.localizedDescription + ) + } + } + + func collectFinalAssistantMessage( + from turnStream: any AgentTurnStreaming + ) async throws -> AgentMessage { + var latestAssistantMessage: AgentMessage? + + for try await event in turnStream.events { + switch event { + case let .assistantMessageCompleted(message): + if message.role == .assistant { + latestAssistantMessage = message + } + + case let .toolCallRequested(invocation): + try await turnStream.submitToolResult( + .failure( + invocation: invocation, + message: "Automatic memory capture does not allow tool calls." + ), + for: invocation.id + ) + + default: + break + } + } + + guard let latestAssistantMessage else { + throw AgentRuntimeError.assistantResponseMissing() + } + + return latestAssistantMessage + } +} diff --git a/Sources/CodexKit/Runtime/AgentRuntime+Messaging.swift b/Sources/CodexKit/Runtime/AgentRuntime+Messaging.swift index a90ae86..791e48d 100644 --- a/Sources/CodexKit/Runtime/AgentRuntime+Messaging.swift +++ b/Sources/CodexKit/Runtime/AgentRuntime+Messaging.swift @@ -10,10 +10,103 @@ extension AgentRuntime { try await streamMessage( request, in: threadID, - responseFormat: nil + responseFormat: nil, + streamedStructuredOutput: nil ) } + public func streamMessage( + _ request: UserMessageRequest, + in threadID: String, + expecting outputType: Output.Type = Output.self, + options: AgentStructuredStreamingOptions = AgentStructuredStreamingOptions(), + decoder: JSONDecoder = JSONDecoder() + ) async throws -> AsyncThrowingStream, Error> { + try await streamMessage( + request, + in: threadID, + expecting: outputType, + responseFormat: outputType.responseFormat, + options: options, + decoder: decoder + ) + } + + public func streamMessage( + _ request: UserMessageRequest, + in threadID: String, + expecting outputType: Output.Type, + responseFormat: AgentStructuredOutputFormat, + options: AgentStructuredStreamingOptions = AgentStructuredStreamingOptions(), + decoder: JSONDecoder = JSONDecoder() + ) async throws -> AsyncThrowingStream, Error> { + guard request.hasContent else { + throw AgentRuntimeError.invalidMessageContent() + } + + guard let thread = thread(for: threadID) else { + throw AgentRuntimeError.threadNotFound(threadID) + } + + let session = try await sessionManager.requireSession() + let userMessage = AgentMessage( + threadID: threadID, + role: .user, + text: request.text, + images: request.images + ) + let priorMessages = state.messagesByThread[threadID] ?? [] + let resolvedTurnSkills = try resolveTurnSkills( + thread: thread, + message: request + ) + let resolvedInstructions = await resolveInstructions( + thread: thread, + message: request, + resolvedTurnSkills: resolvedTurnSkills + ) + + try await appendMessage(userMessage) + try await setThreadStatus(.streaming, for: threadID) + + let tools = await toolRegistry.allDefinitions() + let turnStart = try await beginTurnWithUnauthorizedRecovery( + thread: thread, + history: priorMessages, + message: request, + instructions: resolvedInstructions, + responseFormat: nil, + streamedStructuredOutput: AgentStreamedStructuredOutputRequest( + responseFormat: responseFormat, + options: options + ), + tools: tools, + session: session + ) + let turnStream = turnStart.turnStream + let turnSession = turnStart.session + + return AsyncThrowingStream { continuation in + continuation.yield(.messageCommitted(userMessage)) + continuation.yield(.threadStatusChanged(threadID: threadID, status: .streaming)) + + Task { + await self.consumeStructuredTurnStream( + turnStream, + for: threadID, + userMessage: userMessage, + session: turnSession, + resolvedTurnSkills: resolvedTurnSkills, + responseFormat: responseFormat, + options: options, + decoder: decoder, + outputType: outputType, + continuation: continuation + ) + } + } + } + public func sendMessage( _ request: UserMessageRequest, in threadID: String @@ -21,7 +114,8 @@ extension AgentRuntime { let stream = try await streamMessage( request, in: threadID, - responseFormat: nil + responseFormat: nil, + streamedStructuredOutput: nil ) let message = try await collectFinalAssistantMessage(from: stream) return message.displayText @@ -52,7 +146,8 @@ extension AgentRuntime { let stream = try await streamMessage( request, in: threadID, - responseFormat: responseFormat + responseFormat: responseFormat, + streamedStructuredOutput: nil ) let message = try await collectFinalAssistantMessage(from: stream) let payload = Data(message.text.trimmingCharacters(in: .whitespacesAndNewlines).utf8) @@ -70,7 +165,8 @@ extension AgentRuntime { func streamMessage( _ request: UserMessageRequest, in threadID: String, - responseFormat: AgentStructuredOutputFormat? + responseFormat: AgentStructuredOutputFormat?, + streamedStructuredOutput: AgentStreamedStructuredOutputRequest? ) async throws -> AsyncThrowingStream { guard request.hasContent else { throw AgentRuntimeError.invalidMessageContent() @@ -108,6 +204,7 @@ extension AgentRuntime { message: request, instructions: resolvedInstructions, responseFormat: responseFormat, + streamedStructuredOutput: streamedStructuredOutput, tools: tools, session: session ) @@ -137,6 +234,7 @@ extension AgentRuntime { message: UserMessageRequest, instructions: String, responseFormat: AgentStructuredOutputFormat?, + streamedStructuredOutput: AgentStreamedStructuredOutputRequest?, tools: [ToolDefinition], session: ChatGPTSession ) async throws -> ( @@ -152,6 +250,7 @@ extension AgentRuntime { message: message, instructions: instructions, responseFormat: responseFormat, + streamedStructuredOutput: streamedStructuredOutput, tools: tools, session: session ) @@ -180,212 +279,4 @@ extension AgentRuntime { resolvedTurnSkills: resolvedTurnSkills ) } - - // MARK: - Turn Consumption - - private func consumeTurnStream( - _ turnStream: any AgentTurnStreaming, - for threadID: String, - userMessage: AgentMessage, - session: ChatGPTSession, - resolvedTurnSkills: ResolvedTurnSkills, - continuation: AsyncThrowingStream.Continuation - ) async { - let policyTracker: TurnSkillPolicyTracker? = if resolvedTurnSkills.compiledToolPolicy.hasConstraints { - TurnSkillPolicyTracker(policy: resolvedTurnSkills.compiledToolPolicy) - } else { - nil - } - var assistantMessages: [AgentMessage] = [] - - do { - for try await backendEvent in turnStream.events { - switch backendEvent { - case let .turnStarted(turn): - continuation.yield(.turnStarted(turn)) - - case let .assistantMessageDelta(threadID, turnID, delta): - continuation.yield( - .assistantMessageDelta( - threadID: threadID, - turnID: turnID, - delta: delta - ) - ) - - case let .assistantMessageCompleted(message): - try await appendMessage(message) - if message.role == .assistant { - assistantMessages.append(message) - } - continuation.yield(.messageCommitted(message)) - - case let .toolCallRequested(invocation): - continuation.yield(.toolCallStarted(invocation)) - - let result: ToolResultEnvelope - if let policyTracker, - let validationError = policyTracker.validate(toolName: invocation.toolName) { - result = .failure( - invocation: invocation, - message: validationError.message - ) - } else { - let resolvedResult = try await resolveToolInvocation( - invocation, - session: session, - continuation: continuation - ) - result = resolvedResult - policyTracker?.recordAccepted(toolName: invocation.toolName) - } - - try await turnStream.submitToolResult(result, for: invocation.id) - continuation.yield(.toolCallFinished(result)) - try await setThreadStatus(.streaming, for: threadID) - continuation.yield(.threadStatusChanged(threadID: threadID, status: .streaming)) - - case let .turnCompleted(summary): - if let completionError = policyTracker?.completionError() { - try await setThreadStatus(.failed, for: threadID) - continuation.yield(.threadStatusChanged(threadID: threadID, status: .failed)) - continuation.yield(.turnFailed(completionError)) - continuation.finish(throwing: completionError) - return - } - - try await setThreadStatus(.idle, for: threadID) - await automaticallyCaptureMemoriesIfConfigured( - for: threadID, - userMessage: userMessage, - assistantMessages: assistantMessages - ) - continuation.yield(.threadStatusChanged(threadID: threadID, status: .idle)) - continuation.yield(.turnCompleted(summary)) - } - } - - continuation.finish() - } catch { - let runtimeError = (error as? AgentRuntimeError) - ?? AgentRuntimeError( - code: "turn_failed", - message: error.localizedDescription - ) - try? await setThreadStatus(.failed, for: threadID) - continuation.yield(.threadStatusChanged(threadID: threadID, status: .failed)) - continuation.yield(.turnFailed(runtimeError)) - continuation.finish(throwing: error) - } - } - - private func resolveToolInvocation( - _ invocation: ToolInvocation, - session: ChatGPTSession, - continuation: AsyncThrowingStream.Continuation - ) async throws -> ToolResultEnvelope { - if let definition = await toolRegistry.definition(named: invocation.toolName), - definition.approvalPolicy == .requiresApproval { - let approval = ApprovalRequest( - threadID: invocation.threadID, - turnID: invocation.turnID, - toolInvocation: invocation, - title: "Approve \(invocation.toolName)?", - message: definition.approvalMessage - ?? "This tool requires explicit approval before it can run." - ) - - try await setThreadStatus(.waitingForApproval, for: invocation.threadID) - continuation.yield( - .threadStatusChanged( - threadID: invocation.threadID, - status: .waitingForApproval - ) - ) - continuation.yield(.approvalRequested(approval)) - - let decision = try await approvalCoordinator.requestApproval(approval) - continuation.yield( - .approvalResolved( - ApprovalResolution( - requestID: approval.id, - threadID: approval.threadID, - turnID: approval.turnID, - decision: decision - ) - ) - ) - - guard decision == .approved else { - return .denied(invocation: invocation) - } - } - - try await setThreadStatus(.waitingForToolResult, for: invocation.threadID) - continuation.yield( - .threadStatusChanged( - threadID: invocation.threadID, - status: .waitingForToolResult - ) - ) - - return await toolRegistry.execute(invocation, session: session) - } - - // MARK: - Message Collection - - func collectFinalAssistantMessage( - from stream: AsyncThrowingStream - ) async throws -> AgentMessage { - var latestAssistantMessage: AgentMessage? - - for try await event in stream { - guard case let .messageCommitted(message) = event, - message.role == .assistant - else { - continue - } - - latestAssistantMessage = message - } - - guard let latestAssistantMessage else { - throw AgentRuntimeError.assistantResponseMissing() - } - - return latestAssistantMessage - } - - func collectFinalAssistantMessage( - from turnStream: any AgentTurnStreaming - ) async throws -> AgentMessage { - var latestAssistantMessage: AgentMessage? - - for try await event in turnStream.events { - switch event { - case let .assistantMessageCompleted(message): - if message.role == .assistant { - latestAssistantMessage = message - } - - case let .toolCallRequested(invocation): - try await turnStream.submitToolResult( - .failure( - invocation: invocation, - message: "Automatic memory capture does not allow tool calls." - ), - for: invocation.id - ) - - default: - break - } - } - - guard let latestAssistantMessage else { - throw AgentRuntimeError.assistantResponseMissing() - } - - return latestAssistantMessage - } } diff --git a/Sources/CodexKit/Runtime/AgentRuntime+TurnConsumption.swift b/Sources/CodexKit/Runtime/AgentRuntime+TurnConsumption.swift new file mode 100644 index 0000000..2580619 --- /dev/null +++ b/Sources/CodexKit/Runtime/AgentRuntime+TurnConsumption.swift @@ -0,0 +1,376 @@ +import Foundation + +extension AgentRuntime { + // MARK: - Turn Consumption + + func consumeTurnStream( + _ turnStream: any AgentTurnStreaming, + for threadID: String, + userMessage: AgentMessage, + session: ChatGPTSession, + resolvedTurnSkills: ResolvedTurnSkills, + continuation: AsyncThrowingStream.Continuation + ) async { + let policyTracker: TurnSkillPolicyTracker? = if resolvedTurnSkills.compiledToolPolicy.hasConstraints { + TurnSkillPolicyTracker(policy: resolvedTurnSkills.compiledToolPolicy) + } else { + nil + } + var assistantMessages: [AgentMessage] = [] + + do { + for try await backendEvent in turnStream.events { + switch backendEvent { + case let .turnStarted(turn): + continuation.yield(.turnStarted(turn)) + + case let .assistantMessageDelta(threadID, turnID, delta): + continuation.yield( + .assistantMessageDelta( + threadID: threadID, + turnID: turnID, + delta: delta + ) + ) + + case let .assistantMessageCompleted(message): + try await appendMessage(message) + if message.role == .assistant { + assistantMessages.append(message) + } + continuation.yield(.messageCommitted(message)) + + case .structuredOutputPartial, + .structuredOutputCommitted, + .structuredOutputValidationFailed: + break + + case let .toolCallRequested(invocation): + continuation.yield(.toolCallStarted(invocation)) + + let result: ToolResultEnvelope + if let policyTracker, + let validationError = policyTracker.validate(toolName: invocation.toolName) { + result = .failure( + invocation: invocation, + message: validationError.message + ) + } else { + let resolvedResult = try await resolveToolInvocation( + invocation, + session: session, + continuation: continuation + ) + result = resolvedResult + policyTracker?.recordAccepted(toolName: invocation.toolName) + } + + try await turnStream.submitToolResult(result, for: invocation.id) + continuation.yield(.toolCallFinished(result)) + try await setThreadStatus(.streaming, for: threadID) + continuation.yield(.threadStatusChanged(threadID: threadID, status: .streaming)) + + case let .turnCompleted(summary): + if let completionError = policyTracker?.completionError() { + try await setThreadStatus(.failed, for: threadID) + continuation.yield(.threadStatusChanged(threadID: threadID, status: .failed)) + continuation.yield(.turnFailed(completionError)) + continuation.finish(throwing: completionError) + return + } + + try await setThreadStatus(.idle, for: threadID) + await automaticallyCaptureMemoriesIfConfigured( + for: threadID, + userMessage: userMessage, + assistantMessages: assistantMessages + ) + continuation.yield(.threadStatusChanged(threadID: threadID, status: .idle)) + continuation.yield(.turnCompleted(summary)) + } + } + + continuation.finish() + } catch { + let runtimeError = (error as? AgentRuntimeError) + ?? AgentRuntimeError( + code: "turn_failed", + message: error.localizedDescription + ) + try? await setThreadStatus(.failed, for: threadID) + continuation.yield(.threadStatusChanged(threadID: threadID, status: .failed)) + continuation.yield(.turnFailed(runtimeError)) + continuation.finish(throwing: error) + } + } + + func consumeStructuredTurnStream( + _ turnStream: any AgentTurnStreaming, + for threadID: String, + userMessage: AgentMessage, + session: ChatGPTSession, + resolvedTurnSkills: ResolvedTurnSkills, + responseFormat: AgentStructuredOutputFormat, + options: AgentStructuredStreamingOptions, + decoder: JSONDecoder, + outputType: Output.Type, + continuation: AsyncThrowingStream, Error>.Continuation + ) async { + let policyTracker: TurnSkillPolicyTracker? = if resolvedTurnSkills.compiledToolPolicy.hasConstraints { + TurnSkillPolicyTracker(policy: resolvedTurnSkills.compiledToolPolicy) + } else { + nil + } + var assistantMessages: [AgentMessage] = [] + var sawStructuredCommit = false + + do { + for try await backendEvent in turnStream.events { + switch backendEvent { + case let .turnStarted(turn): + continuation.yield(.turnStarted(turn)) + + case let .assistantMessageDelta(threadID, turnID, delta): + continuation.yield( + .assistantMessageDelta( + threadID: threadID, + turnID: turnID, + delta: delta + ) + ) + + case let .assistantMessageCompleted(message): + try await appendMessage(message) + if message.role == .assistant { + assistantMessages.append(message) + } + continuation.yield(.messageCommitted(message)) + + case let .structuredOutputPartial(value): + do { + let decoded = try decodeStructuredValue( + value, + as: outputType, + decoder: decoder + ) + if options.emitPartials { + continuation.yield(.structuredOutputPartial(decoded)) + } + } catch { + continuation.yield( + .structuredOutputValidationFailed( + AgentStructuredOutputValidationFailure( + stage: .partial, + message: error.localizedDescription, + rawPayload: value.prettyJSONString + ) + ) + ) + } + + case let .structuredOutputCommitted(value): + do { + let decoded = try decodeStructuredValue( + value, + as: outputType, + decoder: decoder + ) + sawStructuredCommit = true + continuation.yield(.structuredOutputCommitted(decoded)) + } catch { + let validationFailure = AgentStructuredOutputValidationFailure( + stage: .committed, + message: error.localizedDescription, + rawPayload: value.prettyJSONString + ) + let runtimeError = AgentRuntimeError.structuredOutputInvalid( + stage: validationFailure.stage, + underlyingMessage: validationFailure.message + ) + try await setThreadStatus(.failed, for: threadID) + continuation.yield(.threadStatusChanged(threadID: threadID, status: .failed)) + continuation.yield(.structuredOutputValidationFailed(validationFailure)) + continuation.yield(.turnFailed(runtimeError)) + continuation.finish(throwing: runtimeError) + return + } + + case let .structuredOutputValidationFailed(validationFailure): + continuation.yield(.structuredOutputValidationFailed(validationFailure)) + + case let .toolCallRequested(invocation): + continuation.yield(.toolCallStarted(invocation)) + + let result: ToolResultEnvelope + if let policyTracker, + let validationError = policyTracker.validate(toolName: invocation.toolName) { + result = .failure( + invocation: invocation, + message: validationError.message + ) + } else { + let resolvedResult = try await resolveToolInvocation( + invocation, + session: session, + continuation: continuation + ) + result = resolvedResult + policyTracker?.recordAccepted(toolName: invocation.toolName) + } + + try await turnStream.submitToolResult(result, for: invocation.id) + continuation.yield(.toolCallFinished(result)) + try await setThreadStatus(.streaming, for: threadID) + continuation.yield(.threadStatusChanged(threadID: threadID, status: .streaming)) + + case let .turnCompleted(summary): + if let completionError = policyTracker?.completionError() { + try await setThreadStatus(.failed, for: threadID) + continuation.yield(.threadStatusChanged(threadID: threadID, status: .failed)) + continuation.yield(.turnFailed(completionError)) + continuation.finish(throwing: completionError) + return + } + + if options.required, !sawStructuredCommit { + let runtimeError = AgentRuntimeError.structuredOutputMissing( + formatName: responseFormat.name + ) + try await setThreadStatus(.failed, for: threadID) + continuation.yield(.threadStatusChanged(threadID: threadID, status: .failed)) + continuation.yield(.turnFailed(runtimeError)) + continuation.finish(throwing: runtimeError) + return + } + + try await setThreadStatus(.idle, for: threadID) + await automaticallyCaptureMemoriesIfConfigured( + for: threadID, + userMessage: userMessage, + assistantMessages: assistantMessages + ) + continuation.yield(.threadStatusChanged(threadID: threadID, status: .idle)) + continuation.yield(.turnCompleted(summary)) + } + } + + continuation.finish() + } catch { + let runtimeError = (error as? AgentRuntimeError) + ?? AgentRuntimeError( + code: "turn_failed", + message: error.localizedDescription + ) + try? await setThreadStatus(.failed, for: threadID) + continuation.yield(.threadStatusChanged(threadID: threadID, status: .failed)) + continuation.yield(.turnFailed(runtimeError)) + continuation.finish(throwing: error) + } + } + + func resolveToolInvocation( + _ invocation: ToolInvocation, + session: ChatGPTSession, + continuation: AsyncThrowingStream.Continuation + ) async throws -> ToolResultEnvelope { + if let definition = await toolRegistry.definition(named: invocation.toolName), + definition.approvalPolicy == .requiresApproval { + let approval = ApprovalRequest( + threadID: invocation.threadID, + turnID: invocation.turnID, + toolInvocation: invocation, + title: "Approve \(invocation.toolName)?", + message: definition.approvalMessage + ?? "This tool requires explicit approval before it can run." + ) + + try await setThreadStatus(.waitingForApproval, for: invocation.threadID) + continuation.yield( + .threadStatusChanged( + threadID: invocation.threadID, + status: .waitingForApproval + ) + ) + continuation.yield(.approvalRequested(approval)) + + let decision = try await approvalCoordinator.requestApproval(approval) + continuation.yield( + .approvalResolved( + ApprovalResolution( + requestID: approval.id, + threadID: approval.threadID, + turnID: approval.turnID, + decision: decision + ) + ) + ) + + guard decision == .approved else { + return .denied(invocation: invocation) + } + } + + try await setThreadStatus(.waitingForToolResult, for: invocation.threadID) + continuation.yield( + .threadStatusChanged( + threadID: invocation.threadID, + status: .waitingForToolResult + ) + ) + + return await toolRegistry.execute(invocation, session: session) + } + + func resolveToolInvocation( + _ invocation: ToolInvocation, + session: ChatGPTSession, + continuation: AsyncThrowingStream, Error>.Continuation + ) async throws -> ToolResultEnvelope { + if let definition = await toolRegistry.definition(named: invocation.toolName), + definition.approvalPolicy == .requiresApproval { + let approval = ApprovalRequest( + threadID: invocation.threadID, + turnID: invocation.turnID, + toolInvocation: invocation, + title: "Approve \(invocation.toolName)?", + message: definition.approvalMessage + ?? "This tool requires explicit approval before it can run." + ) + + try await setThreadStatus(.waitingForApproval, for: invocation.threadID) + continuation.yield( + .threadStatusChanged( + threadID: invocation.threadID, + status: .waitingForApproval + ) + ) + continuation.yield(.approvalRequested(approval)) + + let decision = try await approvalCoordinator.requestApproval(approval) + continuation.yield( + .approvalResolved( + ApprovalResolution( + requestID: approval.id, + threadID: approval.threadID, + turnID: approval.turnID, + decision: decision + ) + ) + ) + + guard decision == .approved else { + return .denied(invocation: invocation) + } + } + + try await setThreadStatus(.waitingForToolResult, for: invocation.threadID) + continuation.yield( + .threadStatusChanged( + threadID: invocation.threadID, + status: .waitingForToolResult + ) + ) + + return await toolRegistry.execute(invocation, session: session) + } +} diff --git a/Sources/CodexKit/Runtime/AgentStructuredStreaming.swift b/Sources/CodexKit/Runtime/AgentStructuredStreaming.swift new file mode 100644 index 0000000..9d5e9ff --- /dev/null +++ b/Sources/CodexKit/Runtime/AgentStructuredStreaming.swift @@ -0,0 +1,78 @@ +import Foundation + +public struct AgentStructuredStreamingOptions: Sendable, Hashable { + public var required: Bool + public var emitPartials: Bool + + public init( + required: Bool = false, + emitPartials: Bool = true + ) { + self.required = required + self.emitPartials = emitPartials + } +} + +public struct AgentStructuredOutputMetadata: Codable, Hashable, Sendable { + public let formatName: String + public let payload: JSONValue + + public init( + formatName: String, + payload: JSONValue + ) { + self.formatName = formatName + self.payload = payload + } +} + +public enum AgentStructuredOutputValidationStage: String, Codable, Hashable, Sendable { + case partial + case committed +} + +public struct AgentStructuredOutputValidationFailure: Error, Hashable, Sendable { + public let stage: AgentStructuredOutputValidationStage + public let message: String + public let rawPayload: String? + + public init( + stage: AgentStructuredOutputValidationStage, + message: String, + rawPayload: String? = nil + ) { + self.stage = stage + self.message = message + self.rawPayload = rawPayload + } +} + +public enum AgentStructuredStreamEvent: Sendable { + case threadStarted(AgentThread) + case threadStatusChanged(threadID: String, status: AgentThreadStatus) + case turnStarted(AgentTurn) + case assistantMessageDelta(threadID: String, turnID: String, delta: String) + case messageCommitted(AgentMessage) + case approvalRequested(ApprovalRequest) + case approvalResolved(ApprovalResolution) + case toolCallStarted(ToolInvocation) + case toolCallFinished(ToolResultEnvelope) + case structuredOutputPartial(Output) + case structuredOutputCommitted(Output) + case structuredOutputValidationFailed(AgentStructuredOutputValidationFailure) + case turnCompleted(AgentTurnSummary) + case turnFailed(AgentRuntimeError) +} + +public struct AgentStreamedStructuredOutputRequest: Sendable, Hashable { + public let responseFormat: AgentStructuredOutputFormat + public let options: AgentStructuredStreamingOptions + + public init( + responseFormat: AgentStructuredOutputFormat, + options: AgentStructuredStreamingOptions + ) { + self.responseFormat = responseFormat + self.options = options + } +} diff --git a/Sources/CodexKit/Runtime/CodexResponsesBackend+Models.swift b/Sources/CodexKit/Runtime/CodexResponsesBackend+Models.swift index 23dc289..5f098ee 100644 --- a/Sources/CodexKit/Runtime/CodexResponsesBackend+Models.swift +++ b/Sources/CodexKit/Runtime/CodexResponsesBackend+Models.swift @@ -170,6 +170,9 @@ struct FunctionCallRecord: Sendable { enum CodexResponsesStreamEvent: Sendable { case assistantTextDelta(String) case assistantMessage(AgentMessage) + case structuredOutputPartial(JSONValue) + case structuredOutputCommitted(JSONValue) + case structuredOutputValidationFailed(AgentStructuredOutputValidationFailure) case functionCall(FunctionCallRecord) case completed(AgentUsage) } diff --git a/Sources/CodexKit/Runtime/CodexResponsesBackend+Streaming.swift b/Sources/CodexKit/Runtime/CodexResponsesBackend+Streaming.swift index 97bb364..331df3d 100644 --- a/Sources/CodexKit/Runtime/CodexResponsesBackend+Streaming.swift +++ b/Sources/CodexKit/Runtime/CodexResponsesBackend+Streaming.swift @@ -183,17 +183,27 @@ extension CodexResponsesTurnSession { configuration: CodexResponsesBackendConfiguration, instructions: String, responseFormat: AgentStructuredOutputFormat?, + streamedStructuredOutput: AgentStreamedStructuredOutputRequest?, threadID: String, items: [WorkingHistoryItem], tools: [ToolDefinition], session: ChatGPTSession, encoder: JSONEncoder ) throws -> URLRequest { + let resolvedInstructions = if let streamedStructuredOutput { + instructions + "\n\n" + streamedStructuredOutputInstructions(for: streamedStructuredOutput) + } else { + instructions + } let requestBody = ResponsesRequestBody( model: configuration.model, reasoning: .init(effort: configuration.reasoningEffort), - instructions: instructions, - text: .init(format: .init(responseFormat: responseFormat)), + instructions: resolvedInstructions, + text: .init( + format: .init( + responseFormat: streamedStructuredOutput == nil ? responseFormat : nil + ) + ), input: items.map(\.jsonValue), tools: responsesTools( from: tools, diff --git a/Sources/CodexKit/Runtime/CodexResponsesBackend+StructuredOutput.swift b/Sources/CodexKit/Runtime/CodexResponsesBackend+StructuredOutput.swift new file mode 100644 index 0000000..f30926d --- /dev/null +++ b/Sources/CodexKit/Runtime/CodexResponsesBackend+StructuredOutput.swift @@ -0,0 +1,246 @@ +import Foundation + +enum StructuredStreamParserMode { + case visible + case structured +} + +enum StructuredStreamParsingEvent { + case visibleText(String) + case structuredOutputPartial(JSONValue) + case structuredOutputValidationFailed(AgentStructuredOutputValidationFailure) +} + +enum StructuredStreamFinalResult { + case none + case committed(JSONValue) + case invalid(AgentStructuredOutputValidationFailure) +} + +struct StructuredStreamExtraction { + let visibleText: String + let finalResult: StructuredStreamFinalResult +} + +struct CodexResponsesStructuredStreamParser { + static let openTag = "" + static let closeTag = "" + + private var mode: StructuredStreamParserMode = .visible + private var pending = "" + private var structuredBuffer = "" + private var lastPartial: JSONValue? + + mutating func consume(delta: String) -> [StructuredStreamParsingEvent] { + pending.append(delta) + var events: [StructuredStreamParsingEvent] = [] + + processing: while true { + switch mode { + case .visible: + if let range = pending.range(of: Self.openTag) { + let visible = String(pending[.. 0 else { + break processing + } + + let index = pending.index(pending.startIndex, offsetBy: emitCount) + let visible = String(pending[.. 0 else { + break processing + } + + let index = pending.index(pending.startIndex, offsetBy: emitCount) + structuredBuffer.append(contentsOf: pending[.. StructuredStreamExtraction { + Self.extractFinal(from: rawMessage) + } + + private mutating func snapshotEvents( + stage: AgentStructuredOutputValidationStage + ) -> [StructuredStreamParsingEvent] { + guard let data = structuredBuffer.data(using: .utf8) else { + return [] + } + + do { + let value = try JSONDecoder().decode(JSONValue.self, from: data) + guard value != lastPartial else { + return [] + } + lastPartial = value + return [.structuredOutputPartial(value)] + } catch { + if stage == .committed { + return [ + .structuredOutputValidationFailed( + AgentStructuredOutputValidationFailure( + stage: .committed, + message: error.localizedDescription, + rawPayload: structuredBuffer + ) + ), + ] + } + return [] + } + } + + private static func trailingMatchLength( + in buffer: String, + against marker: String + ) -> Int { + let maxLength = min(buffer.count, marker.count - 1) + guard maxLength > 0 else { + return 0 + } + + for length in stride(from: maxLength, through: 1, by: -1) { + let suffix = buffer.suffix(length) + if marker.hasPrefix(String(suffix)) { + return length + } + } + + return 0 + } + + private static func extractFinal(from rawMessage: String) -> StructuredStreamExtraction { + guard let openRange = rawMessage.range(of: openTag) else { + return StructuredStreamExtraction( + visibleText: rawMessage.trimmingCharacters(in: .whitespacesAndNewlines), + finalResult: .none + ) + } + + let remaining = rawMessage[openRange.upperBound...] + guard let closeRange = remaining.range(of: closeTag) else { + return StructuredStreamExtraction( + visibleText: rawMessage[.. String { + let schemaData = (try? JSONEncoder().encode(request.responseFormat.schema.jsonValue)) + ?? Data("{}".utf8) + let schema = String(decoding: schemaData, as: UTF8.self) + let description = request.responseFormat.description + .map { "Description: \($0)\n" } + ?? "" + + let requirementLine = request.options.required + ? "You must emit exactly one hidden structured output block." + : "Emit the hidden structured output block only when it is useful and you can satisfy the schema." + + return """ + CodexKit private streaming contract: + - Respond with normal user-facing assistant text first. + - Do not mention any hidden framing or transport markers in the visible text. + - After the visible text, optionally append one hidden structured output block using the exact tags below. + - Hidden block opening tag: \(CodexResponsesStructuredStreamParser.openTag) + - Hidden block closing tag: \(CodexResponsesStructuredStreamParser.closeTag) + - The hidden block contents must be valid JSON matching the declared schema. + - \(requirementLine) + \(description)Schema name: \(request.responseFormat.name) + Schema JSON: + \(schema) + """ + } +} diff --git a/Sources/CodexKit/Runtime/CodexResponsesBackend.swift b/Sources/CodexKit/Runtime/CodexResponsesBackend.swift index 8f045ca..54e6519 100644 --- a/Sources/CodexKit/Runtime/CodexResponsesBackend.swift +++ b/Sources/CodexKit/Runtime/CodexResponsesBackend.swift @@ -67,6 +67,7 @@ public actor CodexResponsesBackend: AgentBackend { message: UserMessageRequest, instructions: String, responseFormat: AgentStructuredOutputFormat?, + streamedStructuredOutput: AgentStreamedStructuredOutputRequest?, tools: [ToolDefinition], session: ChatGPTSession ) async throws -> any AgentTurnStreaming { @@ -74,6 +75,7 @@ public actor CodexResponsesBackend: AgentBackend { configuration: configuration, instructions: instructions, responseFormat: responseFormat, + streamedStructuredOutput: streamedStructuredOutput, urlSession: urlSession, encoder: encoder, decoder: decoder, @@ -86,6 +88,28 @@ public actor CodexResponsesBackend: AgentBackend { } } +private extension CodexResponsesBackend { + static func structuredMetadata( + from text: String, + responseFormat: AgentStructuredOutputFormat? + ) -> AgentStructuredOutputMetadata? { + guard let responseFormat else { + return nil + } + + let payloadText = text.trimmingCharacters(in: .whitespacesAndNewlines) + guard let data = payloadText.data(using: .utf8), + let payload = try? JSONDecoder().decode(JSONValue.self, from: data) else { + return nil + } + + return AgentStructuredOutputMetadata( + formatName: responseFormat.name, + payload: payload + ) + } +} + final class CodexResponsesTurnSession: AgentTurnStreaming, @unchecked Sendable { let events: AsyncThrowingStream @@ -95,6 +119,7 @@ final class CodexResponsesTurnSession: AgentTurnStreaming, @unchecked Sendable { configuration: CodexResponsesBackendConfiguration, instructions: String, responseFormat: AgentStructuredOutputFormat?, + streamedStructuredOutput: AgentStreamedStructuredOutputRequest?, urlSession: URLSession, encoder: JSONEncoder, decoder: JSONDecoder, @@ -117,6 +142,7 @@ final class CodexResponsesTurnSession: AgentTurnStreaming, @unchecked Sendable { configuration: configuration, instructions: instructions, responseFormat: responseFormat, + streamedStructuredOutput: streamedStructuredOutput, urlSession: urlSession, encoder: encoder, decoder: decoder, @@ -158,6 +184,7 @@ final class CodexResponsesTurnSession: AgentTurnStreaming, @unchecked Sendable { configuration: CodexResponsesBackendConfiguration, instructions: String, responseFormat: AgentStructuredOutputFormat?, + streamedStructuredOutput: AgentStreamedStructuredOutputRequest?, urlSession: URLSession, encoder: JSONEncoder, decoder: JSONDecoder, @@ -186,6 +213,8 @@ final class CodexResponsesTurnSession: AgentTurnStreaming, @unchecked Sendable { var shouldContinue = true var pendingToolImages: [AgentImageAttachment] = [] var pendingToolFallbackTexts: [String] = [] + var structuredParser = CodexResponsesStructuredStreamParser() + var pendingStructuredOutputMetadata: AgentStructuredOutputMetadata? while shouldContinue { shouldContinue = false @@ -201,6 +230,7 @@ final class CodexResponsesTurnSession: AgentTurnStreaming, @unchecked Sendable { configuration: configuration, instructions: instructions, responseFormat: responseFormat, + streamedStructuredOutput: streamedStructuredOutput, threadID: threadID, items: workingHistory, tools: tools, @@ -218,36 +248,109 @@ final class CodexResponsesTurnSession: AgentTurnStreaming, @unchecked Sendable { switch event { case let .assistantTextDelta(delta): emittedRetryUnsafeOutput = true - continuation.yield( - .assistantMessageDelta( - threadID: threadID, - turnID: turnID, - delta: delta + if streamedStructuredOutput != nil { + for parsedEvent in structuredParser.consume(delta: delta) { + switch parsedEvent { + case let .visibleText(visibleDelta): + guard !visibleDelta.isEmpty else { + continue + } + continuation.yield( + .assistantMessageDelta( + threadID: threadID, + turnID: turnID, + delta: visibleDelta + ) + ) + case let .structuredOutputPartial(value): + continuation.yield(.structuredOutputPartial(value)) + case let .structuredOutputValidationFailed(validationFailure): + continuation.yield(.structuredOutputValidationFailed(validationFailure)) + } + } + } else { + continuation.yield( + .assistantMessageDelta( + threadID: threadID, + turnID: turnID, + delta: delta + ) ) - ) + } case let .assistantMessage(messageTemplate): emittedRetryUnsafeOutput = true + let normalizedMessage: AgentMessage + if let streamedStructuredOutput { + let extraction = structuredParser.finalize(rawMessage: messageTemplate.text) + + switch extraction.finalResult { + case .none: + break + case let .committed(value): + pendingStructuredOutputMetadata = AgentStructuredOutputMetadata( + formatName: streamedStructuredOutput.responseFormat.name, + payload: value + ) + continuation.yield(.structuredOutputCommitted(value)) + case let .invalid(validationFailure): + continuation.yield(.structuredOutputValidationFailed(validationFailure)) + throw AgentRuntimeError.structuredOutputInvalid( + stage: validationFailure.stage, + underlyingMessage: validationFailure.message + ) + } + + normalizedMessage = AgentMessage( + threadID: threadID, + role: .assistant, + text: extraction.visibleText, + images: messageTemplate.images + ) + } else { + normalizedMessage = AgentMessage( + threadID: threadID, + role: .assistant, + text: messageTemplate.text, + images: messageTemplate.images + ) + } + let assistantText: String - if messageTemplate.text.trimmingCharacters(in: .whitespacesAndNewlines).isEmpty, + if normalizedMessage.text.trimmingCharacters(in: .whitespacesAndNewlines).isEmpty, !pendingToolFallbackTexts.isEmpty { assistantText = pendingToolFallbackTexts.joined(separator: "\n\n") } else { - assistantText = messageTemplate.text + assistantText = normalizedMessage.text } - let mergedImages = (messageTemplate.images + pendingToolImages).uniqued() + let mergedImages = (normalizedMessage.images + pendingToolImages).uniqued() let message = AgentMessage( threadID: threadID, role: .assistant, text: assistantText, - images: mergedImages + images: mergedImages, + structuredOutput: pendingStructuredOutputMetadata + ?? CodexResponsesBackend.structuredMetadata( + from: assistantText, + responseFormat: responseFormat + ) ) workingHistory.append(.assistantMessage(message)) continuation.yield(.assistantMessageCompleted(message)) pendingToolImages.removeAll(keepingCapacity: true) pendingToolFallbackTexts.removeAll(keepingCapacity: true) + pendingStructuredOutputMetadata = nil + + case let .structuredOutputPartial(value): + continuation.yield(.structuredOutputPartial(value)) + + case let .structuredOutputCommitted(value): + continuation.yield(.structuredOutputCommitted(value)) + + case let .structuredOutputValidationFailed(validationFailure): + continuation.yield(.structuredOutputValidationFailed(validationFailure)) case let .functionCall(functionCall): emittedRetryUnsafeOutput = true diff --git a/Sources/CodexKit/Support/JSONValue.swift b/Sources/CodexKit/Support/JSONValue.swift index d13585a..a3f25ba 100644 --- a/Sources/CodexKit/Support/JSONValue.swift +++ b/Sources/CodexKit/Support/JSONValue.swift @@ -70,4 +70,13 @@ public enum JSONValue: Codable, Hashable, Sendable { } return value } + + public var prettyJSONString: String { + guard let data = try? JSONEncoder().encode(self), + let string = String(data: data, encoding: .utf8) + else { + return "" + } + return string + } } diff --git a/Tests/CodexKitTests/AgentRuntimeMessageTests.swift b/Tests/CodexKitTests/AgentRuntimeMessageTests.swift index ce2c4a8..dfe1f23 100644 --- a/Tests/CodexKitTests/AgentRuntimeMessageTests.swift +++ b/Tests/CodexKitTests/AgentRuntimeMessageTests.swift @@ -63,6 +63,213 @@ extension AgentRuntimeTests { let formats = await backend.receivedResponseFormats() XCTAssertEqual(formats.last??.name, "shipping_reply_draft") + + let messages = await runtime.messages(for: thread.id) + XCTAssertEqual( + messages.last?.structuredOutput, + AgentStructuredOutputMetadata( + formatName: "shipping_reply_draft", + payload: .object([ + "reply": .string("Your order is already in transit."), + "priority": .string("high"), + ]) + ) + ) + } + + func testStructuredStreamYieldsVisibleTextAndCommittedPayload() async throws { + let backend = InMemoryAgentBackend( + structuredResponseText: #"{"reply":"Your order is already in transit.","priority":"high"}"# + ) + let runtime = try AgentRuntime(configuration: .init( + authProvider: DemoChatGPTAuthProvider(), + secureStore: KeychainSessionSecureStore( + service: "CodexKitTests.ChatGPTSession", + account: UUID().uuidString + ), + backend: backend, + approvalPresenter: AutoApprovalPresenter(), + stateStore: InMemoryRuntimeStateStore() + )) + + _ = try await runtime.restore() + _ = try await runtime.signIn() + + let thread = try await runtime.createThread(title: "Structured Stream") + let stream = try await runtime.streamMessage( + UserMessageRequest(text: "Draft a shipping reply."), + in: thread.id, + expecting: ShippingReplyDraft.self + ) + + var visibleText = "" + var partials: [ShippingReplyDraft] = [] + var committed: ShippingReplyDraft? + var sawTurnCompleted = false + + for try await event in stream { + switch event { + case let .assistantMessageDelta(_, _, delta): + visibleText += delta + case let .structuredOutputPartial(snapshot): + partials.append(snapshot) + case let .structuredOutputCommitted(snapshot): + committed = snapshot + case .turnCompleted: + sawTurnCompleted = true + XCTAssertNotNil(committed) + default: + break + } + } + + XCTAssertEqual(visibleText, "Echo: Draft a shipping reply.") + XCTAssertFalse(visibleText.contains("codexkit-structured-output")) + XCTAssertFalse(partials.isEmpty) + XCTAssertEqual( + committed, + ShippingReplyDraft( + reply: "Your order is already in transit.", + priority: "high" + ) + ) + XCTAssertTrue(sawTurnCompleted) + + let messages = await runtime.messages(for: thread.id) + XCTAssertEqual(messages.last?.text, "Echo: Draft a shipping reply.") + XCTAssertEqual( + messages.last?.structuredOutput, + AgentStructuredOutputMetadata( + formatName: "shipping_reply_draft", + payload: .object([ + "reply": .string("Your order is already in transit."), + "priority": .string("high"), + ]) + ) + ) + } + + func testStructuredStreamPersistsFinalPayloadMetadataAcrossRestore() async throws { + let backend = InMemoryAgentBackend( + structuredResponseText: #"{"reply":"Your order is already in transit.","priority":"high"}"# + ) + let stateStore = InMemoryRuntimeStateStore() + let runtime = try AgentRuntime(configuration: .init( + authProvider: DemoChatGPTAuthProvider(), + secureStore: KeychainSessionSecureStore( + service: "CodexKitTests.ChatGPTSession", + account: UUID().uuidString + ), + backend: backend, + approvalPresenter: AutoApprovalPresenter(), + stateStore: stateStore + )) + + _ = try await runtime.restore() + _ = try await runtime.signIn() + + let thread = try await runtime.createThread(title: "Structured Stream Restore") + let stream = try await runtime.streamMessage( + UserMessageRequest(text: "Draft a shipping reply."), + in: thread.id, + expecting: ShippingReplyDraft.self + ) + try await drainStructuredStream(stream) + + let restoredRuntime = try AgentRuntime(configuration: .init( + authProvider: DemoChatGPTAuthProvider(), + secureStore: KeychainSessionSecureStore( + service: "CodexKitTests.ChatGPTSession", + account: UUID().uuidString + ), + backend: backend, + approvalPresenter: AutoApprovalPresenter(), + stateStore: stateStore + )) + _ = try await restoredRuntime.restore() + + let messages = await restoredRuntime.messages(for: thread.id) + XCTAssertEqual( + messages.last?.structuredOutput, + AgentStructuredOutputMetadata( + formatName: "shipping_reply_draft", + payload: .object([ + "reply": .string("Your order is already in transit."), + "priority": .string("high"), + ]) + ) + ) + } + + func testStructuredStreamRequiredFailsWhenNoPayloadIsProduced() async throws { + let runtime = try AgentRuntime(configuration: .init( + authProvider: DemoChatGPTAuthProvider(), + secureStore: KeychainSessionSecureStore( + service: "CodexKitTests.ChatGPTSession", + account: UUID().uuidString + ), + backend: OptionalStructuredMissingBackend(), + approvalPresenter: AutoApprovalPresenter(), + stateStore: InMemoryRuntimeStateStore() + )) + + _ = try await runtime.restore() + _ = try await runtime.signIn() + + let thread = try await runtime.createThread(title: "Structured Required") + let stream = try await runtime.streamMessage( + UserMessageRequest(text: "Draft a shipping reply."), + in: thread.id, + expecting: ShippingReplyDraft.self, + options: .init(required: true) + ) + + await XCTAssertThrowsErrorAsync( + try await drainStructuredStream(stream) + ) { error in + let runtimeError = error as? AgentRuntimeError + XCTAssertEqual(runtimeError?.code, "structured_output_missing") + } + } + + func testStructuredStreamOptionalSucceedsWithoutPayload() async throws { + let runtime = try AgentRuntime(configuration: .init( + authProvider: DemoChatGPTAuthProvider(), + secureStore: KeychainSessionSecureStore( + service: "CodexKitTests.ChatGPTSession", + account: UUID().uuidString + ), + backend: OptionalStructuredMissingBackend(), + approvalPresenter: AutoApprovalPresenter(), + stateStore: InMemoryRuntimeStateStore() + )) + + _ = try await runtime.restore() + _ = try await runtime.signIn() + + let thread = try await runtime.createThread(title: "Structured Optional") + let stream = try await runtime.streamMessage( + UserMessageRequest(text: "Draft a shipping reply."), + in: thread.id, + expecting: ShippingReplyDraft.self + ) + + var sawCommitted = false + var visibleText = "" + + for try await event in stream { + switch event { + case let .assistantMessageDelta(_, _, delta): + visibleText += delta + case .structuredOutputCommitted: + sawCommitted = true + default: + break + } + } + + XCTAssertFalse(sawCommitted) + XCTAssertEqual(visibleText, "Echo: Draft a shipping reply.") } func testStructuredDecodeFailureThrowsRuntimeError() async throws { @@ -97,6 +304,46 @@ extension AgentRuntimeTests { } } + func testStructuredStreamValidationFailureSurfacesAndFailsTurn() async throws { + let runtime = try AgentRuntime(configuration: .init( + authProvider: DemoChatGPTAuthProvider(), + secureStore: KeychainSessionSecureStore( + service: "CodexKitTests.ChatGPTSession", + account: UUID().uuidString + ), + backend: InMemoryAgentBackend( + structuredResponseText: #"{"unexpected":"value"}"# + ), + approvalPresenter: AutoApprovalPresenter(), + stateStore: InMemoryRuntimeStateStore() + )) + + _ = try await runtime.restore() + _ = try await runtime.signIn() + + let thread = try await runtime.createThread(title: "Structured Stream Failure") + let stream = try await runtime.streamMessage( + UserMessageRequest(text: "Draft a shipping reply."), + in: thread.id, + expecting: ShippingReplyDraft.self + ) + + var validationFailures: [AgentStructuredOutputValidationFailure] = [] + + await XCTAssertThrowsErrorAsync( + try await collectStructuredStreamFailures( + from: stream, + into: &validationFailures + ) + ) { error in + let runtimeError = error as? AgentRuntimeError + XCTAssertEqual(runtimeError?.code, "structured_output_invalid") + } + + XCTAssertFalse(validationFailures.isEmpty) + XCTAssertEqual(validationFailures.last?.stage, .committed) + } + func testImportedContentInitializerBuildsMessageWithSharedURLs() async throws { let importedContent = AgentImportedContent( textSnippets: ["Customer says the package arrived damaged."], @@ -222,6 +469,59 @@ extension AgentRuntimeTests { XCTAssertTrue(sawToolResult) } + func testStructuredStreamWorksAlongsideToolCalls() async throws { + let runtime = try AgentRuntime(configuration: .init( + authProvider: DemoChatGPTAuthProvider(), + secureStore: KeychainSessionSecureStore( + service: "CodexKitTests.ChatGPTSession", + account: UUID().uuidString + ), + backend: InMemoryAgentBackend(), + approvalPresenter: AutoApprovalPresenter(), + stateStore: InMemoryRuntimeStateStore(), + tools: [ + .init( + definition: ToolDefinition( + name: "demo_lookup_profile", + description: "Lookup profile", + inputSchema: .object([:]), + approvalPolicy: .requiresApproval + ), + executor: AnyToolExecutor { invocation, _ in + .success(invocation: invocation, text: "demo-result") + } + ), + ] + )) + + _ = try await runtime.restore() + _ = try await runtime.signIn() + + let thread = try await runtime.createThread() + let stream = try await runtime.streamMessage( + UserMessageRequest(text: "please use the tool"), + in: thread.id, + expecting: ShippingReplyDraft.self + ) + + var sawToolResult = false + var sawCommitted = false + + for try await event in stream { + switch event { + case .toolCallFinished: + sawToolResult = true + case .structuredOutputCommitted: + sawCommitted = true + default: + break + } + } + + XCTAssertTrue(sawToolResult) + XCTAssertTrue(sawCommitted) + } + func testSendMessageRetriesUnauthorizedByRefreshingSession() async throws { let authProvider = RotatingDemoAuthProvider() let backend = UnauthorizedThenSuccessBackend() @@ -332,3 +632,20 @@ extension AgentRuntimeTests { XCTAssertTrue(sawToolResult) } } + +private func drainStructuredStream( + _ stream: AsyncThrowingStream, Error> +) async throws { + for try await _ in stream {} +} + +private func collectStructuredStreamFailures( + from stream: AsyncThrowingStream, Error>, + into failures: inout [AgentStructuredOutputValidationFailure] +) async throws { + for try await event in stream { + if case let .structuredOutputValidationFailed(validationFailure) = event { + failures.append(validationFailure) + } + } +} diff --git a/Tests/CodexKitTests/AgentRuntimeTests.swift b/Tests/CodexKitTests/AgentRuntimeTests.swift index 8e3b0b7..4bf02e1 100644 --- a/Tests/CodexKitTests/AgentRuntimeTests.swift +++ b/Tests/CodexKitTests/AgentRuntimeTests.swift @@ -65,6 +65,7 @@ final class AgentRuntimeTests: XCTestCase { XCTAssertEqual(state.threads.first?.personaStack, nil) XCTAssertEqual(state.threads.first?.memoryContext, nil) XCTAssertEqual(state.messagesByThread["thread-1"]?.first?.images, []) + XCTAssertEqual(state.messagesByThread["thread-1"]?.first?.structuredOutput, nil) XCTAssertEqual(state.messagesByThread["thread-1"]?.first?.text, "Hello from legacy state") } @@ -137,6 +138,7 @@ actor UnauthorizedThenSuccessBackend: AgentBackend { message: UserMessageRequest, instructions _: String, responseFormat _: AgentStructuredOutputFormat?, + streamedStructuredOutput _: AgentStreamedStructuredOutputRequest?, tools _: [ToolDefinition], session: ChatGPTSession ) async throws -> any AgentTurnStreaming { @@ -150,7 +152,8 @@ actor UnauthorizedThenSuccessBackend: AgentBackend { thread: thread, message: message, selectedTool: nil, - structuredResponseText: nil + structuredResponseText: nil, + streamedStructuredOutput: nil ) } @@ -183,6 +186,7 @@ actor UnauthorizedOnCreateThenSuccessBackend: AgentBackend { message _: UserMessageRequest, instructions _: String, responseFormat _: AgentStructuredOutputFormat?, + streamedStructuredOutput _: AgentStreamedStructuredOutputRequest?, tools _: [ToolDefinition], session _: ChatGPTSession ) async throws -> any AgentTurnStreaming { @@ -190,7 +194,8 @@ actor UnauthorizedOnCreateThenSuccessBackend: AgentBackend { thread: thread, message: .init(text: ""), selectedTool: nil, - structuredResponseText: nil + structuredResponseText: nil, + streamedStructuredOutput: nil ) } @@ -214,6 +219,7 @@ actor ImageReplyAgentBackend: AgentBackend { message _: UserMessageRequest, instructions _: String, responseFormat _: AgentStructuredOutputFormat?, + streamedStructuredOutput _: AgentStreamedStructuredOutputRequest?, tools _: [ToolDefinition], session _: ChatGPTSession ) async throws -> any AgentTurnStreaming { @@ -221,6 +227,35 @@ actor ImageReplyAgentBackend: AgentBackend { } } +actor OptionalStructuredMissingBackend: AgentBackend { + func createThread(session _: ChatGPTSession) async throws -> AgentThread { + AgentThread(id: UUID().uuidString) + } + + func resumeThread(id: String, session _: ChatGPTSession) async throws -> AgentThread { + AgentThread(id: id) + } + + func beginTurn( + thread: AgentThread, + history _: [AgentMessage], + message: UserMessageRequest, + instructions _: String, + responseFormat _: AgentStructuredOutputFormat?, + streamedStructuredOutput _: AgentStreamedStructuredOutputRequest?, + tools _: [ToolDefinition], + session _: ChatGPTSession + ) async throws -> any AgentTurnStreaming { + MockAgentTurnSession( + thread: thread, + message: message, + selectedTool: nil, + structuredResponseText: nil, + streamedStructuredOutput: nil + ) + } +} + final class ImageReplyTurn: AgentTurnStreaming, @unchecked Sendable { let events: AsyncThrowingStream diff --git a/Tests/CodexKitTests/CodexResponsesBackendTests.swift b/Tests/CodexKitTests/CodexResponsesBackendTests.swift index 56df7f2..ff8d789 100644 --- a/Tests/CodexKitTests/CodexResponsesBackendTests.swift +++ b/Tests/CodexKitTests/CodexResponsesBackendTests.swift @@ -57,6 +57,7 @@ final class CodexResponsesBackendTests: XCTestCase { message: UserMessageRequest(text: "Hi there"), instructions: "Resolved instructions", responseFormat: nil, + streamedStructuredOutput: nil, tools: [], session: session ) @@ -127,6 +128,7 @@ final class CodexResponsesBackendTests: XCTestCase { message: UserMessageRequest(text: "Think hard"), instructions: "Resolved instructions", responseFormat: nil, + streamedStructuredOutput: nil, tools: [], session: session ) @@ -212,6 +214,7 @@ final class CodexResponsesBackendTests: XCTestCase { message: UserMessageRequest(text: "Find the profile"), instructions: "Resolved instructions", responseFormat: nil, + streamedStructuredOutput: nil, tools: [tool], session: session ) @@ -280,6 +283,7 @@ final class CodexResponsesBackendTests: XCTestCase { message: UserMessageRequest(text: "Search the web"), instructions: "Resolved instructions", responseFormat: nil, + streamedStructuredOutput: nil, tools: [], session: session ) @@ -342,6 +346,7 @@ final class CodexResponsesBackendTests: XCTestCase { ), instructions: "Resolved instructions", responseFormat: nil, + streamedStructuredOutput: nil, tools: [], session: session ) @@ -422,6 +427,7 @@ final class CodexResponsesBackendTests: XCTestCase { message: UserMessageRequest(text: "Make me an image"), instructions: "Resolved instructions", responseFormat: nil, + streamedStructuredOutput: nil, tools: [tool], session: session ) @@ -504,6 +510,7 @@ final class CodexResponsesBackendTests: XCTestCase { message: UserMessageRequest(text: "Hi"), instructions: "Resolved instructions", responseFormat: nil, + streamedStructuredOutput: nil, tools: [], session: session ) @@ -559,6 +566,7 @@ final class CodexResponsesBackendTests: XCTestCase { message: UserMessageRequest(text: "Hi"), instructions: "Resolved instructions", responseFormat: nil, + streamedStructuredOutput: nil, tools: [], session: session ) @@ -621,6 +629,7 @@ final class CodexResponsesBackendTests: XCTestCase { message: UserMessageRequest(text: "Retry me"), instructions: "Resolved instructions", responseFormat: nil, + streamedStructuredOutput: nil, tools: [], session: session ) @@ -688,6 +697,7 @@ final class CodexResponsesBackendTests: XCTestCase { message: UserMessageRequest(text: "Draft a reply."), instructions: "Resolved instructions", responseFormat: responseFormat, + streamedStructuredOutput: nil, tools: [], session: session ) @@ -695,6 +705,99 @@ final class CodexResponsesBackendTests: XCTestCase { for try await _ in turnStream.events {} } + func testBackendStripsStructuredStreamFramingFromVisibleAssistantText() async throws { + let backend = CodexResponsesBackend(urlSession: makeTestURLSession()) + let session = ChatGPTSession( + accessToken: "access-token", + refreshToken: "refresh-token", + account: ChatGPTAccount(id: "workspace-123", email: "taylor@example.com", plan: .plus) + ) + let streamedStructuredOutput = AgentStreamedStructuredOutputRequest( + responseFormat: AgentStructuredOutputFormat( + name: "shipping_reply_draft", + description: "A concise shipping support reply draft.", + schema: .object( + properties: [ + "reply": .string(), + "priority": .string(), + ], + required: ["reply", "priority"], + additionalProperties: false + ) + ), + options: .init(required: true) + ) + + await TestURLProtocol.enqueue( + .init( + headers: ["Content-Type": "text/event-stream"], + body: Data( + """ + event: response.output_text.delta + data: {"type":"response.output_text.delta","delta":"Hello "} + + event: response.output_text.delta + data: {"type":"response.output_text.delta","delta":"{\\"reply\\":\\"Done\\",\\"priority\\":\\"high\\"}"} + + event: response.output_item.done + data: {"type":"response.output_item.done","item":{"type":"message","role":"assistant","content":[{"type":"output_text","text":"Hello {\\"reply\\":\\"Done\\",\\"priority\\":\\"high\\"}"}]}} + + event: response.completed + data: {"type":"response.completed","response":{"id":"resp_streamed","usage":{"input_tokens":4,"input_tokens_details":{"cached_tokens":0},"output_tokens":1}}} + + """.utf8 + ), + inspect: { request in + let body = try XCTUnwrap(requestBodyData(for: request)) + let json = try JSONSerialization.jsonObject(with: body) as? [String: Any] + let instructions = try XCTUnwrap(json?["instructions"] as? String) + XCTAssertTrue(instructions.contains("CodexKit private streaming contract")) + let text = json?["text"] as? [String: Any] + let format = try XCTUnwrap(text?["format"] as? [String: Any]) + XCTAssertEqual(format["type"] as? String, "text") + } + ) + ) + + let turnStream = try await backend.beginTurn( + thread: AgentThread(id: "thread-streamed-structured"), + history: [], + message: UserMessageRequest(text: "Draft a reply."), + instructions: "Resolved instructions", + responseFormat: nil, + streamedStructuredOutput: streamedStructuredOutput, + tools: [], + session: session + ) + + var deltas: [String] = [] + var finalAssistantMessage: AgentMessage? + var committedValue: JSONValue? + + for try await event in turnStream.events { + switch event { + case let .assistantMessageDelta(_, _, delta): + deltas.append(delta) + case let .assistantMessageCompleted(message): + finalAssistantMessage = message + case let .structuredOutputCommitted(value): + committedValue = value + default: + break + } + } + + XCTAssertEqual(deltas.joined(), "Hello ") + XCTAssertEqual(finalAssistantMessage?.text, "Hello") + XCTAssertEqual( + committedValue, + .object([ + "reply": .string("Done"), + "priority": .string("high"), + ]) + ) + } + } private func drainEvents(_ events: AsyncThrowingStream) async throws { diff --git a/Tests/CodexKitTests/Support/InMemoryAgentBackend.swift b/Tests/CodexKitTests/Support/InMemoryAgentBackend.swift index 59d548c..9c3206e 100644 --- a/Tests/CodexKitTests/Support/InMemoryAgentBackend.swift +++ b/Tests/CodexKitTests/Support/InMemoryAgentBackend.swift @@ -39,6 +39,7 @@ public actor InMemoryAgentBackend: AgentBackend { message: UserMessageRequest, instructions: String, responseFormat: AgentStructuredOutputFormat?, + streamedStructuredOutput: AgentStreamedStructuredOutputRequest?, tools: [ToolDefinition], session _: ChatGPTSession ) async throws -> any AgentTurnStreaming { @@ -65,7 +66,11 @@ public actor InMemoryAgentBackend: AgentBackend { thread: updatedThread, message: message, selectedTool: selectedTool, - structuredResponseText: responseFormat == nil ? nil : structuredResponseText + structuredResponseText: (responseFormat != nil || streamedStructuredOutput != nil) + ? structuredResponseText + : nil, + responseFormat: responseFormat, + streamedStructuredOutput: streamedStructuredOutput ) } @@ -86,7 +91,9 @@ public final class MockAgentTurnSession: AgentTurnStreaming, @unchecked Sendable thread: AgentThread, message: UserMessageRequest, selectedTool: ToolDefinition?, - structuredResponseText: String? + structuredResponseText: String?, + responseFormat: AgentStructuredOutputFormat? = nil, + streamedStructuredOutput: AgentStreamedStructuredOutputRequest? ) { let pendingResults = PendingToolResults() self.pendingResults = pendingResults @@ -125,16 +132,46 @@ public final class MockAgentTurnSession: AgentTurnStreaming, @unchecked Sendable let result = try await pendingResults.wait(for: invocation.id) let responseText = result.primaryText ?? "The tool completed without returning display text." + let visibleText = "Tool result from \(selectedTool.name): \(responseText)" + + for chunk in MockAgentTurnSession.chunks(for: visibleText) { + continuation.yield( + .assistantMessageDelta( + threadID: thread.id, + turnID: turn.id, + delta: chunk + ) + ) + try await Task.sleep(for: .milliseconds(30)) + } + + if let streamedStructuredOutput { + try MockAgentTurnSession.emitStructuredEvents( + responseText: structuredResponseText ?? #"{"reply":"Structured echo","priority":"normal"}"#, + request: streamedStructuredOutput, + into: continuation + ) + } + + let structuredMetadata = try MockAgentTurnSession.structuredMetadata( + responseText: structuredResponseText, + request: streamedStructuredOutput, + responseFormat: responseFormat + ) let fullMessage = AgentMessage( threadID: thread.id, role: .assistant, - text: "Tool result from \(selectedTool.name): \(responseText)" + text: visibleText, + structuredOutput: structuredMetadata ) continuation.yield(.assistantMessageCompleted(fullMessage)) } else { - let response = structuredResponseText ?? "Echo: \(message.text)" + let visibleText = "Echo: \(message.text)" + let response = streamedStructuredOutput == nil + ? (structuredResponseText ?? visibleText) + : visibleText for chunk in MockAgentTurnSession.chunks(for: response) { continuation.yield( .assistantMessageDelta( @@ -146,12 +183,27 @@ public final class MockAgentTurnSession: AgentTurnStreaming, @unchecked Sendable try await Task.sleep(for: .milliseconds(35)) } + if let streamedStructuredOutput { + try MockAgentTurnSession.emitStructuredEvents( + responseText: structuredResponseText ?? #"{"reply":"Structured echo","priority":"normal"}"#, + request: streamedStructuredOutput, + into: continuation + ) + } + + let structuredMetadata = try MockAgentTurnSession.structuredMetadata( + responseText: structuredResponseText, + request: streamedStructuredOutput, + responseFormat: responseFormat + ) + continuation.yield( .assistantMessageCompleted( AgentMessage( threadID: thread.id, role: .assistant, - text: response + text: response, + structuredOutput: structuredMetadata ) ) ) @@ -200,6 +252,49 @@ public final class MockAgentTurnSession: AgentTurnStreaming, @unchecked Sendable return chunks } + + private static func emitStructuredEvents( + responseText: String, + request: AgentStreamedStructuredOutputRequest, + into continuation: AsyncThrowingStream.Continuation + ) throws { + guard let data = responseText.data(using: .utf8) else { + continuation.yield( + .structuredOutputValidationFailed( + AgentStructuredOutputValidationFailure( + stage: .committed, + message: "The in-memory structured output was not UTF-8.", + rawPayload: responseText + ) + ) + ) + return + } + + let value = try JSONDecoder().decode(JSONValue.self, from: data) + if request.options.emitPartials { + continuation.yield(.structuredOutputPartial(value)) + } + continuation.yield(.structuredOutputCommitted(value)) + } + + private static func structuredMetadata( + responseText: String?, + request: AgentStreamedStructuredOutputRequest?, + responseFormat: AgentStructuredOutputFormat? + ) throws -> AgentStructuredOutputMetadata? { + guard let responseText, + let data = responseText.data(using: .utf8) + else { + return nil + } + + let value = try JSONDecoder().decode(JSONValue.self, from: data) + return AgentStructuredOutputMetadata( + formatName: request?.responseFormat.name ?? responseFormat?.name ?? "structured_output", + payload: value + ) + } } actor PendingToolResults { From e28f95eb8cacf8d4962555270b68213d7da4073f Mon Sep 17 00:00:00 2001 From: Timothy Zelinsky Date: Mon, 23 Mar 2026 16:15:54 +1100 Subject: [PATCH 2/2] Refactor structured stream parser loop --- ...dexResponsesBackend+StructuredOutput.swift | 110 ++++++++++-------- 1 file changed, 64 insertions(+), 46 deletions(-) diff --git a/Sources/CodexKit/Runtime/CodexResponsesBackend+StructuredOutput.swift b/Sources/CodexKit/Runtime/CodexResponsesBackend+StructuredOutput.swift index f30926d..edc6a88 100644 --- a/Sources/CodexKit/Runtime/CodexResponsesBackend+StructuredOutput.swift +++ b/Sources/CodexKit/Runtime/CodexResponsesBackend+StructuredOutput.swift @@ -35,52 +35,7 @@ struct CodexResponsesStructuredStreamParser { pending.append(delta) var events: [StructuredStreamParsingEvent] = [] - processing: while true { - switch mode { - case .visible: - if let range = pending.range(of: Self.openTag) { - let visible = String(pending[.. 0 else { - break processing - } - - let index = pending.index(pending.startIndex, offsetBy: emitCount) - let visible = String(pending[.. 0 else { - break processing - } - - let index = pending.index(pending.startIndex, offsetBy: emitCount) - structuredBuffer.append(contentsOf: pending[.. Bool { + switch mode { + case .visible: + return consumeVisibleContent(into: &events) + case .structured: + return consumeStructuredContent(into: &events) + } + } + + private mutating func consumeVisibleContent( + into events: inout [StructuredStreamParsingEvent] + ) -> Bool { + if let range = pending.range(of: Self.openTag) { + let visible = String(pending[.. 0 else { + return false + } + + let index = pending.index(pending.startIndex, offsetBy: emitCount) + let visible = String(pending[.. Bool { + if let range = pending.range(of: Self.closeTag) { + structuredBuffer.append(contentsOf: pending[.. 0 else { + return false + } + + let index = pending.index(pending.startIndex, offsetBy: emitCount) + structuredBuffer.append(contentsOf: pending[..