diff --git a/.changeset/external-call-resilience.md b/.changeset/external-call-resilience.md new file mode 100644 index 000000000..e7037c562 --- /dev/null +++ b/.changeset/external-call-resilience.md @@ -0,0 +1,26 @@ +--- +'@objectstack/spec': minor +'@objectstack/connector-rest': patch +'@objectstack/connector-slack': patch +'@objectstack/embedder-openai': patch +'@objectstack/connector-mcp': patch +--- + +feat(spec): resilientFetch — timeout + backoff for outbound HTTP (P1-1) + +Outbound calls in the connectors/embedder were naked `fetch` with no timeout or +retry, so a slow or rate-limited external API could hang an agent turn with no +recovery. + +New shared `resilientFetch` (`@objectstack/spec/shared`): +- per-attempt timeout via `AbortController` (default 30s); +- exponential backoff with jitter, up to 3 attempts, on network errors / 429 / 5xx; +- honours a `Retry-After` header on 429; +- never retries a caller-initiated abort (intentional cancellation). + +Wired into `connector-rest`, `connector-slack`, and `embedder-openai`. +`connector-mcp` talks through the MCP SDK transport, so it gets a 30s per-request +`timeout` on `callTool` / `listTools` instead. + +A stateful per-host **circuit breaker** is deliberately left as a follow-up: +timeout + backoff already removes the hang/no-recovery risk. diff --git a/docs/launch-readiness.md b/docs/launch-readiness.md index 04827cb97..ea5146bbc 100644 --- a/docs/launch-readiness.md +++ b/docs/launch-readiness.md @@ -110,7 +110,18 @@ fix or acceptance.** rate-limited external API hangs the entire agent turn with no recovery. - **Action:** Add a default request timeout (e.g. 30s, configurable) + exponential backoff (3 tries) + a circuit breaker; tests for 429 / timeout paths. -- **Owner:** _______ · Verify ☐ · Sign-off ☐ · Notes: _______ +- **Fix:** New shared `resilientFetch` (`@objectstack/spec/shared`) — 30s per-attempt + timeout (AbortController) + exponential backoff with jitter (3 tries) on + network errors / 429 / 5xx, honouring `Retry-After`; never retries a + caller-initiated abort. Wired into `connector-rest`, `connector-slack`, + `embedder-openai`. `connector-mcp` uses the MCP SDK transport, so it gets a 30s + per-request `timeout` on `callTool` / `listTools` instead. +13 tests (helper 9, + connector retry 1, plus existing suites green). +- **Deferred (follow-up, not blocking):** a **circuit breaker** — it's stateful + and per-host; timeout + backoff already removes the "hangs the agent turn / no + recovery" risk. Making timeout/retry **per-call configurable** (currently + sensible defaults) is a small follow-up. +- **Owner:** _______ · Verify ✅ (confirmed real @ `main`) · Sign-off ☐ · Notes: Timeout + backoff shipped across all 4 paths; circuit breaker deferred (rationale above). Awaiting human sign-off. ### P1-2 — Unbounded growth: execution logs, job runs, event log - **Area:** `service-automation` (in-memory exec logs, hard 1000 cap), diff --git a/packages/connectors/connector-mcp/src/mcp-connector.ts b/packages/connectors/connector-mcp/src/mcp-connector.ts index 9f9cde357..e0d508510 100644 --- a/packages/connectors/connector-mcp/src/mcp-connector.ts +++ b/packages/connectors/connector-mcp/src/mcp-connector.ts @@ -149,6 +149,13 @@ function normalizeResult(raw: unknown): Record { return out; } +/** + * Default per-request timeout (ms) for MCP calls (P1-1). Without it, a hung or + * unresponsive MCP server stalls the agent turn indefinitely. The SDK aborts the + * request once this elapses. + */ +const MCP_REQUEST_TIMEOUT_MS = 30_000; + /** * The default {@link McpClientLike} — lazily imports the official MCP SDK so it * is only loaded when a real connection is made (tests inject their own client). @@ -182,11 +189,11 @@ async function defaultClientFactory( return { async listTools() { - const res = await client.listTools(); + const res = await client.listTools(undefined, { timeout: MCP_REQUEST_TIMEOUT_MS }); return (res.tools ?? []) as McpToolDescriptor[]; }, async callTool(name, args) { - return client.callTool({ name, arguments: args }); + return client.callTool({ name, arguments: args }, undefined, { timeout: MCP_REQUEST_TIMEOUT_MS }); }, async close() { await client.close(); diff --git a/packages/connectors/connector-rest/src/rest-connector.test.ts b/packages/connectors/connector-rest/src/rest-connector.test.ts index 050a33275..ccb1fcf3b 100644 --- a/packages/connectors/connector-rest/src/rest-connector.test.ts +++ b/packages/connectors/connector-rest/src/rest-connector.test.ts @@ -48,6 +48,27 @@ describe('createRestConnector — request action', () => { expect(out).toEqual({ status: 200, ok: true, body: { id: 1, name: 'Ada' } }); }); + it('retries a transient 503 then returns the success (P1-1)', async () => { + let n = 0; + const calls: number[] = []; + const impl = (async () => { + calls.push(1); + const status = n++ === 0 ? 503 : 200; + return { + status, + ok: status >= 200 && status < 300, + headers: { get: (h: string) => (h.toLowerCase() === 'content-type' ? 'application/json' : null) }, + json: async () => ({ ok: status === 200 }), + text: async () => '', + }; + }) as unknown as typeof fetch; + const { handlers } = createRestConnector({ baseUrl: 'https://api.example.com', fetchImpl: impl }); + + const out = await handlers.request({ path: '/x' }, {}); + expect(calls.length).toBe(2); // retried the 503 once + expect(out.status).toBe(200); + }); + it('JSON-encodes the body and sets Content-Type for non-GET', async () => { const { impl, calls } = stubFetch(); const { handlers } = createRestConnector({ baseUrl: 'https://api.example.com', fetchImpl: impl }); diff --git a/packages/connectors/connector-rest/src/rest-connector.ts b/packages/connectors/connector-rest/src/rest-connector.ts index a98104434..9044f6735 100644 --- a/packages/connectors/connector-rest/src/rest-connector.ts +++ b/packages/connectors/connector-rest/src/rest-connector.ts @@ -1,6 +1,7 @@ // Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. import type { Connector } from '@objectstack/spec/integration'; +import { resilientFetch } from '@objectstack/spec/shared'; /** * Generic REST connector — the reference *concrete* connector (ADR-0018 @@ -97,7 +98,6 @@ function applyAuth( export function createRestConnector(opts: RestConnectorOptions): RestConnectorBundle { const name = opts.name ?? 'rest'; const auth: RestAuth = opts.auth ?? { type: 'none' }; - const doFetch = opts.fetchImpl ?? fetch; const def: Connector = { name, @@ -154,11 +154,11 @@ export function createRestConnector(opts: RestConnectorOptions): RestConnectorBu headers['Content-Type'] = 'application/json'; } - const response = await doFetch(url, { + const response = await resilientFetch(url, { method, headers, body: hasBody ? JSON.stringify(req.body) : undefined, - }); + }, { fetchImpl: opts.fetchImpl }); // Parse JSON when advertised; fall back to text so non-JSON endpoints // don't throw. diff --git a/packages/connectors/connector-slack/src/slack-connector.ts b/packages/connectors/connector-slack/src/slack-connector.ts index 7298cec46..a682c7f1c 100644 --- a/packages/connectors/connector-slack/src/slack-connector.ts +++ b/packages/connectors/connector-slack/src/slack-connector.ts @@ -1,6 +1,7 @@ // Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. import type { Connector } from '@objectstack/spec/integration'; +import { resilientFetch } from '@objectstack/spec/shared'; /** * Slack connector — a *concrete* connector (ADR-0018 §Addendum) and the second @@ -78,7 +79,6 @@ export interface SlackConnectorBundle { export function createSlackConnector(opts: SlackConnectorOptions): SlackConnectorBundle { const name = opts.name ?? 'slack'; const baseUrl = (opts.baseUrl ?? 'https://slack.com/api').replace(/\/+$/, ''); - const doFetch = opts.fetchImpl ?? fetch; const def: Connector = { name, @@ -151,11 +151,11 @@ export function createSlackConnector(opts: SlackConnectorOptions): SlackConnecto Authorization: `Bearer ${opts.token}`, }; - const response = await doFetch(`${baseUrl}/${method}`, { + const response = await resilientFetch(`${baseUrl}/${method}`, { method: 'POST', headers, body: JSON.stringify(params), - }); + }, { fetchImpl: opts.fetchImpl }); // The Slack Web API always answers with JSON; `ok` is the real outcome. const body = (await response.json()) as Record; diff --git a/packages/plugins/embedder-openai/src/index.ts b/packages/plugins/embedder-openai/src/index.ts index 197501e18..4027b5cd0 100644 --- a/packages/plugins/embedder-openai/src/index.ts +++ b/packages/plugins/embedder-openai/src/index.ts @@ -20,6 +20,7 @@ */ import type { IEmbedder } from '@objectstack/spec/contracts'; +import { resilientFetch } from '@objectstack/spec/shared'; export interface OpenAIEmbedderOptions { /** Bearer token sent as `Authorization: Bearer `. Required. */ @@ -154,7 +155,7 @@ export class OpenAIEmbedder implements IEmbedder { if (texts.length === 0) return []; const body: Record = { model: this.model, input: texts }; if (this.requestedDims) body.dimensions = this.requestedDims; - const res = await this.fetchImpl(`${this.baseUrl}/embeddings`, { + const res = await resilientFetch(`${this.baseUrl}/embeddings`, { method: 'POST', headers: { 'content-type': 'application/json', @@ -162,7 +163,7 @@ export class OpenAIEmbedder implements IEmbedder { ...this.extraHeaders, }, body: JSON.stringify(body), - }); + }, { fetchImpl: this.fetchImpl }); if (!res.ok) { const text = await res.text().catch(() => ''); throw new Error( diff --git a/packages/plugins/embedder-openai/vitest.config.ts b/packages/plugins/embedder-openai/vitest.config.ts index 9f37e2fa4..1ca74cee6 100644 --- a/packages/plugins/embedder-openai/vitest.config.ts +++ b/packages/plugins/embedder-openai/vitest.config.ts @@ -11,6 +11,7 @@ export default defineConfig({ resolve: { alias: { '@objectstack/spec/contracts': path.resolve(__dirname, '../../spec/src/contracts/index.ts'), + '@objectstack/spec/shared': path.resolve(__dirname, '../../spec/src/shared/index.ts'), '@objectstack/spec': path.resolve(__dirname, '../../spec/src/index.ts'), }, }, diff --git a/packages/spec/src/shared/index.ts b/packages/spec/src/shared/index.ts index 8f01f57da..0a517239a 100644 --- a/packages/spec/src/shared/index.ts +++ b/packages/spec/src/shared/index.ts @@ -18,3 +18,4 @@ export * from './metadata-collection.zod'; export * from './lazy-schema'; export * from './expression.zod'; export * from './protection.zod'; +export * from './resilient-fetch'; diff --git a/packages/spec/src/shared/resilient-fetch.test.ts b/packages/spec/src/shared/resilient-fetch.test.ts new file mode 100644 index 000000000..60c0573ac --- /dev/null +++ b/packages/spec/src/shared/resilient-fetch.test.ts @@ -0,0 +1,106 @@ +// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. + +import { describe, it, expect, vi } from 'vitest'; +import { resilientFetch } from './resilient-fetch'; + +/** Minimal Response stand-in — resilientFetch only reads `.status` + `.headers.get`. */ +function resp(status: number, headers: Record = {}): Response { + return { + status, + ok: status < 400, + headers: { get: (k: string) => headers[k.toLowerCase()] ?? null }, + } as unknown as Response; +} + +/** A fetch that returns the scripted statuses in order (last one repeats). */ +function scripted(statuses: Array | [number, Record]>) { + let i = 0; + return vi.fn(async () => { + const s = statuses[Math.min(i++, statuses.length - 1)]; + if (Array.isArray(s)) return resp(s[0], s[1]); + return resp(s as number); + }); +} + +const noSleep = async () => {}; + +describe('resilientFetch', () => { + it('returns a successful response without retrying', async () => { + const fetchImpl = scripted([200]); + const res = await resilientFetch('http://x', {}, { fetchImpl, sleep: noSleep }); + expect(res.status).toBe(200); + expect(fetchImpl).toHaveBeenCalledTimes(1); + }); + + it('retries a 429 then returns the success', async () => { + const fetchImpl = scripted([429, 200]); + const res = await resilientFetch('http://x', {}, { fetchImpl, sleep: noSleep, retries: 3 }); + expect(res.status).toBe(200); + expect(fetchImpl).toHaveBeenCalledTimes(2); + }); + + it('retries a 5xx then returns the success', async () => { + const fetchImpl = scripted([503, 500, 200]); + const res = await resilientFetch('http://x', {}, { fetchImpl, sleep: noSleep, retries: 3 }); + expect(res.status).toBe(200); + expect(fetchImpl).toHaveBeenCalledTimes(3); + }); + + it('gives up after `retries` attempts and returns the last response', async () => { + const fetchImpl = scripted([500, 500, 500]); + const res = await resilientFetch('http://x', {}, { fetchImpl, sleep: noSleep, retries: 3 }); + expect(res.status).toBe(500); + expect(fetchImpl).toHaveBeenCalledTimes(3); + }); + + it('does NOT retry a non-retryable status (4xx other than 429)', async () => { + const fetchImpl = scripted([404, 200]); + const res = await resilientFetch('http://x', {}, { fetchImpl, sleep: noSleep, retries: 3 }); + expect(res.status).toBe(404); + expect(fetchImpl).toHaveBeenCalledTimes(1); + }); + + it('honours a numeric Retry-After header on a 429', async () => { + const fetchImpl = scripted([[429, { 'retry-after': '2' }], 200]); + const sleep = vi.fn(noSleep); + await resilientFetch('http://x', {}, { fetchImpl, sleep, retries: 3 }); + expect(sleep).toHaveBeenCalledWith(2000); + }); + + it('times out a hung request and surfaces the error', async () => { + const fetchImpl = vi.fn( + (_url: any, init: any) => + new Promise((_, reject) => { + init.signal.addEventListener('abort', () => reject(new Error('aborted')), { once: true }); + }), + ); + await expect( + resilientFetch('http://x', {}, { fetchImpl, sleep: noSleep, retries: 1, timeoutMs: 10 }), + ).rejects.toThrow(); + expect(fetchImpl).toHaveBeenCalledTimes(1); + }); + + it('retries a network error before succeeding', async () => { + let n = 0; + const fetchImpl = vi.fn(async () => { + if (n++ === 0) throw new Error('ECONNRESET'); + return resp(200); + }); + const res = await resilientFetch('http://x', {}, { fetchImpl, sleep: noSleep, retries: 3 }); + expect(res.status).toBe(200); + expect(fetchImpl).toHaveBeenCalledTimes(2); + }); + + it('does not retry when the caller aborts', async () => { + const ac = new AbortController(); + ac.abort(); + const fetchImpl = vi.fn(async (_u: any, init: any) => { + if (init.signal.aborted) throw new Error('aborted by caller'); + return resp(200); + }); + await expect( + resilientFetch('http://x', { signal: ac.signal }, { fetchImpl, sleep: noSleep, retries: 3 }), + ).rejects.toThrow(/aborted/); + expect(fetchImpl).toHaveBeenCalledTimes(1); + }); +}); diff --git a/packages/spec/src/shared/resilient-fetch.ts b/packages/spec/src/shared/resilient-fetch.ts new file mode 100644 index 000000000..4c3b32aa4 --- /dev/null +++ b/packages/spec/src/shared/resilient-fetch.ts @@ -0,0 +1,110 @@ +// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. + +/** + * `resilientFetch` — a thin, dependency-free wrapper around `fetch` that gives + * outbound HTTP calls a **per-attempt timeout** and **bounded exponential + * backoff** so a slow or rate-limited external API can't hang the caller (e.g. an + * agent turn) indefinitely with no recovery. + * + * Used by the HTTP-based connectors and embedders (`connector-rest`, + * `connector-slack`, `embedder-openai`). It is intentionally NOT a circuit + * breaker — that is stateful, per-host, and a separate concern; this fixes the + * "naked fetch hangs / never retries a transient blip" gap. + * + * Behaviour: + * - aborts each attempt after `timeoutMs` (default 30s); + * - retries on a network error or a retryable status (429 / 5xx) up to + * `retries` total attempts (default 3), with exponential backoff + jitter; + * - honours a `Retry-After` header (seconds or HTTP-date) on a 429; + * - never retries when the **caller's** own `signal` aborts (that's intentional + * cancellation, not a transient failure). + */ +export interface ResilientFetchOptions { + /** fetch implementation (injectable for tests / non-global runtimes). */ + fetchImpl?: typeof fetch; + /** Per-attempt timeout in ms. Default 30000. */ + timeoutMs?: number; + /** Total attempts including the first. Default 3. */ + retries?: number; + /** Base backoff in ms; doubled each retry, plus jitter. Default 300. */ + backoffBaseMs?: number; + /** Predicate for retryable HTTP statuses. Default: 429 or >= 500. */ + retryableStatus?: (status: number) => boolean; + /** Sleep impl (injectable for deterministic tests). */ + sleep?: (ms: number) => Promise; +} + +const DEFAULT_TIMEOUT_MS = 30_000; +const DEFAULT_RETRIES = 3; +const DEFAULT_BACKOFF_BASE_MS = 300; + +const defaultRetryable = (status: number): boolean => status === 429 || status >= 500; +const defaultSleep = (ms: number): Promise => new Promise((resolve) => setTimeout(resolve, ms)); + +export async function resilientFetch( + input: string | URL, + init: RequestInit = {}, + opts: ResilientFetchOptions = {}, +): Promise { + const fetchImpl = opts.fetchImpl ?? (globalThis.fetch as typeof fetch | undefined); + if (!fetchImpl) { + throw new Error('resilientFetch: no fetch implementation (pass opts.fetchImpl or run on a fetch-capable runtime)'); + } + const timeoutMs = opts.timeoutMs ?? DEFAULT_TIMEOUT_MS; + const maxAttempts = Math.max(1, opts.retries ?? DEFAULT_RETRIES); + const backoffBaseMs = opts.backoffBaseMs ?? DEFAULT_BACKOFF_BASE_MS; + const isRetryable = opts.retryableStatus ?? defaultRetryable; + const sleep = opts.sleep ?? defaultSleep; + const callerSignal = init.signal ?? undefined; + + let lastError: unknown; + for (let attempt = 1; attempt <= maxAttempts; attempt++) { + const controller = new AbortController(); + const onCallerAbort = () => controller.abort((callerSignal as AbortSignal | undefined)?.reason); + if (callerSignal) { + if (callerSignal.aborted) controller.abort((callerSignal as AbortSignal).reason); + else callerSignal.addEventListener('abort', onCallerAbort, { once: true }); + } + let timedOut = false; + const timer = setTimeout(() => { + timedOut = true; + controller.abort(new Error(`resilientFetch: request timed out after ${timeoutMs}ms`)); + }, timeoutMs); + + try { + const res = await fetchImpl(input, { ...init, signal: controller.signal }); + if (isRetryable(res.status) && attempt < maxAttempts) { + await sleep(retryDelayMs(res, attempt, backoffBaseMs)); + continue; + } + return res; + } catch (err) { + lastError = err; + // The caller cancelled (not our timeout) → propagate, never retry. + if (callerSignal?.aborted && !timedOut) throw err; + if (attempt >= maxAttempts) break; + await sleep(backoffMs(attempt, backoffBaseMs)); + } finally { + clearTimeout(timer); + callerSignal?.removeEventListener?.('abort', onCallerAbort); + } + } + throw lastError instanceof Error ? lastError : new Error(String(lastError)); +} + +/** Exponential backoff with small jitter to avoid synchronized retries. */ +function backoffMs(attempt: number, base: number): number { + return base * 2 ** (attempt - 1) + Math.floor(Math.random() * 100); +} + +/** Retry delay for a response: honour `Retry-After` on 429, else backoff. */ +function retryDelayMs(res: Response, attempt: number, base: number): number { + const retryAfter = res.headers.get('retry-after'); + if (retryAfter) { + const secs = Number(retryAfter); + if (Number.isFinite(secs)) return Math.max(0, secs * 1000); + const dateMs = Date.parse(retryAfter); + if (!Number.isNaN(dateMs)) return Math.max(0, dateMs - Date.now()); + } + return backoffMs(attempt, base); +}