From 4d842a8b634e4c25757c5766fd87da6f7a0b488f Mon Sep 17 00:00:00 2001 From: Jack Zhuang <277994282+os-zhuang@users.noreply.github.com> Date: Sun, 7 Jun 2026 05:24:00 +0800 Subject: [PATCH] =?UTF-8?q?feat(mcp):=20Streamable=20HTTP=20transport=20?= =?UTF-8?q?=E2=80=94=20apps=20as=20network-reachable=20MCP=20servers=20(AD?= =?UTF-8?q?R-0036=20Phase=202)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The MCP server plugin spoke stdio only, so remote agents (Claude Desktop / Cursor) could not reach a hosted env. This adds the Streamable HTTP transport and wires it into the runtime request path on top of the Phase 1a sys_api_key auth foundation. plugin-mcp-server: - MCPServerRuntime.handleHttpRequest(request, { bridge, parsedBody }) serves one MCP request over the Web-standard WebStandardStreamableHTTPServerTransport (Node 18+/Workers/Deno/Bun). Stateless: a fresh isolated McpServer + transport per request (SDK-recommended), JSON-response mode → fully buffered, no streaming pass-through concerns over the Worker→container hop. - New registerObjectTools + McpDataBridge (mcp-http-tools.ts): object-CRUD tool set (list_objects, describe_object, query_records, get_record, create_record, update_record, delete_record). Execution delegated to an injected, principal-bound bridge — the tool layer never touches the data engine. System (sys_*) objects are not exposed by default (fail-closed guard per tool). The internal AI/authoring toolRegistry is deliberately NOT bridged externally. runtime: - HttpDispatcher serves /mcp: opt-in via OS_MCP_SERVER_ENABLED=true (404 when off), fail-closed auth (anonymous → 401). Builds an McpDataBridge that runs every op through the existing callData path bound to the request's ExecutionContext — external agents run under the key's permissions + RLS, never a parallel/escalated path. Discovery advertises mcp only when enabled. Tests: plugin 36 (11 new: handshake/tools/list/call, sys_ guard, error mapping, 406), runtime 365 (5 new: gate/auth/principal-binding). tsc clean. Security: every external MCP entry runs as the scoped sys_api_key principal under existing permissions + RLS; opt-in per env; no raw keys/secrets on the wire. Co-Authored-By: Claude Opus 4.8 --- .changeset/mcp-streamable-http-transport.md | 42 +++ .../src/__tests__/mcp-server-runtime.test.ts | 10 +- .../plugins/plugin-mcp-server/src/index.ts | 6 + .../plugin-mcp-server/src/mcp-http-tools.ts | 280 ++++++++++++++++++ .../src/mcp-server-runtime.http.test.ts | 225 ++++++++++++++ .../src/mcp-server-runtime.ts | 77 ++++- .../runtime/src/http-dispatcher.mcp.test.ts | 125 ++++++++ packages/runtime/src/http-dispatcher.ts | 139 ++++++++- 8 files changed, 899 insertions(+), 5 deletions(-) create mode 100644 .changeset/mcp-streamable-http-transport.md create mode 100644 packages/plugins/plugin-mcp-server/src/mcp-http-tools.ts create mode 100644 packages/plugins/plugin-mcp-server/src/mcp-server-runtime.http.test.ts create mode 100644 packages/runtime/src/http-dispatcher.mcp.test.ts diff --git a/.changeset/mcp-streamable-http-transport.md b/.changeset/mcp-streamable-http-transport.md new file mode 100644 index 000000000..121cfae43 --- /dev/null +++ b/.changeset/mcp-streamable-http-transport.md @@ -0,0 +1,42 @@ +--- +'@objectstack/plugin-mcp-server': minor +'@objectstack/runtime': minor +--- + +feat(mcp): Streamable HTTP transport — every app is a network-reachable MCP server (ADR-0036 Phase 2) + +The MCP server plugin spoke **stdio only**, so a remote agent (Claude Desktop / +Cursor) could not connect to a hosted env. This adds the **Streamable HTTP** +transport and wires it into the runtime's request path, building on the Phase 1a +`sys_api_key` auth foundation. + +- **`@objectstack/plugin-mcp-server`** + - `MCPServerRuntime.handleHttpRequest(request, { bridge, parsedBody })` — + serves one MCP request over the Web-standard `WebStandardStreamableHTTPServerTransport` + (runs on Node 18+, Workers, Deno, Bun). **Stateless**: a fresh, isolated + `McpServer` + transport is built per request (the SDK-recommended pattern), + in JSON-response mode so the response is fully buffered — no streaming + pass-through concerns over the Worker→container hop. + - New `registerObjectTools` + `McpDataBridge` (`mcp-http-tools.ts`): the + object-CRUD tool set (`list_objects`, `describe_object`, `query_records`, + `get_record`, `create_record`, `update_record`, `delete_record`). All + execution is delegated to an injected, **principal-bound** bridge — the tool + layer never touches the data engine directly. System (`sys_*`) objects are + **not exposed** by default (fail-closed guard on every object-scoped tool). + The internal AI/authoring toolRegistry is deliberately NOT bridged onto the + external surface. + +- **`@objectstack/runtime`** + - `HttpDispatcher` serves `/mcp`: **opt-in** via `OS_MCP_SERVER_ENABLED=true` + (404 when off, so the surface isn't advertised); **fail-closed auth** + (anonymous → 401 — requires the principal resolved by Phase 1a's API-key + path or a session). It builds an `McpDataBridge` that runs every operation + through the existing `callData` path bound to the request's + `ExecutionContext`, so external agents run under the key's permissions + RLS, + never a parallel or escalated path. The discovery endpoint advertises `mcp` + only when enabled. + +Security: every external MCP entry runs as the scoped `sys_api_key` principal +under existing object permissions + RLS; MCP is opt-in per env; no raw keys or +secrets cross the wire. Fully unit-tested (transport handshake/tools, gate, +auth, principal binding). diff --git a/packages/plugins/plugin-mcp-server/src/__tests__/mcp-server-runtime.test.ts b/packages/plugins/plugin-mcp-server/src/__tests__/mcp-server-runtime.test.ts index 55727d686..f3d6e2f41 100644 --- a/packages/plugins/plugin-mcp-server/src/__tests__/mcp-server-runtime.test.ts +++ b/packages/plugins/plugin-mcp-server/src/__tests__/mcp-server-runtime.test.ts @@ -257,7 +257,7 @@ describe('MCPServerRuntime', () => { expect(runtime.isStarted).toBe(false); }); - it('should warn when HTTP transport is requested', async () => { + it('start() is a no-op for HTTP transport (served per-request via dispatcher)', async () => { const httpRuntime = new MCPServerRuntime({ transport: 'http', logger: mockLogger as any, @@ -265,10 +265,16 @@ describe('MCPServerRuntime', () => { await httpRuntime.start(); + // HTTP is served per-request through handleHttpRequest(), not a + // long-lived connect() — so start() does not mark the server started + // and must not warn that HTTP is unsupported. expect(httpRuntime.isStarted).toBe(false); - expect(mockLogger.warn).toHaveBeenCalledWith( + expect(mockLogger.warn).not.toHaveBeenCalledWith( '[MCP] HTTP transport is not yet supported. Use stdio transport.', ); + expect(mockLogger.info).toHaveBeenCalledWith( + '[MCP] HTTP transport ready (served per-request at /api/v1/mcp).', + ); }); it('should be idempotent on stop when not started', async () => { diff --git a/packages/plugins/plugin-mcp-server/src/index.ts b/packages/plugins/plugin-mcp-server/src/index.ts index 726df1b6c..a669774e2 100644 --- a/packages/plugins/plugin-mcp-server/src/index.ts +++ b/packages/plugins/plugin-mcp-server/src/index.ts @@ -13,3 +13,9 @@ export { MCPServerPlugin } from './plugin.js'; export type { MCPServerPluginOptions } from './plugin.js'; export { MCPServerRuntime } from './mcp-server-runtime.js'; export type { MCPServerRuntimeConfig } from './mcp-server-runtime.js'; +export { registerObjectTools } from './mcp-http-tools.js'; +export type { + McpDataBridge, + McpObjectSummary, + RegisterObjectToolsOptions, +} from './mcp-http-tools.js'; diff --git a/packages/plugins/plugin-mcp-server/src/mcp-http-tools.ts b/packages/plugins/plugin-mcp-server/src/mcp-http-tools.ts new file mode 100644 index 000000000..b300906e6 --- /dev/null +++ b/packages/plugins/plugin-mcp-server/src/mcp-http-tools.ts @@ -0,0 +1,280 @@ +// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. + +/** + * mcp-http-tools — object CRUD exposed as MCP tools for the HTTP transport. + * + * These are the tools an external agent (Claude Desktop / Cursor) drives over + * the network. Unlike the stdio bridge — which is a trusted local process — + * the HTTP surface is reached by arbitrary callers, so every operation MUST + * run under the caller's resolved principal. We never touch the data engine + * directly here: all reads/writes go through an injected {@link McpDataBridge} + * that the runtime wires to the SAME permission/RLS-enforcing path the REST + * API uses (`callData` with the request's ExecutionContext). This module owns + * the tool *shape*; the bridge owns *execution + security*. + * + * SECURITY (zero-tolerance): + * - System objects (`sys_*`) are NOT exposed by default — fail-closed guard on + * every tool that takes an object name, independent of the bridge. + * - The bridge is bound to the caller's principal; tools cannot widen it. + * - Errors are returned as tool errors (text), never thrown across the wire, + * and never include secrets. + */ + +import { z } from 'zod'; +import type { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js'; + +export interface McpObjectSummary { + name: string; + label?: string; + fieldCount?: number; +} + +/** + * Data access seam for the HTTP MCP tools. Implemented by the runtime/dispatcher + * so execution flows through the existing permission + RLS path bound to the + * caller's ExecutionContext. Every method runs AS the authenticated principal. + */ +export interface McpDataBridge { + listObjects(): Promise; + describeObject(name: string): Promise; + query( + object: string, + opts: { + where?: Record; + fields?: string[]; + limit?: number; + offset?: number; + orderBy?: Array<{ field: string; order: 'asc' | 'desc' }>; + }, + ): Promise; + get(object: string, id: string): Promise; + create(object: string, data: Record): Promise; + update(object: string, id: string, data: Record): Promise; + remove(object: string, id: string): Promise; +} + +export interface RegisterObjectToolsOptions { + /** Expose `sys_*` system objects too. Default false (fail-closed). */ + allowSystemObjects?: boolean; + /** Hard cap on `query_records` page size. Default 50. */ + maxQueryLimit?: number; +} + +const DEFAULT_MAX_LIMIT = 50; + +/** A `sys_`-prefixed object is a system table — off-limits to external agents. */ +function isSystemObject(name: string): boolean { + return /^sys_/i.test(name); +} + +function textResult(value: unknown) { + return { content: [{ type: 'text' as const, text: jsonText(value) }] }; +} + +function errorResult(message: string) { + return { content: [{ type: 'text' as const, text: message }], isError: true as const }; +} + +function jsonText(value: unknown): string { + try { + return JSON.stringify(value, null, 2); + } catch { + return String(value); + } +} + +/** + * Register the object-CRUD tool set on a fresh per-request {@link McpServer}. + * All execution is delegated to `bridge`, which is bound to the caller's + * principal by the runtime. + */ +export function registerObjectTools( + server: McpServer, + bridge: McpDataBridge, + options: RegisterObjectToolsOptions = {}, +): void { + const allowSystem = options.allowSystemObjects === true; + const maxLimit = options.maxQueryLimit ?? DEFAULT_MAX_LIMIT; + + /** Fail-closed object-name guard shared by every object-scoped tool. */ + const guard = (objectName: string): string | undefined => { + if (!objectName || typeof objectName !== 'string') return 'objectName is required'; + if (!allowSystem && isSystemObject(objectName)) { + return `Object "${objectName}" is a system object and is not exposed via MCP`; + } + return undefined; + }; + + server.registerTool( + 'list_objects', + { + description: + 'List the data objects (tables) available in this app. Returns each object\'s name, label and field count.', + inputSchema: {}, + annotations: { readOnlyHint: true, destructiveHint: false, openWorldHint: false }, + }, + async () => { + try { + const objects = await bridge.listObjects(); + const visible = allowSystem ? objects : objects.filter((o) => !isSystemObject(o.name)); + return textResult({ objects: visible, totalCount: visible.length }); + } catch (err) { + return errorResult(messageOf(err)); + } + }, + ); + + server.registerTool( + 'describe_object', + { + description: + 'Get the schema of a data object: its fields (name, type, label, required) and enabled features.', + inputSchema: { objectName: z.string().describe('The object/table name, e.g. "task"') }, + annotations: { readOnlyHint: true, destructiveHint: false, openWorldHint: false }, + }, + async ({ objectName }) => { + const bad = guard(objectName); + if (bad) return errorResult(bad); + try { + const def = await bridge.describeObject(objectName); + if (!def) return errorResult(`Object "${objectName}" not found`); + return textResult(def); + } catch (err) { + return errorResult(messageOf(err)); + } + }, + ); + + server.registerTool( + 'query_records', + { + description: + 'Query records from an object with optional filter, field selection, sorting and pagination. ' + + 'Runs under the caller\'s permissions and row-level security.', + inputSchema: { + objectName: z.string().describe('The object/table name'), + where: z + .record(z.string(), z.unknown()) + .optional() + .describe('Filter conditions, e.g. {"status":"open"}'), + fields: z.array(z.string()).optional().describe('Field names to return (defaults to all)'), + limit: z.number().int().positive().max(maxLimit).optional().describe(`Max rows (≤ ${maxLimit})`), + offset: z.number().int().nonnegative().optional().describe('Rows to skip'), + orderBy: z + .array(z.object({ field: z.string(), order: z.enum(['asc', 'desc']) })) + .optional() + .describe('Sort order'), + }, + annotations: { readOnlyHint: true, destructiveHint: false, openWorldHint: false }, + }, + async ({ objectName, where, fields, limit, offset, orderBy }) => { + const bad = guard(objectName); + if (bad) return errorResult(bad); + try { + const result = await bridge.query(objectName, { + where, + fields, + limit: Math.min(limit ?? maxLimit, maxLimit), + offset, + orderBy, + }); + return textResult(result); + } catch (err) { + return errorResult(messageOf(err)); + } + }, + ); + + server.registerTool( + 'get_record', + { + description: 'Fetch a single record by id.', + inputSchema: { + objectName: z.string().describe('The object/table name'), + recordId: z.string().describe('The record id'), + }, + annotations: { readOnlyHint: true, destructiveHint: false, openWorldHint: false }, + }, + async ({ objectName, recordId }) => { + const bad = guard(objectName); + if (bad) return errorResult(bad); + try { + const record = await bridge.get(objectName, recordId); + if (record == null) return errorResult(`Record "${recordId}" not found in "${objectName}"`); + return textResult(record); + } catch (err) { + return errorResult(messageOf(err)); + } + }, + ); + + server.registerTool( + 'create_record', + { + description: 'Create a new record. Runs under the caller\'s permissions and validations.', + inputSchema: { + objectName: z.string().describe('The object/table name'), + data: z.record(z.string(), z.unknown()).describe('Field values for the new record'), + }, + annotations: { readOnlyHint: false, destructiveHint: false, openWorldHint: false }, + }, + async ({ objectName, data }) => { + const bad = guard(objectName); + if (bad) return errorResult(bad); + try { + return textResult(await bridge.create(objectName, data)); + } catch (err) { + return errorResult(messageOf(err)); + } + }, + ); + + server.registerTool( + 'update_record', + { + description: 'Update fields on an existing record by id.', + inputSchema: { + objectName: z.string().describe('The object/table name'), + recordId: z.string().describe('The record id'), + data: z.record(z.string(), z.unknown()).describe('Field values to change'), + }, + annotations: { readOnlyHint: false, destructiveHint: false, openWorldHint: false }, + }, + async ({ objectName, recordId, data }) => { + const bad = guard(objectName); + if (bad) return errorResult(bad); + try { + return textResult(await bridge.update(objectName, recordId, data)); + } catch (err) { + return errorResult(messageOf(err)); + } + }, + ); + + server.registerTool( + 'delete_record', + { + description: 'Delete a record by id. This is destructive.', + inputSchema: { + objectName: z.string().describe('The object/table name'), + recordId: z.string().describe('The record id'), + }, + annotations: { readOnlyHint: false, destructiveHint: true, openWorldHint: false }, + }, + async ({ objectName, recordId }) => { + const bad = guard(objectName); + if (bad) return errorResult(bad); + try { + return textResult(await bridge.remove(objectName, recordId)); + } catch (err) { + return errorResult(messageOf(err)); + } + }, + ); +} + +function messageOf(err: unknown): string { + if (err instanceof Error) return err.message; + if (err && typeof err === 'object' && 'message' in err) return String((err as any).message); + return String(err); +} diff --git a/packages/plugins/plugin-mcp-server/src/mcp-server-runtime.http.test.ts b/packages/plugins/plugin-mcp-server/src/mcp-server-runtime.http.test.ts new file mode 100644 index 000000000..4d3a98eec --- /dev/null +++ b/packages/plugins/plugin-mcp-server/src/mcp-server-runtime.http.test.ts @@ -0,0 +1,225 @@ +// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. + +import { describe, it, expect, beforeEach } from 'vitest'; + +import { MCPServerRuntime } from './mcp-server-runtime.js'; +import type { McpDataBridge } from './mcp-http-tools.js'; + +/** Records every bridge call so tests can assert principal-bound delegation. */ +function makeBridge(): McpDataBridge & { calls: any[] } { + const calls: any[] = []; + return { + calls, + async listObjects() { + calls.push(['listObjects']); + return [ + { name: 'task', label: 'Task', fieldCount: 4 }, + { name: 'sys_user', label: 'User', fieldCount: 9 }, + ]; + }, + async describeObject(name: string) { + calls.push(['describeObject', name]); + if (name === 'task') return { name: 'task', fields: [{ name: 'title', type: 'text' }] }; + return null; + }, + async query(object: string, opts: any) { + calls.push(['query', object, opts]); + return { object, records: [{ id: '1', title: 'a' }], total: 1 }; + }, + async get(object: string, id: string) { + calls.push(['get', object, id]); + return { id, title: 'a' }; + }, + async create(object: string, data: any) { + calls.push(['create', object, data]); + return { object, id: 'new1', record: data }; + }, + async update(object: string, id: string, data: any) { + calls.push(['update', object, id, data]); + return { object, id, record: data }; + }, + async remove(object: string, id: string) { + calls.push(['remove', object, id]); + return { object, id, deleted: true }; + }, + }; +} + +function mcpRequest(body: unknown): Request { + return new Request('http://localhost/api/v1/mcp', { + method: 'POST', + headers: { + 'content-type': 'application/json', + accept: 'application/json, text/event-stream', + }, + body: JSON.stringify(body), + }); +} + +async function call(runtime: MCPServerRuntime, body: unknown, bridge?: McpDataBridge) { + const res = await runtime.handleHttpRequest(mcpRequest(body), { bridge, parsedBody: body }); + const json = res.status === 202 ? null : await res.json(); + return { status: res.status, json }; +} + +const INIT = { + jsonrpc: '2.0', + id: 0, + method: 'initialize', + params: { protocolVersion: '2025-03-26', capabilities: {}, clientInfo: { name: 't', version: '1' } }, +}; + +describe('MCPServerRuntime.handleHttpRequest (Streamable HTTP)', () => { + let runtime: MCPServerRuntime; + let bridge: ReturnType; + + beforeEach(() => { + runtime = new MCPServerRuntime({ name: 'objectstack-test', version: '9.9.9' }); + bridge = makeBridge(); + }); + + it('handles initialize and reports server info', async () => { + const { status, json } = await call(runtime, INIT, bridge); + expect(status).toBe(200); + expect(json.result.serverInfo.name).toBe('objectstack-test'); + expect(json.result.capabilities.tools).toBeDefined(); + }); + + it('lists the object-CRUD tools', async () => { + const { json } = await call(runtime, { jsonrpc: '2.0', id: 1, method: 'tools/list' }, bridge); + const names = json.result.tools.map((t: any) => t.name).sort(); + expect(names).toEqual( + [ + 'create_record', + 'delete_record', + 'describe_object', + 'get_record', + 'list_objects', + 'query_records', + 'update_record', + ].sort(), + ); + }); + + it('marks delete_record destructive and query_records read-only', async () => { + const { json } = await call(runtime, { jsonrpc: '2.0', id: 1, method: 'tools/list' }, bridge); + const byName = Object.fromEntries(json.result.tools.map((t: any) => [t.name, t])); + expect(byName.delete_record.annotations.destructiveHint).toBe(true); + expect(byName.query_records.annotations.readOnlyHint).toBe(true); + }); + + it('exposes no tools (no tools capability) when no bridge is provided', async () => { + // Without a bridge, no tools are registered, so the SDK never wires the + // tools/list handler — the request resolves to a JSON-RPC error, not a + // tool list. The dispatcher always supplies a bridge in practice. + const { json } = await call(runtime, { jsonrpc: '2.0', id: 1, method: 'tools/list' }); + expect(json.result).toBeUndefined(); + expect(json.error).toBeDefined(); + }); + + it('list_objects filters out system objects by default', async () => { + const { json } = await call( + runtime, + { jsonrpc: '2.0', id: 2, method: 'tools/call', params: { name: 'list_objects', arguments: {} } }, + bridge, + ); + const payload = JSON.parse(json.result.content[0].text); + expect(payload.objects.map((o: any) => o.name)).toEqual(['task']); + expect(bridge.calls).toContainEqual(['listObjects']); + }); + + it('query_records delegates to the bridge with filter + capped limit', async () => { + const { json } = await call( + runtime, + { + jsonrpc: '2.0', + id: 3, + method: 'tools/call', + params: { name: 'query_records', arguments: { objectName: 'task', where: { status: 'open' }, limit: 5 } }, + }, + bridge, + ); + expect(json.result.isError).toBeFalsy(); + const queryCall = bridge.calls.find((c) => c[0] === 'query'); + expect(queryCall[1]).toBe('task'); + expect(queryCall[2].where).toEqual({ status: 'open' }); + expect(queryCall[2].limit).toBe(5); + }); + + it('rejects system objects on object-scoped tools (fail-closed)', async () => { + const { json } = await call( + runtime, + { + jsonrpc: '2.0', + id: 4, + method: 'tools/call', + params: { name: 'describe_object', arguments: { objectName: 'sys_user' } }, + }, + bridge, + ); + expect(json.result.isError).toBe(true); + expect(json.result.content[0].text).toMatch(/system object/i); + // The bridge must never be consulted for a blocked system object. + expect(bridge.calls.find((c) => c[0] === 'describeObject')).toBeUndefined(); + }); + + it('create_record delegates to the bridge', async () => { + await call( + runtime, + { + jsonrpc: '2.0', + id: 5, + method: 'tools/call', + params: { name: 'create_record', arguments: { objectName: 'task', data: { title: 'x' } } }, + }, + bridge, + ); + expect(bridge.calls).toContainEqual(['create', 'task', { title: 'x' }]); + }); + + it('delete_record delegates to the bridge', async () => { + await call( + runtime, + { + jsonrpc: '2.0', + id: 6, + method: 'tools/call', + params: { name: 'delete_record', arguments: { objectName: 'task', recordId: '1' } }, + }, + bridge, + ); + expect(bridge.calls).toContainEqual(['remove', 'task', '1']); + }); + + it('returns 406 when the client does not accept both JSON and SSE', async () => { + const req = new Request('http://localhost/api/v1/mcp', { + method: 'POST', + headers: { 'content-type': 'application/json', accept: 'application/json' }, + body: JSON.stringify(INIT), + }); + const res = await runtime.handleHttpRequest(req, { bridge, parsedBody: INIT }); + expect(res.status).toBe(406); + }); + + it('surfaces bridge errors as tool errors, not thrown across the wire', async () => { + const failing: McpDataBridge = { + ...bridge, + async query() { + throw new Error('RLS: not permitted'); + }, + }; + const { status, json } = await call( + runtime, + { + jsonrpc: '2.0', + id: 7, + method: 'tools/call', + params: { name: 'query_records', arguments: { objectName: 'task' } }, + }, + failing, + ); + expect(status).toBe(200); + expect(json.result.isError).toBe(true); + expect(json.result.content[0].text).toContain('RLS: not permitted'); + }); +}); diff --git a/packages/plugins/plugin-mcp-server/src/mcp-server-runtime.ts b/packages/plugins/plugin-mcp-server/src/mcp-server-runtime.ts index f62c8f6d8..4f493dcf1 100644 --- a/packages/plugins/plugin-mcp-server/src/mcp-server-runtime.ts +++ b/packages/plugins/plugin-mcp-server/src/mcp-server-runtime.ts @@ -2,9 +2,12 @@ import { McpServer, ResourceTemplate } from '@modelcontextprotocol/sdk/server/mcp.js'; import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js'; +import { WebStandardStreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/webStandardStreamableHttp.js'; import type { Logger, IMetadataService, IDataEngine, AIToolDefinition } from '@objectstack/spec/contracts'; import type { Agent } from '@objectstack/spec/ai'; import type { ToolRegistry, ToolExecutionResult } from './types.js'; +import { registerObjectTools } from './mcp-http-tools.js'; +import type { McpDataBridge, RegisterObjectToolsOptions } from './mcp-http-tools.js'; import { z } from 'zod'; /** @@ -476,8 +479,10 @@ export class MCPServerRuntime { this.started = true; logger?.info(`[MCP] Server started (transport: stdio, name: ${this.config.name})`); } else { - // HTTP transport support will be added in a future version - logger?.warn('[MCP] HTTP transport is not yet supported. Use stdio transport.'); + // HTTP is served per-request via `handleHttpRequest()` (mounted by the + // runtime dispatcher at `/api/v1/mcp`), not through a long-lived + // `connect()` like stdio — so there is nothing to start here. + logger?.info('[MCP] HTTP transport ready (served per-request at /api/v1/mcp).'); } } @@ -492,4 +497,72 @@ export class MCPServerRuntime { this.started = false; this.config.logger?.info('[MCP] Server stopped'); } + + // ── HTTP (Streamable HTTP) transport ─────────────────────────── + + /** + * Handle one MCP request over the **Streamable HTTP** transport (Web Standard + * `Request`/`Response`), the network-reachable surface for external agents. + * + * Stateless by design: a fresh {@link McpServer} + transport is built per + * request (the SDK-recommended pattern for stateless HTTP — it avoids any + * cross-request session/request-id collision and keeps each call isolated). + * The tool set is the object-CRUD bridge, bound to the **caller's principal** + * via `bridge`; the runtime wires that bridge to the existing permission + + * RLS path, so an external agent can never exceed the key's authority. + * + * Only the object-CRUD tools are exposed here — the internal AI/authoring + * toolRegistry (which can mutate metadata) is deliberately NOT bridged onto + * the external surface. + * + * @param request The inbound Web `Request` (headers/method/url). + * @param opts.bridge Principal-bound data accessor (required to expose tools). + * @param opts.parsedBody Pre-parsed JSON-RPC body (the dispatcher already read it). + * @param opts.authInfo Optional auth info forwarded to message handlers. + * @param opts.toolOptions Object-tool exposure options (system objects, limits). + */ + async handleHttpRequest( + request: Request, + opts: { + bridge?: McpDataBridge; + parsedBody?: unknown; + authInfo?: unknown; + toolOptions?: RegisterObjectToolsOptions; + } = {}, + ): Promise { + // Fresh, isolated server per request (stateless). + const server = new McpServer( + { name: this.config.name, version: this.config.version }, + { + capabilities: { tools: {} }, + instructions: + this.config.instructions ?? + 'ObjectStack MCP Server — query and modify your app\'s data objects as tools.', + }, + ); + + if (opts.bridge) { + registerObjectTools(server, opts.bridge, opts.toolOptions); + } + + const transport = new WebStandardStreamableHTTPServerTransport({ + // Stateless: no session id, single request/response. + sessionIdGenerator: undefined, + // Return a buffered JSON response (no long-lived SSE) — fits the + // Worker→container hop without streaming pass-through concerns. + enableJsonResponse: true, + }); + + await server.connect(transport); + try { + // JSON-response mode fully materialises the Response before resolving, + // so it is safe to close the per-request server in `finally`. + return await transport.handleRequest(request, { + parsedBody: opts.parsedBody, + authInfo: opts.authInfo as any, + }); + } finally { + await server.close().catch(() => {}); + } + } } diff --git a/packages/runtime/src/http-dispatcher.mcp.test.ts b/packages/runtime/src/http-dispatcher.mcp.test.ts new file mode 100644 index 000000000..f87e2cf19 --- /dev/null +++ b/packages/runtime/src/http-dispatcher.mcp.test.ts @@ -0,0 +1,125 @@ +// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. + +import { describe, it, expect, beforeEach, afterEach } from 'vitest'; + +import { HttpDispatcher } from './http-dispatcher.js'; + +/** + * These tests drive `handleMcp` directly to verify the gate + auth + bridge + * wiring without standing up a full kernel. The MCP transport itself is tested + * in @objectstack/plugin-mcp-server; here we assert: + * - opt-in gate (OS_MCP_SERVER_ENABLED) + * - fail-closed auth (anonymous → 401) + * - the injected bridge runs through callData bound to the request's + * ExecutionContext (RLS/permissions), proving principal binding. + */ + +function makeContext(overrides: any = {}) { + return { + request: new Request('http://localhost/api/v1/mcp', { + method: 'POST', + headers: { 'content-type': 'application/json', accept: 'application/json, text/event-stream' }, + body: '{}', + }), + response: {}, + environmentId: undefined, + executionContext: { userId: 'u1', isSystem: false, roles: [], permissions: [] }, + ...overrides, + }; +} + +/** A fake kernel exposing only the services handleMcp / callData need. */ +function makeKernel(opts: { withMcp?: boolean; recordedContexts?: any[] } = {}) { + const recorded = opts.recordedContexts ?? []; + const ql = { + insert: async (_o: string, data: any, o: any) => { + recorded.push(o?.context); + return { id: 'new1', ...data }; + }, + find: async () => [], + update: async () => ({}), + delete: async () => ({}), + }; + const metadata = { + listObjects: async () => [{ name: 'task', fields: { title: {} } }], + getObject: async (n: string) => (n === 'task' ? { name: 'task', fields: {} } : null), + }; + // The fake MCP service exercises the bridge so we can assert principal binding. + const mcpService: any = { + lastOpts: undefined, + handleHttpRequest: async (_req: Request, o: any) => { + mcpService.lastOpts = o; + const created = await o.bridge.create('task', { title: 'x' }); + return new Response(JSON.stringify({ ok: true, created }), { + status: 200, + headers: { 'content-type': 'application/json' }, + }); + }, + }; + const services: Record = { metadata, objectql: ql }; + if (opts.withMcp) services.mcp = mcpService; + const kernel: any = { + getService: (n: string) => services[n], + getServiceAsync: async (n: string) => services[n], + }; + return { kernel, mcpService, recorded }; +} + +describe('HttpDispatcher.handleMcp', () => { + const prev = process.env.OS_MCP_SERVER_ENABLED; + afterEach(() => { + if (prev === undefined) delete process.env.OS_MCP_SERVER_ENABLED; + else process.env.OS_MCP_SERVER_ENABLED = prev; + }); + + it('returns 404 when MCP is not enabled (opt-in gate)', async () => { + delete process.env.OS_MCP_SERVER_ENABLED; + const { kernel } = makeKernel({ withMcp: true }); + const d = new HttpDispatcher(kernel, undefined, { enforceProjectMembership: false }); + const res = await d.handleMcp({}, makeContext()); + expect(res.response.status).toBe(404); + }); + + describe('when enabled', () => { + beforeEach(() => { + process.env.OS_MCP_SERVER_ENABLED = 'true'; + }); + + it('returns 501 when no MCP service is registered', async () => { + const { kernel } = makeKernel({ withMcp: false }); + const d = new HttpDispatcher(kernel, undefined, { enforceProjectMembership: false }); + const res = await d.handleMcp({}, makeContext()); + expect(res.response.status).toBe(501); + }); + + it('returns 401 for an anonymous request (fail-closed auth)', async () => { + const { kernel } = makeKernel({ withMcp: true }); + const d = new HttpDispatcher(kernel, undefined, { enforceProjectMembership: false }); + const res = await d.handleMcp({}, makeContext({ executionContext: undefined })); + expect(res.response.status).toBe(401); + }); + + it('delegates to the MCP runtime with a bridge + parsedBody when authed', async () => { + const { kernel, mcpService } = makeKernel({ withMcp: true }); + const d = new HttpDispatcher(kernel, undefined, { enforceProjectMembership: false }); + const body = { jsonrpc: '2.0', id: 1, method: 'tools/list' }; + const res = await d.handleMcp(body, makeContext()); + expect(res.response.status).toBe(200); + expect(res.response.body.ok).toBe(true); + expect(mcpService.lastOpts.parsedBody).toEqual(body); + expect(typeof mcpService.lastOpts.bridge.query).toBe('function'); + }); + + it('binds the bridge to the request ExecutionContext (RLS/permissions)', async () => { + const recorded: any[] = []; + const { kernel } = makeKernel({ withMcp: true, recordedContexts: recorded }); + const d = new HttpDispatcher(kernel, undefined, { enforceProjectMembership: false }); + await d.handleMcp({ jsonrpc: '2.0', id: 1, method: 'tools/list' }, makeContext()); + // The fake MCP service called bridge.create → callData → ql.insert with + // { context }. That context MUST be the caller's principal, not system. + expect(recorded.length).toBe(1); + expect(recorded[0]?.userId).toBe('u1'); + expect(recorded[0]?.isSystem).toBe(false); + }); + }); +}); diff --git a/packages/runtime/src/http-dispatcher.ts b/packages/runtime/src/http-dispatcher.ts index 7a1883c42..1c6e276bc 100644 --- a/packages/runtime/src/http-dispatcher.ts +++ b/packages/runtime/src/http-dispatcher.ts @@ -264,6 +264,135 @@ export class HttpDispatcher { throw { statusCode: 400, message: `Unknown data action: ${action}` }; } + /** + * Handle an MCP request over the Streamable HTTP transport (`/mcp`). + * + * Gating + auth (fail-closed): + * - **opt-in**: only served when `OS_MCP_SERVER_ENABLED=true` (single-env + * runtime). Multi-tenant cloud overrides this gate per env. When off we + * return 404 so the surface isn't advertised. + * - **auth**: requires a principal already resolved by + * `resolveExecutionContext` (the `sys_api_key` Bearer/header path or a + * session). Anonymous → 401. + * + * Execution: the MCP runtime builds a stateless per-request server whose + * object-CRUD tools run through {@link callData} bound to THIS request's + * ExecutionContext — i.e. the exact permission + RLS path the REST API + * uses. An external agent can never exceed the key's authority. + */ + async handleMcp(body: any, context: HttpProtocolContext): Promise { + if (!HttpDispatcher.isMcpEnabled()) { + return { handled: true, response: this.error('MCP server is not enabled for this environment', 404) }; + } + + const mcp: any = await this.resolveService('mcp', context.environmentId); + if (!mcp || typeof mcp.handleHttpRequest !== 'function') { + return { handled: true, response: this.error('MCP server is not available', 501) }; + } + + const ec = context.executionContext; + if (!ec || (!ec.userId && !ec.isSystem)) { + return { handled: true, response: this.error('Unauthorized: a valid API key is required', 401) }; + } + + const request = context.request; + if (!request || typeof (request as any).headers?.get !== 'function') { + // The MCP transport needs a Web-standard Request (headers/method/url). + return { handled: true, response: this.error('MCP transport requires a standard HTTP request', 400) }; + } + + const bridge = this.buildMcpBridge(context); + let webRes: Response; + try { + webRes = await mcp.handleHttpRequest(request, { bridge, parsedBody: body }); + } catch (err: any) { + return { handled: true, response: this.error(err?.message ?? 'MCP request failed', 500) }; + } + + // Convert the transport's buffered Web Response into the dispatcher's + // `{ status, headers, body }` shape (JSON-response mode → fully buffered). + const headers: Record = {}; + try { webRes.headers.forEach((v, k) => { headers[k] = v; }); } catch { /* no headers */ } + const text = await webRes.text().catch(() => ''); + let responseBody: any = null; + if (text) { + const ct = headers['content-type'] ?? ''; + if (ct.includes('application/json')) { + try { responseBody = JSON.parse(text); } catch { responseBody = text; } + } else { + responseBody = text; + } + } + return { handled: true, response: { status: webRes.status, headers, body: responseBody } }; + } + + /** Whether the MCP HTTP surface is opted in for this single-env runtime. */ + private static isMcpEnabled(): boolean { + return typeof process !== 'undefined' && process.env?.OS_MCP_SERVER_ENABLED === 'true'; + } + + /** + * Build a principal-bound {@link McpDataBridge}: every method runs AS the + * request's ExecutionContext through {@link callData} (RLS/permissions) and + * the per-env metadata service. Keeps the MCP tool layer free of any direct + * engine access. + */ + private buildMcpBridge(context: HttpProtocolContext): any { + const ec = context.executionContext; + const envId = context.environmentId; + const driver = (context as any).dataDriver; + const callData = this.callData.bind(this); + const getMeta = () => this.resolveService('metadata', envId); + + return { + listObjects: async () => { + const meta: any = await getMeta(); + const objs: any[] = (await meta?.listObjects?.()) ?? []; + return objs.map((o) => ({ + name: o.name, + label: o.label ?? o.name, + fieldCount: o.fields ? Object.keys(o.fields).length : undefined, + })); + }, + describeObject: async (name: string) => { + const meta: any = await getMeta(); + const def: any = await meta?.getObject?.(name); + if (!def) return null; + const fields = def.fields ?? {}; + return { + name: def.name, + label: def.label ?? def.name, + fields: Object.entries(fields).map(([k, f]: [string, any]) => ({ + name: k, + type: f?.type, + label: f?.label ?? k, + required: f?.required ?? false, + })), + enableFeatures: def.enable ?? {}, + }; + }, + query: async (object: string, o: any) => { + const query: any = {}; + if (o?.where) query.where = o.where; + if (o?.fields) query.fields = o.fields; + if (typeof o?.limit === 'number') query.limit = o.limit; + if (typeof o?.offset === 'number') query.offset = o.offset; + if (o?.orderBy) query.orderBy = o.orderBy; + return await callData('query', { object, query }, driver, envId, ec); + }, + get: async (object: string, id: string) => { + const res: any = await callData('get', { object, id }, driver, envId, ec); + return res?.record ?? res ?? null; + }, + create: async (object: string, data: any) => + await callData('create', { object, data }, driver, envId, ec), + update: async (object: string, id: string, data: any) => + await callData('update', { object, id, data }, driver, envId, ec), + remove: async (object: string, id: string) => + await callData('delete', { object, id }, driver, envId, ec), + }; + } + /** * Parse a project UUID out of a scoped URL path such as * `/api/v1/environments/abc-123/data/task` or `/projects/abc-123/meta`. @@ -609,6 +738,10 @@ export class HttpDispatcher { notifications: hasNotification ? `${prefix}/notifications` : undefined, ai: hasAi ? `${prefix}/ai` : undefined, i18n: hasI18n ? `${prefix}/i18n` : undefined, + // MCP (Streamable HTTP) is opt-in per env — only advertised + // when OS_MCP_SERVER_ENABLED=true so the surface isn't exposed + // by default. The objectui Integrations page reads this. + mcp: HttpDispatcher.isMcpEnabled() ? `${prefix}/mcp` : undefined, }; // Build per-service status map @@ -2599,7 +2732,11 @@ export class HttpDispatcher { if (cleanPath.startsWith('/data')) { return this.handleData(cleanPath.substring(5), method, body, query, context); } - + + if (cleanPath === '/mcp' || cleanPath.startsWith('/mcp/') || cleanPath.startsWith('/mcp?')) { + return this.handleMcp(body, context); + } + if (cleanPath.startsWith('/graphql')) { if (method === 'POST') return this.handleGraphQL(body, context); // GraphQL usually GET for Playground is handled by middleware but we can return 405 or handle it