Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 62 additions & 61 deletions packages/opencode/src/mcp/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import {
CallToolResultSchema,
ListToolsResultSchema,
ToolSchema,
type Tool as MCPToolDef,
ToolListChangedNotificationSchema,
} from "@modelcontextprotocol/sdk/types.js"
import { Config } from "@/config/config"
Expand Down Expand Up @@ -71,6 +70,7 @@ export class NotFoundError extends Schema.TaggedErrorClass<NotFoundError>()("MCP
}) {}

type MCPClient = Client
type MCPToolDef = Awaited<ReturnType<MCPClient["listTools"]>>["tools"][number]

const StatusConnected = Schema.Struct({ status: Schema.Literal("connected") }).annotate({
identifier: "MCPStatusConnected",
Expand Down Expand Up @@ -124,6 +124,10 @@ function isOutputSchemaValidationError(error: Error) {
)
}

function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value)
}

async function paginate<T, R extends { nextCursor?: string }>(
list: (cursor?: string) => Promise<R>,
items: (result: R) => T[],
Expand Down Expand Up @@ -168,12 +172,12 @@ function convertMcpTool(mcpTool: MCPToolDef, client: MCPClient, timeout?: number
const inputSchema = mcpTool.inputSchema

// Spread first, then override type to ensure it's always "object"
const schema: JSONSchema7 = {
...(inputSchema as JSONSchema7),
const schema = {
...inputSchema,
type: "object",
properties: (inputSchema.properties ?? {}) as JSONSchema7["properties"],
properties: inputSchema.properties ?? {},
additionalProperties: false,
}
} satisfies JSONSchema7

return dynamicTool({
description: mcpTool.description ?? "",
Expand All @@ -182,7 +186,7 @@ function convertMcpTool(mcpTool: MCPToolDef, client: MCPClient, timeout?: number
return client.callTool(
{
name: mcpTool.name,
arguments: (args || {}) as Record<string, unknown>,
arguments: isRecord(args) ? args : {},
},
CallToolResultSchema,
{
Expand Down Expand Up @@ -217,11 +221,11 @@ function fetchFromClient<T extends { name: string }>(
)
}

interface CreateResult {
mcpClient?: MCPClient
status: Status
defs?: MCPToolDef[]
}
type UnavailableStatus = Exclude<Status, { status: "connected" }>
type ConnectResult = { type: "connected"; client: MCPClient } | { type: "unavailable"; status: UnavailableStatus }
type CreateResult =
| { type: "connected"; client: MCPClient; defs: MCPToolDef[] }
| { type: "unavailable"; status: UnavailableStatus }

interface AuthResult {
authorizationUrl: string
Expand Down Expand Up @@ -298,20 +302,20 @@ export const layer = Layer.effect(
(t, exit) => (Exit.isFailure(exit) ? Effect.tryPromise(() => t.close()).pipe(Effect.ignore) : Effect.void),
)

const DISABLED_RESULT: CreateResult = { status: { status: "disabled" } }
const DISABLED_RESULT = {
type: "unavailable",
status: { status: "disabled" },
} satisfies CreateResult

const connectRemote = Effect.fn("MCP.connectRemote")(function* (
key: string,
mcp: ConfigMCPV1.Info & { type: "remote" },
) {
const connectRemote = Effect.fn("MCP.connectRemote")(function* (key: string, mcp: ConfigMCPV1.Remote) {
const oauthDisabled = mcp.oauth === false
const oauthConfig = typeof mcp.oauth === "object" ? mcp.oauth : undefined
const url = remoteURL(mcp.url)
if (!url) {
return {
client: undefined as MCPClient | undefined,
status: { status: "failed" as const, error: `Invalid MCP URL for "${key}"` },
}
type: "unavailable",
status: { status: "failed", error: `Invalid MCP URL for "${key}"` },
} satisfies ConnectResult
}
let authProvider: McpOAuthProvider | undefined

Expand Down Expand Up @@ -351,7 +355,7 @@ export const layer = Layer.effect(
]

const connectTimeout = mcp.timeout ?? DEFAULT_TIMEOUT
let lastStatus: Status | undefined
let lastStatus: UnavailableStatus | undefined

for (const { name, transport } of transports) {
const result = yield* connectTransport(transport, connectTimeout).pipe(
Expand All @@ -364,7 +368,7 @@ export const layer = Layer.effect(
if (isAuthError) {
if (lastError.message.includes("registration") || lastError.message.includes("client_id")) {
lastStatus = {
status: "needs_client_registration" as const,
status: "needs_client_registration",
error: "Server does not support dynamic client registration. Please provide clientId in config.",
}
return events
Expand All @@ -377,7 +381,7 @@ export const layer = Layer.effect(
.pipe(Effect.ignore, Effect.as(undefined))
} else {
pendingOAuthTransports.set(key, transport)
lastStatus = { status: "needs_auth" as const }
lastStatus = { status: "needs_auth" }
return events
.publish(TuiEvent.ToastShow, {
title: "MCP Authentication Required",
Expand All @@ -389,25 +393,22 @@ export const layer = Layer.effect(
}
}

lastStatus = { status: "failed" as const, error: lastError.message }
lastStatus = { status: "failed", error: lastError.message }
return Effect.void
}),
)
if (result) return { client: result.client, status: { status: "connected" } as Status }
if (result) return { type: "connected", client: result.client } satisfies ConnectResult
// If this was an auth error, stop trying other transports
if (lastStatus?.status === "needs_auth" || lastStatus?.status === "needs_client_registration") break
}

return {
client: undefined as MCPClient | undefined,
status: (lastStatus ?? { status: "failed", error: "Unknown error" }) as Status,
}
type: "unavailable",
status: lastStatus ?? { status: "failed", error: "Unknown error" },
} satisfies ConnectResult
})

const connectLocal = Effect.fn("MCP.connectLocal")(function* (
key: string,
mcp: ConfigMCPV1.Info & { type: "local" },
) {
const connectLocal = Effect.fn("MCP.connectLocal")(function* (mcp: ConfigMCPV1.Local) {
const [cmd, ...args] = mcp.command
const cwd = yield* InstanceState.directory
const transport = new StdioClientTransport({
Expand All @@ -424,13 +425,13 @@ export const layer = Layer.effect(

const connectTimeout = mcp.timeout ?? DEFAULT_TIMEOUT
return yield* connectTransport(transport, connectTimeout).pipe(
Effect.map((client): { client: MCPClient | undefined; status: Status } => ({
client,
status: { status: "connected" },
})),
Effect.catch((error): Effect.Effect<{ client: MCPClient | undefined; status: Status }> => {
Effect.map((client) => ({ type: "connected", client }) satisfies ConnectResult),
Effect.catch((error) => {
const msg = error instanceof Error ? error.message : String(error)
return Effect.succeed({ client: undefined, status: { status: "failed", error: msg } })
return Effect.succeed({
type: "unavailable",
status: { status: "failed", error: msg },
} satisfies ConnectResult)
}),
)
})
Expand All @@ -440,31 +441,29 @@ export const layer = Layer.effect(
return DISABLED_RESULT
}

const { client: mcpClient, status } =
mcp.type === "remote"
? yield* connectRemote(key, mcp as ConfigMCPV1.Info & { type: "remote" })
: yield* connectLocal(key, mcp as ConfigMCPV1.Info & { type: "local" })
const result = mcp.type === "remote" ? yield* connectRemote(key, mcp) : yield* connectLocal(mcp)

if (!mcpClient) {
if (status.status !== "connected" && status.status !== "disabled") {
yield* Effect.logWarning("server unavailable", { key, type: mcp.type, status: status.status })
}
return { status } satisfies CreateResult
if (result.type === "unavailable") {
yield* Effect.logWarning("server unavailable", { key, type: mcp.type, status: result.status.status })
return result satisfies CreateResult
}

const listed = mcpClient.getServerCapabilities()?.tools ? yield* defs(mcpClient, mcp.timeout) : []
const listed = result.client.getServerCapabilities()?.tools ? yield* defs(result.client, mcp.timeout) : []
if (!listed) {
yield* Effect.tryPromise(() => mcpClient.close()).pipe(Effect.ignore)
return { status: { status: "failed", error: "Failed to get tools" } } satisfies CreateResult
yield* Effect.tryPromise(() => result.client.close()).pipe(Effect.ignore)
return {
type: "unavailable",
status: { status: "failed", error: "Failed to get tools" },
} satisfies CreateResult
}

return { mcpClient, status, defs: listed } satisfies CreateResult
return { type: "connected", client: result.client, defs: listed } satisfies CreateResult
})
const cfgSvc = yield* Config.Service

const descendants = Effect.fnUntraced(
function* (pid: number) {
if (process.platform === "win32") return [] as number[]
if (process.platform === "win32") return Array<number>()
const pids: number[] = []
const queue = [pid]
for (let index = 0; index < queue.length; index++) {
Expand All @@ -483,7 +482,7 @@ export const layer = Layer.effect(
return pids
},
Effect.scoped,
Effect.catch(() => Effect.succeed([] as number[])),
Effect.catch(() => Effect.succeed(Array<number>())),
)

function watch(s: State, name: string, client: MCPClient, bridge: EffectBridge.Shape, timeout?: number) {
Expand Down Expand Up @@ -528,12 +527,14 @@ export const layer = Layer.effect(
const result = yield* create(key, mcp).pipe(Effect.catch(() => Effect.void))
if (!result) return

s.status[key] = result.status
if (result.mcpClient) {
s.clients[key] = result.mcpClient
s.defs[key] = result.defs!
watch(s, key, result.mcpClient, bridge, mcp.timeout)
if (result.type === "unavailable") {
s.status[key] = result.status
return
}
s.status[key] = { status: "connected" }
s.clients[key] = result.client
s.defs[key] = result.defs
watch(s, key, result.client, bridge, mcp.timeout)
}),
{ concurrency: "unbounded" },
)
Expand Down Expand Up @@ -616,14 +617,14 @@ export const layer = Layer.effect(
const s = yield* InstanceState.get(state)
const result = yield* create(name, mcp)

s.status[name] = result.status
if (!result.mcpClient) {
if (result.type === "unavailable") {
s.status[name] = result.status
yield* closeClient(s, name)
delete s.clients[name]
return result.status
}

return yield* storeClient(s, name, result.mcpClient, result.defs!, mcp.timeout)
return yield* storeClient(s, name, result.client, result.defs, mcp.timeout)
})

const add = Effect.fn("MCP.add")(function* (name: string, mcp: ConfigMCPV1.Info) {
Expand Down Expand Up @@ -861,7 +862,7 @@ export const layer = Layer.effect(
if (!transport) throw new Error(`No pending OAuth flow for MCP server: ${mcpName}`)

const result = yield* Effect.tryPromise({
try: () => transport.finishAuth(authorizationCode).then(() => true as const),
try: () => transport.finishAuth(authorizationCode).then((): true => true),
catch: (error) => {
return error
},
Expand Down
Loading