Skip to content

Commit d05348d

Browse files
fix(tables): load workflow outside schema lock; use DbOrTx for getTableById
1 parent 20e1f28 commit d05348d

1 file changed

Lines changed: 117 additions & 89 deletions

File tree

apps/sim/lib/table/service.ts

Lines changed: 117 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ function applyColumnOrderToSchema(
231231

232232
export async function getTableById(
233233
tableId: string,
234-
options?: { includeArchived?: boolean; tx?: DbTransaction }
234+
options?: { includeArchived?: boolean; tx?: DbOrTx }
235235
): Promise<TableDefinition | null> {
236236
const { includeArchived = false, tx } = options ?? {}
237237
const executor = tx ?? db
@@ -3227,13 +3227,60 @@ export async function updateWorkflowGroup(
32273227
data: UpdateWorkflowGroupData,
32283228
requestId: string
32293229
): Promise<TableDefinition> {
3230+
const mappingUpdates = data.mappingUpdates ?? []
3231+
3232+
// Phase 1 (no lock): when there are mapping updates, load the workflow once to
3233+
// resolve each remap's new leaf type. Kept OFF the advisory-lock critical
3234+
// section so concurrent group edits on the same table don't time out waiting
3235+
// on this DB load. Best-effort — a resolution failure leaves column types
3236+
// unchanged (workflow deleted, block removed). The result is applied against
3237+
// the fresh schema under the lock in phase 2.
3238+
const remapLeafTypeByColumn = new Map<string, ColumnDefinition['type']>()
3239+
if (mappingUpdates.length > 0) {
3240+
try {
3241+
const preTable = await getTableById(data.tableId)
3242+
const preGroup = preTable?.schema.workflowGroups?.find((g) => g.id === data.groupId)
3243+
const targetWorkflowId = data.workflowId ?? preGroup?.workflowId
3244+
if (targetWorkflowId) {
3245+
const [
3246+
{ loadWorkflowFromNormalizedTables },
3247+
{ flattenWorkflowOutputs },
3248+
{ columnTypeForLeaf },
3249+
] = await Promise.all([
3250+
import('@/lib/workflows/persistence/utils'),
3251+
import('@/lib/workflows/blocks/flatten-outputs'),
3252+
import('./column-naming'),
3253+
])
3254+
const normalized = await loadWorkflowFromNormalizedTables(targetWorkflowId)
3255+
if (normalized) {
3256+
const blocks = Object.values(normalized.blocks ?? {}).map((b) => ({
3257+
id: b.id,
3258+
type: b.type,
3259+
name: b.name,
3260+
triggerMode: (b as { triggerMode?: boolean }).triggerMode,
3261+
subBlocks: b.subBlocks as Record<string, unknown> | undefined,
3262+
}))
3263+
const flattened = flattenWorkflowOutputs(blocks, normalized.edges ?? [])
3264+
const flatByKey = new Map(flattened.map((f) => [`${f.blockId}::${f.path}`, f]))
3265+
for (const u of mappingUpdates) {
3266+
const match = flatByKey.get(`${u.blockId}::${u.path}`)
3267+
if (!match) continue
3268+
const newType = columnTypeForLeaf(match.leafType)
3269+
if (newType) remapLeafTypeByColumn.set(u.columnName, newType)
3270+
}
3271+
}
3272+
}
3273+
} catch (err) {
3274+
logger.warn(
3275+
`[${requestId}] Could not resolve new leaf types for remap on group ${data.groupId}; leaving column types unchanged:`,
3276+
err
3277+
)
3278+
}
3279+
}
3280+
32303281
const { updatedTable, added, remappedColumnNames, newOutputs, previousAutoRun } =
32313282
await withLockedTable(data.tableId, async (table, trx) => {
3232-
// Generous idle timeout: the leaf-type resolution below loads the
3233-
// workflow on a separate connection, leaving this transaction idle while
3234-
// it runs — the default 5s `idle_in_transaction_session_timeout` would
3235-
// kill the held lock on a large workflow.
3236-
await setTableTxTimeouts(trx, { statementMs: 60_000, idleMs: 30_000 })
3283+
await setTableTxTimeouts(trx, { statementMs: 60_000 })
32373284

32383285
const schema = table.schema
32393286
const groups = schema.workflowGroups ?? []
@@ -3248,13 +3295,11 @@ export async function updateWorkflowGroup(
32483295
// of outputs so the downstream `(blockId, path)`-keyed diff doesn't see the
32493296
// swap as a remove+add. The corresponding row data is cleared after the
32503297
// schema write so stale values from the old source don't linger.
3251-
const mappingUpdates = data.mappingUpdates ?? []
32523298
const remappedColumnNames = new Set<string>()
3253-
// Per-column type override resolved from the new mapping's leaf type. Only
3254-
// populated when a remap actually changes the column's type — keeps the
3255-
// schema patch a no-op when the user repoints to an output of the same
3256-
// type. Falls back to leaving the existing type alone if the workflow or
3257-
// its target output can't be resolved (workflow deleted, block removed).
3299+
// Per-column type override resolved (out-of-lock) from the new mapping's
3300+
// leaf type. Only populated when a remap actually changes the column's
3301+
// type against the fresh schema — keeps the schema patch a no-op when the
3302+
// user repoints to an output of the same type.
32583303
const remappedColumnTypes = new Map<string, ColumnDefinition['type']>()
32593304
let oldOutputs = group.outputs
32603305
if (mappingUpdates.length > 0) {
@@ -3274,48 +3319,14 @@ export async function updateWorkflowGroup(
32743319
return { ...o, blockId: u.blockId, path: u.path }
32753320
})
32763321

3277-
// Resolve the new leaf type for each remap so the column's declared type
3278-
// matches what the new mapping produces. Without this, a string→number
3279-
// remap would keep `type: 'string'` and coerceRowToSchema would coerce
3280-
// every backfilled value toward the wrong type.
3281-
try {
3282-
const [
3283-
{ loadWorkflowFromNormalizedTables },
3284-
{ flattenWorkflowOutputs },
3285-
{ columnTypeForLeaf },
3286-
] = await Promise.all([
3287-
import('@/lib/workflows/persistence/utils'),
3288-
import('@/lib/workflows/blocks/flatten-outputs'),
3289-
import('./column-naming'),
3290-
])
3291-
const targetWorkflowId = data.workflowId ?? group.workflowId
3292-
const normalized = await loadWorkflowFromNormalizedTables(targetWorkflowId)
3293-
if (normalized) {
3294-
const blocks = Object.values(normalized.blocks ?? {}).map((b) => ({
3295-
id: b.id,
3296-
type: b.type,
3297-
name: b.name,
3298-
triggerMode: (b as { triggerMode?: boolean }).triggerMode,
3299-
subBlocks: b.subBlocks as Record<string, unknown> | undefined,
3300-
}))
3301-
const flattened = flattenWorkflowOutputs(blocks, normalized.edges ?? [])
3302-
const flatByKey = new Map(flattened.map((f) => [`${f.blockId}::${f.path}`, f]))
3303-
const colByName = new Map(schema.columns.map((c) => [c.name, c]))
3304-
for (const u of mappingUpdates) {
3305-
const match = flatByKey.get(`${u.blockId}::${u.path}`)
3306-
if (!match) continue
3307-
const newType = columnTypeForLeaf(match.leafType)
3308-
const oldType = colByName.get(u.columnName)?.type
3309-
if (newType && newType !== oldType) {
3310-
remappedColumnTypes.set(u.columnName, newType)
3311-
}
3312-
}
3322+
const colByName = new Map(schema.columns.map((c) => [c.name, c]))
3323+
for (const u of mappingUpdates) {
3324+
const newType = remapLeafTypeByColumn.get(u.columnName)
3325+
if (!newType) continue
3326+
const oldType = colByName.get(u.columnName)?.type
3327+
if (newType !== oldType) {
3328+
remappedColumnTypes.set(u.columnName, newType)
33133329
}
3314-
} catch (err) {
3315-
logger.warn(
3316-
`[${requestId}] Could not resolve new leaf types for remap on group ${data.groupId}; leaving column types unchanged:`,
3317-
err
3318-
)
33193330
}
33203331
}
33213332

@@ -3554,56 +3565,73 @@ export async function addWorkflowGroupOutput(
35543565
},
35553566
requestId: string
35563567
): Promise<TableDefinition> {
3557-
const { updatedTable, newOutput } = await withLockedTable(data.tableId, async (table, trx) => {
3558-
// Generous idle timeout: the workflow load below runs on a separate
3559-
// connection, leaving this transaction idle while it resolves the pickable
3560-
// output — the default 5s `idle_in_transaction_session_timeout` would kill
3561-
// the held lock on a large workflow.
3562-
await setTableTxTimeouts(trx, { idleMs: 30_000 })
3568+
// Phase 1 (no lock): load the workflow and resolve the pickable output plus
3569+
// its execution-order index. This depends only on the workflow graph (which
3570+
// is stable), so it runs OFF the advisory-lock critical section — holding the
3571+
// lock during this DB load would make concurrent adders on the same table
3572+
// time out waiting (the Mothership fan-out this fix targets). Phase 2
3573+
// re-validates that the group still maps to the same workflow under the lock.
3574+
const preTable = await getTableById(data.tableId)
3575+
if (!preTable) throw new Error('Table not found')
3576+
const preGroup = (preTable.schema.workflowGroups ?? []).find((g) => g.id === data.groupId)
3577+
if (!preGroup) {
3578+
throw new Error(`Workflow group "${data.groupId}" not found`)
3579+
}
3580+
const workflowId = preGroup.workflowId
3581+
3582+
const [
3583+
{ loadWorkflowFromNormalizedTables },
3584+
{ flattenWorkflowOutputs, getBlockExecutionOrder },
3585+
{ columnTypeForLeaf, deriveOutputColumnName },
3586+
] = await Promise.all([
3587+
import('@/lib/workflows/persistence/utils'),
3588+
import('@/lib/workflows/blocks/flatten-outputs'),
3589+
import('./column-naming'),
3590+
])
3591+
const normalized = await loadWorkflowFromNormalizedTables(workflowId)
3592+
if (!normalized) {
3593+
throw new Error(`Workflow ${workflowId} not found`)
3594+
}
3595+
const blocks = Object.values(normalized.blocks ?? {}).map((b) => ({
3596+
id: b.id,
3597+
type: b.type,
3598+
name: b.name,
3599+
triggerMode: (b as { triggerMode?: boolean }).triggerMode,
3600+
subBlocks: b.subBlocks as Record<string, unknown> | undefined,
3601+
}))
3602+
const flattened = flattenWorkflowOutputs(blocks, normalized.edges ?? [])
3603+
const match = flattened.find((f) => f.blockId === data.blockId && f.path === data.path)
3604+
if (!match) {
3605+
throw new Error(
3606+
`Output ${data.blockId}::${data.path} is not a valid pickable output on workflow ${workflowId}`
3607+
)
3608+
}
3609+
const newColumnType = columnTypeForLeaf(match.leafType)
3610+
const distances = getBlockExecutionOrder(blocks, normalized.edges ?? [])
3611+
const flatIndex = new Map(flattened.map((f, i) => [`${f.blockId}::${f.path}`, i]))
35633612

3613+
// Phase 2 (locked): re-read fresh, validate against the current schema, and
3614+
// write. The critical section holds no I/O — just the in-memory splice + the
3615+
// schema UPDATE — so concurrent adders queue behind it quickly.
3616+
const { updatedTable, newOutput } = await withLockedTable(data.tableId, async (table, trx) => {
35643617
const schema = table.schema
35653618
const groups = schema.workflowGroups ?? []
35663619
const groupIndex = groups.findIndex((g) => g.id === data.groupId)
35673620
if (groupIndex === -1) {
35683621
throw new Error(`Workflow group "${data.groupId}" not found`)
35693622
}
35703623
const group = groups[groupIndex]
3571-
3572-
if (group.outputs.some((o) => o.blockId === data.blockId && o.path === data.path)) {
3624+
if (group.workflowId !== workflowId) {
35733625
throw new Error(
3574-
`Workflow group "${data.groupId}" already has an output at ${data.blockId}::${data.path}`
3626+
`Workflow group "${data.groupId}" was remapped to a different workflow concurrently; retry the add.`
35753627
)
35763628
}
35773629

3578-
const [
3579-
{ loadWorkflowFromNormalizedTables },
3580-
{ flattenWorkflowOutputs, getBlockExecutionOrder },
3581-
{ columnTypeForLeaf, deriveOutputColumnName },
3582-
] = await Promise.all([
3583-
import('@/lib/workflows/persistence/utils'),
3584-
import('@/lib/workflows/blocks/flatten-outputs'),
3585-
import('./column-naming'),
3586-
])
3587-
const normalized = await loadWorkflowFromNormalizedTables(group.workflowId)
3588-
if (!normalized) {
3589-
throw new Error(`Workflow ${group.workflowId} not found`)
3590-
}
3591-
const blocks = Object.values(normalized.blocks ?? {}).map((b) => ({
3592-
id: b.id,
3593-
type: b.type,
3594-
name: b.name,
3595-
triggerMode: (b as { triggerMode?: boolean }).triggerMode,
3596-
subBlocks: b.subBlocks as Record<string, unknown> | undefined,
3597-
}))
3598-
const flattened = flattenWorkflowOutputs(blocks, normalized.edges ?? [])
3599-
const match = flattened.find((f) => f.blockId === data.blockId && f.path === data.path)
3600-
if (!match) {
3630+
if (group.outputs.some((o) => o.blockId === data.blockId && o.path === data.path)) {
36013631
throw new Error(
3602-
`Output ${data.blockId}::${data.path} is not a valid pickable output on workflow ${group.workflowId}`
3632+
`Workflow group "${data.groupId}" already has an output at ${data.blockId}::${data.path}`
36033633
)
36043634
}
3605-
const distances = getBlockExecutionOrder(blocks, normalized.edges ?? [])
3606-
const flatIndex = new Map(flattened.map((f, i) => [`${f.blockId}::${f.path}`, i]))
36073635

36083636
const taken = new Set(schema.columns.map((c) => c.name))
36093637
const columnName = data.columnName ?? deriveOutputColumnName(data.path, taken)
@@ -3621,7 +3649,7 @@ export async function addWorkflowGroupOutput(
36213649

36223650
const newColDef: ColumnDefinition = {
36233651
name: columnName,
3624-
type: columnTypeForLeaf(match.leafType),
3652+
type: newColumnType,
36253653
required: false,
36263654
unique: false,
36273655
workflowGroupId: data.groupId,

0 commit comments

Comments
 (0)