Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions apps/sim/lib/table/dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,13 @@ export async function insertDispatch(input: {
*
* Hits the `(table_id, status)` partial index on table_row_executions. */
export async function countRunningCells(
tableId: string
tableId: string,
opts?: { includeUnclaimedPreStamps?: boolean }
): Promise<{ total: number; byRowId: Record<string, number> }> {
// `pending` + null-executionId rows are unclaimed pre-stamps. With an active
// dispatch they're real queued work (include); with none they're abandoned
// orphans that would pin the badge above zero forever (exclude).
const excludeOrphanPreStamps = !opts?.includeUnclaimedPreStamps
const rows = await db
.select({
rowId: tableRowExecutions.rowId,
Expand All @@ -209,9 +214,9 @@ export async function countRunningCells(
and(
eq(tableRowExecutions.tableId, tableId),
inArray(tableRowExecutions.status, ['queued', 'running', 'pending']),
// Exclude orphan pre-stamps (`pending` + null executionId). De Morgan of
// NOT(pending AND null) — `status` is NOT NULL so `ne` is well-defined.
or(ne(tableRowExecutions.status, 'pending'), isNotNull(tableRowExecutions.executionId))
excludeOrphanPreStamps
? or(ne(tableRowExecutions.status, 'pending'), isNotNull(tableRowExecutions.executionId))
: undefined
)
)
.groupBy(tableRowExecutions.rowId)
Expand Down Expand Up @@ -261,9 +266,10 @@ export async function countActiveRunCells(
return rowsAhead * groupCount
}

// One round-trip per dispatch + the sidecar count, all in parallel.
// Include pre-stamps so `byRowId` matches the live SSE count (which counts
// `pending`); otherwise the badge flickers 20→0 on each refetch.
const [sidecar, perDispatch] = await Promise.all([
countRunningCells(tableId),
countRunningCells(tableId, { includeUnclaimedPreStamps: true }),
Promise.all(active.map(countRowsAhead)),
])
const total = perDispatch.reduce((sum, n) => sum + n, 0)
Expand Down
Loading