Skip to content
Merged
169 changes: 169 additions & 0 deletions src/advisory-lock.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
import { describe, expect } from "vitest";
import { createPool } from "slonik";

import { pgsliceTest as test } from "./testing/index.js";
import { AdvisoryLock, AdvisoryLockError } from "./advisory-lock.js";
import { Table } from "./table.js";

describe("AdvisoryLock.withLock", () => {
test("executes handler and returns result", async ({ transaction }) => {
const table = Table.parse("test_table");
const result = await AdvisoryLock.withLock(
transaction,
table,
"test_op",
async () => {
return "success";
},
);

expect(result).toBe("success");
});

test("releases lock even if handler throws", async ({ transaction }) => {
const table = Table.parse("test_table");

await expect(
AdvisoryLock.withLock(transaction, table, "test_op", async () => {
throw new Error("handler error");
}),
).rejects.toThrow("handler error");

// Should be able to acquire the lock again since it was released
const result = await AdvisoryLock.withLock(
transaction,
table,
"test_op",
async () => "acquired again",
);
expect(result).toBe("acquired again");
});

test("throws AdvisoryLockError when lock is held by another session", async ({
databaseUrl,
}) => {
const table = Table.parse("test_table");
const operation = "test_op";

// Create two separate pools - each will hold a separate session
const pool1 = await createPool(databaseUrl.toString(), {
maximumPoolSize: 1,
queryRetryLimit: 0,
});
const pool2 = await createPool(databaseUrl.toString(), {
maximumPoolSize: 1,
queryRetryLimit: 0,
});

try {
// Use a transaction in pool1 to hold the connection open while we hold the lock
await pool1.transaction(async (tx1) => {
// Acquire lock in the first session
const release = await AdvisoryLock.acquire(tx1, table, operation);

// Try to acquire the same lock in the second session
await pool2.transaction(async (tx2) => {
await expect(
AdvisoryLock.acquire(tx2, table, operation),
).rejects.toThrow(AdvisoryLockError);
});

await release();
});
} finally {
await pool1.end();
await pool2.end();
}
});
});

describe("AdvisoryLock.acquire", () => {
test("returns a release function", async ({ transaction }) => {
const table = Table.parse("test_table");
const release = await AdvisoryLock.acquire(transaction, table, "test_op");

expect(typeof release).toBe("function");
await release();
});

test("same table + different operation = different locks", async ({
databaseUrl,
}) => {
const table = Table.parse("test_table");

const pool1 = await createPool(databaseUrl.toString(), {
maximumPoolSize: 1,
queryRetryLimit: 0,
});
const pool2 = await createPool(databaseUrl.toString(), {
maximumPoolSize: 1,
queryRetryLimit: 0,
});

try {
// Use transactions to hold connections open
await pool1.transaction(async (tx1) => {
// Acquire lock for operation1
const release1 = await AdvisoryLock.acquire(tx1, table, "operation1");

// Should be able to acquire lock for operation2 on same table in different session
await pool2.transaction(async (tx2) => {
const release2 = await AdvisoryLock.acquire(tx2, table, "operation2");
await release2();
});

await release1();
});
} finally {
await pool1.end();
await pool2.end();
}
});

test("different table + same operation = different locks", async ({
databaseUrl,
}) => {
const table1 = Table.parse("table_one");
const table2 = Table.parse("table_two");

const pool1 = await createPool(databaseUrl.toString(), {
maximumPoolSize: 1,
queryRetryLimit: 0,
});
const pool2 = await createPool(databaseUrl.toString(), {
maximumPoolSize: 1,
queryRetryLimit: 0,
});

try {
// Use transactions to hold connections open
await pool1.transaction(async (tx1) => {
// Acquire lock for table1
const release1 = await AdvisoryLock.acquire(tx1, table1, "same_op");

// Should be able to acquire lock for table2 with same operation
await pool2.transaction(async (tx2) => {
const release2 = await AdvisoryLock.acquire(tx2, table2, "same_op");
await release2();
});

await release1();
});
} finally {
await pool1.end();
await pool2.end();
}
});
});

describe("AdvisoryLockError", () => {
test("has descriptive error message", () => {
const table = Table.parse("my_schema.my_table");
const error = new AdvisoryLockError(table, "prep");

expect(error.message).toContain("prep");
expect(error.message).toContain("my_schema.my_table");
expect(error.message).toContain("Another pgslice operation");
expect(error.name).toBe("AdvisoryLockError");
});
});
96 changes: 96 additions & 0 deletions src/advisory-lock.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import { CommonQueryMethods } from "slonik";
import { z } from "zod";
import { Table } from "./table.js";
import { sql } from "./sql-utils.js";

export class AdvisoryLockError extends Error {
override name = "AdvisoryLockError";

constructor(table: Table, operation: string) {
super(
`Could not acquire advisory lock for "${operation}" on table "${table.toString()}". ` +
`Another pgslice operation may be in progress.`,
);
}
}

export abstract class AdvisoryLock {
/**
* Executes a handler while holding an advisory lock.
* The lock is automatically released when the handler completes or throws.
*/
static async withLock<T>(
connection: CommonQueryMethods,
table: Table,
operation: string,
handler: () => Promise<T>,
): Promise<T> {
const release = await this.acquire(connection, table, operation);
try {
return await handler();
} finally {
await release();
}
}

/**
* Acquires an advisory lock and returns a release function.
* Use this for generators that need to hold a lock across yields.
*/
static async acquire(
connection: CommonQueryMethods,
table: Table,
operation: string,
): Promise<() => Promise<void>> {
const key = await this.#getKey(connection, table, operation);
const acquired = await this.#tryAcquire(connection, key);

if (!acquired) {
throw new AdvisoryLockError(table, operation);
}

return async () => {
await this.#release(connection, key);
};
}

static async #getKey(
connection: CommonQueryMethods,
table: Table,
operation: string,
): Promise<bigint> {
const lockName = `${table.toString()}:${operation}`;
const result = await connection.one(
sql.type(z.object({ key: z.coerce.bigint() }))`
SELECT hashtext(${lockName})::bigint AS key
`,
);
return result.key;
}

static async #tryAcquire(
connection: CommonQueryMethods,
key: bigint,
): Promise<boolean> {
const result = await connection.one(
sql.type(z.object({ acquired: z.boolean() }))`
SELECT pg_try_advisory_lock(${key}) AS acquired
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An alternative would be row based locks on a dedicated table which might have easier debugging, but I can see why session-level advisory locks make more sense given fill's batch processing.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, and if we end up having issues with this advisory locking strategy, maybe we even do that. But it's nice to have pgslice not depend on a specific schema in that way.

`,
);
return result.acquired;
}

static async #release(
connection: CommonQueryMethods,
key: bigint,
): Promise<void> {
const { acquired } = await connection.one(
sql.type(
z.object({ acquired: z.boolean() }),
)`SELECT pg_advisory_unlock(${key}) AS acquired`,
);
if (!acquired) {
throw new Error("Attempted to release lock that was never held.");
}
}
}
2 changes: 1 addition & 1 deletion src/commands/disable-mirroring.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export class DisableMirroringCommand extends BaseCommand {

override async perform(pgslice: Pgslice): Promise<void> {
await pgslice.start(async (tx) => {
await this.context.pgslice.disableMirroring(tx, { table: this.table });
await pgslice.disableMirroring(tx, { table: this.table });
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style/consistency change?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I should actually clean this up: There's two ways in a command to get access to the Pgslice instance (context or the perform parameter) but we should probably just have one.

I can look at that later.

this.context.stdout.write(
`Mirroring triggers disabled for ${this.table}\n`,
);
Expand Down
34 changes: 18 additions & 16 deletions src/commands/fill.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,25 +56,27 @@ export class FillCommand extends BaseCommand {
});

async perform(pgslice: Pgslice) {
let hasBatches = false;
for await (const batch of pgslice.fill({
table: this.table,
swapped: this.swapped,
batchSize: this.batchSize,
start: this.start,
})) {
hasBatches = true;
await pgslice.start(async (conn) => {
let hasBatches = false;
for await (const batch of pgslice.fill(conn, {
table: this.table,
swapped: this.swapped,
batchSize: this.batchSize,
start: this.start,
})) {
hasBatches = true;

this.context.stdout.write(`/* batch ${batch.batchNumber} */\n`);
this.context.stdout.write(`/* batch ${batch.batchNumber} */\n`);

// Sleep between batches if requested
if (this.sleep) {
await sleep(this.sleep * 1000);
// Sleep between batches if requested
if (this.sleep) {
await sleep(this.sleep * 1000);
}
}
}

if (!hasBatches) {
this.context.stdout.write("/* nothing to fill */\n");
}
if (!hasBatches) {
this.context.stdout.write("/* nothing to fill */\n");
}
});
}
}
60 changes: 31 additions & 29 deletions src/commands/synchronize.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,36 +75,38 @@ export class SynchronizeCommand extends BaseCommand {
let targetName: string | null = null;
let headerPrinted = false;

for await (const batch of pgslice.synchronize({
table: this.table,
start: this.start,
windowSize: this.windowSize,
dryRun: this.dryRun,
})) {
// Print header on first batch (we need synchronizer to know table names)
if (!headerPrinted) {
// Get table names from the batch (inferred from the command options)
sourceName = this.table;
targetName = `${this.table}_intermediate`;
this.#printHeader(sourceName, targetName);
headerPrinted = true;
await pgslice.start(async (conn) => {
for await (const batch of pgslice.synchronize(conn, {
table: this.table,
start: this.start,
windowSize: this.windowSize,
dryRun: this.dryRun,
})) {
// Print header on first batch (we need synchronizer to know table names)
if (!headerPrinted) {
// Get table names from the batch (inferred from the command options)
sourceName = this.table;
targetName = `${this.table}_intermediate`;
this.#printHeader(sourceName, targetName);
headerPrinted = true;
}

stats.totalBatches++;
stats.totalRowsCompared += batch.rowsCompared;
stats.matchingRows += batch.matchingRows;
stats.rowsWithDifferences += batch.rowsUpdated;
stats.missingRows += batch.rowsInserted;
stats.extraRows += batch.rowsDeleted;

this.#printBatchResult(batch);

// Calculate and apply adaptive delay
const sleepTime = this.#calculateSleepTime(batch.batchDurationMs);
if (sleepTime > 0) {
await sleep(sleepTime * 1000);
}
}

stats.totalBatches++;
stats.totalRowsCompared += batch.rowsCompared;
stats.matchingRows += batch.matchingRows;
stats.rowsWithDifferences += batch.rowsUpdated;
stats.missingRows += batch.rowsInserted;
stats.extraRows += batch.rowsDeleted;

this.#printBatchResult(batch);

// Calculate and apply adaptive delay
const sleepTime = this.#calculateSleepTime(batch.batchDurationMs);
if (sleepTime > 0) {
await sleep(sleepTime * 1000);
}
}
});

// Print summary
this.#printSummary(stats);
Expand Down
Loading