From c4d307071a53981df728ce00cda6ec6e8f59dcfb Mon Sep 17 00:00:00 2001 From: Stefan Budeanu Date: Thu, 19 Feb 2026 17:51:24 -0500 Subject: [PATCH 1/4] Fix fill skipping old rows when dest has mirrored data When enable_mirroring runs before fill, the mirroring trigger copies recent rows into the intermediate table. Fill without --start called destTable.maxId() which returned a high ID from the mirrored row, causing WHERE id > to find nothing and skip all old rows. - Start from sourceTable.minId() instead of destTable.maxId() - Restructure batch CTE to track source progress independently, preventing early termination on all-duplicate batches - Fix minId() to order by PK only (not time+PK) for non-monotonic PKs - Add endId to CLI batch log output for resume support Co-Authored-By: Claude Opus 4.6 --- src/commands/fill.test.ts | 12 +-- src/commands/fill.ts | 4 +- src/fill.test.ts | 159 ++++++++++++++++++++++++++++++++++++++ src/filler.ts | 45 ++++++++--- src/table.ts | 8 +- src/types.ts | 1 + 6 files changed, 208 insertions(+), 21 deletions(-) diff --git a/src/commands/fill.test.ts b/src/commands/fill.test.ts index e78cae4..7bbafc3 100644 --- a/src/commands/fill.test.ts +++ b/src/commands/fill.test.ts @@ -125,8 +125,8 @@ describe("FillCommand", () => { expect(expect(exitCode).toBe(0)); const output = commandContext.stdout.read()?.toString(); expect(output).toBeDefined(); - expect(output).toContain("/* batch 1 */"); - expect(output).toContain("/* batch 2 */"); + expect(output).toContain("/* batch 1:"); + expect(output).toContain("/* batch 2:"); }); test("outputs nothing to fill when source is empty", async ({ @@ -194,8 +194,8 @@ describe("FillCommand", () => { expect(expect(exitCode).toBe(0)); const output = commandContext.stdout.read()?.toString(); // ULID batches show "batch N" without total - expect(output).toContain("/* batch 1 */"); - expect(output).toContain("/* batch 2 */"); + expect(output).toContain("/* batch 1:"); + expect(output).toContain("/* batch 2:"); }); }); @@ -249,8 +249,8 @@ describe("FillCommand", () => { const output = (commandContext.stdout as PassThrough).read()?.toString(); expect(output).toBeDefined(); - expect(output).toContain("/* batch 1 */"); - expect(output).toContain("/* batch 2 */"); + expect(output).toContain("/* batch 1:"); + expect(output).toContain("/* batch 2:"); }); }); }); diff --git a/src/commands/fill.ts b/src/commands/fill.ts index 9dd560e..3fd024c 100644 --- a/src/commands/fill.ts +++ b/src/commands/fill.ts @@ -66,7 +66,9 @@ export class FillCommand extends BaseCommand { })) { hasBatches = true; - this.context.stdout.write(`/* batch ${batch.batchNumber} */\n`); + this.context.stdout.write( + `/* batch ${batch.batchNumber}: ${batch.rowsInserted} rows inserted, endId=${batch.endId} */\n`, + ); // Sleep between batches if requested if (this.sleep) { diff --git a/src/fill.test.ts b/src/fill.test.ts index b177813..512aad9 100644 --- a/src/fill.test.ts +++ b/src/fill.test.ts @@ -178,6 +178,42 @@ describe("Filler", () => { ); expect(existing.name).toBe("existing"); }); + + test("copies remaining rows when first batch is all duplicates", async ({ + transaction, + }) => { + await transaction.query(sql.unsafe` + CREATE TABLE posts (id BIGSERIAL PRIMARY KEY, name TEXT) + `); + await transaction.query(sql.unsafe` + CREATE TABLE posts_intermediate (id BIGSERIAL PRIMARY KEY, name TEXT) + `); + await transaction.query(sql.unsafe` + INSERT INTO posts (name) + SELECT 'item_' || i FROM generate_series(1, 25) AS i + `); + // Simulate a partial fill: first 10 rows already in dest + await transaction.query(sql.unsafe` + INSERT INTO posts_intermediate (id, name) + SELECT id, name FROM posts WHERE id <= 10 + `); + + const filler = await Filler.init(transaction, { + table: "posts", + batchSize: 10, + }); + + for await (const _batch of filler.fill(transaction)) { + // consume batches + } + + const count = await transaction.one( + sql.type(z.object({ count: z.coerce.number() }))` + SELECT COUNT(*)::int FROM posts_intermediate + `, + ); + expect(count.count).toBe(25); + }); }); }); @@ -330,6 +366,62 @@ describe("Pgslice.fill", () => { } }); + test("fills rows with non-monotonic PK and created_at", async ({ + pgslice, + transaction, + }) => { + vi.useFakeTimers(); + vi.setSystemTime(new Date(Date.UTC(2026, 0, 15))); + + try { + // PK order doesn't match created_at order: id=1 has a later date than id=2 + await transaction.query(sql.unsafe` + CREATE TABLE posts ( + id BIGSERIAL PRIMARY KEY, + created_at DATE NOT NULL, + name TEXT + ) + `); + await transaction.query(sql.unsafe` + INSERT INTO posts (id, created_at, name) VALUES + (1, '2026-01-20', 'later-date-lower-pk'), + (2, '2026-01-10', 'earlier-date-higher-pk') + `); + + await pgslice.prep(transaction, { + table: "posts", + column: "created_at", + period: "month", + partition: true, + }); + await pgslice.addPartitions(transaction, { + table: "posts", + intermediate: true, + past: 0, + future: 0, + }); + + for await (const _batch of pgslice.fill(transaction, { + table: "posts", + })) { + // consume batches + } + + const rows = await transaction.any( + sql.type(z.object({ name: z.string() }))` + SELECT name FROM posts_intermediate ORDER BY id ASC + `, + ); + // Both rows should be filled regardless of PK/time ordering + expect(rows.map((row) => row.name)).toEqual([ + "later-date-lower-pk", + "earlier-date-higher-pk", + ]); + } finally { + vi.useRealTimers(); + } + }); + test("returns nothing to fill for empty source", async ({ pgslice, transaction, @@ -421,6 +513,73 @@ describe("Pgslice.fill", () => { expect(error.message).toBe("Table not found: public.posts_intermediate"); }); + test("fills old rows when dest has recent mirrored row", async ({ + pgslice, + transaction, + }) => { + vi.useFakeTimers(); + vi.setSystemTime(new Date(Date.UTC(2026, 0, 15))); + + try { + // Create source table with data spanning the partition range + await transaction.query(sql.unsafe` + CREATE TABLE posts ( + id BIGSERIAL PRIMARY KEY, + created_at DATE NOT NULL, + name TEXT + ) + `); + await transaction.query(sql.unsafe` + INSERT INTO posts (created_at, name) VALUES + ('2026-01-01', 'old-1'), + ('2026-01-02', 'old-2'), + ('2026-01-03', 'old-3'), + ('2026-01-10', 'recent-mirrored') + `); + + // Setup partitioned intermediate table + await pgslice.prep(transaction, { + table: "posts", + column: "created_at", + period: "month", + partition: true, + }); + await pgslice.addPartitions(transaction, { + table: "posts", + intermediate: true, + past: 0, + future: 0, + }); + + // Simulate mirroring: a recent row was copied to dest by the trigger + // BEFORE fill runs. This sets destMaxId to 4. + await transaction.query(sql.unsafe` + INSERT INTO posts_intermediate (id, created_at, name) + VALUES (4, '2026-01-10', 'recent-mirrored') + `); + + // Run fill WITHOUT --start. + // Bug: fill sees destMaxId=4, queries "WHERE id > 4", finds nothing, + // returns "nothing to fill". Old rows (id 1,2,3) are never copied. + const batches = []; + for await (const batch of pgslice.fill(transaction, { + table: "posts", + })) { + batches.push(batch); + } + + // All 4 source rows should be in the dest + const count = await transaction.one( + sql.type(z.object({ count: z.coerce.number() }))` + SELECT COUNT(*)::int FROM posts_intermediate + `, + ); + expect(count.count).toBe(4); + } finally { + vi.useRealTimers(); + } + }); + test("throws for table without primary key", async ({ pgslice, transaction, diff --git a/src/filler.ts b/src/filler.ts index 9ed4843..5bf8349 100644 --- a/src/filler.ts +++ b/src/filler.ts @@ -108,9 +108,20 @@ export class Filler { }); startingId = destMaxId ?? undefined; } else { - // Get max from dest - resume from where we left off (exclusive) - const destMaxId = await destTable.maxId(tx); - startingId = destMaxId ?? undefined; + // Start from the source's min PK (inclusive) so we don't skip old rows + // when the dest already has mirrored data with a high ID. + const minId = await sourceTable.minId( + tx, + timeFilter + ? { + column: timeFilter.column, + cast: timeFilter.cast, + startingTime: timeFilter.startingTime, + } + : undefined, + ); + startingId = minId ?? undefined; + includeStart = true; } return new Filler({ @@ -150,8 +161,8 @@ export class Filler { currentId = result.endId; } - // Stop when no rows were inserted (source exhausted) - if (result.rowsInserted === 0) { + // Stop when no source rows remain + if (result.sourceCount === 0) { break; } @@ -167,6 +178,7 @@ export class Filler { currentId: IdValue | null, includeStart: boolean, ): Promise<{ + sourceCount: number; rowsInserted: number; startId: IdValue | null; endId: IdValue | null; @@ -214,31 +226,42 @@ export class Filler { sql.fragment`, `, ); - // Build and execute the CTE-based INSERT query + // Use a two-CTE approach: source_batch tracks progress independently of + // inserted rows. This prevents early termination when an entire batch + // consists of rows already in dest (e.g. from mirroring or a prior partial fill). const result = await connection.one( sql.type( z.object({ - max_id: idValueSchema, + source_max_id: idValueSchema, + source_count: z.coerce.number(), count: z.coerce.number(), }), )` - WITH batch AS ( - INSERT INTO ${this.#dest.sqlIdentifier} (${columnList}) + WITH source_batch AS ( SELECT ${columnList} FROM ${this.#source.sqlIdentifier} WHERE ${whereClause} ORDER BY ${pkCol} LIMIT ${this.#batchSize} + ), + inserted AS ( + INSERT INTO ${this.#dest.sqlIdentifier} (${columnList}) + SELECT ${columnList} + FROM source_batch ON CONFLICT DO NOTHING RETURNING ${pkCol} ) - SELECT MAX(${pkCol}) AS max_id, COUNT(*)::int AS count FROM batch + SELECT + (SELECT MAX(${pkCol}) FROM source_batch) AS source_max_id, + (SELECT COUNT(*)::int FROM source_batch) AS source_count, + (SELECT COUNT(*)::int FROM inserted) AS count `, ); - const endId = transformIdValue(result.max_id); + const endId = transformIdValue(result.source_max_id); return { + sourceCount: result.source_count, rowsInserted: result.count, startId, endId, diff --git a/src/table.ts b/src/table.ts index 8213a4d..596ef4b 100644 --- a/src/table.ts +++ b/src/table.ts @@ -437,22 +437,24 @@ export class Table { const col = sql.identifier([await this.primaryKey(tx)]); let whereClause = sql.fragment`1 = 1`; - const orderByClauses = [sql.fragment`${col} ASC`]; if (options?.column && options.cast && options.startingTime) { const timeCol = sql.identifier([options.column]); const startDate = formatDateForSql(options.startingTime, options.cast); whereClause = sql.fragment`${timeCol} >= ${startDate}`; - orderByClauses.unshift(sql.fragment`${timeCol} ASC`); } + // Order by PK only (not time column) so we get the smallest PK in the + // time range. Ordering by time first would miss rows with a smaller PK + // but later timestamp when PKs aren't monotonic with the partition column. + // LIMIT 1 lets Postgres use an index scan on the PK. const result = await tx.maybeOne( sql.type(z.object({ min_id: idValueSchema }))` SELECT ${col} AS min_id FROM ${this.sqlIdentifier} WHERE ${whereClause} - ORDER BY ${sql.join(orderByClauses, sql.fragment`, `)} + ORDER BY ${col} ASC LIMIT 1 `, ); diff --git a/src/types.ts b/src/types.ts index 8a0826e..4a2d130 100644 --- a/src/types.ts +++ b/src/types.ts @@ -116,6 +116,7 @@ export interface FillOptions { */ export interface FillBatchResult { batchNumber: number; + sourceCount: number; rowsInserted: number; startId: IdValue | null; endId: IdValue | null; From 7ec7ab882ce6e1f5a5d18357164a92ede92e2342 Mon Sep 17 00:00:00 2001 From: Stefan Budeanu Date: Fri, 20 Feb 2026 10:03:03 -0500 Subject: [PATCH 2/4] Raise startup statement timeout for minId lookups. This avoids 30s timeouts on large table startup scans and documents resume guidance. Co-authored-by: Cursor --- README.md | 10 +++++++++ src/fill.test.ts | 6 +++-- src/filler.ts | 27 ++++++++++++++-------- src/pgslice.ts | 8 +++++-- src/sql-utils.ts | 55 +++++++++++++++++++++++++++++++++++++++++++++ src/synchronizer.ts | 29 ++++++++++++++++-------- src/table.ts | 10 +++++---- 7 files changed, 119 insertions(+), 26 deletions(-) diff --git a/README.md b/README.md index b9a2c25..b6f50ab 100644 --- a/README.md +++ b/README.md @@ -85,6 +85,16 @@ Options: - `--start`: Primary key value to start from (numeric or ULID) - `--where`: Additional WHERE conditions to filter rows +By default, `fill` auto-detects a starting ID by scanning for the smallest +primary key (bounded to the partition time range for partitioned tables). This +lookup can be slow on very large tables without a supporting index. If you're +resuming or want to skip the startup scan, pass `--start` instead (the batch +output includes `endId` for easy resuming). + +To avoid timing out during this startup scan, pgslice temporarily raises the +session `statement_timeout` to 5 minutes for the initial lookup (fill and +synchronize) and then restores the previous value. + To sync data across different databases, check out [pgsync](https://github.com/ankane/pgsync). 7. Analyze tables diff --git a/src/fill.test.ts b/src/fill.test.ts index 512aad9..d17ef6a 100644 --- a/src/fill.test.ts +++ b/src/fill.test.ts @@ -374,7 +374,9 @@ describe("Pgslice.fill", () => { vi.setSystemTime(new Date(Date.UTC(2026, 0, 15))); try { - // PK order doesn't match created_at order: id=1 has a later date than id=2 + // PK order doesn't match created_at order: id=1 has a later date than id=2. + // If we chose the start ID from the earliest created_at row (id=2), + // we'd skip id=1 even though it's in-range. This mirrors ULID/time skew. await transaction.query(sql.unsafe` CREATE TABLE posts ( id BIGSERIAL PRIMARY KEY, @@ -412,7 +414,7 @@ describe("Pgslice.fill", () => { SELECT name FROM posts_intermediate ORDER BY id ASC `, ); - // Both rows should be filled regardless of PK/time ordering + // Both rows should be filled regardless of PK/time ordering. expect(rows.map((row) => row.name)).toEqual([ "later-date-lower-pk", "earlier-date-higher-pk", diff --git a/src/filler.ts b/src/filler.ts index 5bf8349..81084f5 100644 --- a/src/filler.ts +++ b/src/filler.ts @@ -7,7 +7,11 @@ import type { IdValue, TimeFilter, } from "./types.js"; -import { formatDateForSql } from "./sql-utils.js"; +import { + formatDateForSql, + STARTUP_STATEMENT_TIMEOUT_MS, + withStatementTimeout, +} from "./sql-utils.js"; /** * Zod schema for validating ID values from the database. @@ -110,15 +114,20 @@ export class Filler { } else { // Start from the source's min PK (inclusive) so we don't skip old rows // when the dest already has mirrored data with a high ID. - const minId = await sourceTable.minId( + const minId = await withStatementTimeout( tx, - timeFilter - ? { - column: timeFilter.column, - cast: timeFilter.cast, - startingTime: timeFilter.startingTime, - } - : undefined, + STARTUP_STATEMENT_TIMEOUT_MS, + () => + sourceTable.minId( + tx, + timeFilter + ? { + column: timeFilter.column, + cast: timeFilter.cast, + startingTime: timeFilter.startingTime, + } + : undefined, + ), ); startingId = minId ?? undefined; includeStart = true; diff --git a/src/pgslice.ts b/src/pgslice.ts index c286051..eb3ba51 100644 --- a/src/pgslice.ts +++ b/src/pgslice.ts @@ -420,7 +420,9 @@ export class Pgslice { ); try { - const filler = await Filler.init(connection, options); + const filler = await connection.transaction((tx) => + Filler.init(tx, options), + ); for await (const batch of filler.fill(connection)) { yield batch; @@ -447,7 +449,9 @@ export class Pgslice { "synchronize", ); try { - const synchronizer = await Synchronizer.init(connection, options); + const synchronizer = await connection.transaction((tx) => + Synchronizer.init(tx, options), + ); for await (const batch of synchronizer.synchronize(connection)) { yield batch; diff --git a/src/sql-utils.ts b/src/sql-utils.ts index 4e12b99..7f68db9 100644 --- a/src/sql-utils.ts +++ b/src/sql-utils.ts @@ -1,5 +1,6 @@ import { createSqlTag, + type CommonQueryMethods, type PrimitiveValueExpression, type SerializableValue, } from "slonik"; @@ -13,6 +14,8 @@ export const sql = createSqlTag({ }, }); +export const STARTUP_STATEMENT_TIMEOUT_MS = 5 * 60 * 1000; + /** * Creates a SQL fragment from a raw string. * This is useful for dynamically building SQL queries where the content @@ -86,3 +89,55 @@ export function valueToSql(val: unknown, dataType: string) { return sql.fragment`${val as PrimitiveValueExpression}`; } + +const statementTimeoutSchema = z.object({ + statement_timeout: z.string(), + statement_timeout_ms: z.coerce.number(), +}); + +export async function withStatementTimeout( + connection: CommonQueryMethods, + minTimeoutMs: number | undefined, + handler: () => Promise, +): Promise { + if (!minTimeoutMs || minTimeoutMs <= 0) { + return handler(); + } + + const settings = await connection.one( + sql.type(statementTimeoutSchema)` + SELECT + current_setting('statement_timeout') AS statement_timeout, + CASE + WHEN current_setting('statement_timeout') IN ('0', '0ms') THEN 0 + ELSE (EXTRACT(EPOCH FROM current_setting('statement_timeout')::interval) * 1000)::bigint + END AS statement_timeout_ms + `, + ); + + if ( + settings.statement_timeout_ms === 0 || + settings.statement_timeout_ms >= minTimeoutMs + ) { + return handler(); + } + + await connection.query( + sql.typeAlias( + "void", + )`SELECT set_config('statement_timeout', ${String(minTimeoutMs)}, true)`, + ); + try { + return await handler(); + } finally { + try { + await connection.query( + sql.typeAlias( + "void", + )`SELECT set_config('statement_timeout', ${settings.statement_timeout}, true)`, + ); + } catch { + // Ignore errors to avoid masking the original failure. + } + } +} diff --git a/src/synchronizer.ts b/src/synchronizer.ts index 6e94ae2..5c5fd5f 100644 --- a/src/synchronizer.ts +++ b/src/synchronizer.ts @@ -2,7 +2,13 @@ import { CommonQueryMethods } from "slonik"; import { z } from "zod"; import { Table, transformIdValue } from "./table.js"; -import { formatDateForSql, sql, valueToSql } from "./sql-utils.js"; +import { + formatDateForSql, + sql, + STARTUP_STATEMENT_TIMEOUT_MS, + valueToSql, + withStatementTimeout, +} from "./sql-utils.js"; import type { ColumnInfo, IdValue, @@ -109,15 +115,20 @@ export class Synchronizer { if (options.start !== undefined) { startingId = transformIdValue(options.start); } else { - const minId = await sourceTable.minId( + const minId = await withStatementTimeout( tx, - timeFilter - ? { - column: timeFilter.column, - cast: timeFilter.cast, - startingTime: timeFilter.startingTime, - } - : undefined, + STARTUP_STATEMENT_TIMEOUT_MS, + () => + sourceTable.minId( + tx, + timeFilter + ? { + column: timeFilter.column, + cast: timeFilter.cast, + startingTime: timeFilter.startingTime, + } + : undefined, + ), ); if (minId === null) { throw new Error("No rows found in source table"); diff --git a/src/table.ts b/src/table.ts index 596ef4b..e7eb98a 100644 --- a/src/table.ts +++ b/src/table.ts @@ -445,10 +445,12 @@ export class Table { whereClause = sql.fragment`${timeCol} >= ${startDate}`; } - // Order by PK only (not time column) so we get the smallest PK in the - // time range. Ordering by time first would miss rows with a smaller PK - // but later timestamp when PKs aren't monotonic with the partition column. - // LIMIT 1 lets Postgres use an index scan on the PK. + // We want the smallest PK within the time range, so we order by PK and take + // the first row. Ordering by time (created_at) is faster, but only correct + // if PK order is monotonic with the partition column. With ULIDs, clock + // skew, backfills, or manual timestamps can yield smaller PKs with later + // timestamps, which would be skipped. This path favors correctness; it can + // be slower on large tables without a supporting index. const result = await tx.maybeOne( sql.type(z.object({ min_id: idValueSchema }))` SELECT ${col} AS min_id From 8064b02706b0e08dea6123afd2854a95e5a9742d Mon Sep 17 00:00:00 2001 From: Stefan Budeanu Date: Fri, 20 Feb 2026 13:57:42 -0500 Subject: [PATCH 3/4] Remove startup statement timeout override Let fill and synchronize minId lookups use the session timeout and drop the README note about temporarily raising statement_timeout. Co-authored-by: Cursor --- README.md | 4 ---- src/filler.ts | 27 +++++++++------------------ src/sql-utils.ts | 2 -- src/synchronizer.ts | 29 +++++++++-------------------- 4 files changed, 18 insertions(+), 44 deletions(-) diff --git a/README.md b/README.md index b6f50ab..5bb432c 100644 --- a/README.md +++ b/README.md @@ -91,10 +91,6 @@ lookup can be slow on very large tables without a supporting index. If you're resuming or want to skip the startup scan, pass `--start` instead (the batch output includes `endId` for easy resuming). -To avoid timing out during this startup scan, pgslice temporarily raises the -session `statement_timeout` to 5 minutes for the initial lookup (fill and -synchronize) and then restores the previous value. - To sync data across different databases, check out [pgsync](https://github.com/ankane/pgsync). 7. Analyze tables diff --git a/src/filler.ts b/src/filler.ts index 81084f5..5bf8349 100644 --- a/src/filler.ts +++ b/src/filler.ts @@ -7,11 +7,7 @@ import type { IdValue, TimeFilter, } from "./types.js"; -import { - formatDateForSql, - STARTUP_STATEMENT_TIMEOUT_MS, - withStatementTimeout, -} from "./sql-utils.js"; +import { formatDateForSql } from "./sql-utils.js"; /** * Zod schema for validating ID values from the database. @@ -114,20 +110,15 @@ export class Filler { } else { // Start from the source's min PK (inclusive) so we don't skip old rows // when the dest already has mirrored data with a high ID. - const minId = await withStatementTimeout( + const minId = await sourceTable.minId( tx, - STARTUP_STATEMENT_TIMEOUT_MS, - () => - sourceTable.minId( - tx, - timeFilter - ? { - column: timeFilter.column, - cast: timeFilter.cast, - startingTime: timeFilter.startingTime, - } - : undefined, - ), + timeFilter + ? { + column: timeFilter.column, + cast: timeFilter.cast, + startingTime: timeFilter.startingTime, + } + : undefined, ); startingId = minId ?? undefined; includeStart = true; diff --git a/src/sql-utils.ts b/src/sql-utils.ts index 7f68db9..e8610d0 100644 --- a/src/sql-utils.ts +++ b/src/sql-utils.ts @@ -14,8 +14,6 @@ export const sql = createSqlTag({ }, }); -export const STARTUP_STATEMENT_TIMEOUT_MS = 5 * 60 * 1000; - /** * Creates a SQL fragment from a raw string. * This is useful for dynamically building SQL queries where the content diff --git a/src/synchronizer.ts b/src/synchronizer.ts index 5c5fd5f..6e94ae2 100644 --- a/src/synchronizer.ts +++ b/src/synchronizer.ts @@ -2,13 +2,7 @@ import { CommonQueryMethods } from "slonik"; import { z } from "zod"; import { Table, transformIdValue } from "./table.js"; -import { - formatDateForSql, - sql, - STARTUP_STATEMENT_TIMEOUT_MS, - valueToSql, - withStatementTimeout, -} from "./sql-utils.js"; +import { formatDateForSql, sql, valueToSql } from "./sql-utils.js"; import type { ColumnInfo, IdValue, @@ -115,20 +109,15 @@ export class Synchronizer { if (options.start !== undefined) { startingId = transformIdValue(options.start); } else { - const minId = await withStatementTimeout( + const minId = await sourceTable.minId( tx, - STARTUP_STATEMENT_TIMEOUT_MS, - () => - sourceTable.minId( - tx, - timeFilter - ? { - column: timeFilter.column, - cast: timeFilter.cast, - startingTime: timeFilter.startingTime, - } - : undefined, - ), + timeFilter + ? { + column: timeFilter.column, + cast: timeFilter.cast, + startingTime: timeFilter.startingTime, + } + : undefined, ); if (minId === null) { throw new Error("No rows found in source table"); From 5e929a8af265acd85dd857ebeeb2f05acad72ab6 Mon Sep 17 00:00:00 2001 From: Stefan Budeanu Date: Fri, 20 Feb 2026 15:00:01 -0500 Subject: [PATCH 4/4] review comments --- src/fill.test.ts | 275 ++++++++++++++++++++++------------------------- src/sql-utils.ts | 53 --------- 2 files changed, 127 insertions(+), 201 deletions(-) diff --git a/src/fill.test.ts b/src/fill.test.ts index d17ef6a..3458e63 100644 --- a/src/fill.test.ts +++ b/src/fill.test.ts @@ -319,109 +319,95 @@ describe("Pgslice.fill", () => { pgslice, transaction, }) => { - vi.useFakeTimers(); - vi.setSystemTime(new Date(Date.UTC(2026, 0, 15))); - - try { - await transaction.query(sql.unsafe` - CREATE TABLE posts ( - id BIGSERIAL PRIMARY KEY, - created_at DATE NOT NULL, - name TEXT - ) - `); - await transaction.query(sql.unsafe` - INSERT INTO posts (created_at, name) VALUES - ('2026-01-10', 'in-range'), - ('2025-12-15', 'out-of-range') - `); - - await pgslice.prep(transaction, { - table: "posts", - column: "created_at", - period: "month", - partition: true, - }); - await pgslice.addPartitions(transaction, { - table: "posts", - intermediate: true, - past: 0, - future: 0, - }); + await transaction.query(sql.unsafe` + CREATE TABLE posts ( + id BIGSERIAL PRIMARY KEY, + created_at DATE NOT NULL, + name TEXT + ) + `); + await transaction.query(sql.unsafe` + INSERT INTO posts (created_at, name) VALUES + ('2026-01-10', 'in-range'), + ('2025-12-15', 'out-of-range') + `); - for await (const _batch of pgslice.fill(transaction, { - table: "posts", - })) { - // consume batches - } + await pgslice.prep(transaction, { + table: "posts", + column: "created_at", + period: "month", + partition: true, + }); + await pgslice.addPartitions(transaction, { + table: "posts", + intermediate: true, + past: 0, + future: 0, + }); - const rows = await transaction.any( - sql.type(z.object({ name: z.string() }))` - SELECT name FROM posts_intermediate ORDER BY id ASC - `, - ); - expect(rows.map((row) => row.name)).toEqual(["in-range"]); - } finally { - vi.useRealTimers(); + for await (const _batch of pgslice.fill(transaction, { + table: "posts", + })) { + // consume batches } + + const rows = await transaction.any( + sql.type(z.object({ name: z.string() }))` + SELECT name FROM posts_intermediate ORDER BY id ASC + `, + ); + expect(rows.map((row) => row.name)).toEqual(["in-range"]); }); test("fills rows with non-monotonic PK and created_at", async ({ pgslice, transaction, }) => { - vi.useFakeTimers(); - vi.setSystemTime(new Date(Date.UTC(2026, 0, 15))); - - try { - // PK order doesn't match created_at order: id=1 has a later date than id=2. - // If we chose the start ID from the earliest created_at row (id=2), - // we'd skip id=1 even though it's in-range. This mirrors ULID/time skew. - await transaction.query(sql.unsafe` - CREATE TABLE posts ( - id BIGSERIAL PRIMARY KEY, - created_at DATE NOT NULL, - name TEXT - ) - `); - await transaction.query(sql.unsafe` - INSERT INTO posts (id, created_at, name) VALUES - (1, '2026-01-20', 'later-date-lower-pk'), - (2, '2026-01-10', 'earlier-date-higher-pk') - `); - - await pgslice.prep(transaction, { - table: "posts", - column: "created_at", - period: "month", - partition: true, - }); - await pgslice.addPartitions(transaction, { - table: "posts", - intermediate: true, - past: 0, - future: 0, - }); + // PK order doesn't match created_at order: id=1 has a later date than id=2. + // If we chose the start ID from the earliest created_at row (id=2), + // we'd skip id=1 even though it's in-range. This mirrors ULID/time skew. + await transaction.query(sql.unsafe` + CREATE TABLE posts ( + id BIGSERIAL PRIMARY KEY, + created_at DATE NOT NULL, + name TEXT + ) + `); + await transaction.query(sql.unsafe` + INSERT INTO posts (id, created_at, name) VALUES + (1, '2026-01-20', 'later-date-lower-pk'), + (2, '2026-01-10', 'earlier-date-higher-pk') + `); - for await (const _batch of pgslice.fill(transaction, { - table: "posts", - })) { - // consume batches - } + await pgslice.prep(transaction, { + table: "posts", + column: "created_at", + period: "month", + partition: true, + }); + await pgslice.addPartitions(transaction, { + table: "posts", + intermediate: true, + past: 0, + future: 0, + }); - const rows = await transaction.any( - sql.type(z.object({ name: z.string() }))` - SELECT name FROM posts_intermediate ORDER BY id ASC - `, - ); - // Both rows should be filled regardless of PK/time ordering. - expect(rows.map((row) => row.name)).toEqual([ - "later-date-lower-pk", - "earlier-date-higher-pk", - ]); - } finally { - vi.useRealTimers(); + for await (const _batch of pgslice.fill(transaction, { + table: "posts", + })) { + // consume batches } + + const rows = await transaction.any( + sql.type(z.object({ name: z.string() }))` + SELECT name FROM posts_intermediate ORDER BY id ASC + `, + ); + // Both rows should be filled regardless of PK/time ordering. + expect(rows.map((row) => row.name)).toEqual([ + "later-date-lower-pk", + "earlier-date-higher-pk", + ]); }); test("returns nothing to fill for empty source", async ({ @@ -519,67 +505,60 @@ describe("Pgslice.fill", () => { pgslice, transaction, }) => { - vi.useFakeTimers(); - vi.setSystemTime(new Date(Date.UTC(2026, 0, 15))); - - try { - // Create source table with data spanning the partition range - await transaction.query(sql.unsafe` - CREATE TABLE posts ( - id BIGSERIAL PRIMARY KEY, - created_at DATE NOT NULL, - name TEXT - ) - `); - await transaction.query(sql.unsafe` - INSERT INTO posts (created_at, name) VALUES - ('2026-01-01', 'old-1'), - ('2026-01-02', 'old-2'), - ('2026-01-03', 'old-3'), - ('2026-01-10', 'recent-mirrored') - `); - - // Setup partitioned intermediate table - await pgslice.prep(transaction, { - table: "posts", - column: "created_at", - period: "month", - partition: true, - }); - await pgslice.addPartitions(transaction, { - table: "posts", - intermediate: true, - past: 0, - future: 0, - }); + // Create source table with data spanning the partition range + await transaction.query(sql.unsafe` + CREATE TABLE posts ( + id BIGSERIAL PRIMARY KEY, + created_at DATE NOT NULL, + name TEXT + ) + `); + await transaction.query(sql.unsafe` + INSERT INTO posts (created_at, name) VALUES + ('2026-01-01', 'old-1'), + ('2026-01-02', 'old-2'), + ('2026-01-03', 'old-3'), + ('2026-01-10', 'recent-mirrored') + `); - // Simulate mirroring: a recent row was copied to dest by the trigger - // BEFORE fill runs. This sets destMaxId to 4. - await transaction.query(sql.unsafe` - INSERT INTO posts_intermediate (id, created_at, name) - VALUES (4, '2026-01-10', 'recent-mirrored') - `); + // Setup partitioned intermediate table + await pgslice.prep(transaction, { + table: "posts", + column: "created_at", + period: "month", + partition: true, + }); + await pgslice.addPartitions(transaction, { + table: "posts", + intermediate: true, + past: 0, + future: 0, + }); - // Run fill WITHOUT --start. - // Bug: fill sees destMaxId=4, queries "WHERE id > 4", finds nothing, - // returns "nothing to fill". Old rows (id 1,2,3) are never copied. - const batches = []; - for await (const batch of pgslice.fill(transaction, { - table: "posts", - })) { - batches.push(batch); - } + // Simulate mirroring: a recent row was copied to dest by the trigger + // BEFORE fill runs. This sets destMaxId to 4. + await transaction.query(sql.unsafe` + INSERT INTO posts_intermediate (id, created_at, name) + VALUES (4, '2026-01-10', 'recent-mirrored') + `); - // All 4 source rows should be in the dest - const count = await transaction.one( - sql.type(z.object({ count: z.coerce.number() }))` - SELECT COUNT(*)::int FROM posts_intermediate - `, - ); - expect(count.count).toBe(4); - } finally { - vi.useRealTimers(); + // Run fill WITHOUT --start. + // Bug: fill sees destMaxId=4, queries "WHERE id > 4", finds nothing, + // returns "nothing to fill". Old rows (id 1,2,3) are never copied. + const batches = []; + for await (const batch of pgslice.fill(transaction, { + table: "posts", + })) { + batches.push(batch); } + + // All 4 source rows should be in the dest + const count = await transaction.one( + sql.type(z.object({ count: z.coerce.number() }))` + SELECT COUNT(*)::int FROM posts_intermediate + `, + ); + expect(count.count).toBe(4); }); test("throws for table without primary key", async ({ diff --git a/src/sql-utils.ts b/src/sql-utils.ts index e8610d0..4e12b99 100644 --- a/src/sql-utils.ts +++ b/src/sql-utils.ts @@ -1,6 +1,5 @@ import { createSqlTag, - type CommonQueryMethods, type PrimitiveValueExpression, type SerializableValue, } from "slonik"; @@ -87,55 +86,3 @@ export function valueToSql(val: unknown, dataType: string) { return sql.fragment`${val as PrimitiveValueExpression}`; } - -const statementTimeoutSchema = z.object({ - statement_timeout: z.string(), - statement_timeout_ms: z.coerce.number(), -}); - -export async function withStatementTimeout( - connection: CommonQueryMethods, - minTimeoutMs: number | undefined, - handler: () => Promise, -): Promise { - if (!minTimeoutMs || minTimeoutMs <= 0) { - return handler(); - } - - const settings = await connection.one( - sql.type(statementTimeoutSchema)` - SELECT - current_setting('statement_timeout') AS statement_timeout, - CASE - WHEN current_setting('statement_timeout') IN ('0', '0ms') THEN 0 - ELSE (EXTRACT(EPOCH FROM current_setting('statement_timeout')::interval) * 1000)::bigint - END AS statement_timeout_ms - `, - ); - - if ( - settings.statement_timeout_ms === 0 || - settings.statement_timeout_ms >= minTimeoutMs - ) { - return handler(); - } - - await connection.query( - sql.typeAlias( - "void", - )`SELECT set_config('statement_timeout', ${String(minTimeoutMs)}, true)`, - ); - try { - return await handler(); - } finally { - try { - await connection.query( - sql.typeAlias( - "void", - )`SELECT set_config('statement_timeout', ${settings.statement_timeout}, true)`, - ); - } catch { - // Ignore errors to avoid masking the original failure. - } - } -}