Skip to content

Commit 6d67cb7

Browse files
committed
fix(mcp): bugbot — TTL watchdog on OAuth lock + don't close evicted pinned agents
- Redis refresh lock now uses a 15s TTL with a watchdog that extends every 5s while fn() runs. Long-running OAuth refreshes no longer lose the lock mid-flight and let another process race the same refresh. - Pinned-agent LRU eviction no longer calls `agent.close()`. Existing `createMcpPinnedFetch` closures hold the dispatcher reference and were using a closed Agent after eviction. We drop from the cache and let GC release the dispatcher once the last closure dies; undici closes idle keep-alive connections via its own internal timeout. - New tests: watchdog extends while fn() runs and stops once it settles; evicted agents are not closed and captured closures still work.
1 parent c5069c0 commit 6d67cb7

4 files changed

Lines changed: 83 additions & 19 deletions

File tree

apps/sim/lib/mcp/oauth/storage.test.ts

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,10 @@ import {
1111
} from '@sim/testing'
1212
import { beforeEach, describe, expect, it, vi } from 'vitest'
1313

14-
const { mockAcquireLock, mockReleaseLock } = vi.hoisted(() => ({
14+
const { mockAcquireLock, mockReleaseLock, mockExtendLock } = vi.hoisted(() => ({
1515
mockAcquireLock: vi.fn(),
1616
mockReleaseLock: vi.fn(),
17+
mockExtendLock: vi.fn(),
1718
}))
1819

1920
vi.mock('@sim/db', () => dbChainMock)
@@ -22,6 +23,7 @@ vi.mock('@/lib/core/security/encryption', () => encryptionMock)
2223
vi.mock('@/lib/core/config/redis', () => ({
2324
acquireLock: mockAcquireLock,
2425
releaseLock: mockReleaseLock,
26+
extendLock: mockExtendLock,
2527
}))
2628

2729
import {
@@ -112,7 +114,9 @@ describe('withMcpOauthRefreshLock', () => {
112114
vi.clearAllMocks()
113115
mockAcquireLock.mockReset()
114116
mockReleaseLock.mockReset()
117+
mockExtendLock.mockReset()
115118
mockReleaseLock.mockResolvedValue(true)
119+
mockExtendLock.mockResolvedValue(true)
116120
})
117121

118122
it('serializes concurrent in-process callers, each running its own fn()', async () => {
@@ -197,4 +201,37 @@ describe('withMcpOauthRefreshLock', () => {
197201
expect(keys).toContain('mcp:oauth:refresh:row-a')
198202
expect(keys).toContain('mcp:oauth:refresh:row-b')
199203
})
204+
205+
it('extends the lock TTL while fn() is running so long refreshes do not lose the lock', async () => {
206+
vi.useFakeTimers()
207+
try {
208+
mockAcquireLock.mockResolvedValue(true)
209+
let resolveFn: (v: string) => void
210+
const fn = vi.fn(
211+
() =>
212+
new Promise<string>((resolve) => {
213+
resolveFn = resolve
214+
})
215+
)
216+
217+
const pending = withMcpOauthRefreshLock('row-watchdog', fn)
218+
219+
// Advance time past two extend intervals (5s + 5s = 10s).
220+
await vi.advanceTimersByTimeAsync(11_000)
221+
expect(mockExtendLock.mock.calls.length).toBeGreaterThanOrEqual(2)
222+
for (const call of mockExtendLock.mock.calls) {
223+
expect(call[0]).toBe('mcp:oauth:refresh:row-watchdog')
224+
}
225+
226+
resolveFn!('done')
227+
await expect(pending).resolves.toBe('done')
228+
229+
// Watchdog must stop once fn() settles — no more extend calls.
230+
const extendCallsAtFinish = mockExtendLock.mock.calls.length
231+
await vi.advanceTimersByTimeAsync(20_000)
232+
expect(mockExtendLock.mock.calls.length).toBe(extendCallsAtFinish)
233+
} finally {
234+
vi.useRealTimers()
235+
}
236+
})
200237
})

apps/sim/lib/mcp/oauth/storage.ts

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import { toError } from '@sim/utils/errors'
1010
import { sleep } from '@sim/utils/helpers'
1111
import { generateId, generateShortId } from '@sim/utils/id'
1212
import { and, eq, gt } from 'drizzle-orm'
13-
import { acquireLock, releaseLock } from '@/lib/core/config/redis'
13+
import { acquireLock, extendLock, releaseLock } from '@/lib/core/config/redis'
1414
import { decryptSecret, encryptSecret } from '@/lib/core/security/encryption'
1515

1616
const logger = createLogger('McpOauthStorage')
@@ -238,13 +238,16 @@ export async function clearState(rowId: string): Promise<void> {
238238
* `McpClient` instances that can't be shared, unlike a scalar access token):
239239
* 1) In-process: per-row Promise chain. Concurrent callers queue; each
240240
* runs `fn()` after the previous settles.
241-
* 2) Cross-process: Redis mutex (`acquireLock` / `releaseLock`). Followers
242-
* poll until the holder releases, then acquire and run `fn()`.
241+
* 2) Cross-process: Redis mutex (`acquireLock` / `releaseLock`) with a TTL
242+
* watchdog that periodically extends the lock while `fn()` runs, so
243+
* long-running refreshes don't drop the lock and let another process
244+
* race onto the same refresh.
243245
*
244246
* Falls open if Redis is unavailable — `acquireLock` no-ops, but in-process
245247
* serialization still holds within a single Node process.
246248
*/
247-
const REFRESH_LOCK_TTL_SEC = 30
249+
const REFRESH_LOCK_TTL_SEC = 15
250+
const REFRESH_LOCK_EXTEND_INTERVAL_MS = 5_000
248251
const REFRESH_POLL_INTERVAL_MS = 100
249252
const REFRESH_MAX_WAIT_MS = 30_000
250253

@@ -283,9 +286,18 @@ async function runWithRedisMutex<T>(
283286
}
284287

285288
if (acquired) {
289+
const watchdog = setInterval(() => {
290+
extendLock(lockKey, ownerToken, REFRESH_LOCK_TTL_SEC).catch((error) => {
291+
logger.warn('Refresh lock extend failed', {
292+
rowId,
293+
error: toError(error).message,
294+
})
295+
})
296+
}, REFRESH_LOCK_EXTEND_INTERVAL_MS)
286297
try {
287298
return await fn()
288299
} finally {
300+
clearInterval(watchdog)
289301
await releaseLock(lockKey, ownerToken).catch((error) => {
290302
logger.warn('Refresh lock release failed (will expire via TTL)', {
291303
rowId,

apps/sim/lib/mcp/pinned-fetch.test.ts

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,22 +3,27 @@
33
*/
44
import { beforeEach, describe, expect, it, vi } from 'vitest'
55

6-
const { mockAgent, mockCreatePinnedLookup, mockUndiciFetch, capturedAgentOptions } = vi.hoisted(
7-
() => {
6+
const { mockAgent, mockCreatePinnedLookup, mockUndiciFetch, capturedAgentOptions, agentCloses } =
7+
vi.hoisted(() => {
88
const capturedAgentOptions: unknown[] = []
9+
const agentCloses: unknown[] = []
910
class MockAgent {
1011
constructor(options: unknown) {
1112
capturedAgentOptions.push(options)
1213
}
14+
close() {
15+
agentCloses.push(this)
16+
return Promise.resolve()
17+
}
1318
}
1419
return {
1520
mockAgent: MockAgent,
1621
mockCreatePinnedLookup: vi.fn(),
1722
mockUndiciFetch: vi.fn(),
1823
capturedAgentOptions,
24+
agentCloses,
1925
}
20-
}
21-
)
26+
})
2227

2328
vi.mock('undici', () => ({ Agent: mockAgent, fetch: mockUndiciFetch }))
2429
vi.mock('@/lib/core/security/input-validation.server', () => ({
@@ -31,6 +36,7 @@ describe('createMcpPinnedFetch', () => {
3136
beforeEach(() => {
3237
vi.clearAllMocks()
3338
capturedAgentOptions.length = 0
39+
agentCloses.length = 0
3440
__resetPinnedAgentsForTests()
3541
mockCreatePinnedLookup.mockReturnValue('pinned-lookup-fn')
3642
mockUndiciFetch.mockResolvedValue(new Response('ok'))
@@ -105,4 +111,17 @@ describe('createMcpPinnedFetch', () => {
105111
const d2 = (mockUndiciFetch.mock.calls[1][1] as { dispatcher: unknown }).dispatcher
106112
expect(d1).not.toBe(d2)
107113
})
114+
115+
it('does not close evicted agents — captured closures keep working', async () => {
116+
// Build an early closure whose agent will get evicted by later IPs.
117+
const earlyClient = createMcpPinnedFetch('10.0.0.1')
118+
// Fill the cache past its 64-entry limit so the early entry is evicted.
119+
for (let i = 0; i < 64; i++) createMcpPinnedFetch(`10.1.${Math.floor(i / 256)}.${i % 256}`)
120+
121+
// Eviction must NOT have closed any agents.
122+
expect(agentCloses).toHaveLength(0)
123+
// The early closure's captured dispatcher is still callable.
124+
await earlyClient('https://example.com/still-works')
125+
expect(mockUndiciFetch).toHaveBeenCalledTimes(1)
126+
})
108127
})

apps/sim/lib/mcp/pinned-fetch.ts

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,24 +23,20 @@ function getPinnedAgent(resolvedIP: string): Agent {
2323
return existing
2424
}
2525
if (pinnedAgents.size >= MAX_POOLED_AGENTS) {
26+
// Drop the oldest entry WITHOUT closing it — existing `createMcpPinnedFetch`
27+
// closures may still hold a reference and have in-flight requests. The
28+
// dispatcher is GC'd (and its sockets cleaned up) when the last closure
29+
// releases it; undici closes idle keep-alive connections after its own
30+
// timeout (default 4s).
2631
const oldestKey = pinnedAgents.keys().next().value
27-
if (oldestKey !== undefined) {
28-
const oldest = pinnedAgents.get(oldestKey)
29-
pinnedAgents.delete(oldestKey)
30-
oldest?.close().catch(() => {})
31-
}
32+
if (oldestKey !== undefined) pinnedAgents.delete(oldestKey)
3233
}
3334
const agent = new Agent({ connect: { lookup: createPinnedLookup(resolvedIP) } })
3435
pinnedAgents.set(resolvedIP, agent)
3536
return agent
3637
}
3738

3839
export function __resetPinnedAgentsForTests(): void {
39-
for (const agent of pinnedAgents.values()) {
40-
try {
41-
void agent.close?.()
42-
} catch {}
43-
}
4440
pinnedAgents.clear()
4541
}
4642

0 commit comments

Comments
 (0)