Skip to content

Fix reciprocal rank fusion, streaming deltas, and error handling#600

Merged
sroussey merged 6 commits into
mainfrom
claude/upbeat-faraday-w55gni
Jun 25, 2026
Merged

Fix reciprocal rank fusion, streaming deltas, and error handling#600
sroussey merged 6 commits into
mainfrom
claude/upbeat-faraday-w55gni

Conversation

@sroussey

Copy link
Copy Markdown
Collaborator

Summary

This PR fixes critical bugs in reciprocal rank fusion (RRF), streaming event accumulation, and error handling across multiple AI tasks and providers. It also adds Ollama structured generation support and improves robustness in knowledge base search and tool calling.

Key Changes

Reciprocal Rank Fusion (RerankerTask)

  • Fixed RRF implementation: Now correctly fuses two independent rankings (retrieval scores + lexical keyword overlap) instead of just reranking by position. The query is now passed to reciprocalRankFusion() to enable keyword-based ranking.
  • Extracted keywordMatchCount() helper: Shared by both simpleRerank() and reciprocalRankFusion() to avoid duplication and ensure consistent keyword matching logic.
  • Improved RRF algorithm: Properly computes scoreRank and keywordRank separately, then combines them via 1/(k+rank_a) + 1/(k+rank_b) so chunks ranked highly by both signals float to the top.

Streaming Event Accumulation (StreamEventAccumulator)

  • Removed mixed-mode rejection: Text and object deltas on distinct ports now coexist and compose into a single output object (e.g., tool-calling streams text on text and tool calls on toolCalls).
  • Fixed delta precedence: Accumulated deltas now take precedence over the finish payload, preventing structural default scaffolds (e.g., { text: "", toolCalls: [] }) from clobbering streamed content.
  • Mirrors task-graph behavior: Ensures .run() and streaming produce identical output.

Tool Calling (Ollama & ToolCallingTask)

  • Ollama tool calling: Moved tool call validation to the delta emission point (not finish), so hallucinated tool names are filtered before streaming to consumers.
  • Structural finish scaffold: Ollama now emits only { text: "", toolCalls: [] } on finish, letting the accumulator's delta precedence rule preserve streamed calls.
  • Session disposal timing: ToolCallingTask.execute() and executeStream() now register the session disposer before calling super.execute() / super.executeStream(), ensuring cleanup fires even if the provider throws or the stream aborts mid-iteration.

Knowledge Base Search Error Handling (AiChatWithKbTask)

  • Wrapped KB search in try-catch: A single KB's embedding/search failure no longer aborts the entire turn. Errors are logged and that KB returns empty results, allowing other KBs to still contribute.

Ollama Structured Generation

  • New Ollama_StructuredGeneration.ts: Implements JSON-mode structured generation for Ollama, passing the schema as the format parameter and emitting object-delta events with progressively parsed partial JSON.

Gemini Text Generation

  • Extracted buildGenerationConfig() helper: Maps canonical sampling params (maxTokens, temperature, topP, frequencyPenalty, presencePenalty) onto Gemini's generationConfig, only setting defined fields so callers that omit a param keep the provider's default.

AiChatTask

  • Empty assistant message handling: A turn that produces no assistant content (provider emitted only non-text events, an immediate finish, or an empty/aborted response) no longer appends an empty message to history nor counts as a completed turn, preventing context poisoning.

Error Classification (AiJob)

  • Improved HTTP status detection: The regex for extracting HTTP status codes from error messages now requires HTTP-shaped context (e.g., "HTTP 503", "status: 429") to avoid misclassifying bare numbers like "512" in "Sequence length 512 exceeds limit" or model IDs.

Tool Call Parsing (ToolCallParsers)

  • Fixed Llama parser offset: Now tracks the actual block start position instead of reconstructing the offset from a literal {"name": " prefix, which breaks when the model's output format varies.

Provider Registration (AiProvider)

  • **Worker cleanup on re-

https://claude.ai/code/session_015j321YUsXJRVEVkBmEavtg

claude added 5 commits June 24, 2026 22:39
… review

Blocker + Major fixes from the AI-subsystem review:

- AiProvider: terminate the WorkerManager worker (and idle timer) on
  re-registration teardown and on afterRegister rollback, so re-registering
  a worker-backed provider no longer throws "already registered" and a failed
  registration doesn't leak the worker.
- AiJob.classifyProviderError: only scavenge a 4xx/5xx HTTP status from the
  error message in an HTTP-shaped context, so numbers like "512" in a model
  message no longer misclassify the error as a retryable server error.
- ToolCallParsers.parseLlama: slice leading content to the matched block's
  start offset instead of fragile literal-prefix length math.
- AiChatTask: do not append an empty assistant turn to history or count it as
  a completed turn when the provider streamed no text.
- AiChatWithKbTask: isolate per-KB search failures so one failing KB degrades
  to empty results instead of aborting the whole conversation turn.
- DocumentEnricherTask: require doc_id and documentTree in the input schema
  and drop the unsafe casts that passed undefined through.
- ToolCallingTask: register the session disposer before running so it still
  fires when execute()/the stream throws or aborts mid-iteration.

Verified: tsgo type-check (36/36 packages) and vitest ai/task/rag/provider/util
suites all pass.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_015j321YUsXJRVEVkBmEavtg
…behavior

Design-decision fixes from the AI-subsystem review:

- AiTask.gateOrThrow now shares modelMeetsRequires with validateInput/
  narrowInput so all three gates apply one policy: a model that declares
  capabilities must include every required one, while a model with no declared
  capabilities passes (unverified-allow). Previously gateOrThrow rejected a
  capability-less model that the other two gates accepted.
- RerankerTask reciprocal-rank-fusion now performs real RRF: it fuses the
  retrieval-score ranking (or input order when no scores are given) with a
  lexical keyword-overlap ranking derived from the query, instead of the prior
  identity re-score that ignored scores and preserved input order. Shared the
  keyword-match loop between simple and RRF paths.
- README: document that the built-in direct/concurrency-limited strategies
  invoke the provider once and surface failures without automatic retry; the
  retryable/permanent classification is for diagnostics and queue-backed
  execution, not the default in-process strategies.

Verified: ai (238) and rag (225) vitest suites pass; ai package builds (JS+types).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_015j321YUsXJRVEVkBmEavtg
Provider Major fixes from the AI-subsystem review:

- Gemini (Gemini_TextGeneration): map frequencyPenalty and presencePenalty
  onto generationConfig (the SDK supports both), forward topP on the chat path
  (previously only the prompt path forwarded it), and gate every optional
  sampling param on `!== undefined` so omitted params keep the provider default
  — matching the OpenAI/Anthropic adapters. Extracted a shared
  buildGenerationConfig helper used by both the chat and prompt paths.
- Ollama: add structured-generation support so StructuredGenerationTask
  (requires ["json-mode"]) can run on Ollama. New Ollama_StructuredGeneration
  run-fn passes the output schema as Ollama's chat `format` parameter, emits
  progressive object-delta on the `object` port, and a terminal finish whose
  data.object carries the parsed object (per the structured-generation
  streaming convention). Registered OLLAMA_JSON_MODE = ["text.generation",
  "json-mode"] in both node and browser run-fn lists, and inferOllamaCapabilities
  now advertises json-mode for generative models. Updated the Ollama provider
  unit test that pinned the prior no-json-mode behavior.

Verified: provider vitest (163) and OllamaProvider unit test (17) pass; both
provider packages build (JS+types).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_015j321YUsXJRVEVkBmEavtg
…bber

B1/M9 from the review: the two terminal stream consumers disagreed. The
task-graph StreamProcessor (streaming path) merges accumulated deltas ON TOP of
the finish payload (delta-wins) and never rejects text+object streams, relying
on the run-fn's structural default scaffold to supply absent ports. The
StreamEventAccumulator (the .run()/AiTask.execute path) did the opposite
(finish-wins) and threw on any text+object mix. So a tool-calling run-fn's
finish scaffold { text: "", toolCalls: [] } clobbered the streamed tool calls
(B1), and a model streaming both reasoning text and tool calls threw a
mixed-mode error (M9).

- StreamEventAccumulator: accumulated deltas now take precedence over the
  finish payload, and text/object deltas on distinct ports compose into one
  object (same-port collisions resolve object-last). This mirrors the
  StreamProcessor so .run() and streaming produce identical output, and a
  streaming finish (empty {} or a default scaffold) can never clobber streamed
  content.
- Ollama tool-calling: stop accumulating into the finish event (emit a static
  { text: "", toolCalls: [] } scaffold) and filter hallucinated tool names at
  the object-delta, not at finish — required now that accumulated deltas win.
  OpenAI and Gemini already emit a static scaffold and filter their deltas, so
  they were correct once the consumers agreed; no change needed there.

Verified: ai (239), task (1117), provider (163) vitest pass; new accumulator
tests cover the no-clobber and distinct-port compose cases.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_015j321YUsXJRVEVkBmEavtg
The B1 regression test cast the observeFinish argument as StreamEvent<Out>
(the full union, which includes StreamError and has no `data`), tripping
tsgo (TS2345) in @workglow/test#build-types even though vitest transpiled it
fine. Pass the finish literal uncast like the sibling tests so it narrows to
StreamFinish<Out>.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_015j321YUsXJRVEVkBmEavtg
@github-actions

github-actions Bot commented Jun 25, 2026

Copy link
Copy Markdown

Coverage Report

Status Category Percentage Covered / Total
🔵 Lines 62.5% 25322 / 40512
🔵 Statements 62.34% 26206 / 42035
🔵 Functions 63.75% 4814 / 7551
🔵 Branches 51.08% 12366 / 24206
File CoverageNo changed files found.
Generated in workflow #2622 for commit cc880f5 by the Vitest Coverage Report Action

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR fixes several correctness issues and edge cases across the AI stack—most notably reciprocal-rank-fusion reranking, streaming output accumulation semantics, and provider/session cleanup—while adding Ollama JSON-mode (structured generation) support and improving robustness around KB search and error classification.

Changes:

  • Correct RRF reranking by fusing independent retrieval-score and keyword-overlap rankings, with shared keyword matching logic.
  • Align streaming accumulation behavior with task-graph semantics (compose deltas across ports; deltas take precedence over structural finish scaffolds) and update tests accordingly.
  • Extend provider capabilities (Ollama json-mode), improve provider/task cleanup and error-handling (worker termination on re-register, KB search isolation, safer HTTP status extraction, parser offset fix).

Reviewed changes

Copilot reviewed 21 out of 21 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
providers/ollama/src/ai/common/Ollama_ToolCalling.ts Emits validated tool-call deltas during streaming and uses a structural finish scaffold.
providers/ollama/src/ai/common/Ollama_StructuredGeneration.ts Adds Ollama JSON-mode structured generation streaming run-fn with partial JSON parsing + final object finish payload.
providers/ollama/src/ai/common/Ollama_JobRunFns.ts Registers the new Ollama json-mode run-fn in the worker runtime.
providers/ollama/src/ai/common/Ollama_JobRunFns.browser.ts Registers the new Ollama json-mode run-fn in the browser runtime.
providers/ollama/src/ai/common/Ollama_CapabilitySets.ts Adds ["text.generation","json-mode"] capability set.
providers/ollama/src/ai/common/Ollama_Capabilities.ts Includes json-mode in inferred capability lists.
providers/google-gemini/src/ai/common/Gemini_TextGeneration.ts Introduces buildGenerationConfig() to map canonical sampling params without forcing undefined values.
packages/test/src/test/ai/StreamEventAccumulator.test.ts Updates/expands tests for composing deltas across distinct ports and finish-scaffold precedence.
packages/test/src/test/ai/collectStream.test.ts Updates collectStream expectations for mixed text/object deltas across ports.
packages/test/src/test/ai-provider-api/OllamaProvider.test.ts Updates provider capability/run-fn assertions to include json-mode registration.
packages/ai/src/task/ToolCallingTask.ts Registers session disposal earlier to ensure cleanup even on throw/abort mid-execution/stream.
packages/ai/src/task/RerankerTask.ts Fixes RRF algorithm and factors out shared keyword match counting.
packages/ai/src/task/DocumentEnricherTask.ts Tightens schema/types for required inputs (doc_id, documentTree) and removes unnecessary casts.
packages/ai/src/task/base/AiTask.ts Clarifies and centralizes capability gating semantics via modelMeetsRequires.
packages/ai/src/task/AiChatWithKbTask.ts Wraps per-KB search in try/catch so one KB failure degrades to empty results instead of aborting the turn.
packages/ai/src/task/AiChatTask.ts Prevents appending an empty assistant message when a turn yields no assistant text.
packages/ai/src/provider/AiProvider.ts Ensures worker-backed providers terminate existing workers on re-register and on partial-registration failure cleanup.
packages/ai/src/provider-utils/ToolCallParsers.ts Fixes Llama parser content slicing by tracking the true balanced-block start offset.
packages/ai/src/job/AiJob.ts Tightens HTTP-status extraction regex to avoid misclassifying bare numbers in error messages.
packages/ai/src/capability/StreamEventAccumulator.ts Allows composing text+object deltas across ports and ensures accumulated deltas take precedence over finish scaffolds.
packages/ai/README.md Documents execution/retry behavior (no automatic retries for default in-process strategies).

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +29 to +36
export function createOllamaStructuredGenerationStream(
getClient: GetClient
): AiProviderRunFn<
StructuredGenerationTaskInput,
StructuredGenerationTaskOutput,
OllamaModelConfig
> {
return async (input, model, signal, emit, outputSchema) => {

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in cc880f5packages/test/src/test/ai-provider-api/Ollama_StructuredGenerationStream.test.ts, a fake-stream unit test mirroring Ollama_TextGenerationStream.test.ts. It covers: the output schema passed through as Ollama's format, progressive object-delta emission on the object port from partial JSON, finish.data.object carrying the fully parsed object, partial-JSON fallback on truncated output, pre-aborted-signal rejection (before any client call/emit), and a single stream.abort() on mid-iteration abort. The factory is now exported from @workglow/ollama/ai-runtime so the test can import it.


Generated by Claude Code

Addresses the Copilot review note that the new Ollama json-mode adapter had no
unit coverage. Adds a fake-stream test (mirroring Ollama_TextGenerationStream)
covering: the output schema passed as Ollama's `format`, progressive
object-delta emission on the `object` port from partial JSON, finish.data.object
carrying the fully parsed object, partial-JSON fallback on truncated output,
pre-aborted-signal rejection before any client call, and a single stream.abort()
on mid-iteration abort.

Exports createOllamaStructuredGenerationStream from @workglow/ollama/ai-runtime
(alongside the text-generation factory) so the test can import it.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_015j321YUsXJRVEVkBmEavtg
@sroussey sroussey merged commit 6082fa0 into main Jun 25, 2026
14 checks passed
@sroussey sroussey deleted the claude/upbeat-faraday-w55gni branch June 25, 2026 01:25
sroussey pushed a commit that referenced this pull request Jun 25, 2026
Mirror of #600's StreamEventAccumulator "no clobber" test on the streaming
(.run via executeStream) path: an object-mode toolCalls delta must survive a
static {text:"",toolCalls:[]} finish scaffold, and the absent text port falls
back to the scaffold default. Guards the B1 class against drift in the
rewritten StreamProcessor finish-enrichment.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_013fSp8GisRRDbtTvbUrS1c5
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants