Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/agents-server-stream-paths.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"@electric-ax/agents-server": patch
"@electric-ax/agents-runtime": patch
---

Keep Durable Streams paths service-agnostic in the OSS agents-server. Subscription payloads, webhook wake paths, callback ack paths, and routing adapters now treat stream names as relative to the configured Durable Streams base URL instead of applying service-id path transforms.

Persist and verify Durable Streams webhook signing secrets before forwarding webhook wakes through agents-server. Runtime handlers that use server auth headers now send Durable Streams claim tokens via `electric-claim-token`, preserving the configured server `Authorization` header for cloud callback routes.
6 changes: 6 additions & 0 deletions packages/agents-runtime/src/create-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,12 @@ export function createRuntimeRouter(
createElectricTools,
idleTimeout,
heartbeatInterval,
...(serverHeaders
? {
claimHeaders: serverHeaders,
claimTokenHeader: `electric-claim-token` as const,
}
: {}),
}
const getRegisteredType = (name: string) =>
registry ? registry.get(name) : getEntityType(name)
Expand Down
53 changes: 53 additions & 0 deletions packages/agents-runtime/test/create-handler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,59 @@ describe(`createRuntimeHandler`, () => {
expect(headers.get(`content-type`)).toBe(`application/json`)
})

it(`uses configured server headers for webhook wake callbacks`, async () => {
defineEntity(`test-agent`, { handler: async () => {} })
processWakeMock.mockResolvedValueOnce(null)

const notification = {
consumerId: `consumer-1`,
epoch: 1,
wakeId: `wake-1`,
streamPath: `/streams/entity:test-1`,
streams: [{ path: `/streams/entity:test-1`, offset: `0_0` }],
callback: `http://localhost:3000/_electric/callback-forward/wake-1`,
claimToken: `tok-1`,
entity: {
type: `test-agent`,
status: `active`,
url: `http://localhost:3000/test-agent/test-1`,
streams: {
main: `/streams/entity:test-1`,
error: `/streams/entity-error:test-1`,
},
},
}

const handler = createRuntimeHandler({
baseUrl: `http://localhost:3000`,
handlerUrl: `http://localhost:4000/electric-agents`,
serverHeaders: {
Authorization: `Bearer tenant-token`,
'X-Tenant': `tenant-a`,
},
})

const response = await handler.handleWebhookRequest(
new Request(`http://localhost/electric-agents`, {
method: `POST`,
headers: { 'content-type': `application/json` },
body: JSON.stringify(notification),
})
)

expect(response.status).toBe(200)
expect(processWakeMock).toHaveBeenCalledWith(
notification,
expect.objectContaining({
claimHeaders: {
Authorization: `Bearer tenant-token`,
'X-Tenant': `tenant-a`,
},
claimTokenHeader: `electric-claim-token`,
})
)
})

it(`registers custom state collections as output schemas`, async () => {
defineEntity(`stateful-agent`, {
state: {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE subscription_webhooks
ADD COLUMN webhook_secret text;
7 changes: 7 additions & 0 deletions packages/agents-server/drizzle/meta/_journal.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@
"when": 1778976000000,
"tag": "0008_runner_runtime_diagnostics",
"breakpoints": true
},
{
"idx": 9,
"version": "7",
"when": 1779207600000,
"tag": "0009_signed_webhook_forwarding",
"breakpoints": true
}
]
}
1 change: 1 addition & 0 deletions packages/agents-server/src/db/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ export const subscriptionWebhooks = pgTable(
tenantId: text(`tenant_id`).notNull().default(`default`),
subscriptionId: text(`subscription_id`).notNull(),
webhookUrl: text(`webhook_url`).notNull(),
webhookSecret: text(`webhook_secret`),
createdAt: timestamp(`created_at`, { withTimezone: true })
.notNull()
.defaultNow(),
Expand Down
28 changes: 17 additions & 11 deletions packages/agents-server/src/routing/dispatch-policy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ import {
ErrCodeUnauthorized,
} from '../electric-agents-types.js'
import { runnerWakeStream } from '../entity-registry.js'
import { DurableStreamsSubscriptionError } from '../stream-client.js'
import {
DurableStreamsSubscriptionError,
type SubscriptionResponse,
} from '../stream-client.js'
import { rewriteLoopbackWebhookUrl } from '../utils/webhook-url.js'
import { serverLog } from '../utils/log.js'
import type {
Expand Down Expand Up @@ -114,18 +117,16 @@ function sameDispatchDestination(
}

function subscriptionHasStream(
ctx: TenantContext,
existing: { streams?: Array<string | { path?: string }> },
streamPath: string
): boolean {
const normalizedStream = streamPath.replace(/^\/+/, ``)
const backendStream = `${ctx.service}/${normalizedStream}`
return (
existing.streams?.some((stream) => {
const path = typeof stream === `string` ? stream : stream.path
if (!path) return false
const normalized = path.replace(/^\/+/, ``)
return normalized === normalizedStream || normalized === backendStream
return normalized === normalizedStream
}) ?? false
)
}
Expand Down Expand Up @@ -163,11 +164,10 @@ async function ensureSubscriptionIncludesStream(
streamPath: string,
input: SubscriptionCreateInput,
existing: { streams?: Array<string | { path?: string }> } | null
): Promise<void> {
): Promise<SubscriptionResponse | null> {
if (!existing) {
try {
await ctx.streamClient.putSubscription(subscriptionId, input)
return
return await ctx.streamClient.putSubscription(subscriptionId, input)
} catch (err) {
if (!isSubscriptionAlreadyExistsError(err)) throw err
existing = await ctx.streamClient.getSubscription(subscriptionId)
Expand All @@ -176,14 +176,15 @@ async function ensureSubscriptionIncludesStream(
`[dispatch-policy] subscription create raced with existing subscription but it could not be read`,
{ subscriptionId, stream: streamPath }
)
return
return null
}
}
}

if (!subscriptionHasStream(ctx, existing, streamPath)) {
if (!subscriptionHasStream(existing, streamPath)) {
await ctx.streamClient.addSubscriptionStreams(subscriptionId, [streamPath])
}
return existing as SubscriptionResponse
}

export async function assertDispatchPolicyAllowed(
Expand Down Expand Up @@ -322,7 +323,7 @@ async function linkStreamToTargetSubscription(
ctx.publicUrl,
`/_electric/webhook-forward/${encodeURIComponent(subscriptionId)}`
)
await ensureSubscriptionIncludesStream(
const subscription = await ensureSubscriptionIncludesStream(
ctx,
subscriptionId,
streamPath,
Expand All @@ -334,18 +335,23 @@ async function linkStreamToTargetSubscription(
},
existing
)
const webhookSecret = subscription?.webhook_secret
await ctx.pgDb
.insert(subscriptionWebhooks)
.values({
tenantId: ctx.service,
subscriptionId,
webhookUrl,
...(webhookSecret ? { webhookSecret } : {}),
})
.onConflictDoUpdate({
target: [
subscriptionWebhooks.tenantId,
subscriptionWebhooks.subscriptionId,
],
set: { webhookUrl },
set: {
webhookUrl,
...(webhookSecret ? { webhookSecret } : {}),
},
})
}
Loading
Loading