Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions .changeset/external-call-resilience.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
---
'@objectstack/spec': minor
'@objectstack/connector-rest': patch
'@objectstack/connector-slack': patch
'@objectstack/embedder-openai': patch
'@objectstack/connector-mcp': patch
---

feat(spec): resilientFetch — timeout + backoff for outbound HTTP (P1-1)

Outbound calls in the connectors/embedder were naked `fetch` with no timeout or
retry, so a slow or rate-limited external API could hang an agent turn with no
recovery.

New shared `resilientFetch` (`@objectstack/spec/shared`):
- per-attempt timeout via `AbortController` (default 30s);
- exponential backoff with jitter, up to 3 attempts, on network errors / 429 / 5xx;
- honours a `Retry-After` header on 429;
- never retries a caller-initiated abort (intentional cancellation).

Wired into `connector-rest`, `connector-slack`, and `embedder-openai`.
`connector-mcp` talks through the MCP SDK transport, so it gets a 30s per-request
`timeout` on `callTool` / `listTools` instead.

A stateful per-host **circuit breaker** is deliberately left as a follow-up:
timeout + backoff already removes the hang/no-recovery risk.
13 changes: 12 additions & 1 deletion docs/launch-readiness.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,18 @@ fix or acceptance.**
rate-limited external API hangs the entire agent turn with no recovery.
- **Action:** Add a default request timeout (e.g. 30s, configurable) + exponential
backoff (3 tries) + a circuit breaker; tests for 429 / timeout paths.
- **Owner:** _______ · Verify ☐ · Sign-off ☐ · Notes: _______
- **Fix:** New shared `resilientFetch` (`@objectstack/spec/shared`) — 30s per-attempt
timeout (AbortController) + exponential backoff with jitter (3 tries) on
network errors / 429 / 5xx, honouring `Retry-After`; never retries a
caller-initiated abort. Wired into `connector-rest`, `connector-slack`,
`embedder-openai`. `connector-mcp` uses the MCP SDK transport, so it gets a 30s
per-request `timeout` on `callTool` / `listTools` instead. +13 tests (helper 9,
connector retry 1, plus existing suites green).
- **Deferred (follow-up, not blocking):** a **circuit breaker** — it's stateful
and per-host; timeout + backoff already removes the "hangs the agent turn / no
recovery" risk. Making timeout/retry **per-call configurable** (currently
sensible defaults) is a small follow-up.
- **Owner:** _______ · Verify ✅ (confirmed real @ `main`) · Sign-off ☐ · Notes: Timeout + backoff shipped across all 4 paths; circuit breaker deferred (rationale above). Awaiting human sign-off.

### P1-2 — Unbounded growth: execution logs, job runs, event log
- **Area:** `service-automation` (in-memory exec logs, hard 1000 cap),
Expand Down
11 changes: 9 additions & 2 deletions packages/connectors/connector-mcp/src/mcp-connector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,13 @@ function normalizeResult(raw: unknown): Record<string, unknown> {
return out;
}

/**
* Default per-request timeout (ms) for MCP calls (P1-1). Without it, a hung or
* unresponsive MCP server stalls the agent turn indefinitely. The SDK aborts the
* request once this elapses.
*/
const MCP_REQUEST_TIMEOUT_MS = 30_000;

/**
* The default {@link McpClientLike} — lazily imports the official MCP SDK so it
* is only loaded when a real connection is made (tests inject their own client).
Expand Down Expand Up @@ -182,11 +189,11 @@ async function defaultClientFactory(

return {
async listTools() {
const res = await client.listTools();
const res = await client.listTools(undefined, { timeout: MCP_REQUEST_TIMEOUT_MS });
return (res.tools ?? []) as McpToolDescriptor[];
},
async callTool(name, args) {
return client.callTool({ name, arguments: args });
return client.callTool({ name, arguments: args }, undefined, { timeout: MCP_REQUEST_TIMEOUT_MS });
},
async close() {
await client.close();
Expand Down
21 changes: 21 additions & 0 deletions packages/connectors/connector-rest/src/rest-connector.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,27 @@ describe('createRestConnector — request action', () => {
expect(out).toEqual({ status: 200, ok: true, body: { id: 1, name: 'Ada' } });
});

it('retries a transient 503 then returns the success (P1-1)', async () => {
let n = 0;
const calls: number[] = [];
const impl = (async () => {
calls.push(1);
const status = n++ === 0 ? 503 : 200;
return {
status,
ok: status >= 200 && status < 300,
headers: { get: (h: string) => (h.toLowerCase() === 'content-type' ? 'application/json' : null) },
json: async () => ({ ok: status === 200 }),
text: async () => '',
};
}) as unknown as typeof fetch;
const { handlers } = createRestConnector({ baseUrl: 'https://api.example.com', fetchImpl: impl });

const out = await handlers.request({ path: '/x' }, {});
expect(calls.length).toBe(2); // retried the 503 once
expect(out.status).toBe(200);
});

it('JSON-encodes the body and sets Content-Type for non-GET', async () => {
const { impl, calls } = stubFetch();
const { handlers } = createRestConnector({ baseUrl: 'https://api.example.com', fetchImpl: impl });
Expand Down
6 changes: 3 additions & 3 deletions packages/connectors/connector-rest/src/rest-connector.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license.

import type { Connector } from '@objectstack/spec/integration';
import { resilientFetch } from '@objectstack/spec/shared';

/**
* Generic REST connector — the reference *concrete* connector (ADR-0018
Expand Down Expand Up @@ -97,7 +98,6 @@ function applyAuth(
export function createRestConnector(opts: RestConnectorOptions): RestConnectorBundle {
const name = opts.name ?? 'rest';
const auth: RestAuth = opts.auth ?? { type: 'none' };
const doFetch = opts.fetchImpl ?? fetch;

const def: Connector = {
name,
Expand Down Expand Up @@ -154,11 +154,11 @@ export function createRestConnector(opts: RestConnectorOptions): RestConnectorBu
headers['Content-Type'] = 'application/json';
}

const response = await doFetch(url, {
const response = await resilientFetch(url, {
method,
headers,
body: hasBody ? JSON.stringify(req.body) : undefined,
});
}, { fetchImpl: opts.fetchImpl });

// Parse JSON when advertised; fall back to text so non-JSON endpoints
// don't throw.
Expand Down
6 changes: 3 additions & 3 deletions packages/connectors/connector-slack/src/slack-connector.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license.

import type { Connector } from '@objectstack/spec/integration';
import { resilientFetch } from '@objectstack/spec/shared';

/**
* Slack connector — a *concrete* connector (ADR-0018 §Addendum) and the second
Expand Down Expand Up @@ -78,7 +79,6 @@ export interface SlackConnectorBundle {
export function createSlackConnector(opts: SlackConnectorOptions): SlackConnectorBundle {
const name = opts.name ?? 'slack';
const baseUrl = (opts.baseUrl ?? 'https://slack.com/api').replace(/\/+$/, '');
const doFetch = opts.fetchImpl ?? fetch;

const def: Connector = {
name,
Expand Down Expand Up @@ -151,11 +151,11 @@ export function createSlackConnector(opts: SlackConnectorOptions): SlackConnecto
Authorization: `Bearer ${opts.token}`,
};

const response = await doFetch(`${baseUrl}/${method}`, {
const response = await resilientFetch(`${baseUrl}/${method}`, {
method: 'POST',
headers,
body: JSON.stringify(params),
});
}, { fetchImpl: opts.fetchImpl });

// The Slack Web API always answers with JSON; `ok` is the real outcome.
const body = (await response.json()) as Record<string, unknown>;
Expand Down
5 changes: 3 additions & 2 deletions packages/plugins/embedder-openai/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
*/

import type { IEmbedder } from '@objectstack/spec/contracts';
import { resilientFetch } from '@objectstack/spec/shared';

export interface OpenAIEmbedderOptions {
/** Bearer token sent as `Authorization: Bearer <apiKey>`. Required. */
Expand Down Expand Up @@ -154,15 +155,15 @@ export class OpenAIEmbedder implements IEmbedder {
if (texts.length === 0) return [];
const body: Record<string, unknown> = { model: this.model, input: texts };
if (this.requestedDims) body.dimensions = this.requestedDims;
const res = await this.fetchImpl(`${this.baseUrl}/embeddings`, {
const res = await resilientFetch(`${this.baseUrl}/embeddings`, {
method: 'POST',
headers: {
'content-type': 'application/json',
authorization: `Bearer ${this.apiKey}`,
...this.extraHeaders,
},
body: JSON.stringify(body),
});
}, { fetchImpl: this.fetchImpl });
if (!res.ok) {
const text = await res.text().catch(() => '');
throw new Error(
Expand Down
1 change: 1 addition & 0 deletions packages/plugins/embedder-openai/vitest.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export default defineConfig({
resolve: {
alias: {
'@objectstack/spec/contracts': path.resolve(__dirname, '../../spec/src/contracts/index.ts'),
'@objectstack/spec/shared': path.resolve(__dirname, '../../spec/src/shared/index.ts'),
'@objectstack/spec': path.resolve(__dirname, '../../spec/src/index.ts'),
},
},
Expand Down
1 change: 1 addition & 0 deletions packages/spec/src/shared/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ export * from './metadata-collection.zod';
export * from './lazy-schema';
export * from './expression.zod';
export * from './protection.zod';
export * from './resilient-fetch';
106 changes: 106 additions & 0 deletions packages/spec/src/shared/resilient-fetch.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license.

import { describe, it, expect, vi } from 'vitest';
import { resilientFetch } from './resilient-fetch';

/** Minimal Response stand-in — resilientFetch only reads `.status` + `.headers.get`. */
function resp(status: number, headers: Record<string, string> = {}): Response {
return {
status,
ok: status < 400,
headers: { get: (k: string) => headers[k.toLowerCase()] ?? null },
} as unknown as Response;
}

/** A fetch that returns the scripted statuses in order (last one repeats). */
function scripted(statuses: Array<number | Record<string, string> | [number, Record<string, string>]>) {
let i = 0;
return vi.fn(async () => {
const s = statuses[Math.min(i++, statuses.length - 1)];
if (Array.isArray(s)) return resp(s[0], s[1]);
return resp(s as number);
});
}

const noSleep = async () => {};

describe('resilientFetch', () => {
it('returns a successful response without retrying', async () => {
const fetchImpl = scripted([200]);
const res = await resilientFetch('http://x', {}, { fetchImpl, sleep: noSleep });
expect(res.status).toBe(200);
expect(fetchImpl).toHaveBeenCalledTimes(1);
});

it('retries a 429 then returns the success', async () => {
const fetchImpl = scripted([429, 200]);
const res = await resilientFetch('http://x', {}, { fetchImpl, sleep: noSleep, retries: 3 });
expect(res.status).toBe(200);
expect(fetchImpl).toHaveBeenCalledTimes(2);
});

it('retries a 5xx then returns the success', async () => {
const fetchImpl = scripted([503, 500, 200]);
const res = await resilientFetch('http://x', {}, { fetchImpl, sleep: noSleep, retries: 3 });
expect(res.status).toBe(200);
expect(fetchImpl).toHaveBeenCalledTimes(3);
});

it('gives up after `retries` attempts and returns the last response', async () => {
const fetchImpl = scripted([500, 500, 500]);
const res = await resilientFetch('http://x', {}, { fetchImpl, sleep: noSleep, retries: 3 });
expect(res.status).toBe(500);
expect(fetchImpl).toHaveBeenCalledTimes(3);
});

it('does NOT retry a non-retryable status (4xx other than 429)', async () => {
const fetchImpl = scripted([404, 200]);
const res = await resilientFetch('http://x', {}, { fetchImpl, sleep: noSleep, retries: 3 });
expect(res.status).toBe(404);
expect(fetchImpl).toHaveBeenCalledTimes(1);
});

it('honours a numeric Retry-After header on a 429', async () => {
const fetchImpl = scripted([[429, { 'retry-after': '2' }], 200]);
const sleep = vi.fn(noSleep);
await resilientFetch('http://x', {}, { fetchImpl, sleep, retries: 3 });
expect(sleep).toHaveBeenCalledWith(2000);
});

it('times out a hung request and surfaces the error', async () => {
const fetchImpl = vi.fn(
(_url: any, init: any) =>
new Promise<Response>((_, reject) => {
init.signal.addEventListener('abort', () => reject(new Error('aborted')), { once: true });
}),
);
await expect(
resilientFetch('http://x', {}, { fetchImpl, sleep: noSleep, retries: 1, timeoutMs: 10 }),
).rejects.toThrow();
expect(fetchImpl).toHaveBeenCalledTimes(1);
});

it('retries a network error before succeeding', async () => {
let n = 0;
const fetchImpl = vi.fn(async () => {
if (n++ === 0) throw new Error('ECONNRESET');
return resp(200);
});
const res = await resilientFetch('http://x', {}, { fetchImpl, sleep: noSleep, retries: 3 });
expect(res.status).toBe(200);
expect(fetchImpl).toHaveBeenCalledTimes(2);
});

it('does not retry when the caller aborts', async () => {
const ac = new AbortController();
ac.abort();
const fetchImpl = vi.fn(async (_u: any, init: any) => {
if (init.signal.aborted) throw new Error('aborted by caller');
return resp(200);
});
await expect(
resilientFetch('http://x', { signal: ac.signal }, { fetchImpl, sleep: noSleep, retries: 3 }),
).rejects.toThrow(/aborted/);
expect(fetchImpl).toHaveBeenCalledTimes(1);
});
});
Loading