From 7d47c8446759dcebc67a8a7a2fca2fd827fe8720 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Mon, 18 May 2026 09:49:28 +0100 Subject: [PATCH] fix(webapp): dedupe realtimeStreams array push on stream create The PUT handler at /realtime/v1/streams/:runId/:target/:streamId unconditionally appended streamId to TaskRun.realtimeStreams on every call. SDK call patterns that re-initialize the same stream key on every chunk turned that into a per-write row UPDATE, bloating the array and contending on the row lock. Read the array first and only push when the streamId isn't already present, matching the existing append handler. First-time inits behave identically; repeat inits short-circuit to a single indexed read. --- .server-changes/realtimestreams-dedupe.md | 6 ++++ ...ime.v1.streams.$runId.$target.$streamId.ts | 28 +++++++++++++------ 2 files changed, 25 insertions(+), 9 deletions(-) create mode 100644 .server-changes/realtimestreams-dedupe.md diff --git a/.server-changes/realtimestreams-dedupe.md b/.server-changes/realtimestreams-dedupe.md new file mode 100644 index 00000000000..69987f7b4b6 --- /dev/null +++ b/.server-changes/realtimestreams-dedupe.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: fix +--- + +Dedupe the `realtimeStreams` array push on `PUT /realtime/v1/streams/:runId/:target/:streamId` so repeat stream-init calls for the same `(run, streamId)` skip the row UPDATE, mirroring the existing append handler. diff --git a/apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts b/apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts index 9ca8e36f4ef..dd3d3bf31dd 100644 --- a/apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts +++ b/apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts @@ -62,31 +62,41 @@ const { action } = createActionApiRoute( if (request.method === "PUT") { // This is the "create" endpoint - const updatedRun = await prisma.taskRun.update({ + const target = await prisma.taskRun.findFirst({ where: { friendlyId: targetId, runtimeEnvironmentId: authentication.environment.id, }, - data: { - realtimeStreams: { - push: params.streamId, - }, - }, select: { + id: true, + realtimeStreams: true, realtimeStreamsVersion: true, completedAt: true, }, }); - if (updatedRun.completedAt) { + if (!target) { + return new Response("Run not found", { status: 404 }); + } + + if (target.completedAt) { return new Response("Cannot initialize a realtime stream on a completed run", { status: 400, }); } + if (!target.realtimeStreams.includes(params.streamId)) { + await prisma.taskRun.update({ + where: { id: target.id }, + data: { + realtimeStreams: { push: params.streamId }, + }, + }); + } + const realtimeStream = getRealtimeStreamInstance( authentication.environment, - updatedRun.realtimeStreamsVersion, + target.realtimeStreamsVersion, basinContext ); @@ -94,7 +104,7 @@ const { action } = createActionApiRoute( return json( { - version: updatedRun.realtimeStreamsVersion, + version: target.realtimeStreamsVersion, }, { status: 202, headers: responseHeaders } );