diff --git a/package.json b/package.json index 08dcb89..19a4775 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@tangle-network/agent-runtime", - "version": "0.12.0", + "version": "0.12.1", "description": "Reusable runtime lifecycle for domain-specific agents.", "homepage": "https://github.com/tangle-network/agent-runtime#readme", "repository": { diff --git a/src/backends.ts b/src/backends.ts index c1a0d0d..160fc0e 100644 --- a/src/backends.ts +++ b/src/backends.ts @@ -70,6 +70,59 @@ export function createSandboxPromptBackend< } /** @stable */ +/** + * Retry policy for transient transport errors (rate limits, upstream + * timeouts). Defaults to 5 attempts with exponential backoff starting at + * 1s, ±25% jitter, capped at 30s. Set `maxAttempts: 1` to disable retries. + * + * Retried status codes: + * - 408 Request Timeout + * - 425 Too Early + * - 429 Too Many Requests + * - 500 / 502 / 503 / 504 — upstream transient failures + * + * Hard failures (401, 403, 4xx other than the above) propagate immediately. + */ +export interface BackendRetryPolicy { + /** Total attempts including the first try. Default 5. */ + maxAttempts?: number + /** Initial backoff in ms before the second attempt. Default 1000. */ + initialBackoffMs?: number + /** Hard ceiling on backoff in ms. Default 30000. */ + maxBackoffMs?: number + /** Jitter fraction in [0, 1]. Default 0.25 (±25%). */ + jitter?: number + /** Status codes that trigger a retry. Default: 408, 425, 429, 500, 502, 503, 504. */ + retryStatuses?: ReadonlyArray +} + +const DEFAULT_RETRY_STATUSES = [408, 425, 429, 500, 502, 503, 504] as const + +function pickRetryDelayMs(attempt: number, policy: Required): number { + const exp = policy.initialBackoffMs * 2 ** (attempt - 1) + const capped = Math.min(exp, policy.maxBackoffMs) + const jitter = capped * policy.jitter * (Math.random() * 2 - 1) + return Math.max(0, Math.round(capped + jitter)) +} + +function sleep(ms: number, signal?: AbortSignal): Promise { + return new Promise((resolve, reject) => { + if (signal?.aborted) { + reject(signal.reason ?? new Error('aborted')) + return + } + const t = setTimeout(() => { + signal?.removeEventListener('abort', onAbort) + resolve() + }, ms) + const onAbort = () => { + clearTimeout(t) + reject(signal?.reason ?? new Error('aborted')) + } + signal?.addEventListener('abort', onAbort, { once: true }) + }) +} + export function createOpenAICompatibleBackend< TInput extends AgentBackendInput = AgentBackendInput, >(options: { @@ -78,33 +131,57 @@ export function createOpenAICompatibleBackend< model: string kind?: string fetchImpl?: typeof fetch + retry?: BackendRetryPolicy }): AgentExecutionBackend { const fetcher = options.fetchImpl ?? fetch const kind = options.kind ?? 'tcloud' + const retryPolicy: Required = { + maxAttempts: options.retry?.maxAttempts ?? 5, + initialBackoffMs: options.retry?.initialBackoffMs ?? 1000, + maxBackoffMs: options.retry?.maxBackoffMs ?? 30000, + jitter: options.retry?.jitter ?? 0.25, + retryStatuses: options.retry?.retryStatuses ?? DEFAULT_RETRY_STATUSES, + } return { kind, start(_input, context) { return newRuntimeSession(kind, context.requestedSessionId) }, async *stream(input, context) { - const response = await fetcher(`${options.baseUrl.replace(/\/$/, '')}/chat/completions`, { - method: 'POST', - headers: { - Authorization: `Bearer ${options.apiKey}`, - 'Content-Type': 'application/json', - }, - body: JSON.stringify({ - model: options.model, - stream: true, - messages: input.messages ?? [ - { role: 'user', content: input.message ?? context.task.intent }, - ], - }), - signal: context.signal, - }) - if (!response.ok) { - throw new BackendTransportError(kind, `chat backend returned ${response.status}`, { - status: response.status, + let response: Response | undefined + let lastStatus = 0 + for (let attempt = 1; attempt <= retryPolicy.maxAttempts; attempt++) { + response = await fetcher(`${options.baseUrl.replace(/\/$/, '')}/chat/completions`, { + method: 'POST', + headers: { + Authorization: `Bearer ${options.apiKey}`, + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + model: options.model, + stream: true, + messages: input.messages ?? [ + { role: 'user', content: input.message ?? context.task.intent }, + ], + }), + signal: context.signal, + }) + if (response.ok) break + lastStatus = response.status + if (!retryPolicy.retryStatuses.includes(response.status)) break + if (attempt === retryPolicy.maxAttempts) break + // Drain the failed body so the connection can be reused. + try { + await response.body?.cancel() + } catch { + // Best-effort — some runtimes don't expose cancel. + } + const delayMs = pickRetryDelayMs(attempt, retryPolicy) + await sleep(delayMs, context.signal) + } + if (!response || !response.ok) { + throw new BackendTransportError(kind, `chat backend returned ${lastStatus || 'unknown'}`, { + status: lastStatus || 0, }) } yield* streamResponseEvents(response, context)