Skip to content

Commit eee2da0

Browse files
committed
Fix
1 parent c678109 commit eee2da0

2 files changed

Lines changed: 24 additions & 26 deletions

File tree

apps/sim/app/api/mothership/execute/route.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ const MOTHERSHIP_EXECUTE_STREAM_HEADER = 'x-mothership-execute-stream'
2828
const MOTHERSHIP_EXECUTE_STREAM_VALUE = 'ndjson'
2929
const MOTHERSHIP_EXECUTE_STREAM_CONTENT_TYPE = 'application/x-ndjson'
3030
const MOTHERSHIP_EXECUTE_HEARTBEAT_INTERVAL_MS = 15_000
31+
const ndjsonEncoder = new TextEncoder()
3132

3233
function isAbortError(error: unknown): boolean {
3334
return error instanceof Error && error.name === 'AbortError'
@@ -41,7 +42,7 @@ function wantsStreamedExecuteResponse(req: NextRequest): boolean {
4142
}
4243

4344
function encodeNdjson(value: unknown): Uint8Array {
44-
return new TextEncoder().encode(`${JSON.stringify(value)}\n`)
45+
return ndjsonEncoder.encode(`${JSON.stringify(value)}\n`)
4546
}
4647

4748
function buildExecuteResponsePayload(

apps/sim/executor/handlers/mothership/mothership-handler.ts

Lines changed: 22 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -92,14 +92,7 @@ function isContentSelectedForStreaming(ctx: ExecutionContext, block: SerializedB
9292
return (
9393
ctx.selectedOutputs?.some((outputId) => {
9494
if (outputId === block.id) return true
95-
if (outputId === `${block.id}.content` || outputId === `${block.id}_content`) return true
96-
97-
const firstUnderscoreIndex = outputId.indexOf('_')
98-
if (firstUnderscoreIndex === -1) return false
99-
return (
100-
outputId.substring(0, firstUnderscoreIndex) === block.id &&
101-
outputId.substring(firstUnderscoreIndex + 1) === 'content'
102-
)
95+
return outputId === `${block.id}.content` || outputId === `${block.id}_content`
10396
}) ?? false
10497
)
10598
}
@@ -139,26 +132,30 @@ async function readMothershipExecuteResponse(response: Response): Promise<Mother
139132
throw new Error('Mothership execution stream returned an unknown event')
140133
}
141134

142-
while (true) {
143-
const { done, value } = await reader.read()
144-
if (done) break
145-
146-
buffer += decoder.decode(value, { stream: true })
147-
const lines = buffer.split('\n')
148-
buffer = lines.pop() ?? ''
149-
for (const line of lines) {
150-
processLine(line)
135+
try {
136+
while (true) {
137+
const { done, value } = await reader.read()
138+
if (done) break
139+
140+
buffer += decoder.decode(value, { stream: true })
141+
const lines = buffer.split('\n')
142+
buffer = lines.pop() ?? ''
143+
for (const line of lines) {
144+
processLine(line)
145+
}
151146
}
152-
}
153147

154-
buffer += decoder.decode()
155-
processLine(buffer)
148+
buffer += decoder.decode()
149+
processLine(buffer)
156150

157-
if (!finalResult) {
158-
throw new Error('Mothership execution stream ended without a final result')
159-
}
151+
if (!finalResult) {
152+
throw new Error('Mothership execution stream ended without a final result')
153+
}
160154

161-
return finalResult
155+
return finalResult
156+
} finally {
157+
reader.releaseLock()
158+
}
162159
}
163160

164161
function createMothershipStreamingExecution(
@@ -273,7 +270,7 @@ function createMothershipStreamingExecution(
273270
startTime: new Date().toISOString(),
274271
},
275272
isStreaming: true,
276-
},
273+
} as StreamingExecution['execution'] & { blockId: string },
277274
}
278275
}
279276

0 commit comments

Comments
 (0)