diff --git a/.changeset/agents-server-stream-paths.md b/.changeset/agents-server-stream-paths.md new file mode 100644 index 0000000000..c6f410b18d --- /dev/null +++ b/.changeset/agents-server-stream-paths.md @@ -0,0 +1,8 @@ +--- +"@electric-ax/agents-server": patch +"@electric-ax/agents-runtime": patch +--- + +Keep Durable Streams paths service-agnostic in the OSS agents-server. Subscription payloads, webhook wake paths, callback ack paths, and routing adapters now treat stream names as relative to the configured Durable Streams base URL instead of applying service-id path transforms. + +Persist and verify Durable Streams webhook signing secrets before forwarding webhook wakes through agents-server. Runtime handlers that use server auth headers now send Durable Streams claim tokens via `electric-claim-token`, preserving the configured server `Authorization` header for cloud callback routes. diff --git a/packages/agents-runtime/src/create-handler.ts b/packages/agents-runtime/src/create-handler.ts index 8128cb82bc..45fcd82b3f 100644 --- a/packages/agents-runtime/src/create-handler.ts +++ b/packages/agents-runtime/src/create-handler.ts @@ -189,6 +189,12 @@ export function createRuntimeRouter( createElectricTools, idleTimeout, heartbeatInterval, + ...(serverHeaders + ? { + claimHeaders: serverHeaders, + claimTokenHeader: `electric-claim-token` as const, + } + : {}), } const getRegisteredType = (name: string) => registry ? registry.get(name) : getEntityType(name) diff --git a/packages/agents-runtime/test/create-handler.test.ts b/packages/agents-runtime/test/create-handler.test.ts index 7907b2d522..12e3975d0c 100644 --- a/packages/agents-runtime/test/create-handler.test.ts +++ b/packages/agents-runtime/test/create-handler.test.ts @@ -602,6 +602,59 @@ describe(`createRuntimeHandler`, () => { expect(headers.get(`content-type`)).toBe(`application/json`) }) + it(`uses configured server headers for webhook wake callbacks`, async () => { + defineEntity(`test-agent`, { handler: async () => {} }) + processWakeMock.mockResolvedValueOnce(null) + + const notification = { + consumerId: `consumer-1`, + epoch: 1, + wakeId: `wake-1`, + streamPath: `/streams/entity:test-1`, + streams: [{ path: `/streams/entity:test-1`, offset: `0_0` }], + callback: `http://localhost:3000/_electric/callback-forward/wake-1`, + claimToken: `tok-1`, + entity: { + type: `test-agent`, + status: `active`, + url: `http://localhost:3000/test-agent/test-1`, + streams: { + main: `/streams/entity:test-1`, + error: `/streams/entity-error:test-1`, + }, + }, + } + + const handler = createRuntimeHandler({ + baseUrl: `http://localhost:3000`, + handlerUrl: `http://localhost:4000/electric-agents`, + serverHeaders: { + Authorization: `Bearer tenant-token`, + 'X-Tenant': `tenant-a`, + }, + }) + + const response = await handler.handleWebhookRequest( + new Request(`http://localhost/electric-agents`, { + method: `POST`, + headers: { 'content-type': `application/json` }, + body: JSON.stringify(notification), + }) + ) + + expect(response.status).toBe(200) + expect(processWakeMock).toHaveBeenCalledWith( + notification, + expect.objectContaining({ + claimHeaders: { + Authorization: `Bearer tenant-token`, + 'X-Tenant': `tenant-a`, + }, + claimTokenHeader: `electric-claim-token`, + }) + ) + }) + it(`registers custom state collections as output schemas`, async () => { defineEntity(`stateful-agent`, { state: { diff --git a/packages/agents-server/drizzle/0009_signed_webhook_forwarding.sql b/packages/agents-server/drizzle/0009_signed_webhook_forwarding.sql new file mode 100644 index 0000000000..46f21947fd --- /dev/null +++ b/packages/agents-server/drizzle/0009_signed_webhook_forwarding.sql @@ -0,0 +1,2 @@ +ALTER TABLE subscription_webhooks + ADD COLUMN webhook_secret text; diff --git a/packages/agents-server/drizzle/meta/_journal.json b/packages/agents-server/drizzle/meta/_journal.json index 4574ed99be..809750266e 100644 --- a/packages/agents-server/drizzle/meta/_journal.json +++ b/packages/agents-server/drizzle/meta/_journal.json @@ -64,6 +64,13 @@ "when": 1778976000000, "tag": "0008_runner_runtime_diagnostics", "breakpoints": true + }, + { + "idx": 9, + "version": "7", + "when": 1779207600000, + "tag": "0009_signed_webhook_forwarding", + "breakpoints": true } ] } diff --git a/packages/agents-server/src/db/schema.ts b/packages/agents-server/src/db/schema.ts index 15a1d234bc..7f62a67dfa 100644 --- a/packages/agents-server/src/db/schema.ts +++ b/packages/agents-server/src/db/schema.ts @@ -352,6 +352,7 @@ export const subscriptionWebhooks = pgTable( tenantId: text(`tenant_id`).notNull().default(`default`), subscriptionId: text(`subscription_id`).notNull(), webhookUrl: text(`webhook_url`).notNull(), + webhookSecret: text(`webhook_secret`), createdAt: timestamp(`created_at`, { withTimezone: true }) .notNull() .defaultNow(), diff --git a/packages/agents-server/src/routing/dispatch-policy.ts b/packages/agents-server/src/routing/dispatch-policy.ts index c3c0985ae8..af0aafdd77 100644 --- a/packages/agents-server/src/routing/dispatch-policy.ts +++ b/packages/agents-server/src/routing/dispatch-policy.ts @@ -8,7 +8,10 @@ import { ErrCodeUnauthorized, } from '../electric-agents-types.js' import { runnerWakeStream } from '../entity-registry.js' -import { DurableStreamsSubscriptionError } from '../stream-client.js' +import { + DurableStreamsSubscriptionError, + type SubscriptionResponse, +} from '../stream-client.js' import { rewriteLoopbackWebhookUrl } from '../utils/webhook-url.js' import { serverLog } from '../utils/log.js' import type { @@ -114,18 +117,16 @@ function sameDispatchDestination( } function subscriptionHasStream( - ctx: TenantContext, existing: { streams?: Array }, streamPath: string ): boolean { const normalizedStream = streamPath.replace(/^\/+/, ``) - const backendStream = `${ctx.service}/${normalizedStream}` return ( existing.streams?.some((stream) => { const path = typeof stream === `string` ? stream : stream.path if (!path) return false const normalized = path.replace(/^\/+/, ``) - return normalized === normalizedStream || normalized === backendStream + return normalized === normalizedStream }) ?? false ) } @@ -163,11 +164,10 @@ async function ensureSubscriptionIncludesStream( streamPath: string, input: SubscriptionCreateInput, existing: { streams?: Array } | null -): Promise { +): Promise { if (!existing) { try { - await ctx.streamClient.putSubscription(subscriptionId, input) - return + return await ctx.streamClient.putSubscription(subscriptionId, input) } catch (err) { if (!isSubscriptionAlreadyExistsError(err)) throw err existing = await ctx.streamClient.getSubscription(subscriptionId) @@ -176,14 +176,15 @@ async function ensureSubscriptionIncludesStream( `[dispatch-policy] subscription create raced with existing subscription but it could not be read`, { subscriptionId, stream: streamPath } ) - return + return null } } } - if (!subscriptionHasStream(ctx, existing, streamPath)) { + if (!subscriptionHasStream(existing, streamPath)) { await ctx.streamClient.addSubscriptionStreams(subscriptionId, [streamPath]) } + return existing as SubscriptionResponse } export async function assertDispatchPolicyAllowed( @@ -322,7 +323,7 @@ async function linkStreamToTargetSubscription( ctx.publicUrl, `/_electric/webhook-forward/${encodeURIComponent(subscriptionId)}` ) - await ensureSubscriptionIncludesStream( + const subscription = await ensureSubscriptionIncludesStream( ctx, subscriptionId, streamPath, @@ -334,18 +335,23 @@ async function linkStreamToTargetSubscription( }, existing ) + const webhookSecret = subscription?.webhook_secret await ctx.pgDb .insert(subscriptionWebhooks) .values({ tenantId: ctx.service, subscriptionId, webhookUrl, + ...(webhookSecret ? { webhookSecret } : {}), }) .onConflictDoUpdate({ target: [ subscriptionWebhooks.tenantId, subscriptionWebhooks.subscriptionId, ], - set: { webhookUrl }, + set: { + webhookUrl, + ...(webhookSecret ? { webhookSecret } : {}), + }, }) } diff --git a/packages/agents-server/src/routing/durable-streams-router.ts b/packages/agents-server/src/routing/durable-streams-router.ts index e3243337dd..7598cce88d 100644 --- a/packages/agents-server/src/routing/durable-streams-router.ts +++ b/packages/agents-server/src/routing/durable-streams-router.ts @@ -15,10 +15,8 @@ import { import { validateBody } from './schema.js' import { rewriteLoopbackWebhookUrl } from '../utils/webhook-url.js' import { forwardFetchRequest } from '../utils/server-utils.js' -import { resolveDurableStreamsRoutingAdapter } from './durable-streams-routing-adapter.js' import type { IRequest, RouterType } from 'itty-router' import type { TenantContext } from './context.js' -import type { DurableStreamsRoutingAdapter } from './durable-streams-routing-adapter.js' const subscriptionProxyBodySchema = Type.Object( { @@ -128,10 +126,10 @@ async function forwardToDurableStreams( }, body: requestBody, durableStreamsUrl: ctx.durableStreamsUrl, + serviceId: ctx.service, durableStreamsBearer: ctx.durableStreamsBearer, durableStreamsBearerMode, durableStreamsRouting: ctx.durableStreamsRouting, - serviceId: ctx.service, dispatcher: ctx.durableStreamsDispatcher, route, }) @@ -140,38 +138,28 @@ async function forwardToDurableStreams( type SubscriptionControlAction = (typeof subscriptionControlActions)[number] function rewriteSubscriptionBodyForBackend( - payload: Record, - service: string, - routingAdapter: DurableStreamsRoutingAdapter + payload: Record ): void { if (typeof payload.pattern === `string`) { - payload.pattern = routingAdapter.toBackendStreamPath( - service, - payload.pattern - ) + payload.pattern = normalizeSubscriptionPath(payload.pattern) } if (Array.isArray(payload.streams)) { payload.streams = payload.streams.map((stream) => - typeof stream === `string` - ? routingAdapter.toBackendStreamPath(service, stream) - : stream + typeof stream === `string` ? normalizeSubscriptionPath(stream) : stream ) } if (typeof payload.wake_stream === `string`) { - payload.wake_stream = routingAdapter.toBackendStreamPath( - service, - payload.wake_stream - ) + payload.wake_stream = normalizeSubscriptionPath(payload.wake_stream) } if (Array.isArray(payload.acks)) { payload.acks = payload.acks.map((ack) => { if (!ack || typeof ack !== `object`) return ack const next = { ...(ack as Record) } if (typeof next.stream === `string`) { - next.stream = routingAdapter.toBackendStreamPath(service, next.stream) + next.stream = normalizeSubscriptionPath(next.stream) } if (typeof next.path === `string`) { - next.path = routingAdapter.toBackendStreamPath(service, next.path) + next.path = normalizeSubscriptionPath(next.path) } return next }) @@ -180,9 +168,7 @@ function rewriteSubscriptionBodyForBackend( function rewriteSubscriptionResponseForClient( bytes: Uint8Array, - response: Response, - service: string, - routingAdapter: DurableStreamsRoutingAdapter + response: Response ): Uint8Array { if (!response.headers.get(`content-type`)?.includes(`application/json`)) { return bytes @@ -191,15 +177,12 @@ function rewriteSubscriptionResponseForClient( if (!payload) return bytes if (typeof payload.pattern === `string`) { - payload.pattern = routingAdapter.toRuntimeStreamPath( - service, - payload.pattern - ) + payload.pattern = normalizeSubscriptionPath(payload.pattern) } if (Array.isArray(payload.streams)) { payload.streams = payload.streams.map((stream) => { if (typeof stream === `string`) { - return routingAdapter.toRuntimeStreamPath(service, stream) + return normalizeSubscriptionPath(stream) } if ( stream && @@ -208,8 +191,7 @@ function rewriteSubscriptionResponseForClient( ) { return { ...(stream as Record), - path: routingAdapter.toRuntimeStreamPath( - service, + path: normalizeSubscriptionPath( (stream as Record).path ), } @@ -218,23 +200,20 @@ function rewriteSubscriptionResponseForClient( }) } if (typeof payload.wake_stream === `string`) { - payload.wake_stream = routingAdapter.toRuntimeStreamPath( - service, - payload.wake_stream - ) + payload.wake_stream = normalizeSubscriptionPath(payload.wake_stream) } if (typeof payload.stream === `string`) { - payload.stream = routingAdapter.toRuntimeStreamPath(service, payload.stream) + payload.stream = normalizeSubscriptionPath(payload.stream) } if (Array.isArray(payload.acks)) { payload.acks = payload.acks.map((ack) => { if (!ack || typeof ack !== `object`) return ack const next = { ...(ack as Record) } if (typeof next.stream === `string`) { - next.stream = routingAdapter.toRuntimeStreamPath(service, next.stream) + next.stream = normalizeSubscriptionPath(next.stream) } if (typeof next.path === `string`) { - next.path = routingAdapter.toRuntimeStreamPath(service, next.path) + next.path = normalizeSubscriptionPath(next.path) } return next }) @@ -243,6 +222,10 @@ function rewriteSubscriptionResponseForClient( return new TextEncoder().encode(JSON.stringify(payload)) } +function normalizeSubscriptionPath(path: string): string { + return path.replace(/^\/+/, ``) +} + function decodeJson(bytes: Uint8Array): Record | null { try { const parsed = JSON.parse(new TextDecoder().decode(bytes)) as unknown @@ -260,20 +243,10 @@ function routeParam(request: IRequest, name: string): string { return decodeURIComponent(raw ?? ``) } -function subscriptionRoutingAdapter( - ctx: TenantContext -): DurableStreamsRoutingAdapter { - return resolveDurableStreamsRoutingAdapter( - ctx.durableStreamsRouting, - ctx.durableStreamsUrl - ) -} - async function rewriteSubscriptionRequestBody( request: IRequest, ctx: TenantContext, - subscriptionId: string, - routingAdapter: DurableStreamsRoutingAdapter + subscriptionId: string ): Promise< | { ok: true @@ -300,11 +273,7 @@ async function rewriteSubscriptionRequestBody( ) } - rewriteSubscriptionBodyForBackend( - payload as Record, - ctx.service, - routingAdapter - ) + rewriteSubscriptionBodyForBackend(payload as Record) return { ok: true, @@ -316,13 +285,16 @@ async function rewriteSubscriptionRequestBody( async function forwardSubscriptionRequest( request: IRequest, ctx: TenantContext, - routingAdapter: DurableStreamsRoutingAdapter, opts: { body?: Uint8Array requestUrl?: string bearerMode?: `overwrite` | `if-missing` } = {} -): Promise<{ upstream: Response; response: Response }> { +): Promise<{ + upstream: Response + response: Response + responseBytes: Uint8Array +}> { const upstream = await forwardToDurableStreams( ctx, request, @@ -334,36 +306,55 @@ async function forwardSubscriptionRequest( let responseBytes: Uint8Array = upstream.body ? new Uint8Array(await upstream.arrayBuffer()) : new Uint8Array() - responseBytes = rewriteSubscriptionResponseForClient( - responseBytes, - upstream, - ctx.service, - routingAdapter - ) + responseBytes = rewriteSubscriptionResponseForClient(responseBytes, upstream) return { upstream, response: responseFromUpstream(upstream, responseBytes), + responseBytes, + } +} + +function webhookSecretFromSubscriptionResponse( + body: Uint8Array +): string | undefined { + if (body.length === 0) return undefined + try { + const json = JSON.parse(new TextDecoder().decode(body)) as { + webhook_secret?: unknown + } + return typeof json.webhook_secret === `string` + ? json.webhook_secret + : undefined + } catch { + return undefined } } async function upsertSubscriptionWebhook( ctx: TenantContext, subscriptionId: string, - targetWebhookUrl: string + targetWebhookUrl: string, + webhookSecret: string | undefined ): Promise { + const values = { + tenantId: ctx.service, + subscriptionId, + webhookUrl: targetWebhookUrl, + ...(webhookSecret ? { webhookSecret } : {}), + } + const set = { + webhookUrl: targetWebhookUrl, + ...(webhookSecret ? { webhookSecret } : {}), + } await ctx.pgDb .insert(subscriptionWebhooks) - .values({ - tenantId: ctx.service, - subscriptionId, - webhookUrl: targetWebhookUrl, - }) + .values(values) .onConflictDoUpdate({ target: [ subscriptionWebhooks.tenantId, subscriptionWebhooks.subscriptionId, ], - set: { webhookUrl: targetWebhookUrl }, + set, }) } @@ -383,8 +374,6 @@ async function deleteSubscriptionWebhook( function rewriteSubscriptionStreamPathInUrl( requestUrl: URL, - service: string, - routingAdapter: DurableStreamsRoutingAdapter, streamPath: string ): string { const prefix = requestUrl.pathname.slice( @@ -392,7 +381,7 @@ function rewriteSubscriptionStreamPathInUrl( requestUrl.pathname.indexOf(`/streams/`) + `/streams/`.length ) requestUrl.pathname = `${prefix}${encodeURIComponent( - routingAdapter.toBackendStreamPath(service, streamPath) + normalizeSubscriptionPath(streamPath) )}` return requestUrl.toString() } @@ -402,26 +391,21 @@ async function putSubscriptionBase( ctx: TenantContext ): Promise { const subscriptionId = routeParam(request, `subscriptionId`) - const routingAdapter = subscriptionRoutingAdapter(ctx) const rewrite = await rewriteSubscriptionRequestBody( request, ctx, - subscriptionId, - routingAdapter + subscriptionId ) if (!rewrite.ok) return rewrite.response - const { upstream, response } = await forwardSubscriptionRequest( - request, - ctx, - routingAdapter, - { body: rewrite.body } - ) + const { upstream, response, responseBytes } = + await forwardSubscriptionRequest(request, ctx, { body: rewrite.body }) if (upstream.ok && rewrite.targetWebhookUrl) { await upsertSubscriptionWebhook( ctx, subscriptionId, - rewrite.targetWebhookUrl + rewrite.targetWebhookUrl, + webhookSecretFromSubscriptionResponse(responseBytes) ) } return response @@ -431,9 +415,7 @@ async function getSubscriptionBase( request: IRequest, ctx: TenantContext ): Promise { - const routingAdapter = subscriptionRoutingAdapter(ctx) - return (await forwardSubscriptionRequest(request, ctx, routingAdapter)) - .response + return (await forwardSubscriptionRequest(request, ctx)).response } async function deleteSubscriptionBase( @@ -441,12 +423,7 @@ async function deleteSubscriptionBase( ctx: TenantContext ): Promise { const subscriptionId = routeParam(request, `subscriptionId`) - const routingAdapter = subscriptionRoutingAdapter(ctx) - const { upstream, response } = await forwardSubscriptionRequest( - request, - ctx, - routingAdapter - ) + const { upstream, response } = await forwardSubscriptionRequest(request, ctx) if (upstream.ok) { await deleteSubscriptionWebhook(ctx, subscriptionId) } @@ -458,17 +435,15 @@ async function postSubscriptionStreams( ctx: TenantContext ): Promise { const subscriptionId = routeParam(request, `subscriptionId`) - const routingAdapter = subscriptionRoutingAdapter(ctx) const rewrite = await rewriteSubscriptionRequestBody( request, ctx, - subscriptionId, - routingAdapter + subscriptionId ) if (!rewrite.ok) return rewrite.response return ( - await forwardSubscriptionRequest(request, ctx, routingAdapter, { + await forwardSubscriptionRequest(request, ctx, { body: rewrite.body, }) ).response @@ -478,15 +453,12 @@ async function deleteSubscriptionStream( request: IRequest, ctx: TenantContext ): Promise { - const routingAdapter = subscriptionRoutingAdapter(ctx) const requestUrl = rewriteSubscriptionStreamPathInUrl( new URL(request.url), - ctx.service, - routingAdapter, routeParam(request, `streamPath`) ) return ( - await forwardSubscriptionRequest(request, ctx, routingAdapter, { + await forwardSubscriptionRequest(request, ctx, { requestUrl, }) ).response @@ -495,12 +467,10 @@ async function deleteSubscriptionStream( function subscriptionAction(action: SubscriptionControlAction) { return async (request: IRequest, ctx: TenantContext): Promise => { const subscriptionId = routeParam(request, `subscriptionId`) - const routingAdapter = subscriptionRoutingAdapter(ctx) const rewrite = await rewriteSubscriptionRequestBody( request, ctx, - subscriptionId, - routingAdapter + subscriptionId ) if (!rewrite.ok) return rewrite.response @@ -509,7 +479,7 @@ function subscriptionAction(action: SubscriptionControlAction) { ? `if-missing` : `overwrite` return ( - await forwardSubscriptionRequest(request, ctx, routingAdapter, { + await forwardSubscriptionRequest(request, ctx, { body: rewrite.body, bearerMode, }) @@ -546,10 +516,10 @@ async function streamAppend( }, body, durableStreamsUrl: ctx.durableStreamsUrl, + serviceId: ctx.service, durableStreamsBearer: ctx.durableStreamsBearer, durableStreamsBearerMode: `overwrite`, durableStreamsRouting: ctx.durableStreamsRouting, - serviceId: ctx.service, dispatcher: ctx.durableStreamsDispatcher, }) ) diff --git a/packages/agents-server/src/routing/durable-streams-routing-adapter.ts b/packages/agents-server/src/routing/durable-streams-routing-adapter.ts index 38d258a339..01b1531971 100644 --- a/packages/agents-server/src/routing/durable-streams-routing-adapter.ts +++ b/packages/agents-server/src/routing/durable-streams-routing-adapter.ts @@ -1,21 +1,30 @@ export interface DurableStreamsRoutingInput { durableStreamsUrl: string - serviceId: string requestUrl: string + /** Tenant identity for external routing adapters; the OSS adapter ignores it. */ + serviceId: string } export interface DurableStreamsRoutingAdapter { streamUrl(input: DurableStreamsRoutingInput): URL controlUrl(input: DurableStreamsRoutingInput): URL - toBackendStreamPath(serviceId: string, streamPath: string): string - toRuntimeStreamPath(serviceId: string, streamPath: string): string + /** + * @deprecated Subscription stream paths are logical paths relative to the + * configured Durable Streams URL. This hook is kept only so existing + * external routing adapters can continue to typecheck. + */ + toBackendStreamPath?: (serviceId: string, streamPath: string) => string + /** + * @deprecated Subscription stream paths are logical paths relative to the + * configured Durable Streams URL. This hook is kept only so existing + * external routing adapters can continue to typecheck. + */ + toRuntimeStreamPath?: (serviceId: string, streamPath: string) => string } function appendSearch(target: URL, source: URL): URL { source.searchParams.forEach((value, key) => { - if (key !== `service`) { - target.searchParams.append(key, value) - } + target.searchParams.append(key, value) }) return target } @@ -24,13 +33,24 @@ function withoutTrailingSlash(pathname: string): string { return pathname.replace(/\/+$/, ``) || `/` } +function appendPath(pathname: string, path: string): string { + if (!path) return withoutTrailingSlash(pathname) + const base = withoutTrailingSlash(pathname) + return base === `/` ? `/${path}` : `${base}/${path}` +} + function appendRequestPathToStreamRoot(input: DurableStreamsRoutingInput): URL { const incomingUrl = new URL(input.requestUrl, `http://localhost`) const path = incomingUrl.pathname.replace(/^\/+/, ``) const target = new URL(input.durableStreamsUrl) - target.pathname = path - ? `${withoutTrailingSlash(target.pathname)}/${path}` - : withoutTrailingSlash(target.pathname) + const basePath = withoutTrailingSlash(target.pathname) + const requestPath = path ? `/${path}` : `/` + + target.pathname = + basePath !== `/` && + (requestPath === basePath || requestPath.startsWith(`${basePath}/`)) + ? requestPath + : appendPath(basePath, path) return appendSearch(target, incomingUrl) } @@ -39,14 +59,6 @@ export const streamRootDurableStreamsRoutingAdapter: DurableStreamsRoutingAdapte streamUrl: appendRequestPathToStreamRoot, controlUrl: appendRequestPathToStreamRoot, - - toBackendStreamPath(_serviceId, streamPath) { - return streamPath.replace(/^\/+/, ``) - }, - - toRuntimeStreamPath(_serviceId, streamPath) { - return streamPath.replace(/^\/+/, ``) - }, } export const pathPrefixedSingleTenantDurableStreamsRoutingAdapter = diff --git a/packages/agents-server/src/routing/internal-router.ts b/packages/agents-server/src/routing/internal-router.ts index caa5067ae3..dc3a7ade1c 100644 --- a/packages/agents-server/src/routing/internal-router.ts +++ b/packages/agents-server/src/routing/internal-router.ts @@ -2,6 +2,7 @@ * Sub-router for /_electric/* control-plane routes. */ +import { createHmac, timingSafeEqual } from 'node:crypto' import { appendPathToUrl } from '@electric-ax/agents-runtime' import { Type, type Static } from '@sinclair/typebox' import { and, eq } from 'drizzle-orm' @@ -16,12 +17,12 @@ import { ErrCodeCallbackNotFound, ErrCodeForkInProgress, ErrCodeSubscriptionNotFound, + ErrCodeUnauthorized, } from '../electric-agents-types.js' import { ATTR, tracer } from '../tracing.js' import { decodeJsonObject } from '../utils/server-utils.js' import { serverLog } from '../utils/log.js' import { cronRouter } from './cron-router.js' -import { resolveDurableStreamsRoutingAdapter } from './durable-streams-routing-adapter.js' import { electricProxyRouter } from './electric-proxy-router.js' import { entitiesRouter } from './entities-router.js' import { entityTypesRouter } from './entity-types-router.js' @@ -31,7 +32,6 @@ import { routeBody, validateOptionalJsonBody, withSchema } from './schema.js' import { withLeadingSlash } from './tenant-stream-paths.js' import type { IRequest, RouterType } from 'itty-router' import type { TenantContext } from './context.js' -import type { DurableStreamsRoutingAdapter } from './durable-streams-routing-adapter.js' const wakeRegistrationBodySchema = Type.Object({ subscriberUrl: Type.String(), @@ -93,6 +93,7 @@ type WebhookForwardBody = Static type CallbackForwardBody = Static const DS_SUBSCRIPTION_CALLBACK_PREFIX = `ds-subscription:` +const WEBHOOK_SIGNATURE_TOLERANCE_MS = 5 * 60_000 export type InternalRoutes = RouterType< IRequest, @@ -167,6 +168,33 @@ function claimTokenFromRequest(request: IRequest): string | undefined { ) } +function verifyWebhookSignature( + secret: string | null | undefined, + body: Uint8Array, + header: string | null +): boolean { + if (!secret || !header) return false + + const match = header.match(/^t=(\d+),sha256=([0-9a-f]{64})$/) + if (!match) return false + + const timestampSeconds = Number(match[1]) + if (!Number.isSafeInteger(timestampSeconds)) return false + + const timestampMs = timestampSeconds * 1000 + if (Math.abs(Date.now() - timestampMs) > WEBHOOK_SIGNATURE_TOLERANCE_MS) { + return false + } + + const hmac = createHmac(`sha256`, secret) + hmac.update(`${timestampSeconds}.`) + hmac.update(body) + const expected = Buffer.from(hmac.digest(`hex`), `hex`) + const actual = Buffer.from(match[2], `hex`) + + return expected.length === actual.length && timingSafeEqual(expected, actual) +} + function newWebhookPayload(body: WebhookForwardBody | undefined): { wakeId: string generation: number @@ -220,14 +248,6 @@ function newWebhookPayload(body: WebhookForwardBody | undefined): { } } -function toRuntimeStreamPath( - path: string, - service: string, - routingAdapter: DurableStreamsRoutingAdapter -): string { - return withLeadingSlash(routingAdapter.toRuntimeStreamPath(service, path)) -} - async function registerWake( request: IRequest, ctx: TenantContext @@ -249,39 +269,56 @@ async function webhookForward( subscriptionId ) - const lookupPromise: Promise = tracer.startActiveSpan( - `db.lookupSubscription`, - async (span) => { - try { - const rows = await ctx.pgDb - .select() - .from(subscriptionWebhooks) - .where( - and( - eq(subscriptionWebhooks.tenantId, ctx.service), - eq(subscriptionWebhooks.subscriptionId, subscriptionId) - ) + const lookupPromise: Promise<{ + webhookUrl: string + webhookSecret: string | null + } | null> = tracer.startActiveSpan(`db.lookupSubscription`, async (span) => { + try { + const rows = await ctx.pgDb + .select() + .from(subscriptionWebhooks) + .where( + and( + eq(subscriptionWebhooks.tenantId, ctx.service), + eq(subscriptionWebhooks.subscriptionId, subscriptionId) ) - .limit(1) - return rows[0]?.webhookUrl ?? null - } finally { - span.end() - } + ) + .limit(1) + const row = rows[0] + return row + ? { + webhookUrl: row.webhookUrl, + webhookSecret: row.webhookSecret ?? null, + } + : null + } finally { + span.end() } - ) + }) - const [targetWebhookUrl, body] = await Promise.all([ + const [target, body] = await Promise.all([ lookupPromise, readRequestBody(request as Request), ]) - if (!targetWebhookUrl) { + if (!target) { return apiError( 404, ErrCodeSubscriptionNotFound, `Unknown webhook subscription` ) } + + if ( + !verifyWebhookSignature( + target.webhookSecret, + body, + request.headers.get(`webhook-signature`) + ) + ) { + return apiError(401, ErrCodeUnauthorized, `Invalid webhook signature`) + } + const parsedBodyResult = validateOptionalJsonBody( webhookForwardBodySchema, body, @@ -293,10 +330,6 @@ async function webhookForward( let runningEntityUrl: string | null = null const parsedBody = parsedBodyResult.value as WebhookForwardBody | undefined const newWebhook = newWebhookPayload(parsedBody) - const routingAdapter = resolveDurableStreamsRoutingAdapter( - ctx.durableStreamsRouting, - ctx.durableStreamsUrl - ) if (parsedBody) { const rawPrimaryStream = @@ -307,7 +340,7 @@ async function webhookForward( null const primaryStream = typeof rawPrimaryStream === `string` - ? toRuntimeStreamPath(rawPrimaryStream, ctx.service, routingAdapter) + ? withLeadingSlash(rawPrimaryStream) : null const consumerId = newWebhook?.wakeId ?? @@ -445,9 +478,9 @@ async function webhookForward( upstream = await tracer.startActiveSpan( `fetch.agent-handler`, async (span) => { - span.setAttribute(`http.url`, targetWebhookUrl) + span.setAttribute(`http.url`, target.webhookUrl) try { - return await fetch(targetWebhookUrl, { + return await fetch(target.webhookUrl, { method: request.method, headers, body: bodyFromBytes(forwardBody), @@ -534,15 +567,7 @@ async function callbackForward( return json(responseBody) } - const upstreamBody = encodeCallbackForwardBody( - ctx.service, - consumerId, - requestBody, - resolveDurableStreamsRoutingAdapter( - ctx.durableStreamsRouting, - ctx.durableStreamsUrl - ) - ) + const upstreamBody = encodeCallbackForwardBody(consumerId, requestBody) let upstream: Response try { @@ -566,6 +591,11 @@ async function callbackForward( ) upstream = json(result) } else { + const token = claimTokenFromRequest(request) + if (token) { + headers.set(`authorization`, `Bearer ${token}`) + headers.delete(`electric-claim-token`) + } upstream = await fetch(target.callbackUrl, { method: request.method, headers, @@ -697,13 +727,11 @@ async function mintClaimWriteToken( } function encodeCallbackForwardBody( - service: string, consumerId: string, - body: CallbackForwardBody | undefined, - routingAdapter: DurableStreamsRoutingAdapter + body: CallbackForwardBody | undefined ): Uint8Array { const payload = encodeCallbackForwardPayload(consumerId, body, (stream) => - routingAdapter.toBackendStreamPath(service, stream) + stream.replace(/^\/+/, ``) ) return new TextEncoder().encode(JSON.stringify(payload)) } diff --git a/packages/agents-server/src/routing/tenant-stream-paths.ts b/packages/agents-server/src/routing/tenant-stream-paths.ts index e0338951c7..a5898e5c18 100644 --- a/packages/agents-server/src/routing/tenant-stream-paths.ts +++ b/packages/agents-server/src/routing/tenant-stream-paths.ts @@ -1,26 +1,3 @@ -export function withoutLeadingSlash(path: string): string { - return path.replace(/^\/+/, ``) -} - export function withLeadingSlash(path: string): string { return path.startsWith(`/`) ? path : `/${path}` } - -export function prefixTenantStreamPath(path: string, tenantId: string): string { - const normalized = withoutLeadingSlash(path) - if (!normalized || normalized === tenantId) return tenantId - if (normalized.startsWith(`${tenantId}/`)) return normalized - return `${tenantId}/${normalized}` -} - -export function stripTenantStreamPrefix( - path: string, - tenantId: string -): string { - const normalized = withoutLeadingSlash(path) - if (normalized === tenantId) return `` - if (normalized.startsWith(`${tenantId}/`)) { - return normalized.slice(tenantId.length + 1) - } - return normalized -} diff --git a/packages/agents-server/src/utils/server-utils.ts b/packages/agents-server/src/utils/server-utils.ts index d5df79927e..f618589d3a 100644 --- a/packages/agents-server/src/utils/server-utils.ts +++ b/packages/agents-server/src/utils/server-utils.ts @@ -174,8 +174,8 @@ export async function forwardFetchRequest(options: { headers: Headers } durableStreamsUrl: string - durableStreamsRouting?: DurableStreamsRoutingAdapter serviceId: string + durableStreamsRouting?: DurableStreamsRoutingAdapter body?: Uint8Array dispatcher?: Agent route?: `stream` | `control` diff --git a/packages/agents-server/test/dispatch-policy-routing.test.ts b/packages/agents-server/test/dispatch-policy-routing.test.ts index 3352e85fbc..99cf4e5531 100644 --- a/packages/agents-server/test/dispatch-policy-routing.test.ts +++ b/packages/agents-server/test/dispatch-policy-routing.test.ts @@ -314,7 +314,7 @@ describe(`dispatch policy routing`, () => { entity(dispatchPolicy) ) ;(ctx.streamClient.getSubscription as any).mockResolvedValue({ - streams: [{ path: `tenant-test/chat/one/main` }], + streams: [{ path: `chat/one/main` }], }) ctx.entityManager.send = vi.fn(async () => undefined) @@ -353,6 +353,33 @@ describe(`dispatch policy routing`, () => { ) }) + it(`re-adds runner streams when an old subscription contains a service-prefixed path`, async () => { + const dispatchPolicy: DispatchPolicy = { + targets: [{ type: `runner`, runnerId: `runner-1` }], + } + const ctx = buildContext() + ;(ctx.entityManager.registry.getEntity as any).mockResolvedValue( + entity(dispatchPolicy) + ) + ;(ctx.streamClient.getSubscription as any).mockResolvedValue({ + streams: [{ path: `tenant-test/chat/one/main` }], + }) + ctx.entityManager.send = vi.fn(async () => undefined) + + const response = await globalRouter.fetch( + request(`POST`, `/_electric/entities/chat/one/send`, { + payload: `hello`, + }), + ctx + ) + + expect(response.status).toBe(204) + expect(ctx.streamClient.addSubscriptionStreams).toHaveBeenCalledWith( + expect.stringMatching(/^runner:runner-1:/), + [`/chat/one/main`] + ) + }) + it(`treats runner subscription create conflicts as an idempotent spawn link`, async () => { const dispatchPolicy: DispatchPolicy = { targets: [{ type: `runner`, runnerId: `runner-1` }], @@ -361,7 +388,7 @@ describe(`dispatch policy routing`, () => { ;(ctx.streamClient.getSubscription as any) .mockResolvedValueOnce(null) .mockResolvedValueOnce({ - streams: [{ path: `tenant-test/chat/one/main` }], + streams: [{ path: `chat/one/main` }], }) ;(ctx.streamClient.putSubscription as any).mockRejectedValueOnce( new DurableStreamsSubscriptionError( diff --git a/packages/agents-server/test/electric-agents-routes.test.ts b/packages/agents-server/test/electric-agents-routes.test.ts index 8153b578f4..25ae070e4a 100644 --- a/packages/agents-server/test/electric-agents-routes.test.ts +++ b/packages/agents-server/test/electric-agents-routes.test.ts @@ -68,12 +68,12 @@ function fakeInsertDb() { } } -const serviceRoutedTestAdapter: DurableStreamsRoutingAdapter = { +const customRootTestAdapter: DurableStreamsRoutingAdapter = { streamUrl(input) { const incomingUrl = new URL(input.requestUrl, `http://localhost`) const path = incomingUrl.pathname.replace(/^\/+/, ``) const target = new URL( - `/v1/streams/${input.serviceId}/${path}`, + `/custom-stream-root/${path}`, input.durableStreamsUrl ) target.search = incomingUrl.search @@ -82,18 +82,12 @@ const serviceRoutedTestAdapter: DurableStreamsRoutingAdapter = { controlUrl(input) { const incomingUrl = new URL(input.requestUrl, `http://localhost`) const target = new URL( - `/v1/streams/${input.serviceId}${incomingUrl.pathname}`, + `/custom-stream-root${incomingUrl.pathname}`, input.durableStreamsUrl ) target.search = incomingUrl.search return target }, - toBackendStreamPath(_serviceId, streamPath) { - return streamPath.replace(/^\/+/, ``) - }, - toRuntimeStreamPath(_serviceId, streamPath) { - return streamPath.replace(/^\/+/, ``) - }, } describe(`ElectricAgentsRoutes schedule endpoints`, () => { @@ -438,9 +432,12 @@ describe(`ElectricAgentsRoutes shared-state streams`, () => { }) it(`rewrites webhook subscription targets and keeps the original target locally`, async () => { - const fetchSpy = vi - .spyOn(globalThis, `fetch`) - .mockResolvedValue(new Response(null, { status: 201 })) + const fetchSpy = vi.spyOn(globalThis, `fetch`).mockResolvedValue( + new Response(JSON.stringify({ webhook_secret: `whsec_route_test` }), { + status: 201, + headers: { 'content-type': `application/json` }, + }) + ) const db = fakeInsertDb() try { @@ -451,9 +448,9 @@ describe(`ElectricAgentsRoutes shared-state streams`, () => { webhook: { url: `http://localhost:4448/runtime-webhook` }, }), { - service: `tenant-a`, + service: `svc-agent-1`, publicUrl: `http://agents.local`, - durableStreamsUrl: `http://durable.local/v1/stream/tenant-a`, + durableStreamsUrl: `http://durable.local/v1/stream/svc-agent-1`, pgDb: db.db, isShuttingDown: () => false, } as unknown as TenantContext @@ -462,7 +459,7 @@ describe(`ElectricAgentsRoutes shared-state streams`, () => { expect(result.status).toBe(201) const [url, init] = fetchSpy.mock.calls[0]! expect(String(url)).toBe( - `http://durable.local/v1/stream/tenant-a/__ds/subscriptions/horton-handler` + `http://durable.local/v1/stream/svc-agent-1/__ds/subscriptions/horton-handler` ) expect(JSON.parse(requestBodyText(init?.body))).toEqual({ type: `webhook`, @@ -472,16 +469,17 @@ describe(`ElectricAgentsRoutes shared-state streams`, () => { }, }) expect(db.values).toHaveBeenCalledWith({ - tenantId: `tenant-a`, + tenantId: `svc-agent-1`, subscriptionId: `horton-handler`, webhookUrl: `http://localhost:4448/runtime-webhook`, + webhookSecret: `whsec_route_test`, }) } finally { fetchSpy.mockRestore() } }) - it(`lets a routing adapter own service-routed subscription URLs and stream names`, async () => { + it(`lets a routing adapter own custom subscription URLs without changing stream names`, async () => { const fetchSpy = vi.spyOn(globalThis, `fetch`).mockResolvedValue( new Response( JSON.stringify({ @@ -512,7 +510,7 @@ describe(`ElectricAgentsRoutes shared-state streams`, () => { service: `tenant-a`, publicUrl: `http://agents.local`, durableStreamsUrl: `http://durable.local`, - durableStreamsRouting: serviceRoutedTestAdapter, + durableStreamsRouting: customRootTestAdapter, pgDb: db.db, isShuttingDown: () => false, } as unknown as TenantContext @@ -521,7 +519,7 @@ describe(`ElectricAgentsRoutes shared-state streams`, () => { expect(result.status).toBe(201) const [url, init] = fetchSpy.mock.calls[0]! expect(String(url)).toBe( - `http://durable.local/v1/streams/tenant-a/__ds/subscriptions/horton-handler` + `http://durable.local/custom-stream-root/__ds/subscriptions/horton-handler` ) expect(JSON.parse(requestBodyText(init?.body))).toMatchObject({ pattern: `horton/**`, @@ -564,15 +562,19 @@ describe(`ElectricAgentsRoutes shared-state streams`, () => { webhook: { url: `http://localhost:4448/runtime-webhook` }, }), { - service: `tenant-a`, + service: `svc-agent-1`, publicUrl: `http://agents.local`, - durableStreamsUrl: `http://durable.local/v1/stream/tenant-a`, + durableStreamsUrl: `http://durable.local/v1/stream/svc-agent-1`, pgDb: db.db, isShuttingDown: () => false, } as unknown as TenantContext ) expect(result.status).toBe(201) + const [url] = fetchSpy.mock.calls[0]! + expect(String(url)).toBe( + `http://durable.local/v1/stream/svc-agent-1/__ds/subscriptions/horton-handler` + ) const [, init] = fetchSpy.mock.calls[0]! expect(JSON.parse(requestBodyText(init?.body))).toMatchObject({ pattern: `horton/**`, diff --git a/packages/agents-server/test/entity-projector.test.ts b/packages/agents-server/test/entity-projector.test.ts index fbb6d21c1c..fba0fc44f2 100644 --- a/packages/agents-server/test/entity-projector.test.ts +++ b/packages/agents-server/test/entity-projector.test.ts @@ -99,7 +99,7 @@ describe(`EntityProjector`, () => { it(`projects all tenants from one shared entities shape into tenant streams`, async () => { const streamClient = { - baseUrl: `https://streams.test/v1/stream/svc-a`, + baseUrl: `https://streams.test/v1/stream`, exists: vi.fn().mockResolvedValue(false), create: vi.fn().mockResolvedValue(undefined), readJson: vi.fn().mockResolvedValue([]), @@ -161,7 +161,7 @@ describe(`EntityProjector`, () => { contentType: `application/json`, }) expect(mockState.producerStreams[0]!.url).toBe( - `https://streams.test/v1/stream/svc-a${result.streamUrl}` + `https://streams.test/v1/stream${result.streamUrl}` ) expect(mockState.producerAppends).toHaveLength(1) diff --git a/packages/agents-server/test/horton-pull-wake-e2e.test.ts b/packages/agents-server/test/horton-pull-wake-e2e.test.ts index c7bc259d92..999ad978d8 100644 --- a/packages/agents-server/test/horton-pull-wake-e2e.test.ts +++ b/packages/agents-server/test/horton-pull-wake-e2e.test.ts @@ -82,14 +82,6 @@ function subscriptionUrl( subscriptionId: string ): string { const url = new URL(streamBaseUrl) - const match = /^(.*)\/v1\/stream\/([^/]+)\/?$/.exec(url.pathname) - if (match) { - const [, prefix = ``, serviceId] = match - url.pathname = `${prefix}/v1/stream/__ds/subscriptions/${encodeURIComponent(subscriptionId)}` - url.searchParams.set(`service`, decodeURIComponent(serviceId!)) - return url.toString() - } - url.pathname = `${url.pathname.replace(/\/+$/, ``)}/__ds/subscriptions/${encodeURIComponent(subscriptionId)}` return url.toString() } diff --git a/packages/agents-server/test/host.test.ts b/packages/agents-server/test/host.test.ts index 3ac40d548a..8b1188d9d9 100644 --- a/packages/agents-server/test/host.test.ts +++ b/packages/agents-server/test/host.test.ts @@ -53,22 +53,18 @@ describe(`AgentsHost`, () => { const runtime = await host.registerTenant({ serviceId: `svc-coastal-stork`, - durableStreamsUrl: `https://streams.test/v1/streams/svc-coastal-stork`, + durableStreamsUrl: `https://streams.test/v1/stream`, }) expect(runtime.serviceId).toBe(`svc-coastal-stork`) expect(host.getTenant(`svc-coastal-stork`)).toBe(runtime) - expect(runtime.streamClient.baseUrl).toBe( - `https://streams.test/v1/streams/svc-coastal-stork` - ) + expect(runtime.streamClient.baseUrl).toBe(`https://streams.test/v1/stream`) expect(runtime.wakeRegistry).toBe(host.wakeRegistry) expect(runtime.manager.registry.tenantId).toBe(`svc-coastal-stork`) }) it(`uses an explicitly supplied tenant stream client`, async () => { - const streamClient = new StreamClient( - `https://streams.test/v1/streams/svc-direct-client` - ) + const streamClient = new StreamClient(`https://streams.test/v1/stream`) const host = new AgentsHost({ db: createMockDb(), pgClient: vi.fn() as any, @@ -90,7 +86,7 @@ describe(`AgentsHost`, () => { const runtime = await host.registerTenant({ serviceId: `svc-before-start`, - durableStreamsUrl: `https://streams.test/v1/streams/svc-before-start`, + durableStreamsUrl: `https://streams.test/v1/stream`, }) const rehydrate = vi .spyOn(runtime, `rehydrateCronSchedules`) @@ -140,7 +136,7 @@ describe(`AgentsHost`, () => { const registration = host.registerTenant({ serviceId: `svc-race`, - durableStreamsUrl: `https://streams.test/v1/streams/svc-race`, + durableStreamsUrl: `https://streams.test/v1/stream`, }) await Promise.resolve() diff --git a/packages/agents-server/test/oss-server-router.test.ts b/packages/agents-server/test/oss-server-router.test.ts index 84dcad49bf..a95410e077 100644 --- a/packages/agents-server/test/oss-server-router.test.ts +++ b/packages/agents-server/test/oss-server-router.test.ts @@ -20,7 +20,7 @@ function buildTenantContext( url: `/principal/system:framework`, }, publicUrl: `http://server`, - durableStreamsUrl: `http://durable.local/v1/stream/tenant-test`, + durableStreamsUrl: `http://durable.local/v1/stream`, durableStreamsDispatcher: undefined as any, pgDb: undefined as any, entityManager: undefined as any, @@ -60,7 +60,7 @@ describe(`OSS server routing wrapper`, () => { expect(response.headers.get(`location`)).toBeNull() expect(fetchSpy).toHaveBeenCalledOnce() expect(String(fetchSpy.mock.calls[0]![0])).toBe( - `http://durable.local/v1/stream/tenant-test` + `http://durable.local/v1/stream` ) } finally { fetchSpy.mockRestore() diff --git a/packages/agents-server/test/stream-client.test.ts b/packages/agents-server/test/stream-client.test.ts index 6daa4aa173..17d8a90e33 100644 --- a/packages/agents-server/test/stream-client.test.ts +++ b/packages/agents-server/test/stream-client.test.ts @@ -131,14 +131,14 @@ describe(`StreamClient`, () => { } }) - it(`does not tenant-prefix subscription streams for tenant-root URLs`, async () => { + it(`keeps subscription streams relative to the configured base URL`, async () => { const fetchMock = vi.spyOn(globalThis, `fetch`).mockResolvedValueOnce( new Response(JSON.stringify({ subscription_id: `sub-1` }), { headers: { 'content-type': `application/json` }, }) ) const client = new StreamClient( - `https://streams.test/v1/streams/svc-tenant-a` + `https://streams.test/v1/stream/svc-agent-1` ) try { @@ -149,7 +149,7 @@ describe(`StreamClient`, () => { }) expect(fetchMock).toHaveBeenCalledWith( - `https://streams.test/v1/streams/svc-tenant-a/__ds/subscriptions/sub-1`, + `https://streams.test/v1/stream/svc-agent-1/__ds/subscriptions/sub-1`, expect.objectContaining({ method: `PUT` }) ) const [, init] = fetchMock.mock.calls[0]! @@ -168,10 +168,9 @@ describe(`StreamClient`, () => { headers: { 'content-type': `application/json` }, }) ) - const client = new StreamClient( - `http://127.0.0.1:4545/v1/stream/tenant-a`, - { bearer: `service-token` } - ) + const client = new StreamClient(`http://127.0.0.1:4545/v1/stream`, { + bearer: `service-token`, + }) try { await client.putSubscription(`sub-1`, { @@ -202,10 +201,9 @@ describe(`StreamClient`, () => { }) ) let token = 0 - const client = new StreamClient( - `http://127.0.0.1:4545/v1/stream/tenant-a`, - { bearer: () => `service-token-${++token}` } - ) + const client = new StreamClient(`http://127.0.0.1:4545/v1/stream`, { + bearer: () => `service-token-${++token}`, + }) try { await client.getSubscription(`sub-1`) @@ -228,10 +226,9 @@ describe(`StreamClient`, () => { headers: { 'content-type': `application/json` }, }) ) - const client = new StreamClient( - `http://127.0.0.1:4545/v1/stream/tenant-a`, - { bearer: `service-token` } - ) + const client = new StreamClient(`http://127.0.0.1:4545/v1/stream`, { + bearer: `service-token`, + }) try { await client.ackSubscription(`sub-1`, `claim-token`, { diff --git a/packages/agents-server/test/webhook-forward-routing.test.ts b/packages/agents-server/test/webhook-forward-routing.test.ts index 409021d32e..b592d7d51c 100644 --- a/packages/agents-server/test/webhook-forward-routing.test.ts +++ b/packages/agents-server/test/webhook-forward-routing.test.ts @@ -1,9 +1,12 @@ +import { createHmac } from 'node:crypto' import { describe, expect, it, vi } from 'vitest' import { ClaimWriteTokenStore } from '../src/claim-write-token-store' import { globalRouter } from '../src/routing/global-router' import type { TenantContext } from '../src/routing/context' import type { DurableStreamsRoutingAdapter } from '../src/routing/durable-streams-routing-adapter' +const TEST_WEBHOOK_SECRET = `whsec_test` + function request(method: string, path: string, body?: unknown): Request { return new Request(`http://agents.local${path}`, { method, @@ -13,6 +16,22 @@ function request(method: string, path: string, body?: unknown): Request { }) } +function signedWebhookForwardRequest(path: string, body: unknown): Request { + const rawBody = JSON.stringify(body) + const timestamp = Math.floor(Date.now() / 1000) + const signature = createHmac(`sha256`, TEST_WEBHOOK_SECRET) + .update(`${timestamp}.${rawBody}`) + .digest(`hex`) + return new Request(`http://agents.local${path}`, { + method: `POST`, + headers: { + 'content-type': `application/json`, + 'webhook-signature': `t=${timestamp},sha256=${signature}`, + }, + body: rawBody, + }) +} + function responseJson(response: Response): Promise { return response.json() } @@ -42,12 +61,12 @@ function insertDb() { return { insert, values, onConflictDoUpdate } } -const serviceRoutedTestAdapter: DurableStreamsRoutingAdapter = { +const customRootTestAdapter: DurableStreamsRoutingAdapter = { streamUrl(input) { const incomingUrl = new URL(input.requestUrl, `http://localhost`) const path = incomingUrl.pathname.replace(/^\/+/, ``) const target = new URL( - `/v1/streams/${input.serviceId}/${path}`, + `/custom-stream-root/${path}`, input.durableStreamsUrl ) target.search = incomingUrl.search @@ -56,18 +75,12 @@ const serviceRoutedTestAdapter: DurableStreamsRoutingAdapter = { controlUrl(input) { const incomingUrl = new URL(input.requestUrl, `http://localhost`) const target = new URL( - `/v1/streams/${input.serviceId}${incomingUrl.pathname}`, + `/custom-stream-root${incomingUrl.pathname}`, input.durableStreamsUrl ) target.search = incomingUrl.search return target }, - toBackendStreamPath(_serviceId, streamPath) { - return streamPath.replace(/^\/+/, ``) - }, - toRuntimeStreamPath(_serviceId, streamPath) { - return streamPath.replace(/^\/+/, ``) - }, } function buildContext(overrides: Partial = {}): TenantContext { @@ -93,7 +106,7 @@ function buildContext(overrides: Partial = {}): TenantContext { url: `/principal/system:framework`, }, publicUrl: `http://agents.local`, - durableStreamsUrl: `http://durable.local/v1/stream/tenant-a`, + durableStreamsUrl: `http://durable.local/v1/stream`, durableStreamsDispatcher: undefined as any, pgDb: undefined as any, entityManager: { @@ -126,9 +139,43 @@ function buildContext(overrides: Partial = {}): TenantContext { } describe(`webhook forwarding for Durable Streams subscriptions`, () => { + it(`rejects unsigned webhook-forward deliveries`, async () => { + const select = selectDb([ + { + webhookUrl: `http://runtime.local/_electric/builtin-agent-handler`, + webhookSecret: TEST_WEBHOOK_SECRET, + }, + ]) + const fetchSpy = vi.spyOn(globalThis, `fetch`) + + try { + const response = await globalRouter.fetch( + request(`POST`, `/_electric/webhook-forward/horton-handler`, { + subscription_id: `horton-handler`, + wake_id: `wake-unsigned`, + generation: 1, + streams: [], + callback_url: `http://durable.local/v1/stream/__ds/subscriptions/horton-handler/callback`, + callback_token: `callback-token`, + }), + buildContext({ + pgDb: { select: select.select } as any, + }) + ) + + expect(response.status).toBe(401) + expect(fetchSpy).not.toHaveBeenCalled() + } finally { + fetchSpy.mockRestore() + } + }) + it(`adapts the new webhook wake payload into the runtime wake payload`, async () => { const select = selectDb([ - { webhookUrl: `http://runtime.local/_electric/builtin-agent-handler` }, + { + webhookUrl: `http://runtime.local/_electric/builtin-agent-handler`, + webhookSecret: TEST_WEBHOOK_SECRET, + }, ]) const insert = insertDb() const fetchSpy = vi.spyOn(globalThis, `fetch`).mockResolvedValue( @@ -139,21 +186,24 @@ describe(`webhook forwarding for Durable Streams subscriptions`, () => { try { const response = await globalRouter.fetch( - request(`POST`, `/_electric/webhook-forward/horton-handler`, { - subscription_id: `horton-handler`, - wake_id: `wake-1`, - generation: 7, - streams: [ - { - path: `horton/demo/main`, - acked_offset: `0`, - tail_offset: `1`, - has_pending: true, - }, - ], - callback_url: `http://durable.local/v1/stream/tenant-a/__ds/subscriptions/horton-handler/callback`, - callback_token: `callback-token`, - }), + signedWebhookForwardRequest( + `/_electric/webhook-forward/horton-handler`, + { + subscription_id: `horton-handler`, + wake_id: `wake-1`, + generation: 7, + streams: [ + { + path: `horton/demo/main`, + acked_offset: `0`, + tail_offset: `1`, + has_pending: true, + }, + ], + callback_url: `http://durable.local/v1/stream/__ds/subscriptions/horton-handler/callback`, + callback_token: `callback-token`, + } + ), buildContext({ pgDb: { select: select.select, insert: insert.insert } as any, }) @@ -180,7 +230,7 @@ describe(`webhook forwarding for Durable Streams subscriptions`, () => { expect(insert.values).toHaveBeenCalledWith({ tenantId: `tenant-a`, consumerId: `wake-1`, - callbackUrl: `http://durable.local/v1/stream/tenant-a/__ds/subscriptions/horton-handler/callback`, + callbackUrl: `http://durable.local/v1/stream/__ds/subscriptions/horton-handler/callback`, primaryStream: `/horton/demo/main`, }) } finally { @@ -190,7 +240,10 @@ describe(`webhook forwarding for Durable Streams subscriptions`, () => { it(`routes new webhook wakes to the pending stream when DS includes stale streams first`, async () => { const select = selectDb([ - { webhookUrl: `http://runtime.local/_electric/builtin-agent-handler` }, + { + webhookUrl: `http://runtime.local/_electric/builtin-agent-handler`, + webhookSecret: TEST_WEBHOOK_SECRET, + }, ]) const insert = insertDb() const fetchSpy = vi.spyOn(globalThis, `fetch`).mockResolvedValue( @@ -216,27 +269,30 @@ describe(`webhook forwarding for Durable Streams subscriptions`, () => { try { const response = await globalRouter.fetch( - request(`POST`, `/_electric/webhook-forward/horton-handler`, { - subscription_id: `horton-handler`, - wake_id: `wake-2`, - generation: 8, - streams: [ - { - path: `horton/old/main`, - acked_offset: `10`, - tail_offset: `10`, - has_pending: false, - }, - { - path: `horton/pending/main`, - acked_offset: `0`, - tail_offset: `1`, - has_pending: true, - }, - ], - callback_url: `http://durable.local/v1/stream/tenant-a/__ds/subscriptions/horton-handler/callback`, - callback_token: `callback-token`, - }), + signedWebhookForwardRequest( + `/_electric/webhook-forward/horton-handler`, + { + subscription_id: `horton-handler`, + wake_id: `wake-2`, + generation: 8, + streams: [ + { + path: `horton/old/main`, + acked_offset: `10`, + tail_offset: `10`, + has_pending: false, + }, + { + path: `horton/pending/main`, + acked_offset: `0`, + tail_offset: `1`, + has_pending: true, + }, + ], + callback_url: `http://durable.local/v1/stream/__ds/subscriptions/horton-handler/callback`, + callback_token: `callback-token`, + } + ), buildContext({ pgDb: { select: select.select, insert: insert.insert } as any, entityManager: { @@ -274,7 +330,7 @@ describe(`webhook forwarding for Durable Streams subscriptions`, () => { expect(insert.values).toHaveBeenCalledWith({ tenantId: `tenant-a`, consumerId: `wake-2`, - callbackUrl: `http://durable.local/v1/stream/tenant-a/__ds/subscriptions/horton-handler/callback`, + callbackUrl: `http://durable.local/v1/stream/__ds/subscriptions/horton-handler/callback`, primaryStream: `/horton/pending/main`, }) } finally { @@ -284,7 +340,10 @@ describe(`webhook forwarding for Durable Streams subscriptions`, () => { it(`keeps root-relative DS wake stream paths before forwarding to the runtime`, async () => { const select = selectDb([ - { webhookUrl: `http://runtime.local/_electric/builtin-agent-handler` }, + { + webhookUrl: `http://runtime.local/_electric/builtin-agent-handler`, + webhookSecret: TEST_WEBHOOK_SECRET, + }, ]) const insert = insertDb() const getEntityByStream = vi.fn().mockResolvedValue({ @@ -307,21 +366,24 @@ describe(`webhook forwarding for Durable Streams subscriptions`, () => { try { const response = await globalRouter.fetch( - request(`POST`, `/_electric/webhook-forward/horton-handler`, { - subscription_id: `horton-handler`, - wake_id: `wake-prefixed`, - generation: 9, - streams: [ - { - path: `horton/demo/main`, - acked_offset: `0`, - tail_offset: `1`, - has_pending: true, - }, - ], - callback_url: `http://durable.local/v1/stream/tenant-a/__ds/subscriptions/horton-handler/callback`, - callback_token: `callback-token`, - }), + signedWebhookForwardRequest( + `/_electric/webhook-forward/horton-handler`, + { + subscription_id: `horton-handler`, + wake_id: `wake-prefixed`, + generation: 9, + streams: [ + { + path: `horton/demo/main`, + acked_offset: `0`, + tail_offset: `1`, + has_pending: true, + }, + ], + callback_url: `http://durable.local/v1/stream/__ds/subscriptions/horton-handler/callback`, + callback_token: `callback-token`, + } + ), buildContext({ pgDb: { select: select.select, insert: insert.insert } as any, entityManager: { @@ -359,7 +421,7 @@ describe(`webhook forwarding for Durable Streams subscriptions`, () => { expect(insert.values).toHaveBeenCalledWith({ tenantId: `tenant-a`, consumerId: `wake-prefixed`, - callbackUrl: `http://durable.local/v1/stream/tenant-a/__ds/subscriptions/horton-handler/callback`, + callbackUrl: `http://durable.local/v1/stream/__ds/subscriptions/horton-handler/callback`, primaryStream: `/horton/demo/main`, }) } finally { @@ -370,7 +432,7 @@ describe(`webhook forwarding for Durable Streams subscriptions`, () => { it(`claims new webhook wakes locally and returns a tenant-scoped claim write token`, async () => { const select = selectDb([ { - callbackUrl: `http://durable.local/v1/stream/tenant-a/__ds/subscriptions/horton-handler/callback`, + callbackUrl: `http://durable.local/v1/stream/__ds/subscriptions/horton-handler/callback`, primaryStream: `/horton/demo/main`, }, ]) @@ -409,7 +471,10 @@ describe(`webhook forwarding for Durable Streams subscriptions`, () => { it(`auto-acks webhook wakes for stopped entities`, async () => { const select = selectDb([ - { webhookUrl: `http://runtime.local/_electric/builtin-agent-handler` }, + { + webhookUrl: `http://runtime.local/_electric/builtin-agent-handler`, + webhookSecret: TEST_WEBHOOK_SECRET, + }, ]) const insert = insertDb() const stoppedEntity = { @@ -425,21 +490,24 @@ describe(`webhook forwarding for Durable Streams subscriptions`, () => { try { const response = await globalRouter.fetch( - request(`POST`, `/_electric/webhook-forward/horton-handler`, { - subscription_id: `horton-handler`, - wake_id: `wake-stopped`, - generation: 8, - streams: [ - { - path: `horton/demo/main`, - acked_offset: `1`, - tail_offset: `2`, - has_pending: true, - }, - ], - callback_url: `http://durable.local/v1/stream/tenant-a/__ds/subscriptions/horton-handler/callback`, - callback_token: `callback-token`, - }), + signedWebhookForwardRequest( + `/_electric/webhook-forward/horton-handler`, + { + subscription_id: `horton-handler`, + wake_id: `wake-stopped`, + generation: 8, + streams: [ + { + path: `horton/demo/main`, + acked_offset: `1`, + tail_offset: `2`, + has_pending: true, + }, + ], + callback_url: `http://durable.local/v1/stream/__ds/subscriptions/horton-handler/callback`, + callback_token: `callback-token`, + } + ), buildContext({ pgDb: { select: select.select, insert: insert.insert } as any, entityManager: { @@ -464,7 +532,7 @@ describe(`webhook forwarding for Durable Streams subscriptions`, () => { it(`translates runtime done callbacks to the new Durable Streams callback shape`, async () => { const select = selectDb([ { - callbackUrl: `http://durable.local/v1/stream/tenant-a/__ds/subscriptions/horton-handler/callback`, + callbackUrl: `http://durable.local/v1/stream/__ds/subscriptions/horton-handler/callback`, primaryStream: `/horton/demo/main`, }, ]) @@ -502,7 +570,7 @@ describe(`webhook forwarding for Durable Streams subscriptions`, () => { expect(response.status).toBe(200) const [url, init] = fetchSpy.mock.calls[0]! expect(String(url)).toBe( - `http://durable.local/v1/stream/tenant-a/__ds/subscriptions/horton-handler/callback` + `http://durable.local/v1/stream/__ds/subscriptions/horton-handler/callback` ) expect(init).toMatchObject({ method: `POST`, @@ -535,10 +603,122 @@ describe(`webhook forwarding for Durable Streams subscriptions`, () => { } }) - it(`lets a routing adapter own callback ack stream paths`, async () => { + it(`forwards electric claim tokens as Durable Streams callback authorization`, async () => { + const select = selectDb([ + { + callbackUrl: `http://durable.local/v1/stream/__ds/subscriptions/horton-handler/callback`, + primaryStream: `/horton/demo/main`, + }, + ]) + const fetchSpy = vi.spyOn(globalThis, `fetch`).mockResolvedValue( + new Response(JSON.stringify({ ok: true, next_wake: false }), { + headers: { 'content-type': `application/json` }, + }) + ) + + try { + const response = await globalRouter.fetch( + new Request(`http://agents.local/_electric/callback-forward/wake-1`, { + method: `POST`, + headers: { + 'content-type': `application/json`, + authorization: `Bearer tenant-token`, + 'electric-claim-token': `callback-token`, + }, + body: JSON.stringify({ + epoch: 7, + acks: [{ path: `/horton/demo/main`, offset: `1` }], + done: true, + }), + }), + buildContext({ + pgDb: { select: select.select } as any, + }) + ) + + expect(response.status).toBe(200) + const [, init] = fetchSpy.mock.calls[0]! + const headers = init?.headers as Headers + expect(headers.get(`authorization`)).toBe(`Bearer callback-token`) + expect(headers.get(`electric-claim-token`)).toBeNull() + expect(requestBodyJson(init?.body)).toEqual({ + wake_id: `wake-1`, + generation: 7, + acks: [{ stream: `horton/demo/main`, offset: `1` }], + done: true, + }) + } finally { + fetchSpy.mockRestore() + } + }) + + it(`acks Durable Streams subscription callbacks through the stream client`, async () => { + const select = selectDb([ + { + callbackUrl: `ds-subscription:horton-handler`, + primaryStream: `/horton/demo/main`, + }, + ]) + const fetchSpy = vi.spyOn(globalThis, `fetch`) + const ackSubscription = vi + .fn() + .mockResolvedValue({ ok: true, next_wake: false }) + const ctx = buildContext({ + pgDb: { select: select.select } as any, + streamClient: { ackSubscription } as any, + }) + const claimToken = ctx.runtime.claimWriteTokens.mint( + `tenant-a`, + `/horton/demo/main`, + `wake-1` + ) + + try { + const response = await globalRouter.fetch( + new Request(`http://agents.local/_electric/callback-forward/wake-1`, { + method: `POST`, + headers: { + 'content-type': `application/json`, + authorization: `Bearer tenant-token`, + 'electric-claim-token': `callback-token`, + }, + body: JSON.stringify({ + epoch: 7, + acks: [{ path: `/horton/demo/main`, offset: `1` }], + done: true, + }), + }), + ctx + ) + + expect(response.status).toBe(200) + expect(fetchSpy).not.toHaveBeenCalled() + expect(ackSubscription).toHaveBeenCalledWith( + `horton-handler`, + `callback-token`, + { + wake_id: `wake-1`, + generation: 7, + acks: [{ stream: `horton/demo/main`, offset: `1` }], + done: true, + } + ) + expect( + ctx.runtime.claimWriteTokens.isValid( + `tenant-a`, + `/horton/demo/main`, + claimToken + ) + ).toBe(false) + } finally { + fetchSpy.mockRestore() + } + }) + + it(`keeps callback ack stream paths independent of routing adapter URLs`, async () => { const select = selectDb([ { - callbackUrl: `http://durable.local/v1/stream/tenant-a/__ds/subscriptions/horton-handler/callback`, + callbackUrl: `http://durable.local/v1/stream/__ds/subscriptions/horton-handler/callback`, primaryStream: `/horton/demo/main`, }, ]) @@ -563,7 +743,7 @@ describe(`webhook forwarding for Durable Streams subscriptions`, () => { }), }), buildContext({ - durableStreamsRouting: serviceRoutedTestAdapter, + durableStreamsRouting: customRootTestAdapter, pgDb: { select: select.select } as any, }) )