Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ Options:
- `--start`: Primary key value to start from (numeric or ULID)
- `--where`: Additional WHERE conditions to filter rows

By default, `fill` auto-detects a starting ID by scanning for the smallest
primary key (bounded to the partition time range for partitioned tables). This
lookup can be slow on very large tables without a supporting index. If you're
resuming or want to skip the startup scan, pass `--start` instead (the batch
output includes `endId` for easy resuming).

To sync data across different databases, check out [pgsync](https://github.com/ankane/pgsync).

7. Analyze tables
Expand Down
12 changes: 6 additions & 6 deletions src/commands/fill.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ describe("FillCommand", () => {
expect(expect(exitCode).toBe(0));
const output = commandContext.stdout.read()?.toString();
expect(output).toBeDefined();
expect(output).toContain("/* batch 1 */");
expect(output).toContain("/* batch 2 */");
expect(output).toContain("/* batch 1:");
expect(output).toContain("/* batch 2:");
});

test("outputs nothing to fill when source is empty", async ({
Expand Down Expand Up @@ -194,8 +194,8 @@ describe("FillCommand", () => {
expect(expect(exitCode).toBe(0));
const output = commandContext.stdout.read()?.toString();
// ULID batches show "batch N" without total
expect(output).toContain("/* batch 1 */");
expect(output).toContain("/* batch 2 */");
expect(output).toContain("/* batch 1:");
expect(output).toContain("/* batch 2:");
});
});

Expand Down Expand Up @@ -249,8 +249,8 @@ describe("FillCommand", () => {

const output = (commandContext.stdout as PassThrough).read()?.toString();
expect(output).toBeDefined();
expect(output).toContain("/* batch 1 */");
expect(output).toContain("/* batch 2 */");
expect(output).toContain("/* batch 1:");
expect(output).toContain("/* batch 2:");
});
});
});
4 changes: 3 additions & 1 deletion src/commands/fill.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ export class FillCommand extends BaseCommand {
})) {
hasBatches = true;

this.context.stdout.write(`/* batch ${batch.batchNumber} */\n`);
this.context.stdout.write(
`/* batch ${batch.batchNumber}: ${batch.rowsInserted} rows inserted, endId=${batch.endId} */\n`,
);

// Sleep between batches if requested
if (this.sleep) {
Expand Down
220 changes: 180 additions & 40 deletions src/fill.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,42 @@ describe("Filler", () => {
);
expect(existing.name).toBe("existing");
});

test("copies remaining rows when first batch is all duplicates", async ({
transaction,
}) => {
await transaction.query(sql.unsafe`
CREATE TABLE posts (id BIGSERIAL PRIMARY KEY, name TEXT)
`);
await transaction.query(sql.unsafe`
CREATE TABLE posts_intermediate (id BIGSERIAL PRIMARY KEY, name TEXT)
`);
await transaction.query(sql.unsafe`
INSERT INTO posts (name)
SELECT 'item_' || i FROM generate_series(1, 25) AS i
`);
// Simulate a partial fill: first 10 rows already in dest
await transaction.query(sql.unsafe`
INSERT INTO posts_intermediate (id, name)
SELECT id, name FROM posts WHERE id <= 10
`);

const filler = await Filler.init(transaction, {
table: "posts",
batchSize: 10,
});

for await (const _batch of filler.fill(transaction)) {
// consume batches
}

const count = await transaction.one(
sql.type(z.object({ count: z.coerce.number() }))`
SELECT COUNT(*)::int FROM posts_intermediate
`,
);
expect(count.count).toBe(25);
});
});
});

Expand Down Expand Up @@ -283,51 +319,95 @@ describe("Pgslice.fill", () => {
pgslice,
transaction,
}) => {
vi.useFakeTimers();
vi.setSystemTime(new Date(Date.UTC(2026, 0, 15)));
await transaction.query(sql.unsafe`
CREATE TABLE posts (
id BIGSERIAL PRIMARY KEY,
created_at DATE NOT NULL,
name TEXT
)
`);
await transaction.query(sql.unsafe`
INSERT INTO posts (created_at, name) VALUES
('2026-01-10', 'in-range'),
('2025-12-15', 'out-of-range')
`);

try {
await transaction.query(sql.unsafe`
CREATE TABLE posts (
id BIGSERIAL PRIMARY KEY,
created_at DATE NOT NULL,
name TEXT
)
`);
await transaction.query(sql.unsafe`
INSERT INTO posts (created_at, name) VALUES
('2026-01-10', 'in-range'),
('2025-12-15', 'out-of-range')
`);
await pgslice.prep(transaction, {
table: "posts",
column: "created_at",
period: "month",
partition: true,
});
await pgslice.addPartitions(transaction, {
table: "posts",
intermediate: true,
past: 0,
future: 0,
});

await pgslice.prep(transaction, {
table: "posts",
column: "created_at",
period: "month",
partition: true,
});
await pgslice.addPartitions(transaction, {
table: "posts",
intermediate: true,
past: 0,
future: 0,
});
for await (const _batch of pgslice.fill(transaction, {
table: "posts",
})) {
// consume batches
}

for await (const _batch of pgslice.fill(transaction, {
table: "posts",
})) {
// consume batches
}
const rows = await transaction.any(
sql.type(z.object({ name: z.string() }))`
SELECT name FROM posts_intermediate ORDER BY id ASC
`,
);
expect(rows.map((row) => row.name)).toEqual(["in-range"]);
});

const rows = await transaction.any(
sql.type(z.object({ name: z.string() }))`
SELECT name FROM posts_intermediate ORDER BY id ASC
`,
);
expect(rows.map((row) => row.name)).toEqual(["in-range"]);
} finally {
vi.useRealTimers();
test("fills rows with non-monotonic PK and created_at", async ({
pgslice,
transaction,
}) => {
// PK order doesn't match created_at order: id=1 has a later date than id=2.
// If we chose the start ID from the earliest created_at row (id=2),
// we'd skip id=1 even though it's in-range. This mirrors ULID/time skew.
await transaction.query(sql.unsafe`
CREATE TABLE posts (
id BIGSERIAL PRIMARY KEY,
created_at DATE NOT NULL,
name TEXT
)
`);
await transaction.query(sql.unsafe`
INSERT INTO posts (id, created_at, name) VALUES
(1, '2026-01-20', 'later-date-lower-pk'),
(2, '2026-01-10', 'earlier-date-higher-pk')
`);

await pgslice.prep(transaction, {
table: "posts",
column: "created_at",
period: "month",
partition: true,
});
await pgslice.addPartitions(transaction, {
table: "posts",
intermediate: true,
past: 0,
future: 0,
});

for await (const _batch of pgslice.fill(transaction, {
table: "posts",
})) {
// consume batches
}

const rows = await transaction.any(
sql.type(z.object({ name: z.string() }))`
SELECT name FROM posts_intermediate ORDER BY id ASC
`,
);
// Both rows should be filled regardless of PK/time ordering.
expect(rows.map((row) => row.name)).toEqual([
"later-date-lower-pk",
"earlier-date-higher-pk",
]);
});

test("returns nothing to fill for empty source", async ({
Expand Down Expand Up @@ -421,6 +501,66 @@ describe("Pgslice.fill", () => {
expect(error.message).toBe("Table not found: public.posts_intermediate");
});

test("fills old rows when dest has recent mirrored row", async ({
pgslice,
transaction,
}) => {
// Create source table with data spanning the partition range
await transaction.query(sql.unsafe`
CREATE TABLE posts (
id BIGSERIAL PRIMARY KEY,
created_at DATE NOT NULL,
name TEXT
)
`);
await transaction.query(sql.unsafe`
INSERT INTO posts (created_at, name) VALUES
('2026-01-01', 'old-1'),
('2026-01-02', 'old-2'),
('2026-01-03', 'old-3'),
('2026-01-10', 'recent-mirrored')
`);

// Setup partitioned intermediate table
await pgslice.prep(transaction, {
table: "posts",
column: "created_at",
period: "month",
partition: true,
});
await pgslice.addPartitions(transaction, {
table: "posts",
intermediate: true,
past: 0,
future: 0,
});

// Simulate mirroring: a recent row was copied to dest by the trigger
// BEFORE fill runs. This sets destMaxId to 4.
await transaction.query(sql.unsafe`
INSERT INTO posts_intermediate (id, created_at, name)
VALUES (4, '2026-01-10', 'recent-mirrored')
`);

// Run fill WITHOUT --start.
// Bug: fill sees destMaxId=4, queries "WHERE id > 4", finds nothing,
// returns "nothing to fill". Old rows (id 1,2,3) are never copied.
const batches = [];
for await (const batch of pgslice.fill(transaction, {
table: "posts",
})) {
batches.push(batch);
}

// All 4 source rows should be in the dest
const count = await transaction.one(
sql.type(z.object({ count: z.coerce.number() }))`
SELECT COUNT(*)::int FROM posts_intermediate
`,
);
expect(count.count).toBe(4);
});

test("throws for table without primary key", async ({
pgslice,
transaction,
Expand Down
45 changes: 34 additions & 11 deletions src/filler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,20 @@ export class Filler {
});
startingId = destMaxId ?? undefined;
} else {
// Get max from dest - resume from where we left off (exclusive)
const destMaxId = await destTable.maxId(tx);
startingId = destMaxId ?? undefined;
// Start from the source's min PK (inclusive) so we don't skip old rows
// when the dest already has mirrored data with a high ID.
const minId = await sourceTable.minId(
tx,
timeFilter
? {
column: timeFilter.column,
cast: timeFilter.cast,
startingTime: timeFilter.startingTime,
}
: undefined,
);
startingId = minId ?? undefined;
includeStart = true;
}

return new Filler({
Expand Down Expand Up @@ -150,8 +161,8 @@ export class Filler {
currentId = result.endId;
}

// Stop when no rows were inserted (source exhausted)
if (result.rowsInserted === 0) {
// Stop when no source rows remain
if (result.sourceCount === 0) {
break;
}

Expand All @@ -167,6 +178,7 @@ export class Filler {
currentId: IdValue | null,
includeStart: boolean,
): Promise<{
sourceCount: number;
rowsInserted: number;
startId: IdValue | null;
endId: IdValue | null;
Expand Down Expand Up @@ -214,31 +226,42 @@ export class Filler {
sql.fragment`, `,
);

// Build and execute the CTE-based INSERT query
// Use a two-CTE approach: source_batch tracks progress independently of
// inserted rows. This prevents early termination when an entire batch
// consists of rows already in dest (e.g. from mirroring or a prior partial fill).
const result = await connection.one(
sql.type(
z.object({
max_id: idValueSchema,
source_max_id: idValueSchema,
source_count: z.coerce.number(),
count: z.coerce.number(),
}),
)`
WITH batch AS (
INSERT INTO ${this.#dest.sqlIdentifier} (${columnList})
WITH source_batch AS (
SELECT ${columnList}
FROM ${this.#source.sqlIdentifier}
WHERE ${whereClause}
ORDER BY ${pkCol}
LIMIT ${this.#batchSize}
),
inserted AS (
INSERT INTO ${this.#dest.sqlIdentifier} (${columnList})
SELECT ${columnList}
FROM source_batch
ON CONFLICT DO NOTHING
RETURNING ${pkCol}
)
SELECT MAX(${pkCol}) AS max_id, COUNT(*)::int AS count FROM batch
SELECT
(SELECT MAX(${pkCol}) FROM source_batch) AS source_max_id,
(SELECT COUNT(*)::int FROM source_batch) AS source_count,
(SELECT COUNT(*)::int FROM inserted) AS count
`,
);

const endId = transformIdValue(result.max_id);
const endId = transformIdValue(result.source_max_id);

return {
sourceCount: result.source_count,
rowsInserted: result.count,
startId,
endId,
Expand Down
Loading