Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions packages/ai/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,18 @@ try {
}
```

### Execution and retries

By default an AI task runs the provider call through a **direct** or
**concurrency-limited** execution strategy that invokes the provider **once**
and surfaces any failure to the caller — the built-in strategies do **not**
retry automatically. Provider errors are still classified (retryable vs.
permanent, with a rate-limit `retry-after` when the provider supplies one) for
diagnostics and for queue-backed execution, but that classification only drives
automatic retries when a task is run through a persistent job queue, not through
the default in-process strategies. If you need provider-level retries, run the
task on a queue-backed strategy or implement retry/backoff in your own caller.

## Advanced Configuration

### Model Input Resolution
Expand Down
43 changes: 22 additions & 21 deletions packages/ai/src/capability/StreamEventAccumulator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ const isNonEmptyObject = (v: unknown): v is Record<string, unknown> =>
* - `finish` → captured separately via {@link observeFinish}; mandatory before
* {@link materialize}.
*
* Mixed-mode (both text-delta and object-delta on the same stream) is
* rejected at materialise time.
* Text and object deltas on DISTINCT ports coexist and compose into one object
* (e.g. tool-calling streams text on `text` and tool calls on `toolCalls`);
* accumulated deltas take precedence over the finish payload, mirroring the
* task-graph StreamProcessor so `.run()` and streaming produce identical output.
*
* This accumulator is **only** instantiated at explicit terminal-consumer
* sites (AiTask.execute, StreamProcessor `ctx.shouldAccumulate` branch). Do
Expand Down Expand Up @@ -125,13 +127,6 @@ export class StreamEventAccumulator<T extends TaskOutput = TaskOutput> {
err.lastEventType = lastEventType;
throw err;
}
if (this.hasTextDeltas && this.hasObjectDeltas) {
throw new Error(
"StreamEventAccumulator: stream mixed text-delta and object-delta events. " +
"Mixed-mode streams are not supported."
);
}

// One-shot: finish carries the complete payload.
if (!this.hasTextDeltas && !this.hasObjectDeltas && !this.hasSnapshots) {
if (isNonEmptyObject(this.finishData)) return this.finishData;
Expand All @@ -146,18 +141,24 @@ export class StreamEventAccumulator<T extends TaskOutput = TaskOutput> {
return this.snapshotAccumulator as T;
}

// Text-delta mode — per-port map → object.
if (this.hasTextDeltas && !this.hasObjectDeltas) {
const result: Record<string, unknown> = {};
for (const [port, text] of this.textAccumulator) result[port] = text;
if (isNonEmptyObject(this.finishData)) Object.assign(result, this.finishData);
return result as unknown as T;
// Delta mode — text and/or object deltas. Accumulated deltas take precedence
// over the finish payload, so a streaming finish (empty `{}` or a structural
// default scaffold like `{ text: "", toolCalls: [] }`) can never clobber
// streamed content. Text and object deltas on DISTINCT ports compose (e.g.
// tool-calling streams text on `text` and tool calls on `toolCalls`); a
// same-port collision resolves object-last. Mirrors the task-graph
// StreamProcessor so `.run()` and streaming produce identical output.
const fd = this.finishData;
const result: Record<string, unknown> =
fd !== null && typeof fd === "object" && !Array.isArray(fd)
? { ...(fd as Record<string, unknown>) }
: {};
for (const [port, text] of this.textAccumulator) {
if (text.length > 0) result[port] = text;
}

// Object-delta mode — per-port object → output, then merge finish.
const merged: Record<string, unknown> = {};
for (const [port, obj] of this.objectAccumulator) merged[port] = obj;
if (isNonEmptyObject(this.finishData)) Object.assign(merged, this.finishData as object);
return merged as unknown as T;
for (const [port, obj] of this.objectAccumulator) {
result[port] = obj;
}
return result as unknown as T;
}
}
8 changes: 7 additions & 1 deletion packages/ai/src/job/AiJob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,13 @@ export function classifyProviderError(err: unknown, taskType: string, provider:
: typeof (err as any)?.statusCode === "number"
? (err as any).statusCode
: (() => {
const m = message.match(/\b([45]\d{2})\b/);
// Only treat a 4xx/5xx number as an HTTP status when it appears in
// an HTTP-shaped context (e.g. "HTTP 503", "status: 429"). A bare
// number like the "512" in "Sequence length 512 exceeds limit" or a
// model id must not be scavenged as a status, or it misclassifies.
const m = message.match(
/\b(?:HTTP\/?\d?\.?\d?\s*|status(?:\s*code)?\s*(?:[:=]\s*)?)([45]\d{2})\b/i
);
return m ? parseInt(m[1], 10) : undefined;
})();

Expand Down
11 changes: 9 additions & 2 deletions packages/ai/src/provider-utils/ToolCallParsers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -322,9 +322,13 @@ export const parseLlama: ParserFn = (text) => {
// Uses balanced-brace scanning instead of regex to avoid ReDoS
if (calls.length === 0) {
const blocks = findBalancedBlocks(text, "{", "}");
let firstBlockStart: number | undefined;
for (const block of blocks) {
const parsed = tryParseJson(block.text) as Record<string, unknown> | undefined;
if (parsed?.name && (parsed.parameters !== undefined || parsed.arguments !== undefined)) {
if (firstBlockStart === undefined) {
firstBlockStart = block.start;
}
calls.push(
makeToolCall(
parsed.name as string,
Expand All @@ -334,8 +338,11 @@ export const parseLlama: ParserFn = (text) => {
);
}
}
if (calls.length > 0) {
content = text.slice(0, text.indexOf(calls[0].name) - '{"name": "'.length).trim();
if (calls.length > 0 && firstBlockStart !== undefined) {
// Slice the leading content to the actual block start rather than
// reconstructing the offset from a literal `{"name": "` prefix, which
// breaks when the model emits a different key order or no space.
content = text.slice(0, firstBlockStart).trim();
}
}

Expand Down
10 changes: 10 additions & 0 deletions packages/ai/src/provider/AiProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,11 @@ export abstract class AiProvider<TModelConfig extends ModelConfig = ModelConfig>
const registry = getAiProviderRegistry();
if (registry.getProvider(this.name)) {
registry.unregisterProvider(this.name);
// unregisterProvider only clears registry maps; a worker-backed provider
// also has a worker (and, for factory workers, an idle timer) registered
// on the WorkerManager. Without removing it here, the re-registration
// below throws "Worker <name> is already registered."
await globalServiceRegistry.get(WORKER_MANAGER).terminateWorker(this.name);
}

await this.onInitialize(context);
Expand Down Expand Up @@ -254,7 +259,12 @@ export abstract class AiProvider<TModelConfig extends ModelConfig = ModelConfig>
} catch (err) {
// Clean up the partially-registered provider so the registry isn't left
// in an inconsistent state (e.g., functions registered but no queue).
// For worker-backed registration the worker (and its idle timer) was
// already registered above, so remove it too or it leaks under this name.
registry.unregisterProvider(this.name);
if (!isInline && options.worker) {
await globalServiceRegistry.get(WORKER_MANAGER).terminateWorker(this.name);
}
throw err;
}
}
Expand Down
9 changes: 9 additions & 0 deletions packages/ai/src/task/AiChatTask.ts
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,15 @@ export class AiChatTask extends StreamingAiTask<AiChatTaskInput, AiChatTaskOutpu
yield event;
}

// A turn that produced no assistant content (provider emitted only
// non-text events, an immediate finish, or an empty/aborted response)
// must not append an empty assistant message to history nor count as a
// completed turn — doing so poisons multi-turn context and burns a turn.
// Treat it as end-of-conversation instead.
if (assistantText.length === 0) {
break;
}

const assistantMsg: ChatMessage = {
role: "assistant",
content: [{ type: "text", text: assistantText }],
Expand Down
46 changes: 29 additions & 17 deletions packages/ai/src/task/AiChatWithKbTask.ts
Original file line number Diff line number Diff line change
Expand Up @@ -460,24 +460,36 @@ export class AiChatWithKbTask extends StreamingAiTask<
console.warn(`[AiChatWithKbTask] knowledge base "${kbId}" not registered`);
return { kbId, kbLabel: kbId, kb: undefined, results: [] as ChunkSearchResult[] };
}
let results: ChunkSearchResult[];
if (queryVector) {
results = await (kb as KnowledgeBase).similaritySearch(queryVector, { topK });
} else {
const search = context.own(new KbSearchTask());
const out = await search.run({
knowledgeBase: kb as KnowledgeBase,
query: lastUserText,
topK,
});
results = out.results;
try {
let results: ChunkSearchResult[];
if (queryVector) {
results = await (kb as KnowledgeBase).similaritySearch(queryVector, { topK });
} else {
const search = context.own(new KbSearchTask());
const out = await search.run({
knowledgeBase: kb as KnowledgeBase,
query: lastUserText,
topK,
});
results = out.results;
}
return {
kbId,
kbLabel: kb.title || kbId,
kb: kb as KnowledgeBase,
results,
};
} catch (error) {
// A single KB's embedding/search failure must not abort the turn;
// degrade to empty results for that KB so other KBs still contribute.
console.warn(`[AiChatWithKbTask] knowledge base "${kbId}" search failed`, error);
return {
kbId,
kbLabel: kb.title || kbId,
kb: kb as KnowledgeBase,
results: [] as ChunkSearchResult[],
};
}
return {
kbId,
kbLabel: kb.title || kbId,
kb: kb as KnowledgeBase,
results,
};
})
);
}
Expand Down
8 changes: 4 additions & 4 deletions packages/ai/src/task/DocumentEnricherTask.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ const inputSchema = {
description: "Model to use for named entity recognition (optional)",
}),
},
required: [],
required: ["doc_id", "documentTree"],
additionalProperties: false,
} as const satisfies DataPortSchema;

Expand Down Expand Up @@ -90,7 +90,7 @@ const outputSchema = {

export type DocumentEnricherTaskInput = Omit<
{
doc_id?: string | undefined;
doc_id: string;
documentTree?: { [x: string]: unknown } | undefined;
generateSummaries?: boolean | undefined;
extractEntities?: boolean | undefined;
Expand Down Expand Up @@ -147,7 +147,7 @@ export class DocumentEnricherTask extends Task<
nerModel: nerModelConfig,
} = input;

const root = documentTree as DocumentNode;
const root: DocumentNode = documentTree;
const summaryModel = summaryModelConfig ? (summaryModelConfig as ModelConfig) : undefined;
const nerModel = nerModelConfig ? (nerModelConfig as ModelConfig) : undefined;
let summaryCount = 0;
Expand Down Expand Up @@ -181,7 +181,7 @@ export class DocumentEnricherTask extends Task<
);

return {
doc_id: doc_id as string,
doc_id,
documentTree: enrichedRoot,
summaryCount,
entityCount,
Expand Down
84 changes: 65 additions & 19 deletions packages/ai/src/task/RerankerTask.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ export class RerankerTask extends Task<RerankerTaskInput, RerankerTaskOutput, Re
let rankedItems: RankedItem[];
switch (method) {
case "reciprocal-rank-fusion":
rankedItems = this.reciprocalRankFusion(chunks, scores, metadata);
rankedItems = this.reciprocalRankFusion(query, chunks, scores, metadata);
break;
case "simple":
default:
Expand Down Expand Up @@ -213,21 +213,7 @@ export class RerankerTask extends Task<RerankerTaskInput, RerankerTaskOutput, Re
const chunkLower = chunk.toLowerCase();
const initialScore = scores[index] || 0;

let keywordScore = 0;
for (const word of queryWords) {
// Skip pure-whitespace / empty tokens (defensive: split(/\s+/) +
// .filter(w => w.length > 0) above should already exclude them, but
// make the contract local).
if (word.length === 0 || /^\s+$/.test(word)) continue;
// Escape every regex metacharacter so user input like `foo(`, `\\`,
// `*abc`, or `[` is treated as a literal token rather than being
// parsed as a regex (which would throw `SyntaxError`).
const regex = new RegExp(escapeRegExp(word), "gi");
const matches = chunkLower.match(regex);
if (matches) {
keywordScore += matches.length;
}
}
const keywordScore = this.keywordMatchCount(queryWords, chunkLower);

const exactMatchBonus = hasQueryWords && chunkLower.includes(queryLower) ? 0.5 : 0;
const normalizedKeywordScore = hasQueryWords
Expand All @@ -245,12 +231,72 @@ export class RerankerTask extends Task<RerankerTaskInput, RerankerTaskOutput, Re
return items;
}

/** Reciprocal Rank Fusion: 1 / (k + rank) — useful when combining multiple rankings */
private reciprocalRankFusion(chunks: string[], scores: number[], metadata: any[]): RankedItem[] {
/**
* Count regex-safe keyword-token matches of `queryWords` within a chunk
* (already lower-cased). Shared by {@link simpleRerank} and
* {@link reciprocalRankFusion}.
*/
private keywordMatchCount(queryWords: string[], chunkLower: string): number {
let keywordScore = 0;
for (const word of queryWords) {
// Skip pure-whitespace / empty tokens (defensive: the split + filter at
// the call site should already exclude them, but keep the contract local).
if (word.length === 0 || /^\s+$/.test(word)) continue;
// Escape every regex metacharacter so user input like `foo(`, `\\`,
// `*abc`, or `[` is treated as a literal token rather than being parsed
// as a regex (which would throw `SyntaxError`).
const regex = new RegExp(escapeRegExp(word), "gi");
const matches = chunkLower.match(regex);
if (matches) keywordScore += matches.length;
}
return keywordScore;
}

/**
* Reciprocal Rank Fusion. Fuses two rankings of the same chunks — the
* retrieval ranking (the provided initial `scores`, or the input order when
* no scores are given) and a lexical keyword-overlap ranking derived from the
* `query` — via `RRF(d) = Σ 1 / (k + rank_r(d))`. This is the point of RRF:
* a chunk ranked highly by both similarity and keyword overlap floats to the
* top even if neither signal alone put it first. Unlike a single-signal sort,
* it actually reads `scores` and `query`.
*/
private reciprocalRankFusion(
query: string,
chunks: string[],
scores: number[],
metadata: any[]
): RankedItem[] {
const k = 60;
const n = chunks.length;
if (n === 0) return [];

// Ranking A — retrieval order. Prefer the provided initial scores; fall
// back to the input order (already retrieval order) when none are given.
const scoreOrder = chunks.map((_, index) => index);
if (scores.some((s) => typeof s === "number")) {
scoreOrder.sort((a, b) => (scores[b] ?? -Infinity) - (scores[a] ?? -Infinity));
}
const scoreRank = new Array<number>(n);
scoreOrder.forEach((originalIndex, rank) => (scoreRank[originalIndex] = rank));

// Ranking B — lexical keyword overlap with the query (plus an exact-phrase
// nudge), so RRF fuses a keyword signal with the retrieval signal.
const queryLower = query.toLowerCase();
const queryWords = queryLower.split(/\s+/).filter((w) => w.length > 0);
const keywordScores = chunks.map((chunk) => {
const chunkLower = chunk.toLowerCase();
const base = this.keywordMatchCount(queryWords, chunkLower);
return base + (queryWords.length > 0 && chunkLower.includes(queryLower) ? 0.5 : 0);
});
const keywordOrder = chunks.map((_, index) => index);
keywordOrder.sort((a, b) => keywordScores[b] - keywordScores[a]);
const keywordRank = new Array<number>(n);
keywordOrder.forEach((originalIndex, rank) => (keywordRank[originalIndex] = rank));

const items: RankedItem[] = chunks.map((chunk, index) => ({
chunk,
score: 1 / (k + index + 1),
score: 1 / (k + scoreRank[index] + 1) + 1 / (k + keywordRank[index] + 1),
metadata: metadata[index],
originalIndex: index,
}));
Expand Down
15 changes: 12 additions & 3 deletions packages/ai/src/task/ToolCallingTask.ts
Original file line number Diff line number Diff line change
Expand Up @@ -346,17 +346,26 @@ export class ToolCallingTask extends StreamingAiTask<
input: ToolCallingTaskInput,
executeContext: IExecuteContext
): Promise<ToolCallingTaskOutput | undefined> {
const result = await super.execute(input, executeContext);
// Register the session disposer BEFORE running so it still fires if
// super.execute() throws or the stream aborts mid-iteration — the provider
// may already have allocated the session on the first run-fn invocation.
// The resourceScope is first-registration-wins and disposes via allSettled,
// so computing the session id up front and registering early is safe.
await this.getJobInput(input);
this.registerSessionDispose(input, executeContext);
return result;
return super.execute(input, executeContext);
}

override async *executeStream(
input: ToolCallingTaskInput,
context: IExecuteContext
): AsyncIterable<StreamEvent<ToolCallingTaskOutput>> {
yield* super.executeStream(input, context);
// Register the session disposer BEFORE streaming for the same reason as
// execute(): an abort or throw mid-stream must still leave the disposer
// registered so disposeSession runs on scope teardown.
await this.getJobInput(input);
this.registerSessionDispose(input, context);
yield* super.executeStream(input, context);
}
}

Expand Down
Loading
Loading