Fix reciprocal rank fusion, streaming deltas, and error handling#600
Conversation
… review Blocker + Major fixes from the AI-subsystem review: - AiProvider: terminate the WorkerManager worker (and idle timer) on re-registration teardown and on afterRegister rollback, so re-registering a worker-backed provider no longer throws "already registered" and a failed registration doesn't leak the worker. - AiJob.classifyProviderError: only scavenge a 4xx/5xx HTTP status from the error message in an HTTP-shaped context, so numbers like "512" in a model message no longer misclassify the error as a retryable server error. - ToolCallParsers.parseLlama: slice leading content to the matched block's start offset instead of fragile literal-prefix length math. - AiChatTask: do not append an empty assistant turn to history or count it as a completed turn when the provider streamed no text. - AiChatWithKbTask: isolate per-KB search failures so one failing KB degrades to empty results instead of aborting the whole conversation turn. - DocumentEnricherTask: require doc_id and documentTree in the input schema and drop the unsafe casts that passed undefined through. - ToolCallingTask: register the session disposer before running so it still fires when execute()/the stream throws or aborts mid-iteration. Verified: tsgo type-check (36/36 packages) and vitest ai/task/rag/provider/util suites all pass. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_015j321YUsXJRVEVkBmEavtg
…behavior Design-decision fixes from the AI-subsystem review: - AiTask.gateOrThrow now shares modelMeetsRequires with validateInput/ narrowInput so all three gates apply one policy: a model that declares capabilities must include every required one, while a model with no declared capabilities passes (unverified-allow). Previously gateOrThrow rejected a capability-less model that the other two gates accepted. - RerankerTask reciprocal-rank-fusion now performs real RRF: it fuses the retrieval-score ranking (or input order when no scores are given) with a lexical keyword-overlap ranking derived from the query, instead of the prior identity re-score that ignored scores and preserved input order. Shared the keyword-match loop between simple and RRF paths. - README: document that the built-in direct/concurrency-limited strategies invoke the provider once and surface failures without automatic retry; the retryable/permanent classification is for diagnostics and queue-backed execution, not the default in-process strategies. Verified: ai (238) and rag (225) vitest suites pass; ai package builds (JS+types). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_015j321YUsXJRVEVkBmEavtg
Provider Major fixes from the AI-subsystem review: - Gemini (Gemini_TextGeneration): map frequencyPenalty and presencePenalty onto generationConfig (the SDK supports both), forward topP on the chat path (previously only the prompt path forwarded it), and gate every optional sampling param on `!== undefined` so omitted params keep the provider default — matching the OpenAI/Anthropic adapters. Extracted a shared buildGenerationConfig helper used by both the chat and prompt paths. - Ollama: add structured-generation support so StructuredGenerationTask (requires ["json-mode"]) can run on Ollama. New Ollama_StructuredGeneration run-fn passes the output schema as Ollama's chat `format` parameter, emits progressive object-delta on the `object` port, and a terminal finish whose data.object carries the parsed object (per the structured-generation streaming convention). Registered OLLAMA_JSON_MODE = ["text.generation", "json-mode"] in both node and browser run-fn lists, and inferOllamaCapabilities now advertises json-mode for generative models. Updated the Ollama provider unit test that pinned the prior no-json-mode behavior. Verified: provider vitest (163) and OllamaProvider unit test (17) pass; both provider packages build (JS+types). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_015j321YUsXJRVEVkBmEavtg
…bber
B1/M9 from the review: the two terminal stream consumers disagreed. The
task-graph StreamProcessor (streaming path) merges accumulated deltas ON TOP of
the finish payload (delta-wins) and never rejects text+object streams, relying
on the run-fn's structural default scaffold to supply absent ports. The
StreamEventAccumulator (the .run()/AiTask.execute path) did the opposite
(finish-wins) and threw on any text+object mix. So a tool-calling run-fn's
finish scaffold { text: "", toolCalls: [] } clobbered the streamed tool calls
(B1), and a model streaming both reasoning text and tool calls threw a
mixed-mode error (M9).
- StreamEventAccumulator: accumulated deltas now take precedence over the
finish payload, and text/object deltas on distinct ports compose into one
object (same-port collisions resolve object-last). This mirrors the
StreamProcessor so .run() and streaming produce identical output, and a
streaming finish (empty {} or a default scaffold) can never clobber streamed
content.
- Ollama tool-calling: stop accumulating into the finish event (emit a static
{ text: "", toolCalls: [] } scaffold) and filter hallucinated tool names at
the object-delta, not at finish — required now that accumulated deltas win.
OpenAI and Gemini already emit a static scaffold and filter their deltas, so
they were correct once the consumers agreed; no change needed there.
Verified: ai (239), task (1117), provider (163) vitest pass; new accumulator
tests cover the no-clobber and distinct-port compose cases.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_015j321YUsXJRVEVkBmEavtg
The B1 regression test cast the observeFinish argument as StreamEvent<Out> (the full union, which includes StreamError and has no `data`), tripping tsgo (TS2345) in @workglow/test#build-types even though vitest transpiled it fine. Pass the finish literal uncast like the sibling tests so it narrows to StreamFinish<Out>. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_015j321YUsXJRVEVkBmEavtg
Coverage Report
File CoverageNo changed files found. |
There was a problem hiding this comment.
Pull request overview
This PR fixes several correctness issues and edge cases across the AI stack—most notably reciprocal-rank-fusion reranking, streaming output accumulation semantics, and provider/session cleanup—while adding Ollama JSON-mode (structured generation) support and improving robustness around KB search and error classification.
Changes:
- Correct RRF reranking by fusing independent retrieval-score and keyword-overlap rankings, with shared keyword matching logic.
- Align streaming accumulation behavior with task-graph semantics (compose deltas across ports; deltas take precedence over structural finish scaffolds) and update tests accordingly.
- Extend provider capabilities (Ollama json-mode), improve provider/task cleanup and error-handling (worker termination on re-register, KB search isolation, safer HTTP status extraction, parser offset fix).
Reviewed changes
Copilot reviewed 21 out of 21 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| providers/ollama/src/ai/common/Ollama_ToolCalling.ts | Emits validated tool-call deltas during streaming and uses a structural finish scaffold. |
| providers/ollama/src/ai/common/Ollama_StructuredGeneration.ts | Adds Ollama JSON-mode structured generation streaming run-fn with partial JSON parsing + final object finish payload. |
| providers/ollama/src/ai/common/Ollama_JobRunFns.ts | Registers the new Ollama json-mode run-fn in the worker runtime. |
| providers/ollama/src/ai/common/Ollama_JobRunFns.browser.ts | Registers the new Ollama json-mode run-fn in the browser runtime. |
| providers/ollama/src/ai/common/Ollama_CapabilitySets.ts | Adds ["text.generation","json-mode"] capability set. |
| providers/ollama/src/ai/common/Ollama_Capabilities.ts | Includes json-mode in inferred capability lists. |
| providers/google-gemini/src/ai/common/Gemini_TextGeneration.ts | Introduces buildGenerationConfig() to map canonical sampling params without forcing undefined values. |
| packages/test/src/test/ai/StreamEventAccumulator.test.ts | Updates/expands tests for composing deltas across distinct ports and finish-scaffold precedence. |
| packages/test/src/test/ai/collectStream.test.ts | Updates collectStream expectations for mixed text/object deltas across ports. |
| packages/test/src/test/ai-provider-api/OllamaProvider.test.ts | Updates provider capability/run-fn assertions to include json-mode registration. |
| packages/ai/src/task/ToolCallingTask.ts | Registers session disposal earlier to ensure cleanup even on throw/abort mid-execution/stream. |
| packages/ai/src/task/RerankerTask.ts | Fixes RRF algorithm and factors out shared keyword match counting. |
| packages/ai/src/task/DocumentEnricherTask.ts | Tightens schema/types for required inputs (doc_id, documentTree) and removes unnecessary casts. |
| packages/ai/src/task/base/AiTask.ts | Clarifies and centralizes capability gating semantics via modelMeetsRequires. |
| packages/ai/src/task/AiChatWithKbTask.ts | Wraps per-KB search in try/catch so one KB failure degrades to empty results instead of aborting the turn. |
| packages/ai/src/task/AiChatTask.ts | Prevents appending an empty assistant message when a turn yields no assistant text. |
| packages/ai/src/provider/AiProvider.ts | Ensures worker-backed providers terminate existing workers on re-register and on partial-registration failure cleanup. |
| packages/ai/src/provider-utils/ToolCallParsers.ts | Fixes Llama parser content slicing by tracking the true balanced-block start offset. |
| packages/ai/src/job/AiJob.ts | Tightens HTTP-status extraction regex to avoid misclassifying bare numbers in error messages. |
| packages/ai/src/capability/StreamEventAccumulator.ts | Allows composing text+object deltas across ports and ensures accumulated deltas take precedence over finish scaffolds. |
| packages/ai/README.md | Documents execution/retry behavior (no automatic retries for default in-process strategies). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| export function createOllamaStructuredGenerationStream( | ||
| getClient: GetClient | ||
| ): AiProviderRunFn< | ||
| StructuredGenerationTaskInput, | ||
| StructuredGenerationTaskOutput, | ||
| OllamaModelConfig | ||
| > { | ||
| return async (input, model, signal, emit, outputSchema) => { |
There was a problem hiding this comment.
Added in cc880f5 — packages/test/src/test/ai-provider-api/Ollama_StructuredGenerationStream.test.ts, a fake-stream unit test mirroring Ollama_TextGenerationStream.test.ts. It covers: the output schema passed through as Ollama's format, progressive object-delta emission on the object port from partial JSON, finish.data.object carrying the fully parsed object, partial-JSON fallback on truncated output, pre-aborted-signal rejection (before any client call/emit), and a single stream.abort() on mid-iteration abort. The factory is now exported from @workglow/ollama/ai-runtime so the test can import it.
Generated by Claude Code
Addresses the Copilot review note that the new Ollama json-mode adapter had no unit coverage. Adds a fake-stream test (mirroring Ollama_TextGenerationStream) covering: the output schema passed as Ollama's `format`, progressive object-delta emission on the `object` port from partial JSON, finish.data.object carrying the fully parsed object, partial-JSON fallback on truncated output, pre-aborted-signal rejection before any client call, and a single stream.abort() on mid-iteration abort. Exports createOllamaStructuredGenerationStream from @workglow/ollama/ai-runtime (alongside the text-generation factory) so the test can import it. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_015j321YUsXJRVEVkBmEavtg
Mirror of #600's StreamEventAccumulator "no clobber" test on the streaming (.run via executeStream) path: an object-mode toolCalls delta must survive a static {text:"",toolCalls:[]} finish scaffold, and the absent text port falls back to the scaffold default. Guards the B1 class against drift in the rewritten StreamProcessor finish-enrichment. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_013fSp8GisRRDbtTvbUrS1c5
Summary
This PR fixes critical bugs in reciprocal rank fusion (RRF), streaming event accumulation, and error handling across multiple AI tasks and providers. It also adds Ollama structured generation support and improves robustness in knowledge base search and tool calling.
Key Changes
Reciprocal Rank Fusion (RerankerTask)
reciprocalRankFusion()to enable keyword-based ranking.keywordMatchCount()helper: Shared by bothsimpleRerank()andreciprocalRankFusion()to avoid duplication and ensure consistent keyword matching logic.scoreRankandkeywordRankseparately, then combines them via1/(k+rank_a) + 1/(k+rank_b)so chunks ranked highly by both signals float to the top.Streaming Event Accumulation (StreamEventAccumulator)
textand tool calls ontoolCalls).{ text: "", toolCalls: [] }) from clobbering streamed content..run()and streaming produce identical output.Tool Calling (Ollama & ToolCallingTask)
{ text: "", toolCalls: [] }on finish, letting the accumulator's delta precedence rule preserve streamed calls.ToolCallingTask.execute()andexecuteStream()now register the session disposer before callingsuper.execute()/super.executeStream(), ensuring cleanup fires even if the provider throws or the stream aborts mid-iteration.Knowledge Base Search Error Handling (AiChatWithKbTask)
Ollama Structured Generation
Ollama_StructuredGeneration.ts: Implements JSON-mode structured generation for Ollama, passing the schema as theformatparameter and emittingobject-deltaevents with progressively parsed partial JSON.Gemini Text Generation
buildGenerationConfig()helper: Maps canonical sampling params (maxTokens,temperature,topP,frequencyPenalty,presencePenalty) onto Gemini'sgenerationConfig, only setting defined fields so callers that omit a param keep the provider's default.AiChatTask
Error Classification (AiJob)
Tool Call Parsing (ToolCallParsers)
{"name": "prefix, which breaks when the model's output format varies.Provider Registration (AiProvider)
https://claude.ai/code/session_015j321YUsXJRVEVkBmEavtg