From 7e751b82d38f32401b65746de09800e0623e5089 Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Thu, 28 May 2026 19:55:21 -0700 Subject: [PATCH 1/6] Make workflow description nullable --- apps/sim/lib/api/contracts/workflows.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/sim/lib/api/contracts/workflows.ts b/apps/sim/lib/api/contracts/workflows.ts index 0d99ea83ad2..38b6f64db05 100644 --- a/apps/sim/lib/api/contracts/workflows.ts +++ b/apps/sim/lib/api/contracts/workflows.ts @@ -166,7 +166,7 @@ export const workflowStateSchema = z.object({ metadata: z .object({ name: z.string().optional(), - description: z.string().optional(), + description: z.string().nullable().optional(), }) .optional(), }) From 20e1f28cdf5ec5007d14c0e89081f3fee308163d Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Sat, 30 May 2026 11:34:24 -0700 Subject: [PATCH 2/6] fix(tables): serialize schema mutations to prevent parallel column clobber --- apps/sim/lib/table/service.ts | 1851 +++++++++++++++++---------------- 1 file changed, 942 insertions(+), 909 deletions(-) diff --git a/apps/sim/lib/table/service.ts b/apps/sim/lib/table/service.ts index 18f1fda418e..1db31037a63 100644 --- a/apps/sim/lib/table/service.ts +++ b/apps/sim/lib/table/service.ts @@ -125,6 +125,39 @@ async function acquireTablePositionLock(trx: DbTransaction, tableId: string) { ) } +/** + * Serializes schema/metadata read-modify-writes for a single table so + * concurrent mutators can't clobber each other's `schema` JSONB + * (last-writer-wins). Takes a transaction-scoped advisory lock keyed on + * `tableId`, then re-reads the table INSIDE the lock and hands the fresh + * definition + transaction to `mutate`. Each serialized writer therefore + * validates and computes against the prior writer's committed columns. + * + * Uses an advisory lock (not `SELECT ... FOR UPDATE` on the definition row) so + * it adds no edges to the row-lock graph — the row-count trigger (migration + * 0198) locks the definition row from `insertRow`/`deleteRow`, and a FOR UPDATE + * here would invert that order. Mirrors `acquireTablePositionLock`. The lock and + * the read both release at COMMIT/ROLLBACK; the wait is bounded by the + * `statement_timeout` set in `setTableTxTimeouts`. + */ +async function withLockedTable( + tableId: string, + mutate: (table: TableDefinition, trx: DbTransaction) => Promise, + opts?: { includeArchived?: boolean } +): Promise { + return db.transaction(async (trx) => { + await setTableTxTimeouts(trx) + await trx.execute( + sql`SELECT pg_advisory_xact_lock(hashtextextended(${`user_table_schema:${tableId}`}, 0))` + ) + const table = await getTableById(tableId, { tx: trx, includeArchived: opts?.includeArchived }) + if (!table) { + throw new Error('Table not found') + } + return mutate(table, trx) + }) +} + /** * Returns the next auto-assigned `position` for a table (max(position) + 1, or 0 * if empty). Callers must hold `acquireTablePositionLock` to avoid two concurrent @@ -198,10 +231,11 @@ function applyColumnOrderToSchema( export async function getTableById( tableId: string, - options?: { includeArchived?: boolean } + options?: { includeArchived?: boolean; tx?: DbTransaction } ): Promise { - const { includeArchived = false } = options ?? {} - const results = await db + const { includeArchived = false, tx } = options ?? {} + const executor = tx ?? db + const results = await executor .select({ id: userTableDefinitions.id, name: userTableDefinitions.name, @@ -463,95 +497,92 @@ export async function addTableColumn( }, requestId: string ): Promise { - const table = await getTableById(tableId) - if (!table) { - throw new Error('Table not found') - } - - if (!NAME_PATTERN.test(column.name)) { - throw new Error( - `Invalid column name "${column.name}". Must start with a letter or underscore and contain only alphanumeric characters and underscores.` - ) - } + return withLockedTable(tableId, async (table, trx) => { + if (!NAME_PATTERN.test(column.name)) { + throw new Error( + `Invalid column name "${column.name}". Must start with a letter or underscore and contain only alphanumeric characters and underscores.` + ) + } - if (column.name.length > TABLE_LIMITS.MAX_COLUMN_NAME_LENGTH) { - throw new Error( - `Column name exceeds maximum length (${TABLE_LIMITS.MAX_COLUMN_NAME_LENGTH} characters)` - ) - } + if (column.name.length > TABLE_LIMITS.MAX_COLUMN_NAME_LENGTH) { + throw new Error( + `Column name exceeds maximum length (${TABLE_LIMITS.MAX_COLUMN_NAME_LENGTH} characters)` + ) + } - if (!COLUMN_TYPES.includes(column.type as (typeof COLUMN_TYPES)[number])) { - throw new Error( - `Invalid column type "${column.type}". Must be one of: ${COLUMN_TYPES.join(', ')}` - ) - } + if (!COLUMN_TYPES.includes(column.type as (typeof COLUMN_TYPES)[number])) { + throw new Error( + `Invalid column type "${column.type}". Must be one of: ${COLUMN_TYPES.join(', ')}` + ) + } - const schema = table.schema - if (schema.columns.some((c) => c.name.toLowerCase() === column.name.toLowerCase())) { - throw new Error(`Column "${column.name}" already exists`) - } + const schema = table.schema + if (schema.columns.some((c) => c.name.toLowerCase() === column.name.toLowerCase())) { + throw new Error(`Column "${column.name}" already exists`) + } - if (schema.columns.length >= TABLE_LIMITS.MAX_COLUMNS_PER_TABLE) { - throw new Error( - `Table has reached maximum column limit (${TABLE_LIMITS.MAX_COLUMNS_PER_TABLE})` - ) - } + if (schema.columns.length >= TABLE_LIMITS.MAX_COLUMNS_PER_TABLE) { + throw new Error( + `Table has reached maximum column limit (${TABLE_LIMITS.MAX_COLUMNS_PER_TABLE})` + ) + } - const newColumn: TableSchema['columns'][number] = { - name: column.name, - type: column.type as TableSchema['columns'][number]['type'], - required: column.required ?? false, - unique: column.unique ?? false, - } + const newColumn: TableSchema['columns'][number] = { + name: column.name, + type: column.type as TableSchema['columns'][number]['type'], + required: column.required ?? false, + unique: column.unique ?? false, + } - const columns = [...schema.columns] - if (column.position !== undefined && column.position >= 0 && column.position < columns.length) { - columns.splice(column.position, 0, newColumn) - } else { - columns.push(newColumn) - } - - const updatedSchema: TableSchema = { ...schema, columns } - - // Keep `metadata.columnOrder` in sync: when present, it must list every - // column in `schema.columns`. Splicing the new name in at the same index - // we used in `columns` keeps display ordering aligned with the user's - // intent for `position`-based inserts. - const existingOrder = table.metadata?.columnOrder - let updatedMetadata = table.metadata - if (existingOrder && existingOrder.length > 0 && !existingOrder.includes(column.name)) { - let insertIdx = existingOrder.length - if (column.position !== undefined && column.position >= 0) { - // Anchor on the column previously at `position` — that column shifted - // right by one in `columns`, so the new name slots in at its old spot. - const anchor = schema.columns[column.position]?.name - if (anchor) { - const anchorIdx = existingOrder.indexOf(anchor) - if (anchorIdx !== -1) insertIdx = anchorIdx + const columns = [...schema.columns] + if (column.position !== undefined && column.position >= 0 && column.position < columns.length) { + columns.splice(column.position, 0, newColumn) + } else { + columns.push(newColumn) + } + + const updatedSchema: TableSchema = { ...schema, columns } + + // Keep `metadata.columnOrder` in sync: when present, it must list every + // column in `schema.columns`. Splicing the new name in at the same index + // we used in `columns` keeps display ordering aligned with the user's + // intent for `position`-based inserts. + const existingOrder = table.metadata?.columnOrder + let updatedMetadata = table.metadata + if (existingOrder && existingOrder.length > 0 && !existingOrder.includes(column.name)) { + let insertIdx = existingOrder.length + if (column.position !== undefined && column.position >= 0) { + // Anchor on the column previously at `position` — that column shifted + // right by one in `columns`, so the new name slots in at its old spot. + const anchor = schema.columns[column.position]?.name + if (anchor) { + const anchorIdx = existingOrder.indexOf(anchor) + if (anchorIdx !== -1) insertIdx = anchorIdx + } } + const nextOrder = [...existingOrder] + nextOrder.splice(insertIdx, 0, column.name) + updatedMetadata = { ...table.metadata, columnOrder: nextOrder } } - const nextOrder = [...existingOrder] - nextOrder.splice(insertIdx, 0, column.name) - updatedMetadata = { ...table.metadata, columnOrder: nextOrder } - } - assertValidSchema(updatedSchema, updatedMetadata?.columnOrder) + assertValidSchema(updatedSchema, updatedMetadata?.columnOrder) - const now = new Date() + const now = new Date() - await db - .update(userTableDefinitions) - .set({ schema: updatedSchema, metadata: updatedMetadata, updatedAt: now }) - .where(eq(userTableDefinitions.id, tableId)) + await trx + .update(userTableDefinitions) + .set({ schema: updatedSchema, metadata: updatedMetadata, updatedAt: now }) + .where(eq(userTableDefinitions.id, tableId)) - logger.info(`[${requestId}] Added column "${column.name}" to table ${tableId}`) + logger.info(`[${requestId}] Added column "${column.name}" to table ${tableId}`) - return { - ...table, - schema: updatedSchema, - metadata: updatedMetadata, - updatedAt: now, - } + return { + ...table, + schema: updatedSchema, + metadata: updatedMetadata, + updatedAt: now, + } + }) } /** @@ -2648,94 +2679,91 @@ export async function renameColumn( data: RenameColumnData, requestId: string ): Promise { - const table = await getTableById(data.tableId) - if (!table) { - throw new Error('Table not found') - } - - if (!NAME_PATTERN.test(data.newName)) { - throw new Error( - `Invalid column name "${data.newName}". Column names must start with a letter or underscore, followed by alphanumeric characters or underscores.` - ) - } - - if (data.newName.length > TABLE_LIMITS.MAX_COLUMN_NAME_LENGTH) { - throw new Error( - `Column name exceeds maximum length (${TABLE_LIMITS.MAX_COLUMN_NAME_LENGTH} characters)` - ) - } - - const schema = table.schema - const columnIndex = schema.columns.findIndex( - (c) => c.name.toLowerCase() === data.oldName.toLowerCase() - ) - if (columnIndex === -1) { - throw new Error(`Column "${data.oldName}" not found`) - } + return withLockedTable(data.tableId, async (table, trx) => { + if (!NAME_PATTERN.test(data.newName)) { + throw new Error( + `Invalid column name "${data.newName}". Column names must start with a letter or underscore, followed by alphanumeric characters or underscores.` + ) + } - if ( - schema.columns.some( - (c, i) => i !== columnIndex && c.name.toLowerCase() === data.newName.toLowerCase() - ) - ) { - throw new Error(`Column "${data.newName}" already exists`) - } + if (data.newName.length > TABLE_LIMITS.MAX_COLUMN_NAME_LENGTH) { + throw new Error( + `Column name exceeds maximum length (${TABLE_LIMITS.MAX_COLUMN_NAME_LENGTH} characters)` + ) + } - const actualOldName = schema.columns[columnIndex].name - const updatedColumns = schema.columns.map((c, i) => - i === columnIndex ? { ...c, name: data.newName } : c - ) - // Cascade rename into every workflow group: its output `columnName` refs, - // its `dependencies.columns` entries, and its `inputMappings` source columns. - const updatedGroups = (schema.workflowGroups ?? []).map((group) => { - const renamedOutputs = group.outputs.map((o) => - o.columnName === actualOldName ? { ...o, columnName: data.newName } : o + const schema = table.schema + const columnIndex = schema.columns.findIndex( + (c) => c.name.toLowerCase() === data.oldName.toLowerCase() ) - const renamedDeps = group.dependencies?.columns?.map((d) => - d === actualOldName ? data.newName : d - ) - const renamedMappings = group.inputMappings?.map((m) => - m.columnName === actualOldName ? { ...m, columnName: data.newName } : m - ) - return { - ...group, - outputs: renamedOutputs, - ...(renamedDeps ? { dependencies: { columns: renamedDeps } } : {}), - ...(renamedMappings ? { inputMappings: renamedMappings } : {}), + if (columnIndex === -1) { + throw new Error(`Column "${data.oldName}" not found`) } - }) - const updatedSchema: TableSchema = { - ...schema, - columns: updatedColumns, - ...(updatedGroups.length > 0 ? { workflowGroups: updatedGroups } : {}), - } - const metadata = table.metadata as TableMetadata | null - let updatedMetadata = metadata - if (metadata?.columnWidths && actualOldName in metadata.columnWidths) { - const { [actualOldName]: width, ...rest } = metadata.columnWidths - updatedMetadata = { ...metadata, columnWidths: { ...rest, [data.newName]: width } } - } - if (updatedMetadata?.columnOrder?.includes(actualOldName)) { - updatedMetadata = { - ...updatedMetadata, - columnOrder: updatedMetadata.columnOrder.map((n) => (n === actualOldName ? data.newName : n)), + if ( + schema.columns.some( + (c, i) => i !== columnIndex && c.name.toLowerCase() === data.newName.toLowerCase() + ) + ) { + throw new Error(`Column "${data.newName}" already exists`) } - } - // Validate against the *post-rename* column order. The schema's workflow - // group outputs already reference the new name, so checking against the old - // columnOrder makes the renamed output look "missing" from its group and - // falsely flags the remaining siblings as non-contiguous. - assertValidSchema(updatedSchema, updatedMetadata?.columnOrder) - const now = new Date() - const statementMs = scaledStatementTimeoutMs(table.rowCount ?? 0, { - baseMs: 60_000, - perRowMs: 2, - }) + const actualOldName = schema.columns[columnIndex].name + const updatedColumns = schema.columns.map((c, i) => + i === columnIndex ? { ...c, name: data.newName } : c + ) + // Cascade rename into every workflow group: its output `columnName` refs, + // its `dependencies.columns` entries, and its `inputMappings` source columns. + const updatedGroups = (schema.workflowGroups ?? []).map((group) => { + const renamedOutputs = group.outputs.map((o) => + o.columnName === actualOldName ? { ...o, columnName: data.newName } : o + ) + const renamedDeps = group.dependencies?.columns?.map((d) => + d === actualOldName ? data.newName : d + ) + const renamedMappings = group.inputMappings?.map((m) => + m.columnName === actualOldName ? { ...m, columnName: data.newName } : m + ) + return { + ...group, + outputs: renamedOutputs, + ...(renamedDeps ? { dependencies: { columns: renamedDeps } } : {}), + ...(renamedMappings ? { inputMappings: renamedMappings } : {}), + } + }) + const updatedSchema: TableSchema = { + ...schema, + columns: updatedColumns, + ...(updatedGroups.length > 0 ? { workflowGroups: updatedGroups } : {}), + } + + const metadata = table.metadata as TableMetadata | null + let updatedMetadata = metadata + if (metadata?.columnWidths && actualOldName in metadata.columnWidths) { + const { [actualOldName]: width, ...rest } = metadata.columnWidths + updatedMetadata = { ...metadata, columnWidths: { ...rest, [data.newName]: width } } + } + if (updatedMetadata?.columnOrder?.includes(actualOldName)) { + updatedMetadata = { + ...updatedMetadata, + columnOrder: updatedMetadata.columnOrder.map((n) => + n === actualOldName ? data.newName : n + ), + } + } + // Validate against the *post-rename* column order. The schema's workflow + // group outputs already reference the new name, so checking against the old + // columnOrder makes the renamed output look "missing" from its group and + // falsely flags the remaining siblings as non-contiguous. + assertValidSchema(updatedSchema, updatedMetadata?.columnOrder) - await db.transaction(async (trx) => { + const now = new Date() + const statementMs = scaledStatementTimeoutMs(table.rowCount ?? 0, { + baseMs: 60_000, + perRowMs: 2, + }) await setTableTxTimeouts(trx, { statementMs }) + await trx .update(userTableDefinitions) .set({ schema: updatedSchema, metadata: updatedMetadata, updatedAt: now }) @@ -2746,13 +2774,13 @@ export async function renameColumn( await trx.execute( sql`UPDATE user_table_rows SET data = data - ${actualOldName}::text || jsonb_build_object(${data.newName}::text, data->${actualOldName}::text) WHERE table_id = ${data.tableId} AND data ? ${actualOldName}::text` ) - }) - logger.info( - `[${requestId}] Renamed column "${actualOldName}" to "${data.newName}" in table ${data.tableId}` - ) + logger.info( + `[${requestId}] Renamed column "${actualOldName}" to "${data.newName}" in table ${data.tableId}` + ) - return { ...table, schema: updatedSchema, metadata: updatedMetadata, updatedAt: now } + return { ...table, schema: updatedSchema, metadata: updatedMetadata, updatedAt: now } + }) } /** @@ -2767,67 +2795,62 @@ export async function deleteColumn( data: DeleteColumnData, requestId: string ): Promise { - const table = await getTableById(data.tableId) - if (!table) { - throw new Error('Table not found') - } - - const schema = table.schema - const columnIndex = schema.columns.findIndex( - (c) => c.name.toLowerCase() === data.columnName.toLowerCase() - ) - if (columnIndex === -1) { - throw new Error(`Column "${data.columnName}" not found`) - } + return withLockedTable(data.tableId, async (table, trx) => { + const schema = table.schema + const columnIndex = schema.columns.findIndex( + (c) => c.name.toLowerCase() === data.columnName.toLowerCase() + ) + if (columnIndex === -1) { + throw new Error(`Column "${data.columnName}" not found`) + } - if (schema.columns.length <= 1) { - throw new Error('Cannot delete the last column in a table') - } + if (schema.columns.length <= 1) { + throw new Error('Cannot delete the last column in a table') + } - const targetColumn = schema.columns[columnIndex] - const actualName = targetColumn.name - const ownerGroupId = targetColumn.workflowGroupId + const targetColumn = schema.columns[columnIndex] + const actualName = targetColumn.name + const ownerGroupId = targetColumn.workflowGroupId - // Drop this column's reference from every group's outputs and `columns` - // dependency. If the column is the last output of its parent group, the - // group itself is also removed (a group with zero outputs is invalid). - let groupRemovedId: string | null = null - const updatedGroups = (schema.workflowGroups ?? []) - .map((group) => { - let next = group - if (ownerGroupId && group.id === ownerGroupId) { - const remaining = group.outputs.filter((o) => o.columnName !== actualName) - if (remaining.length === 0) { - groupRemovedId = group.id + // Drop this column's reference from every group's outputs and `columns` + // dependency. If the column is the last output of its parent group, the + // group itself is also removed (a group with zero outputs is invalid). + let groupRemovedId: string | null = null + const updatedGroups = (schema.workflowGroups ?? []) + .map((group) => { + let next = group + if (ownerGroupId && group.id === ownerGroupId) { + const remaining = group.outputs.filter((o) => o.columnName !== actualName) + if (remaining.length === 0) { + groupRemovedId = group.id + } + next = { ...next, outputs: remaining } } - next = { ...next, outputs: remaining } - } - return stripGroupDeps(next, new Set([actualName])) - }) - .filter((g) => g.id !== groupRemovedId) - - const updatedSchema: TableSchema = { - ...schema, - columns: schema.columns.filter((_, i) => i !== columnIndex), - ...(updatedGroups.length > 0 ? { workflowGroups: updatedGroups } : {}), - } - assertValidSchema(updatedSchema, table.metadata?.columnOrder) + return stripGroupDeps(next, new Set([actualName])) + }) + .filter((g) => g.id !== groupRemovedId) - const metadata = table.metadata as TableMetadata | null - let updatedMetadata = metadata - if (metadata?.columnWidths && actualName in metadata.columnWidths) { - const { [actualName]: _, ...rest } = metadata.columnWidths - updatedMetadata = { ...metadata, columnWidths: rest } - } + const updatedSchema: TableSchema = { + ...schema, + columns: schema.columns.filter((_, i) => i !== columnIndex), + ...(updatedGroups.length > 0 ? { workflowGroups: updatedGroups } : {}), + } + assertValidSchema(updatedSchema, table.metadata?.columnOrder) - const now = new Date() - const statementMs = scaledStatementTimeoutMs(table.rowCount ?? 0, { - baseMs: 60_000, - perRowMs: 2, - }) + const metadata = table.metadata as TableMetadata | null + let updatedMetadata = metadata + if (metadata?.columnWidths && actualName in metadata.columnWidths) { + const { [actualName]: _, ...rest } = metadata.columnWidths + updatedMetadata = { ...metadata, columnWidths: rest } + } - await db.transaction(async (trx) => { + const now = new Date() + const statementMs = scaledStatementTimeoutMs(table.rowCount ?? 0, { + baseMs: 60_000, + perRowMs: 2, + }) await setTableTxTimeouts(trx, { statementMs }) + await trx .update(userTableDefinitions) .set({ schema: updatedSchema, metadata: updatedMetadata, updatedAt: now }) @@ -2837,11 +2860,11 @@ export async function deleteColumn( sql`UPDATE user_table_rows SET data = data - ${actualName}::text WHERE table_id = ${data.tableId} AND data ? ${actualName}::text` ) if (groupRemovedId) await stripGroupExecutions(trx, data.tableId, [groupRemovedId]) - }) - logger.info(`[${requestId}] Deleted column "${actualName}" from table ${data.tableId}`) + logger.info(`[${requestId}] Deleted column "${actualName}" from table ${data.tableId}`) - return { ...table, schema: updatedSchema, metadata: updatedMetadata, updatedAt: now } + return { ...table, schema: updatedSchema, metadata: updatedMetadata, updatedAt: now } + }) } /** @@ -2852,73 +2875,68 @@ export async function deleteColumns( data: { tableId: string; columnNames: string[] }, requestId: string ): Promise { - const table = await getTableById(data.tableId) - if (!table) { - throw new Error('Table not found') - } - - const schema = table.schema - const namesToDelete = new Set() - const notFound: string[] = [] - - for (const name of data.columnNames) { - const col = schema.columns.find((c) => c.name.toLowerCase() === name.toLowerCase()) - if (!col) { - notFound.push(name) - } else { - namesToDelete.add(col.name) + return withLockedTable(data.tableId, async (table, trx) => { + const schema = table.schema + const namesToDelete = new Set() + const notFound: string[] = [] + + for (const name of data.columnNames) { + const col = schema.columns.find((c) => c.name.toLowerCase() === name.toLowerCase()) + if (!col) { + notFound.push(name) + } else { + namesToDelete.add(col.name) + } } - } - if (notFound.length > 0) { - throw new Error(`Columns not found: ${notFound.join(', ')}`) - } - - const remaining = schema.columns.filter((c) => !namesToDelete.has(c.name)) - if (remaining.length === 0) { - throw new Error('Cannot delete all columns from a table') - } + if (notFound.length > 0) { + throw new Error(`Columns not found: ${notFound.join(', ')}`) + } - // For each group, drop outputs whose column is being deleted. Groups that - // end up with zero outputs are removed entirely (they'd be invalid). Then - // any remaining group's dependencies referencing a removed group or - // deleted column are cleaned up. - const removedGroupIds = new Set() - let updatedGroups = (schema.workflowGroups ?? []).map((group) => { - const remainingOutputs = group.outputs.filter((o) => !namesToDelete.has(o.columnName)) - if (remainingOutputs.length === 0) { - removedGroupIds.add(group.id) + const remaining = schema.columns.filter((c) => !namesToDelete.has(c.name)) + if (remaining.length === 0) { + throw new Error('Cannot delete all columns from a table') } - return remainingOutputs.length === group.outputs.length - ? group - : { ...group, outputs: remainingOutputs } - }) - updatedGroups = updatedGroups - .filter((g) => !removedGroupIds.has(g.id)) - .map((group) => stripGroupDeps(group, namesToDelete)) - const updatedSchema: TableSchema = { - ...schema, - columns: remaining, - ...(updatedGroups.length > 0 ? { workflowGroups: updatedGroups } : {}), - } - assertValidSchema(updatedSchema, table.metadata?.columnOrder) - const metadata = table.metadata as TableMetadata | null - let updatedMetadata = metadata - if (metadata?.columnWidths) { - const widths = { ...metadata.columnWidths } - for (const n of namesToDelete) delete widths[n] - updatedMetadata = { ...metadata, columnWidths: widths } - } + // For each group, drop outputs whose column is being deleted. Groups that + // end up with zero outputs are removed entirely (they'd be invalid). Then + // any remaining group's dependencies referencing a removed group or + // deleted column are cleaned up. + const removedGroupIds = new Set() + let updatedGroups = (schema.workflowGroups ?? []).map((group) => { + const remainingOutputs = group.outputs.filter((o) => !namesToDelete.has(o.columnName)) + if (remainingOutputs.length === 0) { + removedGroupIds.add(group.id) + } + return remainingOutputs.length === group.outputs.length + ? group + : { ...group, outputs: remainingOutputs } + }) + updatedGroups = updatedGroups + .filter((g) => !removedGroupIds.has(g.id)) + .map((group) => stripGroupDeps(group, namesToDelete)) + const updatedSchema: TableSchema = { + ...schema, + columns: remaining, + ...(updatedGroups.length > 0 ? { workflowGroups: updatedGroups } : {}), + } + assertValidSchema(updatedSchema, table.metadata?.columnOrder) - const now = new Date() - const statementMs = scaledStatementTimeoutMs(table.rowCount ?? 0, { - baseMs: 60_000, - perRowMs: 2 * namesToDelete.size, - }) + const metadata = table.metadata as TableMetadata | null + let updatedMetadata = metadata + if (metadata?.columnWidths) { + const widths = { ...metadata.columnWidths } + for (const n of namesToDelete) delete widths[n] + updatedMetadata = { ...metadata, columnWidths: widths } + } - await db.transaction(async (trx) => { + const now = new Date() + const statementMs = scaledStatementTimeoutMs(table.rowCount ?? 0, { + baseMs: 60_000, + perRowMs: 2 * namesToDelete.size, + }) await setTableTxTimeouts(trx, { statementMs }) + await trx .update(userTableDefinitions) .set({ schema: updatedSchema, metadata: updatedMetadata, updatedAt: now }) @@ -2930,13 +2948,13 @@ export async function deleteColumns( ) } await stripGroupExecutions(trx, data.tableId, removedGroupIds) - }) - logger.info( - `[${requestId}] Deleted columns [${[...namesToDelete].join(', ')}] from table ${data.tableId}` - ) + logger.info( + `[${requestId}] Deleted columns [${[...namesToDelete].join(', ')}] from table ${data.tableId}` + ) - return { ...table, schema: updatedSchema, metadata: updatedMetadata, updatedAt: now } + return { ...table, schema: updatedSchema, metadata: updatedMetadata, updatedAt: now } + }) } /** @@ -2951,75 +2969,76 @@ export async function updateColumnType( data: UpdateColumnTypeData, requestId: string ): Promise { - const table = await getTableById(data.tableId) - if (!table) { - throw new Error('Table not found') - } + return withLockedTable(data.tableId, async (table, trx) => { + await setTableTxTimeouts(trx, { + statementMs: scaledStatementTimeoutMs(table.rowCount ?? 0, { baseMs: 60_000, perRowMs: 2 }), + }) - if (!(COLUMN_TYPES as readonly string[]).includes(data.newType)) { - throw new Error( - `Invalid column type "${data.newType}". Valid types: ${COLUMN_TYPES.join(', ')}` - ) - } + if (!(COLUMN_TYPES as readonly string[]).includes(data.newType)) { + throw new Error( + `Invalid column type "${data.newType}". Valid types: ${COLUMN_TYPES.join(', ')}` + ) + } - const schema = table.schema - const columnIndex = schema.columns.findIndex( - (c) => c.name.toLowerCase() === data.columnName.toLowerCase() - ) - if (columnIndex === -1) { - throw new Error(`Column "${data.columnName}" not found`) - } + const schema = table.schema + const columnIndex = schema.columns.findIndex( + (c) => c.name.toLowerCase() === data.columnName.toLowerCase() + ) + if (columnIndex === -1) { + throw new Error(`Column "${data.columnName}" not found`) + } - const column = schema.columns[columnIndex] - if (column.type === data.newType) { - return table - } + const column = schema.columns[columnIndex] + if (column.type === data.newType) { + return table + } - // Validate existing data is compatible with the new type - const rows = await db - .select({ id: userTableRows.id, data: userTableRows.data }) - .from(userTableRows) - .where( - and( - eq(userTableRows.tableId, data.tableId), - sql`${userTableRows.data} ? ${column.name}`, - sql`${userTableRows.data}->>${column.name}::text IS NOT NULL` + // Validate existing data is compatible with the new type + const rows = await trx + .select({ id: userTableRows.id, data: userTableRows.data }) + .from(userTableRows) + .where( + and( + eq(userTableRows.tableId, data.tableId), + sql`${userTableRows.data} ? ${column.name}`, + sql`${userTableRows.data}->>${column.name}::text IS NOT NULL` + ) ) - ) - let incompatibleCount = 0 - for (const row of rows) { - const rowData = row.data as RowData - const value = rowData[column.name] - if (value === null || value === undefined) continue + let incompatibleCount = 0 + for (const row of rows) { + const rowData = row.data as RowData + const value = rowData[column.name] + if (value === null || value === undefined) continue - if (!isValueCompatibleWithType(value, data.newType)) { - incompatibleCount++ + if (!isValueCompatibleWithType(value, data.newType)) { + incompatibleCount++ + } } - } - if (incompatibleCount > 0) { - throw new Error( - `Cannot change column "${column.name}" to type "${data.newType}": ${incompatibleCount} row(s) have incompatible values. Fix or remove the incompatible values first.` - ) - } + if (incompatibleCount > 0) { + throw new Error( + `Cannot change column "${column.name}" to type "${data.newType}": ${incompatibleCount} row(s) have incompatible values. Fix or remove the incompatible values first.` + ) + } - const updatedColumns = schema.columns.map((c, i) => - i === columnIndex ? { ...c, type: data.newType } : c - ) - const updatedSchema: TableSchema = { ...schema, columns: updatedColumns } - const now = new Date() + const updatedColumns = schema.columns.map((c, i) => + i === columnIndex ? { ...c, type: data.newType } : c + ) + const updatedSchema: TableSchema = { ...schema, columns: updatedColumns } + const now = new Date() - await db - .update(userTableDefinitions) - .set({ schema: updatedSchema, updatedAt: now }) - .where(eq(userTableDefinitions.id, data.tableId)) + await trx + .update(userTableDefinitions) + .set({ schema: updatedSchema, updatedAt: now }) + .where(eq(userTableDefinitions.id, data.tableId)) - logger.info( - `[${requestId}] Changed column "${column.name}" type from "${column.type}" to "${data.newType}" in table ${data.tableId}` - ) + logger.info( + `[${requestId}] Changed column "${column.name}" type from "${column.type}" to "${data.newType}" in table ${data.tableId}` + ) - return { ...table, schema: updatedSchema, updatedAt: now } + return { ...table, schema: updatedSchema, updatedAt: now } + }) } /** @@ -3034,75 +3053,76 @@ export async function updateColumnConstraints( data: UpdateColumnConstraintsData, requestId: string ): Promise { - const table = await getTableById(data.tableId) - if (!table) { - throw new Error('Table not found') - } - - const schema = table.schema - const columnIndex = schema.columns.findIndex( - (c) => c.name.toLowerCase() === data.columnName.toLowerCase() - ) - if (columnIndex === -1) { - throw new Error(`Column "${data.columnName}" not found`) - } + return withLockedTable(data.tableId, async (table, trx) => { + await setTableTxTimeouts(trx, { + statementMs: scaledStatementTimeoutMs(table.rowCount ?? 0, { baseMs: 60_000, perRowMs: 2 }), + }) - const column = schema.columns[columnIndex] - if (column.workflowGroupId) { - throw new Error( - `Cannot change constraints on workflow-output column "${column.name}". Constraints aren't applicable to columns whose values come from workflow execution.` + const schema = table.schema + const columnIndex = schema.columns.findIndex( + (c) => c.name.toLowerCase() === data.columnName.toLowerCase() ) - } - if (data.required === true && !column.required) { - const [result] = await db - .select({ count: count() }) - .from(userTableRows) - .where( - and( - eq(userTableRows.tableId, data.tableId), - sql`(NOT (${userTableRows.data} ? ${column.name}) OR ${userTableRows.data}->>${column.name}::text IS NULL)` - ) - ) + if (columnIndex === -1) { + throw new Error(`Column "${data.columnName}" not found`) + } - if (result.count > 0) { + const column = schema.columns[columnIndex] + if (column.workflowGroupId) { throw new Error( - `Cannot set column "${column.name}" as required: ${result.count} row(s) have null or missing values` + `Cannot change constraints on workflow-output column "${column.name}". Constraints aren't applicable to columns whose values come from workflow execution.` ) } - } + if (data.required === true && !column.required) { + const [result] = await trx + .select({ count: count() }) + .from(userTableRows) + .where( + and( + eq(userTableRows.tableId, data.tableId), + sql`(NOT (${userTableRows.data} ? ${column.name}) OR ${userTableRows.data}->>${column.name}::text IS NULL)` + ) + ) + + if (result.count > 0) { + throw new Error( + `Cannot set column "${column.name}" as required: ${result.count} row(s) have null or missing values` + ) + } + } - if (data.unique === true && !column.unique) { - const duplicates = (await db.execute( - sql`SELECT ${userTableRows.data}->>${column.name}::text AS val, count(*) AS cnt FROM ${userTableRows} WHERE table_id = ${data.tableId} AND ${userTableRows.data} ? ${column.name} AND ${userTableRows.data}->>${column.name}::text IS NOT NULL GROUP BY val HAVING count(*) > 1 LIMIT 1` - )) as { val: string; cnt: number }[] + if (data.unique === true && !column.unique) { + const duplicates = (await trx.execute( + sql`SELECT ${userTableRows.data}->>${column.name}::text AS val, count(*) AS cnt FROM ${userTableRows} WHERE table_id = ${data.tableId} AND ${userTableRows.data} ? ${column.name} AND ${userTableRows.data}->>${column.name}::text IS NOT NULL GROUP BY val HAVING count(*) > 1 LIMIT 1` + )) as { val: string; cnt: number }[] - if (duplicates.length > 0) { - throw new Error(`Cannot set column "${column.name}" as unique: duplicate values exist`) + if (duplicates.length > 0) { + throw new Error(`Cannot set column "${column.name}" as unique: duplicate values exist`) + } } - } - const updatedColumns = schema.columns.map((c, i) => - i === columnIndex - ? { - ...c, - ...(data.required !== undefined ? { required: data.required } : {}), - ...(data.unique !== undefined ? { unique: data.unique } : {}), - } - : c - ) - const updatedSchema: TableSchema = { ...schema, columns: updatedColumns } - const now = new Date() + const updatedColumns = schema.columns.map((c, i) => + i === columnIndex + ? { + ...c, + ...(data.required !== undefined ? { required: data.required } : {}), + ...(data.unique !== undefined ? { unique: data.unique } : {}), + } + : c + ) + const updatedSchema: TableSchema = { ...schema, columns: updatedColumns } + const now = new Date() - await db - .update(userTableDefinitions) - .set({ schema: updatedSchema, updatedAt: now }) - .where(eq(userTableDefinitions.id, data.tableId)) + await trx + .update(userTableDefinitions) + .set({ schema: updatedSchema, updatedAt: now }) + .where(eq(userTableDefinitions.id, data.tableId)) - logger.info( - `[${requestId}] Updated constraints for column "${column.name}" in table ${data.tableId}` - ) + logger.info( + `[${requestId}] Updated constraints for column "${column.name}" in table ${data.tableId}` + ) - return { ...table, schema: updatedSchema, updatedAt: now } + return { ...table, schema: updatedSchema, updatedAt: now } + }) } /** @@ -3114,72 +3134,69 @@ export async function addWorkflowGroup( data: AddWorkflowGroupData, requestId: string ): Promise { - const table = await getTableById(data.tableId) - if (!table) { - throw new Error('Table not found') - } + const updatedTable = await withLockedTable(data.tableId, async (table, trx) => { + const schema = table.schema + const groups = schema.workflowGroups ?? [] + if (groups.some((g) => g.id === data.group.id)) { + throw new Error(`Workflow group "${data.group.id}" already exists`) + } - const schema = table.schema - const groups = schema.workflowGroups ?? [] - if (groups.some((g) => g.id === data.group.id)) { - throw new Error(`Workflow group "${data.group.id}" already exists`) - } + const existingNames = new Set(schema.columns.map((c) => c.name.toLowerCase())) + for (const col of data.outputColumns) { + if (!NAME_PATTERN.test(col.name)) { + throw new Error( + `Invalid output column name "${col.name}". Must satisfy ${NAME_PATTERN.source}.` + ) + } + if (existingNames.has(col.name.toLowerCase())) { + throw new Error(`Column "${col.name}" already exists`) + } + } - const existingNames = new Set(schema.columns.map((c) => c.name.toLowerCase())) - for (const col of data.outputColumns) { - if (!NAME_PATTERN.test(col.name)) { + if (schema.columns.length + data.outputColumns.length > TABLE_LIMITS.MAX_COLUMNS_PER_TABLE) { throw new Error( - `Invalid output column name "${col.name}". Must satisfy ${NAME_PATTERN.source}.` + `Adding ${data.outputColumns.length} columns would exceed the maximum (${TABLE_LIMITS.MAX_COLUMNS_PER_TABLE}).` ) } - if (existingNames.has(col.name.toLowerCase())) { - throw new Error(`Column "${col.name}" already exists`) - } - } - - if (schema.columns.length + data.outputColumns.length > TABLE_LIMITS.MAX_COLUMNS_PER_TABLE) { - throw new Error( - `Adding ${data.outputColumns.length} columns would exceed the maximum (${TABLE_LIMITS.MAX_COLUMNS_PER_TABLE}).` - ) - } - const updatedSchema: TableSchema = { - ...schema, - columns: [...schema.columns, ...data.outputColumns], - workflowGroups: [...groups, data.group], - } + const updatedSchema: TableSchema = { + ...schema, + columns: [...schema.columns, ...data.outputColumns], + workflowGroups: [...groups, data.group], + } - // Keep `metadata.columnOrder` in sync — see `addTableColumn` for the - // invariant. New output columns get appended in the order the caller - // supplied (matches their position in `schema.columns`). - const existingOrder = table.metadata?.columnOrder - let updatedMetadata = table.metadata - if (existingOrder && existingOrder.length > 0) { - const known = new Set(existingOrder) - const append = data.outputColumns.map((c) => c.name).filter((n) => !known.has(n)) - if (append.length > 0) { - updatedMetadata = { ...table.metadata, columnOrder: [...existingOrder, ...append] } + // Keep `metadata.columnOrder` in sync — see `addTableColumn` for the + // invariant. New output columns get appended in the order the caller + // supplied (matches their position in `schema.columns`). + const existingOrder = table.metadata?.columnOrder + let updatedMetadata = table.metadata + if (existingOrder && existingOrder.length > 0) { + const known = new Set(existingOrder) + const append = data.outputColumns.map((c) => c.name).filter((n) => !known.has(n)) + if (append.length > 0) { + updatedMetadata = { ...table.metadata, columnOrder: [...existingOrder, ...append] } + } } - } - assertValidSchema(updatedSchema, updatedMetadata?.columnOrder) + assertValidSchema(updatedSchema, updatedMetadata?.columnOrder) - const now = new Date() - await db - .update(userTableDefinitions) - .set({ schema: updatedSchema, metadata: updatedMetadata, updatedAt: now }) - .where(eq(userTableDefinitions.id, data.tableId)) + const now = new Date() + await trx + .update(userTableDefinitions) + .set({ schema: updatedSchema, metadata: updatedMetadata, updatedAt: now }) + .where(eq(userTableDefinitions.id, data.tableId)) - logger.info( - `[${requestId}] Added workflow group "${data.group.id}" with ${data.outputColumns.length} output column(s) to table ${data.tableId}` - ) + logger.info( + `[${requestId}] Added workflow group "${data.group.id}" with ${data.outputColumns.length} output column(s) to table ${data.tableId}` + ) - const updatedTable: TableDefinition = { - ...table, - schema: updatedSchema, - metadata: updatedMetadata, - updatedAt: now, - } + return { + ...table, + schema: updatedSchema, + metadata: updatedMetadata, + updatedAt: now, + } + }) // Auto-fire existing rows whose deps are already met for the new group. // Fire-and-forget — the dispatcher bounds queue depth (window of 20) and @@ -3210,239 +3227,252 @@ export async function updateWorkflowGroup( data: UpdateWorkflowGroupData, requestId: string ): Promise { - const table = await getTableById(data.tableId) - if (!table) { - throw new Error('Table not found') - } - - const schema = table.schema - const groups = schema.workflowGroups ?? [] - const groupIndex = groups.findIndex((g) => g.id === data.groupId) - if (groupIndex === -1) { - throw new Error(`Workflow group "${data.groupId}" not found`) - } - const group = groups[groupIndex] - - // Apply `mappingUpdates` first: each entry repoints an existing output's - // `(blockId, path)` while preserving the column. We patch the **old** view - // of outputs so the downstream `(blockId, path)`-keyed diff doesn't see the - // swap as a remove+add. The corresponding row data is cleared after the - // schema write so stale values from the old source don't linger. - const mappingUpdates = data.mappingUpdates ?? [] - const remappedColumnNames = new Set() - // Per-column type override resolved from the new mapping's leaf type. Only - // populated when a remap actually changes the column's type — keeps the - // schema patch a no-op when the user repoints to an output of the same - // type. Falls back to leaving the existing type alone if the workflow or - // its target output can't be resolved (workflow deleted, block removed). - const remappedColumnTypes = new Map() - let oldOutputs = group.outputs - if (mappingUpdates.length > 0) { - const updateByName = new Map(mappingUpdates.map((u) => [u.columnName, u])) - for (const u of mappingUpdates) { - const exists = oldOutputs.some((o) => o.columnName === u.columnName) - if (!exists) { - throw new Error( - `Mapping update for unknown column "${u.columnName}" (group ${data.groupId}).` - ) + const { updatedTable, added, remappedColumnNames, newOutputs, previousAutoRun } = + await withLockedTable(data.tableId, async (table, trx) => { + // Generous idle timeout: the leaf-type resolution below loads the + // workflow on a separate connection, leaving this transaction idle while + // it runs — the default 5s `idle_in_transaction_session_timeout` would + // kill the held lock on a large workflow. + await setTableTxTimeouts(trx, { statementMs: 60_000, idleMs: 30_000 }) + + const schema = table.schema + const groups = schema.workflowGroups ?? [] + const groupIndex = groups.findIndex((g) => g.id === data.groupId) + if (groupIndex === -1) { + throw new Error(`Workflow group "${data.groupId}" not found`) } - } - oldOutputs = oldOutputs.map((o) => { - const u = updateByName.get(o.columnName) - if (!u) return o - remappedColumnNames.add(o.columnName) - return { ...o, blockId: u.blockId, path: u.path } - }) - - // Resolve the new leaf type for each remap so the column's declared type - // matches what the new mapping produces. Without this, a string→number - // remap would keep `type: 'string'` and coerceRowToSchema would coerce - // every backfilled value toward the wrong type. - try { - const [ - { loadWorkflowFromNormalizedTables }, - { flattenWorkflowOutputs }, - { columnTypeForLeaf }, - ] = await Promise.all([ - import('@/lib/workflows/persistence/utils'), - import('@/lib/workflows/blocks/flatten-outputs'), - import('./column-naming'), - ]) - const targetWorkflowId = data.workflowId ?? group.workflowId - const normalized = await loadWorkflowFromNormalizedTables(targetWorkflowId) - if (normalized) { - const blocks = Object.values(normalized.blocks ?? {}).map((b) => ({ - id: b.id, - type: b.type, - name: b.name, - triggerMode: (b as { triggerMode?: boolean }).triggerMode, - subBlocks: b.subBlocks as Record | undefined, - })) - const flattened = flattenWorkflowOutputs(blocks, normalized.edges ?? []) - const flatByKey = new Map(flattened.map((f) => [`${f.blockId}::${f.path}`, f])) - const colByName = new Map(schema.columns.map((c) => [c.name, c])) + const group = groups[groupIndex] + + // Apply `mappingUpdates` first: each entry repoints an existing output's + // `(blockId, path)` while preserving the column. We patch the **old** view + // of outputs so the downstream `(blockId, path)`-keyed diff doesn't see the + // swap as a remove+add. The corresponding row data is cleared after the + // schema write so stale values from the old source don't linger. + const mappingUpdates = data.mappingUpdates ?? [] + const remappedColumnNames = new Set() + // Per-column type override resolved from the new mapping's leaf type. Only + // populated when a remap actually changes the column's type — keeps the + // schema patch a no-op when the user repoints to an output of the same + // type. Falls back to leaving the existing type alone if the workflow or + // its target output can't be resolved (workflow deleted, block removed). + const remappedColumnTypes = new Map() + let oldOutputs = group.outputs + if (mappingUpdates.length > 0) { + const updateByName = new Map(mappingUpdates.map((u) => [u.columnName, u])) for (const u of mappingUpdates) { - const match = flatByKey.get(`${u.blockId}::${u.path}`) - if (!match) continue - const newType = columnTypeForLeaf(match.leafType) - const oldType = colByName.get(u.columnName)?.type - if (newType && newType !== oldType) { - remappedColumnTypes.set(u.columnName, newType) + const exists = oldOutputs.some((o) => o.columnName === u.columnName) + if (!exists) { + throw new Error( + `Mapping update for unknown column "${u.columnName}" (group ${data.groupId}).` + ) } } + oldOutputs = oldOutputs.map((o) => { + const u = updateByName.get(o.columnName) + if (!u) return o + remappedColumnNames.add(o.columnName) + return { ...o, blockId: u.blockId, path: u.path } + }) + + // Resolve the new leaf type for each remap so the column's declared type + // matches what the new mapping produces. Without this, a string→number + // remap would keep `type: 'string'` and coerceRowToSchema would coerce + // every backfilled value toward the wrong type. + try { + const [ + { loadWorkflowFromNormalizedTables }, + { flattenWorkflowOutputs }, + { columnTypeForLeaf }, + ] = await Promise.all([ + import('@/lib/workflows/persistence/utils'), + import('@/lib/workflows/blocks/flatten-outputs'), + import('./column-naming'), + ]) + const targetWorkflowId = data.workflowId ?? group.workflowId + const normalized = await loadWorkflowFromNormalizedTables(targetWorkflowId) + if (normalized) { + const blocks = Object.values(normalized.blocks ?? {}).map((b) => ({ + id: b.id, + type: b.type, + name: b.name, + triggerMode: (b as { triggerMode?: boolean }).triggerMode, + subBlocks: b.subBlocks as Record | undefined, + })) + const flattened = flattenWorkflowOutputs(blocks, normalized.edges ?? []) + const flatByKey = new Map(flattened.map((f) => [`${f.blockId}::${f.path}`, f])) + const colByName = new Map(schema.columns.map((c) => [c.name, c])) + for (const u of mappingUpdates) { + const match = flatByKey.get(`${u.blockId}::${u.path}`) + if (!match) continue + const newType = columnTypeForLeaf(match.leafType) + const oldType = colByName.get(u.columnName)?.type + if (newType && newType !== oldType) { + remappedColumnTypes.set(u.columnName, newType) + } + } + } + } catch (err) { + logger.warn( + `[${requestId}] Could not resolve new leaf types for remap on group ${data.groupId}; leaving column types unchanged:`, + err + ) + } } - } catch (err) { - logger.warn( - `[${requestId}] Could not resolve new leaf types for remap on group ${data.groupId}; leaving column types unchanged:`, - err - ) - } - } - // If the caller passed `outputs`, that's the new full set. If only - // `mappingUpdates` was sent, the new set is the remapped old set. - const newOutputs = data.outputs ?? oldOutputs - // Enrichment outputs all share empty `blockId`/`path`, so keying on those - // alone collapses every sibling to one entry (dropping columns on diff). Key - // on the registry `outputId` when present; fall back to `blockId::path` for - // workflow outputs. - const oldKey = (o: WorkflowGroupOutput) => - o.outputId ? `out::${o.outputId}` : `${o.blockId}::${o.path}` - const oldByKey = new Map(oldOutputs.map((o) => [oldKey(o), o])) - const newByKey = new Map(newOutputs.map((o) => [oldKey(o), o])) + // If the caller passed `outputs`, that's the new full set. If only + // `mappingUpdates` was sent, the new set is the remapped old set. + const newOutputs = data.outputs ?? oldOutputs + // Enrichment outputs all share empty `blockId`/`path`, so keying on those + // alone collapses every sibling to one entry (dropping columns on diff). Key + // on the registry `outputId` when present; fall back to `blockId::path` for + // workflow outputs. + const oldKey = (o: WorkflowGroupOutput) => + o.outputId ? `out::${o.outputId}` : `${o.blockId}::${o.path}` + const oldByKey = new Map(oldOutputs.map((o) => [oldKey(o), o])) + const newByKey = new Map(newOutputs.map((o) => [oldKey(o), o])) + + const removed = oldOutputs.filter((o) => !newByKey.has(oldKey(o))) + const added = newOutputs.filter((o) => !oldByKey.has(oldKey(o))) + const newColDefs = data.newOutputColumns ?? [] + const newColByName = new Map(newColDefs.map((c) => [c.name, c])) + + for (const out of added) { + if (!newColByName.has(out.columnName)) { + throw new Error( + `Missing column definition for new output "${out.columnName}" (group ${data.groupId}).` + ) + } + } - const removed = oldOutputs.filter((o) => !newByKey.has(oldKey(o))) - const added = newOutputs.filter((o) => !oldByKey.has(oldKey(o))) - const newColDefs = data.newOutputColumns ?? [] - const newColByName = new Map(newColDefs.map((c) => [c.name, c])) + const removedColumnNames = new Set(removed.map((o) => o.columnName)) + let nextColumns = schema.columns + .filter((c) => !removedColumnNames.has(c.name)) + .map((c) => { + const newType = remappedColumnTypes.get(c.name) + return newType ? { ...c, type: newType } : c + }) + if (newColDefs.length > 0) { + // Splice the new column defs into the group's contiguous run rather than + // appending at the end. The desired in-group order is `newOutputs` (the + // sidebar's BFS-of-the-workflow ordering); we walk it, anchor at the first + // surviving sibling's index in `nextColumns`, and emit each output's + // column def in turn. + const groupColNames = new Set(newOutputs.map((o) => o.columnName)) + const firstGroupIdx = nextColumns.findIndex((c) => groupColNames.has(c.name)) + const anchorIdx = firstGroupIdx === -1 ? nextColumns.length : firstGroupIdx + const newColByLowerName = new Map(newColDefs.map((c) => [c.name.toLowerCase(), c])) + const orderedGroupCols: ColumnDefinition[] = [] + for (const out of newOutputs) { + const fresh = newColByLowerName.get(out.columnName.toLowerCase()) + if (fresh) { + orderedGroupCols.push(fresh) + } else { + const existing = nextColumns.find( + (c) => c.name.toLowerCase() === out.columnName.toLowerCase() + ) + if (existing) orderedGroupCols.push(existing) + } + } + const remaining = nextColumns.filter((c) => !groupColNames.has(c.name)) + nextColumns = [ + ...remaining.slice(0, anchorIdx), + ...orderedGroupCols, + ...remaining.slice(anchorIdx), + ] + } - for (const out of added) { - if (!newColByName.has(out.columnName)) { - throw new Error( - `Missing column definition for new output "${out.columnName}" (group ${data.groupId}).` - ) - } - } + const updatedGroup: WorkflowGroup = { + ...group, + workflowId: data.workflowId ?? group.workflowId, + name: data.name ?? group.name, + dependencies: data.dependencies ?? group.dependencies, + outputs: newOutputs, + ...(data.inputMappings !== undefined ? { inputMappings: data.inputMappings } : {}), + ...(data.type !== undefined ? { type: data.type } : {}), + ...(data.autoRun !== undefined ? { autoRun: data.autoRun } : {}), + } + // Removed outputs may be referenced as deps by sibling groups; strip those + // refs so we don't leave dangling-column deps that fail schema validation. + const nextGroups = groups + .map((g, i) => (i === groupIndex ? updatedGroup : g)) + .map((g) => (g.id === updatedGroup.id ? g : stripGroupDeps(g, removedColumnNames))) + const updatedSchema: TableSchema = { + ...schema, + columns: nextColumns, + workflowGroups: nextGroups, + } - const removedColumnNames = new Set(removed.map((o) => o.columnName)) - let nextColumns = schema.columns - .filter((c) => !removedColumnNames.has(c.name)) - .map((c) => { - const newType = remappedColumnTypes.get(c.name) - return newType ? { ...c, type: newType } : c - }) - if (newColDefs.length > 0) { - // Splice the new column defs into the group's contiguous run rather than - // appending at the end. The desired in-group order is `newOutputs` (the - // sidebar's BFS-of-the-workflow ordering); we walk it, anchor at the first - // surviving sibling's index in `nextColumns`, and emit each output's - // column def in turn. - const groupColNames = new Set(newOutputs.map((o) => o.columnName)) - const firstGroupIdx = nextColumns.findIndex((c) => groupColNames.has(c.name)) - const anchorIdx = firstGroupIdx === -1 ? nextColumns.length : firstGroupIdx - const newColByLowerName = new Map(newColDefs.map((c) => [c.name.toLowerCase(), c])) - const orderedGroupCols: ColumnDefinition[] = [] - for (const out of newOutputs) { - const fresh = newColByLowerName.get(out.columnName.toLowerCase()) - if (fresh) { - orderedGroupCols.push(fresh) - } else { - const existing = nextColumns.find( - (c) => c.name.toLowerCase() === out.columnName.toLowerCase() + // `columnOrder` mirrors the schema layout. Drop removed columns, then splice + // the new ones in at the same anchor as `nextColumns` so the table renders + // them inside the group's contiguous run instead of at the tail. + let updatedColumnOrder = table.metadata?.columnOrder?.filter( + (n) => !removedColumnNames.has(n) + ) + if (updatedColumnOrder && newColDefs.length > 0) { + const newColNamesLower = new Set(newColDefs.map((c) => c.name.toLowerCase())) + const orderWithoutNew = updatedColumnOrder.filter( + (n) => !newColNamesLower.has(n.toLowerCase()) ) - if (existing) orderedGroupCols.push(existing) + const groupColNames = new Set(newOutputs.map((o) => o.columnName)) + const orderedGroupNames = newOutputs.map((o) => o.columnName) + const firstGroupOrderIdx = orderWithoutNew.findIndex((n) => groupColNames.has(n)) + const anchorOrderIdx = + firstGroupOrderIdx === -1 ? orderWithoutNew.length : firstGroupOrderIdx + const remainingOrder = orderWithoutNew.filter((n) => !groupColNames.has(n)) + updatedColumnOrder = [ + ...remainingOrder.slice(0, anchorOrderIdx), + ...orderedGroupNames, + ...remainingOrder.slice(anchorOrderIdx), + ] } - } - const remaining = nextColumns.filter((c) => !groupColNames.has(c.name)) - nextColumns = [ - ...remaining.slice(0, anchorIdx), - ...orderedGroupCols, - ...remaining.slice(anchorIdx), - ] - } + assertValidSchema(updatedSchema, updatedColumnOrder) - const updatedGroup: WorkflowGroup = { - ...group, - workflowId: data.workflowId ?? group.workflowId, - name: data.name ?? group.name, - dependencies: data.dependencies ?? group.dependencies, - outputs: newOutputs, - ...(data.inputMappings !== undefined ? { inputMappings: data.inputMappings } : {}), - ...(data.type !== undefined ? { type: data.type } : {}), - ...(data.autoRun !== undefined ? { autoRun: data.autoRun } : {}), - } - // Removed outputs may be referenced as deps by sibling groups; strip those - // refs so we don't leave dangling-column deps that fail schema validation. - const nextGroups = groups - .map((g, i) => (i === groupIndex ? updatedGroup : g)) - .map((g) => (g.id === updatedGroup.id ? g : stripGroupDeps(g, removedColumnNames))) - const updatedSchema: TableSchema = { - ...schema, - columns: nextColumns, - workflowGroups: nextGroups, - } - - // `columnOrder` mirrors the schema layout. Drop removed columns, then splice - // the new ones in at the same anchor as `nextColumns` so the table renders - // them inside the group's contiguous run instead of at the tail. - let updatedColumnOrder = table.metadata?.columnOrder?.filter((n) => !removedColumnNames.has(n)) - if (updatedColumnOrder && newColDefs.length > 0) { - const newColNamesLower = new Set(newColDefs.map((c) => c.name.toLowerCase())) - const orderWithoutNew = updatedColumnOrder.filter((n) => !newColNamesLower.has(n.toLowerCase())) - const groupColNames = new Set(newOutputs.map((o) => o.columnName)) - const orderedGroupNames = newOutputs.map((o) => o.columnName) - const firstGroupOrderIdx = orderWithoutNew.findIndex((n) => groupColNames.has(n)) - const anchorOrderIdx = firstGroupOrderIdx === -1 ? orderWithoutNew.length : firstGroupOrderIdx - const remainingOrder = orderWithoutNew.filter((n) => !groupColNames.has(n)) - updatedColumnOrder = [ - ...remainingOrder.slice(0, anchorOrderIdx), - ...orderedGroupNames, - ...remainingOrder.slice(anchorOrderIdx), - ] - } - assertValidSchema(updatedSchema, updatedColumnOrder) + const updatedMetadata: TableMetadata | null = + updatedColumnOrder && table.metadata + ? { ...table.metadata, columnOrder: updatedColumnOrder } + : table.metadata + ? { ...table.metadata } + : null - const updatedMetadata: TableMetadata | null = - updatedColumnOrder && table.metadata - ? { ...table.metadata, columnOrder: updatedColumnOrder } - : table.metadata - ? { ...table.metadata } - : null + const now = new Date() + await trx + .update(userTableDefinitions) + .set({ schema: updatedSchema, metadata: updatedMetadata, updatedAt: now }) + .where(eq(userTableDefinitions.id, data.tableId)) + for (const name of removedColumnNames) { + await trx.execute( + sql`UPDATE user_table_rows SET data = data - ${name}::text WHERE table_id = ${data.tableId} AND data ? ${name}::text` + ) + } + // Remapped columns: clear stale values in-tx so rows the backfill can't + // repopulate (no log, no matching span output) end up empty rather than + // retaining the previous mapping's value. The backfill below then writes + // the new mapping's value into rows where it can find one. + for (const name of remappedColumnNames) { + if (removedColumnNames.has(name)) continue + await trx.execute( + sql`UPDATE user_table_rows SET data = data - ${name}::text WHERE table_id = ${data.tableId} AND data ? ${name}::text` + ) + } - const now = new Date() - await db.transaction(async (trx) => { - await setTableTxTimeouts(trx, { statementMs: 60_000 }) - await trx - .update(userTableDefinitions) - .set({ schema: updatedSchema, metadata: updatedMetadata, updatedAt: now }) - .where(eq(userTableDefinitions.id, data.tableId)) - for (const name of removedColumnNames) { - await trx.execute( - sql`UPDATE user_table_rows SET data = data - ${name}::text WHERE table_id = ${data.tableId} AND data ? ${name}::text` - ) - } - // Remapped columns: clear stale values in-tx so rows the backfill can't - // repopulate (no log, no matching span output) end up empty rather than - // retaining the previous mapping's value. The backfill below then writes - // the new mapping's value into rows where it can find one. - for (const name of remappedColumnNames) { - if (removedColumnNames.has(name)) continue - await trx.execute( - sql`UPDATE user_table_rows SET data = data - ${name}::text WHERE table_id = ${data.tableId} AND data ? ${name}::text` + logger.info( + `[${requestId}] Updated workflow group "${data.groupId}" in table ${data.tableId} (added=${added.length}, removed=${removed.length}, remapped=${remappedColumnNames.size})` ) - } - }) - - logger.info( - `[${requestId}] Updated workflow group "${data.groupId}" in table ${data.tableId} (added=${added.length}, removed=${removed.length}, remapped=${remappedColumnNames.size})` - ) - const updatedTable: TableDefinition = { - ...table, - schema: updatedSchema, - metadata: updatedMetadata, - updatedAt: now, - } + const updatedTable: TableDefinition = { + ...table, + schema: updatedSchema, + metadata: updatedMetadata, + updatedAt: now, + } + return { + updatedTable, + added, + remappedColumnNames, + newOutputs, + previousAutoRun: group.autoRun, + } + }) // Backfill from saved execution logs so already-completed group runs surface // the schema changes without re-running the workflow. Two passes: @@ -3490,7 +3520,7 @@ export async function updateWorkflowGroup( // autoRun toggled false → true: fire deps-satisfied rows now via the // dispatcher. Mirrors the post-add path so re-enabling auto-fire doesn't // require manual run clicks for rows that are already eligible. - if (group.autoRun === false && data.autoRun === true) { + if (previousAutoRun === false && data.autoRun === true) { void runWorkflowColumn({ tableId: updatedTable.id, workspaceId: updatedTable.workspaceId, @@ -3524,168 +3554,181 @@ export async function addWorkflowGroupOutput( }, requestId: string ): Promise { - const table = await getTableById(data.tableId) - if (!table) throw new Error('Table not found') + const { updatedTable, newOutput } = await withLockedTable(data.tableId, async (table, trx) => { + // Generous idle timeout: the workflow load below runs on a separate + // connection, leaving this transaction idle while it resolves the pickable + // output — the default 5s `idle_in_transaction_session_timeout` would kill + // the held lock on a large workflow. + await setTableTxTimeouts(trx, { idleMs: 30_000 }) + + const schema = table.schema + const groups = schema.workflowGroups ?? [] + const groupIndex = groups.findIndex((g) => g.id === data.groupId) + if (groupIndex === -1) { + throw new Error(`Workflow group "${data.groupId}" not found`) + } + const group = groups[groupIndex] - const schema = table.schema - const groups = schema.workflowGroups ?? [] - const groupIndex = groups.findIndex((g) => g.id === data.groupId) - if (groupIndex === -1) { - throw new Error(`Workflow group "${data.groupId}" not found`) - } - const group = groups[groupIndex] + if (group.outputs.some((o) => o.blockId === data.blockId && o.path === data.path)) { + throw new Error( + `Workflow group "${data.groupId}" already has an output at ${data.blockId}::${data.path}` + ) + } - if (group.outputs.some((o) => o.blockId === data.blockId && o.path === data.path)) { - throw new Error( - `Workflow group "${data.groupId}" already has an output at ${data.blockId}::${data.path}` - ) - } + const [ + { loadWorkflowFromNormalizedTables }, + { flattenWorkflowOutputs, getBlockExecutionOrder }, + { columnTypeForLeaf, deriveOutputColumnName }, + ] = await Promise.all([ + import('@/lib/workflows/persistence/utils'), + import('@/lib/workflows/blocks/flatten-outputs'), + import('./column-naming'), + ]) + const normalized = await loadWorkflowFromNormalizedTables(group.workflowId) + if (!normalized) { + throw new Error(`Workflow ${group.workflowId} not found`) + } + const blocks = Object.values(normalized.blocks ?? {}).map((b) => ({ + id: b.id, + type: b.type, + name: b.name, + triggerMode: (b as { triggerMode?: boolean }).triggerMode, + subBlocks: b.subBlocks as Record | undefined, + })) + const flattened = flattenWorkflowOutputs(blocks, normalized.edges ?? []) + const match = flattened.find((f) => f.blockId === data.blockId && f.path === data.path) + if (!match) { + throw new Error( + `Output ${data.blockId}::${data.path} is not a valid pickable output on workflow ${group.workflowId}` + ) + } + const distances = getBlockExecutionOrder(blocks, normalized.edges ?? []) + const flatIndex = new Map(flattened.map((f, i) => [`${f.blockId}::${f.path}`, i])) - const [ - { loadWorkflowFromNormalizedTables }, - { flattenWorkflowOutputs, getBlockExecutionOrder }, - { columnTypeForLeaf, deriveOutputColumnName }, - ] = await Promise.all([ - import('@/lib/workflows/persistence/utils'), - import('@/lib/workflows/blocks/flatten-outputs'), - import('./column-naming'), - ]) - const normalized = await loadWorkflowFromNormalizedTables(group.workflowId) - if (!normalized) { - throw new Error(`Workflow ${group.workflowId} not found`) - } - const blocks = Object.values(normalized.blocks ?? {}).map((b) => ({ - id: b.id, - type: b.type, - name: b.name, - triggerMode: (b as { triggerMode?: boolean }).triggerMode, - subBlocks: b.subBlocks as Record | undefined, - })) - const flattened = flattenWorkflowOutputs(blocks, normalized.edges ?? []) - const match = flattened.find((f) => f.blockId === data.blockId && f.path === data.path) - if (!match) { - throw new Error( - `Output ${data.blockId}::${data.path} is not a valid pickable output on workflow ${group.workflowId}` - ) - } - const distances = getBlockExecutionOrder(blocks, normalized.edges ?? []) - const flatIndex = new Map(flattened.map((f, i) => [`${f.blockId}::${f.path}`, i])) + const taken = new Set(schema.columns.map((c) => c.name)) + const columnName = data.columnName ?? deriveOutputColumnName(data.path, taken) + if (!NAME_PATTERN.test(columnName)) { + throw new Error(`Invalid column name "${columnName}". Must satisfy ${NAME_PATTERN.source}.`) + } + if (taken.has(columnName)) { + throw new Error(`Column "${columnName}" already exists`) + } + if (schema.columns.length + 1 > TABLE_LIMITS.MAX_COLUMNS_PER_TABLE) { + throw new Error( + `Adding a column would exceed the maximum (${TABLE_LIMITS.MAX_COLUMNS_PER_TABLE}).` + ) + } - const taken = new Set(schema.columns.map((c) => c.name)) - const columnName = data.columnName ?? deriveOutputColumnName(data.path, taken) - if (!NAME_PATTERN.test(columnName)) { - throw new Error(`Invalid column name "${columnName}". Must satisfy ${NAME_PATTERN.source}.`) - } - if (taken.has(columnName)) { - throw new Error(`Column "${columnName}" already exists`) - } - if (schema.columns.length + 1 > TABLE_LIMITS.MAX_COLUMNS_PER_TABLE) { - throw new Error( - `Adding a column would exceed the maximum (${TABLE_LIMITS.MAX_COLUMNS_PER_TABLE}).` - ) - } + const newColDef: ColumnDefinition = { + name: columnName, + type: columnTypeForLeaf(match.leafType), + required: false, + unique: false, + workflowGroupId: data.groupId, + } + const newOutput: WorkflowGroupOutput = { + blockId: data.blockId, + path: data.path, + columnName, + } + + // Sort all of the group's outputs (existing + new) in workflow execution + // order: BFS distance from the start block ASC, with discovery order as + // tiebreak. This matches what the column-sidebar does at create time, so + // columns from the same workflow always read in the order their blocks run + // — regardless of whether they were added at create time or one-by-one. + const groupColNamesBefore = new Set(group.outputs.map((o) => o.columnName)) + const orderKey = (o: { blockId: string; path: string }) => { + const d = distances[o.blockId] + const dist = d === undefined || d < 0 ? Number.POSITIVE_INFINITY : d + const idx = flatIndex.get(`${o.blockId}::${o.path}`) ?? Number.POSITIVE_INFINITY + return [dist, idx] as const + } + const allGroupOutputs = [...group.outputs, newOutput].sort((a, b) => { + const [da, ia] = orderKey(a) + const [db, ib] = orderKey(b) + return da !== db ? da - db : ia - ib + }) + const orderedGroupColNames = allGroupOutputs.map((o) => o.columnName) + const updatedGroup: WorkflowGroup = { + ...group, + outputs: allGroupOutputs, + } + const nextGroups = groups.map((g, i) => (i === groupIndex ? updatedGroup : g)) + + // Splice the new column run into nextColumns: keep the columns outside the + // group where they were, replace the group's contiguous run with the + // BFS-ordered list. Anchor at the position of the first existing sibling + // (or append if the group was empty). + const colByName = new Map(schema.columns.map((c) => [c.name, c])) + const orderedGroupCols: ColumnDefinition[] = orderedGroupColNames.map((name) => { + if (name === columnName) return newColDef + const existing = colByName.get(name) + if (!existing) { + throw new Error(`Internal: column "${name}" missing while splicing group outputs`) + } + return existing + }) + const remainingCols = schema.columns.filter((c) => !groupColNamesBefore.has(c.name)) + const firstGroupIdx = schema.columns.findIndex((c) => groupColNamesBefore.has(c.name)) + const colAnchor = firstGroupIdx === -1 ? remainingCols.length : firstGroupIdx + const nextColumns = [ + ...remainingCols.slice(0, colAnchor), + ...orderedGroupCols, + ...remainingCols.slice(colAnchor), + ] - const newColDef: ColumnDefinition = { - name: columnName, - type: columnTypeForLeaf(match.leafType), - required: false, - unique: false, - workflowGroupId: data.groupId, - } - const newOutput: WorkflowGroupOutput = { - blockId: data.blockId, - path: data.path, - columnName, - } - - // Sort all of the group's outputs (existing + new) in workflow execution - // order: BFS distance from the start block ASC, with discovery order as - // tiebreak. This matches what the column-sidebar does at create time, so - // columns from the same workflow always read in the order their blocks run - // — regardless of whether they were added at create time or one-by-one. - const groupColNamesBefore = new Set(group.outputs.map((o) => o.columnName)) - const orderKey = (o: { blockId: string; path: string }) => { - const d = distances[o.blockId] - const dist = d === undefined || d < 0 ? Number.POSITIVE_INFINITY : d - const idx = flatIndex.get(`${o.blockId}::${o.path}`) ?? Number.POSITIVE_INFINITY - return [dist, idx] as const - } - const allGroupOutputs = [...group.outputs, newOutput].sort((a, b) => { - const [da, ia] = orderKey(a) - const [db, ib] = orderKey(b) - return da !== db ? da - db : ia - ib - }) - const orderedGroupColNames = allGroupOutputs.map((o) => o.columnName) - const updatedGroup: WorkflowGroup = { - ...group, - outputs: allGroupOutputs, - } - const nextGroups = groups.map((g, i) => (i === groupIndex ? updatedGroup : g)) - - // Splice the new column run into nextColumns: keep the columns outside the - // group where they were, replace the group's contiguous run with the - // BFS-ordered list. Anchor at the position of the first existing sibling - // (or append if the group was empty). - const colByName = new Map(schema.columns.map((c) => [c.name, c])) - const orderedGroupCols: ColumnDefinition[] = orderedGroupColNames.map((name) => { - if (name === columnName) return newColDef - const existing = colByName.get(name) - if (!existing) { - throw new Error(`Internal: column "${name}" missing while splicing group outputs`) - } - return existing - }) - const remainingCols = schema.columns.filter((c) => !groupColNamesBefore.has(c.name)) - const firstGroupIdx = schema.columns.findIndex((c) => groupColNamesBefore.has(c.name)) - const colAnchor = firstGroupIdx === -1 ? remainingCols.length : firstGroupIdx - const nextColumns = [ - ...remainingCols.slice(0, colAnchor), - ...orderedGroupCols, - ...remainingCols.slice(colAnchor), - ] - - const updatedSchema: TableSchema = { - ...schema, - columns: nextColumns, - workflowGroups: nextGroups, - } - - const updatedColumnOrder = table.metadata?.columnOrder - ? (() => { - const orderWithoutGroup = table.metadata!.columnOrder!.filter( - (n) => !groupColNamesBefore.has(n) - ) - const firstGroupOrderIdx = table.metadata!.columnOrder!.findIndex((n) => - groupColNamesBefore.has(n) - ) - const orderAnchor = - firstGroupOrderIdx === -1 ? orderWithoutGroup.length : firstGroupOrderIdx - return [ - ...orderWithoutGroup.slice(0, orderAnchor), - ...orderedGroupColNames, - ...orderWithoutGroup.slice(orderAnchor), - ] - })() - : undefined + const updatedSchema: TableSchema = { + ...schema, + columns: nextColumns, + workflowGroups: nextGroups, + } - assertValidSchema(updatedSchema, updatedColumnOrder) + const updatedColumnOrder = table.metadata?.columnOrder + ? (() => { + const orderWithoutGroup = table.metadata!.columnOrder!.filter( + (n) => !groupColNamesBefore.has(n) + ) + const firstGroupOrderIdx = table.metadata!.columnOrder!.findIndex((n) => + groupColNamesBefore.has(n) + ) + const orderAnchor = + firstGroupOrderIdx === -1 ? orderWithoutGroup.length : firstGroupOrderIdx + return [ + ...orderWithoutGroup.slice(0, orderAnchor), + ...orderedGroupColNames, + ...orderWithoutGroup.slice(orderAnchor), + ] + })() + : undefined + + assertValidSchema(updatedSchema, updatedColumnOrder) + + const updatedMetadata: TableMetadata | null = + updatedColumnOrder && table.metadata + ? { ...table.metadata, columnOrder: updatedColumnOrder } + : table.metadata + ? { ...table.metadata } + : null - const updatedMetadata: TableMetadata | null = - updatedColumnOrder && table.metadata - ? { ...table.metadata, columnOrder: updatedColumnOrder } - : table.metadata - ? { ...table.metadata } - : null + const now = new Date() + await trx + .update(userTableDefinitions) + .set({ schema: updatedSchema, metadata: updatedMetadata, updatedAt: now }) + .where(eq(userTableDefinitions.id, data.tableId)) - const now = new Date() - await db - .update(userTableDefinitions) - .set({ schema: updatedSchema, metadata: updatedMetadata, updatedAt: now }) - .where(eq(userTableDefinitions.id, data.tableId)) + logger.info( + `[${requestId}] Added output "${columnName}" (${newColDef.type}) to workflow group "${data.groupId}" in table ${data.tableId}` + ) - logger.info( - `[${requestId}] Added output "${columnName}" (${newColDef.type}) to workflow group "${data.groupId}" in table ${data.tableId}` - ) + const updatedTable: TableDefinition = { + ...table, + schema: updatedSchema, + metadata: updatedMetadata, + updatedAt: now, + } + return { updatedTable, newOutput } + }) // Backfill from saved execution logs — same flow `updateWorkflowGroup` // uses for added outputs. Reads each row's saved trace spans for the @@ -3694,12 +3737,6 @@ export async function addWorkflowGroupOutput( // Cheap compared to re-running the workflow on every row, which is what // an earlier version of this code did — that mistakenly fanned out N // workflow-group-cell jobs and burned compute the user didn't ask for. - const updatedTable: TableDefinition = { - ...table, - schema: updatedSchema, - metadata: updatedMetadata, - updatedAt: now, - } try { await backfillGroupOutputsFromLogs({ table: updatedTable, @@ -3710,7 +3747,7 @@ export async function addWorkflowGroupOutput( }) } catch (err) { logger.warn( - `[${requestId}] Backfill from execution logs failed for ${data.tableId} group ${data.groupId} after adding output "${columnName}":`, + `[${requestId}] Backfill from execution logs failed for ${data.tableId} group ${data.groupId} after adding output "${newOutput.columnName}":`, err ) } @@ -3728,46 +3765,43 @@ export async function deleteWorkflowGroupOutput( data: { tableId: string; groupId: string; columnName: string }, requestId: string ): Promise { - const table = await getTableById(data.tableId) - if (!table) throw new Error('Table not found') - - const schema = table.schema - const groups = schema.workflowGroups ?? [] - const groupIndex = groups.findIndex((g) => g.id === data.groupId) - if (groupIndex === -1) { - throw new Error(`Workflow group "${data.groupId}" not found`) - } - const group = groups[groupIndex] - if (!group.outputs.some((o) => o.columnName === data.columnName)) { - throw new Error( - `Workflow group "${data.groupId}" has no output bound to column "${data.columnName}"` - ) - } + return withLockedTable(data.tableId, async (table, trx) => { + const schema = table.schema + const groups = schema.workflowGroups ?? [] + const groupIndex = groups.findIndex((g) => g.id === data.groupId) + if (groupIndex === -1) { + throw new Error(`Workflow group "${data.groupId}" not found`) + } + const group = groups[groupIndex] + if (!group.outputs.some((o) => o.columnName === data.columnName)) { + throw new Error( + `Workflow group "${data.groupId}" has no output bound to column "${data.columnName}"` + ) + } - const updatedGroup: WorkflowGroup = { - ...group, - outputs: group.outputs.filter((o) => o.columnName !== data.columnName), - } - const nextGroups = groups.map((g, i) => (i === groupIndex ? updatedGroup : g)) - const nextColumns = schema.columns.filter((c) => c.name !== data.columnName) - const updatedSchema: TableSchema = { - ...schema, - columns: nextColumns, - workflowGroups: nextGroups, - } + const updatedGroup: WorkflowGroup = { + ...group, + outputs: group.outputs.filter((o) => o.columnName !== data.columnName), + } + const nextGroups = groups.map((g, i) => (i === groupIndex ? updatedGroup : g)) + const nextColumns = schema.columns.filter((c) => c.name !== data.columnName) + const updatedSchema: TableSchema = { + ...schema, + columns: nextColumns, + workflowGroups: nextGroups, + } - const updatedColumnOrder = table.metadata?.columnOrder?.filter((n) => n !== data.columnName) - assertValidSchema(updatedSchema, updatedColumnOrder) + const updatedColumnOrder = table.metadata?.columnOrder?.filter((n) => n !== data.columnName) + assertValidSchema(updatedSchema, updatedColumnOrder) - const updatedMetadata: TableMetadata | null = - updatedColumnOrder && table.metadata - ? { ...table.metadata, columnOrder: updatedColumnOrder } - : table.metadata - ? { ...table.metadata } - : null + const updatedMetadata: TableMetadata | null = + updatedColumnOrder && table.metadata + ? { ...table.metadata, columnOrder: updatedColumnOrder } + : table.metadata + ? { ...table.metadata } + : null - const now = new Date() - await db.transaction(async (trx) => { + const now = new Date() await setTableTxTimeouts(trx, { statementMs: 60_000 }) await trx .update(userTableDefinitions) @@ -3776,13 +3810,13 @@ export async function deleteWorkflowGroupOutput( await trx.execute( sql`UPDATE user_table_rows SET data = data - ${data.columnName}::text WHERE table_id = ${data.tableId} AND data ? ${data.columnName}::text` ) - }) - logger.info( - `[${requestId}] Removed output "${data.columnName}" from workflow group "${data.groupId}" in table ${data.tableId}` - ) + logger.info( + `[${requestId}] Removed output "${data.columnName}" from workflow group "${data.groupId}" in table ${data.tableId}` + ) - return { ...table, schema: updatedSchema, metadata: updatedMetadata, updatedAt: now } + return { ...table, schema: updatedSchema, metadata: updatedMetadata, updatedAt: now } + }) } /** @@ -3793,41 +3827,38 @@ export async function deleteWorkflowGroup( data: DeleteWorkflowGroupData, requestId: string ): Promise { - const table = await getTableById(data.tableId) - if (!table) { - throw new Error('Table not found') - } + return withLockedTable(data.tableId, async (table, trx) => { + const schema = table.schema + const groups = schema.workflowGroups ?? [] + const group = groups.find((g) => g.id === data.groupId) + if (!group) { + throw new Error(`Workflow group "${data.groupId}" not found`) + } + + const removedColumnNames = new Set(group.outputs.map((o) => o.columnName)) + // Removed group's output columns may be referenced as deps by sibling groups. + // Strip those refs so we don't leave dangling-column deps behind. + const nextGroups = groups + .filter((g) => g.id !== data.groupId) + .map((g) => stripGroupDeps(g, removedColumnNames)) + const updatedSchema: TableSchema = { + ...schema, + columns: schema.columns.filter((c) => !removedColumnNames.has(c.name)), + workflowGroups: nextGroups, + } + const updatedColumnOrder = table.metadata?.columnOrder?.filter( + (n) => !removedColumnNames.has(n) + ) + assertValidSchema(updatedSchema, updatedColumnOrder) - const schema = table.schema - const groups = schema.workflowGroups ?? [] - const group = groups.find((g) => g.id === data.groupId) - if (!group) { - throw new Error(`Workflow group "${data.groupId}" not found`) - } - - const removedColumnNames = new Set(group.outputs.map((o) => o.columnName)) - // Removed group's output columns may be referenced as deps by sibling groups. - // Strip those refs so we don't leave dangling-column deps behind. - const nextGroups = groups - .filter((g) => g.id !== data.groupId) - .map((g) => stripGroupDeps(g, removedColumnNames)) - const updatedSchema: TableSchema = { - ...schema, - columns: schema.columns.filter((c) => !removedColumnNames.has(c.name)), - workflowGroups: nextGroups, - } - const updatedColumnOrder = table.metadata?.columnOrder?.filter((n) => !removedColumnNames.has(n)) - assertValidSchema(updatedSchema, updatedColumnOrder) - - const updatedMetadata: TableMetadata | null = - updatedColumnOrder && table.metadata - ? { ...table.metadata, columnOrder: updatedColumnOrder } - : table.metadata - ? { ...table.metadata } - : null + const updatedMetadata: TableMetadata | null = + updatedColumnOrder && table.metadata + ? { ...table.metadata, columnOrder: updatedColumnOrder } + : table.metadata + ? { ...table.metadata } + : null - const now = new Date() - await db.transaction(async (trx) => { + const now = new Date() await setTableTxTimeouts(trx, { statementMs: 60_000 }) await trx .update(userTableDefinitions) @@ -3839,16 +3870,18 @@ export async function deleteWorkflowGroup( ) } await stripGroupExecutions(trx, data.tableId, [data.groupId]) - }) - logger.info(`[${requestId}] Deleted workflow group "${data.groupId}" from table ${data.tableId}`) + logger.info( + `[${requestId}] Deleted workflow group "${data.groupId}" from table ${data.tableId}` + ) - return { - ...table, - schema: updatedSchema, - metadata: updatedMetadata, - updatedAt: now, - } + return { + ...table, + schema: updatedSchema, + metadata: updatedMetadata, + updatedAt: now, + } + }) } /** Minimal shape of a trace span we care about for backfill. */ From d05348dad6dd2b3ac3370e65fad5d41634acb01b Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Sat, 30 May 2026 11:56:50 -0700 Subject: [PATCH 3/6] fix(tables): load workflow outside schema lock; use DbOrTx for getTableById --- apps/sim/lib/table/service.ts | 206 +++++++++++++++++++--------------- 1 file changed, 117 insertions(+), 89 deletions(-) diff --git a/apps/sim/lib/table/service.ts b/apps/sim/lib/table/service.ts index 1db31037a63..925214cf48b 100644 --- a/apps/sim/lib/table/service.ts +++ b/apps/sim/lib/table/service.ts @@ -231,7 +231,7 @@ function applyColumnOrderToSchema( export async function getTableById( tableId: string, - options?: { includeArchived?: boolean; tx?: DbTransaction } + options?: { includeArchived?: boolean; tx?: DbOrTx } ): Promise { const { includeArchived = false, tx } = options ?? {} const executor = tx ?? db @@ -3227,13 +3227,60 @@ export async function updateWorkflowGroup( data: UpdateWorkflowGroupData, requestId: string ): Promise { + const mappingUpdates = data.mappingUpdates ?? [] + + // Phase 1 (no lock): when there are mapping updates, load the workflow once to + // resolve each remap's new leaf type. Kept OFF the advisory-lock critical + // section so concurrent group edits on the same table don't time out waiting + // on this DB load. Best-effort — a resolution failure leaves column types + // unchanged (workflow deleted, block removed). The result is applied against + // the fresh schema under the lock in phase 2. + const remapLeafTypeByColumn = new Map() + if (mappingUpdates.length > 0) { + try { + const preTable = await getTableById(data.tableId) + const preGroup = preTable?.schema.workflowGroups?.find((g) => g.id === data.groupId) + const targetWorkflowId = data.workflowId ?? preGroup?.workflowId + if (targetWorkflowId) { + const [ + { loadWorkflowFromNormalizedTables }, + { flattenWorkflowOutputs }, + { columnTypeForLeaf }, + ] = await Promise.all([ + import('@/lib/workflows/persistence/utils'), + import('@/lib/workflows/blocks/flatten-outputs'), + import('./column-naming'), + ]) + const normalized = await loadWorkflowFromNormalizedTables(targetWorkflowId) + if (normalized) { + const blocks = Object.values(normalized.blocks ?? {}).map((b) => ({ + id: b.id, + type: b.type, + name: b.name, + triggerMode: (b as { triggerMode?: boolean }).triggerMode, + subBlocks: b.subBlocks as Record | undefined, + })) + const flattened = flattenWorkflowOutputs(blocks, normalized.edges ?? []) + const flatByKey = new Map(flattened.map((f) => [`${f.blockId}::${f.path}`, f])) + for (const u of mappingUpdates) { + const match = flatByKey.get(`${u.blockId}::${u.path}`) + if (!match) continue + const newType = columnTypeForLeaf(match.leafType) + if (newType) remapLeafTypeByColumn.set(u.columnName, newType) + } + } + } + } catch (err) { + logger.warn( + `[${requestId}] Could not resolve new leaf types for remap on group ${data.groupId}; leaving column types unchanged:`, + err + ) + } + } + const { updatedTable, added, remappedColumnNames, newOutputs, previousAutoRun } = await withLockedTable(data.tableId, async (table, trx) => { - // Generous idle timeout: the leaf-type resolution below loads the - // workflow on a separate connection, leaving this transaction idle while - // it runs — the default 5s `idle_in_transaction_session_timeout` would - // kill the held lock on a large workflow. - await setTableTxTimeouts(trx, { statementMs: 60_000, idleMs: 30_000 }) + await setTableTxTimeouts(trx, { statementMs: 60_000 }) const schema = table.schema const groups = schema.workflowGroups ?? [] @@ -3248,13 +3295,11 @@ export async function updateWorkflowGroup( // of outputs so the downstream `(blockId, path)`-keyed diff doesn't see the // swap as a remove+add. The corresponding row data is cleared after the // schema write so stale values from the old source don't linger. - const mappingUpdates = data.mappingUpdates ?? [] const remappedColumnNames = new Set() - // Per-column type override resolved from the new mapping's leaf type. Only - // populated when a remap actually changes the column's type — keeps the - // schema patch a no-op when the user repoints to an output of the same - // type. Falls back to leaving the existing type alone if the workflow or - // its target output can't be resolved (workflow deleted, block removed). + // Per-column type override resolved (out-of-lock) from the new mapping's + // leaf type. Only populated when a remap actually changes the column's + // type against the fresh schema — keeps the schema patch a no-op when the + // user repoints to an output of the same type. const remappedColumnTypes = new Map() let oldOutputs = group.outputs if (mappingUpdates.length > 0) { @@ -3274,48 +3319,14 @@ export async function updateWorkflowGroup( return { ...o, blockId: u.blockId, path: u.path } }) - // Resolve the new leaf type for each remap so the column's declared type - // matches what the new mapping produces. Without this, a string→number - // remap would keep `type: 'string'` and coerceRowToSchema would coerce - // every backfilled value toward the wrong type. - try { - const [ - { loadWorkflowFromNormalizedTables }, - { flattenWorkflowOutputs }, - { columnTypeForLeaf }, - ] = await Promise.all([ - import('@/lib/workflows/persistence/utils'), - import('@/lib/workflows/blocks/flatten-outputs'), - import('./column-naming'), - ]) - const targetWorkflowId = data.workflowId ?? group.workflowId - const normalized = await loadWorkflowFromNormalizedTables(targetWorkflowId) - if (normalized) { - const blocks = Object.values(normalized.blocks ?? {}).map((b) => ({ - id: b.id, - type: b.type, - name: b.name, - triggerMode: (b as { triggerMode?: boolean }).triggerMode, - subBlocks: b.subBlocks as Record | undefined, - })) - const flattened = flattenWorkflowOutputs(blocks, normalized.edges ?? []) - const flatByKey = new Map(flattened.map((f) => [`${f.blockId}::${f.path}`, f])) - const colByName = new Map(schema.columns.map((c) => [c.name, c])) - for (const u of mappingUpdates) { - const match = flatByKey.get(`${u.blockId}::${u.path}`) - if (!match) continue - const newType = columnTypeForLeaf(match.leafType) - const oldType = colByName.get(u.columnName)?.type - if (newType && newType !== oldType) { - remappedColumnTypes.set(u.columnName, newType) - } - } + const colByName = new Map(schema.columns.map((c) => [c.name, c])) + for (const u of mappingUpdates) { + const newType = remapLeafTypeByColumn.get(u.columnName) + if (!newType) continue + const oldType = colByName.get(u.columnName)?.type + if (newType !== oldType) { + remappedColumnTypes.set(u.columnName, newType) } - } catch (err) { - logger.warn( - `[${requestId}] Could not resolve new leaf types for remap on group ${data.groupId}; leaving column types unchanged:`, - err - ) } } @@ -3554,13 +3565,55 @@ export async function addWorkflowGroupOutput( }, requestId: string ): Promise { - const { updatedTable, newOutput } = await withLockedTable(data.tableId, async (table, trx) => { - // Generous idle timeout: the workflow load below runs on a separate - // connection, leaving this transaction idle while it resolves the pickable - // output — the default 5s `idle_in_transaction_session_timeout` would kill - // the held lock on a large workflow. - await setTableTxTimeouts(trx, { idleMs: 30_000 }) + // Phase 1 (no lock): load the workflow and resolve the pickable output plus + // its execution-order index. This depends only on the workflow graph (which + // is stable), so it runs OFF the advisory-lock critical section — holding the + // lock during this DB load would make concurrent adders on the same table + // time out waiting (the Mothership fan-out this fix targets). Phase 2 + // re-validates that the group still maps to the same workflow under the lock. + const preTable = await getTableById(data.tableId) + if (!preTable) throw new Error('Table not found') + const preGroup = (preTable.schema.workflowGroups ?? []).find((g) => g.id === data.groupId) + if (!preGroup) { + throw new Error(`Workflow group "${data.groupId}" not found`) + } + const workflowId = preGroup.workflowId + + const [ + { loadWorkflowFromNormalizedTables }, + { flattenWorkflowOutputs, getBlockExecutionOrder }, + { columnTypeForLeaf, deriveOutputColumnName }, + ] = await Promise.all([ + import('@/lib/workflows/persistence/utils'), + import('@/lib/workflows/blocks/flatten-outputs'), + import('./column-naming'), + ]) + const normalized = await loadWorkflowFromNormalizedTables(workflowId) + if (!normalized) { + throw new Error(`Workflow ${workflowId} not found`) + } + const blocks = Object.values(normalized.blocks ?? {}).map((b) => ({ + id: b.id, + type: b.type, + name: b.name, + triggerMode: (b as { triggerMode?: boolean }).triggerMode, + subBlocks: b.subBlocks as Record | undefined, + })) + const flattened = flattenWorkflowOutputs(blocks, normalized.edges ?? []) + const match = flattened.find((f) => f.blockId === data.blockId && f.path === data.path) + if (!match) { + throw new Error( + `Output ${data.blockId}::${data.path} is not a valid pickable output on workflow ${workflowId}` + ) + } + const newColumnType = columnTypeForLeaf(match.leafType) + const distances = getBlockExecutionOrder(blocks, normalized.edges ?? []) + const flatIndex = new Map(flattened.map((f, i) => [`${f.blockId}::${f.path}`, i])) + // Phase 2 (locked): re-read fresh, validate against the current schema, and + // write. The critical section holds no I/O — just the in-memory splice + the + // schema UPDATE — so concurrent adders queue behind it quickly. + const { updatedTable, newOutput } = await withLockedTable(data.tableId, async (table, trx) => { const schema = table.schema const groups = schema.workflowGroups ?? [] const groupIndex = groups.findIndex((g) => g.id === data.groupId) @@ -3568,42 +3621,17 @@ export async function addWorkflowGroupOutput( throw new Error(`Workflow group "${data.groupId}" not found`) } const group = groups[groupIndex] - - if (group.outputs.some((o) => o.blockId === data.blockId && o.path === data.path)) { + if (group.workflowId !== workflowId) { throw new Error( - `Workflow group "${data.groupId}" already has an output at ${data.blockId}::${data.path}` + `Workflow group "${data.groupId}" was remapped to a different workflow concurrently; retry the add.` ) } - const [ - { loadWorkflowFromNormalizedTables }, - { flattenWorkflowOutputs, getBlockExecutionOrder }, - { columnTypeForLeaf, deriveOutputColumnName }, - ] = await Promise.all([ - import('@/lib/workflows/persistence/utils'), - import('@/lib/workflows/blocks/flatten-outputs'), - import('./column-naming'), - ]) - const normalized = await loadWorkflowFromNormalizedTables(group.workflowId) - if (!normalized) { - throw new Error(`Workflow ${group.workflowId} not found`) - } - const blocks = Object.values(normalized.blocks ?? {}).map((b) => ({ - id: b.id, - type: b.type, - name: b.name, - triggerMode: (b as { triggerMode?: boolean }).triggerMode, - subBlocks: b.subBlocks as Record | undefined, - })) - const flattened = flattenWorkflowOutputs(blocks, normalized.edges ?? []) - const match = flattened.find((f) => f.blockId === data.blockId && f.path === data.path) - if (!match) { + if (group.outputs.some((o) => o.blockId === data.blockId && o.path === data.path)) { throw new Error( - `Output ${data.blockId}::${data.path} is not a valid pickable output on workflow ${group.workflowId}` + `Workflow group "${data.groupId}" already has an output at ${data.blockId}::${data.path}` ) } - const distances = getBlockExecutionOrder(blocks, normalized.edges ?? []) - const flatIndex = new Map(flattened.map((f, i) => [`${f.blockId}::${f.path}`, i])) const taken = new Set(schema.columns.map((c) => c.name)) const columnName = data.columnName ?? deriveOutputColumnName(data.path, taken) @@ -3621,7 +3649,7 @@ export async function addWorkflowGroupOutput( const newColDef: ColumnDefinition = { name: columnName, - type: columnTypeForLeaf(match.leafType), + type: newColumnType, required: false, unique: false, workflowGroupId: data.groupId, From 9cbcfcc13ac0e70beb5541e5130d081e49a90b20 Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Sat, 30 May 2026 12:06:52 -0700 Subject: [PATCH 4/6] fix(tables): scale idle timeout in updateColumnType to avoid aborting large type changes --- apps/sim/lib/table/service.ts | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/apps/sim/lib/table/service.ts b/apps/sim/lib/table/service.ts index 925214cf48b..40add9c5fd0 100644 --- a/apps/sim/lib/table/service.ts +++ b/apps/sim/lib/table/service.ts @@ -2970,9 +2970,16 @@ export async function updateColumnType( requestId: string ): Promise { return withLockedTable(data.tableId, async (table, trx) => { - await setTableTxTimeouts(trx, { - statementMs: scaledStatementTimeoutMs(table.rowCount ?? 0, { baseMs: 60_000, perRowMs: 2 }), + // Scale both statement and idle timeouts to row count: the compatibility + // check below iterates every row in Node between the row SELECT and the + // schema UPDATE, leaving the transaction idle for that gap. The default 5s + // `idle_in_transaction_session_timeout` would abort a valid type change on + // a large table. + const timeoutMs = scaledStatementTimeoutMs(table.rowCount ?? 0, { + baseMs: 60_000, + perRowMs: 2, }) + await setTableTxTimeouts(trx, { statementMs: timeoutMs, idleMs: timeoutMs }) if (!(COLUMN_TYPES as readonly string[]).includes(data.newType)) { throw new Error( From dd6b0772941f48d1aea61fcb897337405fbea2ba Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Sat, 30 May 2026 12:14:48 -0700 Subject: [PATCH 5/6] fix(tables): skip stale remap types when workflowId changes concurrently --- apps/sim/lib/table/service.ts | 31 ++++++++++++++++++++++++------- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/apps/sim/lib/table/service.ts b/apps/sim/lib/table/service.ts index 40add9c5fd0..ce51579a8f4 100644 --- a/apps/sim/lib/table/service.ts +++ b/apps/sim/lib/table/service.ts @@ -3243,12 +3243,17 @@ export async function updateWorkflowGroup( // unchanged (workflow deleted, block removed). The result is applied against // the fresh schema under the lock in phase 2. const remapLeafTypeByColumn = new Map() + // The workflow id the leaf types above were resolved against. Phase 2 only + // applies the resolved types if the group still points at this workflow under + // the lock — a concurrent `workflowId` change would make them stale. + let resolvedForWorkflowId: string | undefined if (mappingUpdates.length > 0) { try { const preTable = await getTableById(data.tableId) const preGroup = preTable?.schema.workflowGroups?.find((g) => g.id === data.groupId) const targetWorkflowId = data.workflowId ?? preGroup?.workflowId if (targetWorkflowId) { + resolvedForWorkflowId = targetWorkflowId const [ { loadWorkflowFromNormalizedTables }, { flattenWorkflowOutputs }, @@ -3326,13 +3331,25 @@ export async function updateWorkflowGroup( return { ...o, blockId: u.blockId, path: u.path } }) - const colByName = new Map(schema.columns.map((c) => [c.name, c])) - for (const u of mappingUpdates) { - const newType = remapLeafTypeByColumn.get(u.columnName) - if (!newType) continue - const oldType = colByName.get(u.columnName)?.type - if (newType !== oldType) { - remappedColumnTypes.set(u.columnName, newType) + // Only apply the out-of-lock leaf-type resolution if the group still + // points at the workflow we resolved against. If a concurrent writer + // changed `workflowId` between phase 1 and now, those types are stale — + // leave column types unchanged (best-effort, same as a resolution + // failure) rather than stamping types from the old workflow. + const finalWorkflowId = data.workflowId ?? group.workflowId + if (remapLeafTypeByColumn.size > 0 && resolvedForWorkflowId !== finalWorkflowId) { + logger.warn( + `[${requestId}] Workflow group "${data.groupId}" workflowId changed between leaf-type resolution and apply; leaving remapped column types unchanged.` + ) + } else { + const colByName = new Map(schema.columns.map((c) => [c.name, c])) + for (const u of mappingUpdates) { + const newType = remapLeafTypeByColumn.get(u.columnName) + if (!newType) continue + const oldType = colByName.get(u.columnName)?.type + if (newType !== oldType) { + remappedColumnTypes.set(u.columnName, newType) + } } } } From 704944bb03560acf46134be96e717be2141d0f5a Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Sat, 30 May 2026 12:24:13 -0700 Subject: [PATCH 6/6] fix(tables): scale idle timeout in updateColumnConstraints for large tables --- apps/sim/lib/table/service.ts | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/apps/sim/lib/table/service.ts b/apps/sim/lib/table/service.ts index ce51579a8f4..8e3697e2575 100644 --- a/apps/sim/lib/table/service.ts +++ b/apps/sim/lib/table/service.ts @@ -3061,9 +3061,16 @@ export async function updateColumnConstraints( requestId: string ): Promise { return withLockedTable(data.tableId, async (table, trx) => { - await setTableTxTimeouts(trx, { - statementMs: scaledStatementTimeoutMs(table.rowCount ?? 0, { baseMs: 60_000, perRowMs: 2 }), + // Scale both statement and idle timeouts to row count: the required/unique + // validation runs between separate queries inside this transaction, leaving + // it briefly idle. Match `updateColumnType` so the default 5s + // `idle_in_transaction_session_timeout` can't abort a valid change on a + // large table. + const timeoutMs = scaledStatementTimeoutMs(table.rowCount ?? 0, { + baseMs: 60_000, + perRowMs: 2, }) + await setTableTxTimeouts(trx, { statementMs: timeoutMs, idleMs: timeoutMs }) const schema = table.schema const columnIndex = schema.columns.findIndex(