diff --git a/README.md b/README.md index b9a2c25..5bb432c 100644 --- a/README.md +++ b/README.md @@ -85,6 +85,12 @@ Options: - `--start`: Primary key value to start from (numeric or ULID) - `--where`: Additional WHERE conditions to filter rows +By default, `fill` auto-detects a starting ID by scanning for the smallest +primary key (bounded to the partition time range for partitioned tables). This +lookup can be slow on very large tables without a supporting index. If you're +resuming or want to skip the startup scan, pass `--start` instead (the batch +output includes `endId` for easy resuming). + To sync data across different databases, check out [pgsync](https://github.com/ankane/pgsync). 7. Analyze tables diff --git a/src/commands/fill.test.ts b/src/commands/fill.test.ts index e78cae4..7bbafc3 100644 --- a/src/commands/fill.test.ts +++ b/src/commands/fill.test.ts @@ -125,8 +125,8 @@ describe("FillCommand", () => { expect(expect(exitCode).toBe(0)); const output = commandContext.stdout.read()?.toString(); expect(output).toBeDefined(); - expect(output).toContain("/* batch 1 */"); - expect(output).toContain("/* batch 2 */"); + expect(output).toContain("/* batch 1:"); + expect(output).toContain("/* batch 2:"); }); test("outputs nothing to fill when source is empty", async ({ @@ -194,8 +194,8 @@ describe("FillCommand", () => { expect(expect(exitCode).toBe(0)); const output = commandContext.stdout.read()?.toString(); // ULID batches show "batch N" without total - expect(output).toContain("/* batch 1 */"); - expect(output).toContain("/* batch 2 */"); + expect(output).toContain("/* batch 1:"); + expect(output).toContain("/* batch 2:"); }); }); @@ -249,8 +249,8 @@ describe("FillCommand", () => { const output = (commandContext.stdout as PassThrough).read()?.toString(); expect(output).toBeDefined(); - expect(output).toContain("/* batch 1 */"); - expect(output).toContain("/* batch 2 */"); + expect(output).toContain("/* batch 1:"); + expect(output).toContain("/* batch 2:"); }); }); }); diff --git a/src/commands/fill.ts b/src/commands/fill.ts index 9dd560e..3fd024c 100644 --- a/src/commands/fill.ts +++ b/src/commands/fill.ts @@ -66,7 +66,9 @@ export class FillCommand extends BaseCommand { })) { hasBatches = true; - this.context.stdout.write(`/* batch ${batch.batchNumber} */\n`); + this.context.stdout.write( + `/* batch ${batch.batchNumber}: ${batch.rowsInserted} rows inserted, endId=${batch.endId} */\n`, + ); // Sleep between batches if requested if (this.sleep) { diff --git a/src/fill.test.ts b/src/fill.test.ts index b177813..3458e63 100644 --- a/src/fill.test.ts +++ b/src/fill.test.ts @@ -178,6 +178,42 @@ describe("Filler", () => { ); expect(existing.name).toBe("existing"); }); + + test("copies remaining rows when first batch is all duplicates", async ({ + transaction, + }) => { + await transaction.query(sql.unsafe` + CREATE TABLE posts (id BIGSERIAL PRIMARY KEY, name TEXT) + `); + await transaction.query(sql.unsafe` + CREATE TABLE posts_intermediate (id BIGSERIAL PRIMARY KEY, name TEXT) + `); + await transaction.query(sql.unsafe` + INSERT INTO posts (name) + SELECT 'item_' || i FROM generate_series(1, 25) AS i + `); + // Simulate a partial fill: first 10 rows already in dest + await transaction.query(sql.unsafe` + INSERT INTO posts_intermediate (id, name) + SELECT id, name FROM posts WHERE id <= 10 + `); + + const filler = await Filler.init(transaction, { + table: "posts", + batchSize: 10, + }); + + for await (const _batch of filler.fill(transaction)) { + // consume batches + } + + const count = await transaction.one( + sql.type(z.object({ count: z.coerce.number() }))` + SELECT COUNT(*)::int FROM posts_intermediate + `, + ); + expect(count.count).toBe(25); + }); }); }); @@ -283,51 +319,95 @@ describe("Pgslice.fill", () => { pgslice, transaction, }) => { - vi.useFakeTimers(); - vi.setSystemTime(new Date(Date.UTC(2026, 0, 15))); + await transaction.query(sql.unsafe` + CREATE TABLE posts ( + id BIGSERIAL PRIMARY KEY, + created_at DATE NOT NULL, + name TEXT + ) + `); + await transaction.query(sql.unsafe` + INSERT INTO posts (created_at, name) VALUES + ('2026-01-10', 'in-range'), + ('2025-12-15', 'out-of-range') + `); - try { - await transaction.query(sql.unsafe` - CREATE TABLE posts ( - id BIGSERIAL PRIMARY KEY, - created_at DATE NOT NULL, - name TEXT - ) - `); - await transaction.query(sql.unsafe` - INSERT INTO posts (created_at, name) VALUES - ('2026-01-10', 'in-range'), - ('2025-12-15', 'out-of-range') - `); + await pgslice.prep(transaction, { + table: "posts", + column: "created_at", + period: "month", + partition: true, + }); + await pgslice.addPartitions(transaction, { + table: "posts", + intermediate: true, + past: 0, + future: 0, + }); - await pgslice.prep(transaction, { - table: "posts", - column: "created_at", - period: "month", - partition: true, - }); - await pgslice.addPartitions(transaction, { - table: "posts", - intermediate: true, - past: 0, - future: 0, - }); + for await (const _batch of pgslice.fill(transaction, { + table: "posts", + })) { + // consume batches + } - for await (const _batch of pgslice.fill(transaction, { - table: "posts", - })) { - // consume batches - } + const rows = await transaction.any( + sql.type(z.object({ name: z.string() }))` + SELECT name FROM posts_intermediate ORDER BY id ASC + `, + ); + expect(rows.map((row) => row.name)).toEqual(["in-range"]); + }); - const rows = await transaction.any( - sql.type(z.object({ name: z.string() }))` - SELECT name FROM posts_intermediate ORDER BY id ASC - `, - ); - expect(rows.map((row) => row.name)).toEqual(["in-range"]); - } finally { - vi.useRealTimers(); + test("fills rows with non-monotonic PK and created_at", async ({ + pgslice, + transaction, + }) => { + // PK order doesn't match created_at order: id=1 has a later date than id=2. + // If we chose the start ID from the earliest created_at row (id=2), + // we'd skip id=1 even though it's in-range. This mirrors ULID/time skew. + await transaction.query(sql.unsafe` + CREATE TABLE posts ( + id BIGSERIAL PRIMARY KEY, + created_at DATE NOT NULL, + name TEXT + ) + `); + await transaction.query(sql.unsafe` + INSERT INTO posts (id, created_at, name) VALUES + (1, '2026-01-20', 'later-date-lower-pk'), + (2, '2026-01-10', 'earlier-date-higher-pk') + `); + + await pgslice.prep(transaction, { + table: "posts", + column: "created_at", + period: "month", + partition: true, + }); + await pgslice.addPartitions(transaction, { + table: "posts", + intermediate: true, + past: 0, + future: 0, + }); + + for await (const _batch of pgslice.fill(transaction, { + table: "posts", + })) { + // consume batches } + + const rows = await transaction.any( + sql.type(z.object({ name: z.string() }))` + SELECT name FROM posts_intermediate ORDER BY id ASC + `, + ); + // Both rows should be filled regardless of PK/time ordering. + expect(rows.map((row) => row.name)).toEqual([ + "later-date-lower-pk", + "earlier-date-higher-pk", + ]); }); test("returns nothing to fill for empty source", async ({ @@ -421,6 +501,66 @@ describe("Pgslice.fill", () => { expect(error.message).toBe("Table not found: public.posts_intermediate"); }); + test("fills old rows when dest has recent mirrored row", async ({ + pgslice, + transaction, + }) => { + // Create source table with data spanning the partition range + await transaction.query(sql.unsafe` + CREATE TABLE posts ( + id BIGSERIAL PRIMARY KEY, + created_at DATE NOT NULL, + name TEXT + ) + `); + await transaction.query(sql.unsafe` + INSERT INTO posts (created_at, name) VALUES + ('2026-01-01', 'old-1'), + ('2026-01-02', 'old-2'), + ('2026-01-03', 'old-3'), + ('2026-01-10', 'recent-mirrored') + `); + + // Setup partitioned intermediate table + await pgslice.prep(transaction, { + table: "posts", + column: "created_at", + period: "month", + partition: true, + }); + await pgslice.addPartitions(transaction, { + table: "posts", + intermediate: true, + past: 0, + future: 0, + }); + + // Simulate mirroring: a recent row was copied to dest by the trigger + // BEFORE fill runs. This sets destMaxId to 4. + await transaction.query(sql.unsafe` + INSERT INTO posts_intermediate (id, created_at, name) + VALUES (4, '2026-01-10', 'recent-mirrored') + `); + + // Run fill WITHOUT --start. + // Bug: fill sees destMaxId=4, queries "WHERE id > 4", finds nothing, + // returns "nothing to fill". Old rows (id 1,2,3) are never copied. + const batches = []; + for await (const batch of pgslice.fill(transaction, { + table: "posts", + })) { + batches.push(batch); + } + + // All 4 source rows should be in the dest + const count = await transaction.one( + sql.type(z.object({ count: z.coerce.number() }))` + SELECT COUNT(*)::int FROM posts_intermediate + `, + ); + expect(count.count).toBe(4); + }); + test("throws for table without primary key", async ({ pgslice, transaction, diff --git a/src/filler.ts b/src/filler.ts index 9ed4843..5bf8349 100644 --- a/src/filler.ts +++ b/src/filler.ts @@ -108,9 +108,20 @@ export class Filler { }); startingId = destMaxId ?? undefined; } else { - // Get max from dest - resume from where we left off (exclusive) - const destMaxId = await destTable.maxId(tx); - startingId = destMaxId ?? undefined; + // Start from the source's min PK (inclusive) so we don't skip old rows + // when the dest already has mirrored data with a high ID. + const minId = await sourceTable.minId( + tx, + timeFilter + ? { + column: timeFilter.column, + cast: timeFilter.cast, + startingTime: timeFilter.startingTime, + } + : undefined, + ); + startingId = minId ?? undefined; + includeStart = true; } return new Filler({ @@ -150,8 +161,8 @@ export class Filler { currentId = result.endId; } - // Stop when no rows were inserted (source exhausted) - if (result.rowsInserted === 0) { + // Stop when no source rows remain + if (result.sourceCount === 0) { break; } @@ -167,6 +178,7 @@ export class Filler { currentId: IdValue | null, includeStart: boolean, ): Promise<{ + sourceCount: number; rowsInserted: number; startId: IdValue | null; endId: IdValue | null; @@ -214,31 +226,42 @@ export class Filler { sql.fragment`, `, ); - // Build and execute the CTE-based INSERT query + // Use a two-CTE approach: source_batch tracks progress independently of + // inserted rows. This prevents early termination when an entire batch + // consists of rows already in dest (e.g. from mirroring or a prior partial fill). const result = await connection.one( sql.type( z.object({ - max_id: idValueSchema, + source_max_id: idValueSchema, + source_count: z.coerce.number(), count: z.coerce.number(), }), )` - WITH batch AS ( - INSERT INTO ${this.#dest.sqlIdentifier} (${columnList}) + WITH source_batch AS ( SELECT ${columnList} FROM ${this.#source.sqlIdentifier} WHERE ${whereClause} ORDER BY ${pkCol} LIMIT ${this.#batchSize} + ), + inserted AS ( + INSERT INTO ${this.#dest.sqlIdentifier} (${columnList}) + SELECT ${columnList} + FROM source_batch ON CONFLICT DO NOTHING RETURNING ${pkCol} ) - SELECT MAX(${pkCol}) AS max_id, COUNT(*)::int AS count FROM batch + SELECT + (SELECT MAX(${pkCol}) FROM source_batch) AS source_max_id, + (SELECT COUNT(*)::int FROM source_batch) AS source_count, + (SELECT COUNT(*)::int FROM inserted) AS count `, ); - const endId = transformIdValue(result.max_id); + const endId = transformIdValue(result.source_max_id); return { + sourceCount: result.source_count, rowsInserted: result.count, startId, endId, diff --git a/src/pgslice.ts b/src/pgslice.ts index c286051..eb3ba51 100644 --- a/src/pgslice.ts +++ b/src/pgslice.ts @@ -420,7 +420,9 @@ export class Pgslice { ); try { - const filler = await Filler.init(connection, options); + const filler = await connection.transaction((tx) => + Filler.init(tx, options), + ); for await (const batch of filler.fill(connection)) { yield batch; @@ -447,7 +449,9 @@ export class Pgslice { "synchronize", ); try { - const synchronizer = await Synchronizer.init(connection, options); + const synchronizer = await connection.transaction((tx) => + Synchronizer.init(tx, options), + ); for await (const batch of synchronizer.synchronize(connection)) { yield batch; diff --git a/src/table.ts b/src/table.ts index 8213a4d..e7eb98a 100644 --- a/src/table.ts +++ b/src/table.ts @@ -437,22 +437,26 @@ export class Table { const col = sql.identifier([await this.primaryKey(tx)]); let whereClause = sql.fragment`1 = 1`; - const orderByClauses = [sql.fragment`${col} ASC`]; if (options?.column && options.cast && options.startingTime) { const timeCol = sql.identifier([options.column]); const startDate = formatDateForSql(options.startingTime, options.cast); whereClause = sql.fragment`${timeCol} >= ${startDate}`; - orderByClauses.unshift(sql.fragment`${timeCol} ASC`); } + // We want the smallest PK within the time range, so we order by PK and take + // the first row. Ordering by time (created_at) is faster, but only correct + // if PK order is monotonic with the partition column. With ULIDs, clock + // skew, backfills, or manual timestamps can yield smaller PKs with later + // timestamps, which would be skipped. This path favors correctness; it can + // be slower on large tables without a supporting index. const result = await tx.maybeOne( sql.type(z.object({ min_id: idValueSchema }))` SELECT ${col} AS min_id FROM ${this.sqlIdentifier} WHERE ${whereClause} - ORDER BY ${sql.join(orderByClauses, sql.fragment`, `)} + ORDER BY ${col} ASC LIMIT 1 `, ); diff --git a/src/types.ts b/src/types.ts index 8a0826e..4a2d130 100644 --- a/src/types.ts +++ b/src/types.ts @@ -116,6 +116,7 @@ export interface FillOptions { */ export interface FillBatchResult { batchNumber: number; + sourceCount: number; rowsInserted: number; startId: IdValue | null; endId: IdValue | null;