-
Notifications
You must be signed in to change notification settings - Fork 0
Use advisory locks to guard against concurrent pgslice invocations
#16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
773a739
5594ebe
a04f7b9
b4f1e35
bac9be9
0faf50b
ac46f33
a5c36bf
bf0980b
b8d9d50
79f5c80
525b35a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,169 @@ | ||
| import { describe, expect } from "vitest"; | ||
| import { createPool } from "slonik"; | ||
|
|
||
| import { pgsliceTest as test } from "./testing/index.js"; | ||
| import { AdvisoryLock, AdvisoryLockError } from "./advisory-lock.js"; | ||
| import { Table } from "./table.js"; | ||
|
|
||
| describe("AdvisoryLock.withLock", () => { | ||
| test("executes handler and returns result", async ({ transaction }) => { | ||
| const table = Table.parse("test_table"); | ||
| const result = await AdvisoryLock.withLock( | ||
| transaction, | ||
| table, | ||
| "test_op", | ||
| async () => { | ||
| return "success"; | ||
| }, | ||
| ); | ||
|
|
||
| expect(result).toBe("success"); | ||
| }); | ||
|
|
||
| test("releases lock even if handler throws", async ({ transaction }) => { | ||
| const table = Table.parse("test_table"); | ||
|
|
||
| await expect( | ||
| AdvisoryLock.withLock(transaction, table, "test_op", async () => { | ||
| throw new Error("handler error"); | ||
| }), | ||
| ).rejects.toThrow("handler error"); | ||
|
|
||
| // Should be able to acquire the lock again since it was released | ||
| const result = await AdvisoryLock.withLock( | ||
| transaction, | ||
| table, | ||
| "test_op", | ||
| async () => "acquired again", | ||
| ); | ||
| expect(result).toBe("acquired again"); | ||
| }); | ||
|
|
||
| test("throws AdvisoryLockError when lock is held by another session", async ({ | ||
| databaseUrl, | ||
| }) => { | ||
| const table = Table.parse("test_table"); | ||
| const operation = "test_op"; | ||
|
|
||
| // Create two separate pools - each will hold a separate session | ||
| const pool1 = await createPool(databaseUrl.toString(), { | ||
| maximumPoolSize: 1, | ||
| queryRetryLimit: 0, | ||
| }); | ||
| const pool2 = await createPool(databaseUrl.toString(), { | ||
| maximumPoolSize: 1, | ||
| queryRetryLimit: 0, | ||
| }); | ||
|
|
||
| try { | ||
| // Use a transaction in pool1 to hold the connection open while we hold the lock | ||
| await pool1.transaction(async (tx1) => { | ||
| // Acquire lock in the first session | ||
| const release = await AdvisoryLock.acquire(tx1, table, operation); | ||
|
|
||
| // Try to acquire the same lock in the second session | ||
| await pool2.transaction(async (tx2) => { | ||
| await expect( | ||
| AdvisoryLock.acquire(tx2, table, operation), | ||
| ).rejects.toThrow(AdvisoryLockError); | ||
| }); | ||
|
|
||
| await release(); | ||
| }); | ||
| } finally { | ||
| await pool1.end(); | ||
| await pool2.end(); | ||
| } | ||
| }); | ||
| }); | ||
|
|
||
| describe("AdvisoryLock.acquire", () => { | ||
| test("returns a release function", async ({ transaction }) => { | ||
| const table = Table.parse("test_table"); | ||
| const release = await AdvisoryLock.acquire(transaction, table, "test_op"); | ||
|
|
||
| expect(typeof release).toBe("function"); | ||
| await release(); | ||
| }); | ||
|
|
||
| test("same table + different operation = different locks", async ({ | ||
| databaseUrl, | ||
| }) => { | ||
| const table = Table.parse("test_table"); | ||
|
|
||
| const pool1 = await createPool(databaseUrl.toString(), { | ||
| maximumPoolSize: 1, | ||
| queryRetryLimit: 0, | ||
| }); | ||
| const pool2 = await createPool(databaseUrl.toString(), { | ||
| maximumPoolSize: 1, | ||
| queryRetryLimit: 0, | ||
| }); | ||
|
|
||
| try { | ||
| // Use transactions to hold connections open | ||
| await pool1.transaction(async (tx1) => { | ||
| // Acquire lock for operation1 | ||
| const release1 = await AdvisoryLock.acquire(tx1, table, "operation1"); | ||
|
|
||
| // Should be able to acquire lock for operation2 on same table in different session | ||
| await pool2.transaction(async (tx2) => { | ||
| const release2 = await AdvisoryLock.acquire(tx2, table, "operation2"); | ||
| await release2(); | ||
| }); | ||
|
|
||
| await release1(); | ||
| }); | ||
| } finally { | ||
| await pool1.end(); | ||
| await pool2.end(); | ||
| } | ||
| }); | ||
|
|
||
| test("different table + same operation = different locks", async ({ | ||
| databaseUrl, | ||
| }) => { | ||
| const table1 = Table.parse("table_one"); | ||
| const table2 = Table.parse("table_two"); | ||
|
|
||
| const pool1 = await createPool(databaseUrl.toString(), { | ||
| maximumPoolSize: 1, | ||
| queryRetryLimit: 0, | ||
| }); | ||
| const pool2 = await createPool(databaseUrl.toString(), { | ||
| maximumPoolSize: 1, | ||
| queryRetryLimit: 0, | ||
| }); | ||
|
|
||
| try { | ||
| // Use transactions to hold connections open | ||
| await pool1.transaction(async (tx1) => { | ||
| // Acquire lock for table1 | ||
| const release1 = await AdvisoryLock.acquire(tx1, table1, "same_op"); | ||
|
|
||
| // Should be able to acquire lock for table2 with same operation | ||
| await pool2.transaction(async (tx2) => { | ||
| const release2 = await AdvisoryLock.acquire(tx2, table2, "same_op"); | ||
| await release2(); | ||
| }); | ||
|
|
||
| await release1(); | ||
| }); | ||
| } finally { | ||
| await pool1.end(); | ||
| await pool2.end(); | ||
| } | ||
| }); | ||
| }); | ||
|
|
||
| describe("AdvisoryLockError", () => { | ||
| test("has descriptive error message", () => { | ||
| const table = Table.parse("my_schema.my_table"); | ||
| const error = new AdvisoryLockError(table, "prep"); | ||
|
|
||
| expect(error.message).toContain("prep"); | ||
| expect(error.message).toContain("my_schema.my_table"); | ||
| expect(error.message).toContain("Another pgslice operation"); | ||
| expect(error.name).toBe("AdvisoryLockError"); | ||
| }); | ||
| }); |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,96 @@ | ||
| import { CommonQueryMethods } from "slonik"; | ||
| import { z } from "zod"; | ||
| import { Table } from "./table.js"; | ||
| import { sql } from "./sql-utils.js"; | ||
|
|
||
| export class AdvisoryLockError extends Error { | ||
| override name = "AdvisoryLockError"; | ||
|
|
||
| constructor(table: Table, operation: string) { | ||
| super( | ||
| `Could not acquire advisory lock for "${operation}" on table "${table.toString()}". ` + | ||
| `Another pgslice operation may be in progress.`, | ||
| ); | ||
| } | ||
| } | ||
|
|
||
| export abstract class AdvisoryLock { | ||
| /** | ||
| * Executes a handler while holding an advisory lock. | ||
| * The lock is automatically released when the handler completes or throws. | ||
| */ | ||
| static async withLock<T>( | ||
| connection: CommonQueryMethods, | ||
| table: Table, | ||
| operation: string, | ||
| handler: () => Promise<T>, | ||
| ): Promise<T> { | ||
| const release = await this.acquire(connection, table, operation); | ||
| try { | ||
| return await handler(); | ||
| } finally { | ||
| await release(); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Acquires an advisory lock and returns a release function. | ||
| * Use this for generators that need to hold a lock across yields. | ||
| */ | ||
| static async acquire( | ||
| connection: CommonQueryMethods, | ||
| table: Table, | ||
| operation: string, | ||
| ): Promise<() => Promise<void>> { | ||
| const key = await this.#getKey(connection, table, operation); | ||
| const acquired = await this.#tryAcquire(connection, key); | ||
|
|
||
| if (!acquired) { | ||
| throw new AdvisoryLockError(table, operation); | ||
| } | ||
|
|
||
| return async () => { | ||
| await this.#release(connection, key); | ||
| }; | ||
| } | ||
|
|
||
| static async #getKey( | ||
| connection: CommonQueryMethods, | ||
| table: Table, | ||
| operation: string, | ||
| ): Promise<bigint> { | ||
| const lockName = `${table.toString()}:${operation}`; | ||
| const result = await connection.one( | ||
| sql.type(z.object({ key: z.coerce.bigint() }))` | ||
| SELECT hashtext(${lockName})::bigint AS key | ||
| `, | ||
| ); | ||
| return result.key; | ||
| } | ||
|
|
||
| static async #tryAcquire( | ||
| connection: CommonQueryMethods, | ||
| key: bigint, | ||
| ): Promise<boolean> { | ||
| const result = await connection.one( | ||
| sql.type(z.object({ acquired: z.boolean() }))` | ||
| SELECT pg_try_advisory_lock(${key}) AS acquired | ||
| `, | ||
| ); | ||
| return result.acquired; | ||
| } | ||
|
|
||
| static async #release( | ||
| connection: CommonQueryMethods, | ||
| key: bigint, | ||
| ): Promise<void> { | ||
| const { acquired } = await connection.one( | ||
| sql.type( | ||
| z.object({ acquired: z.boolean() }), | ||
| )`SELECT pg_advisory_unlock(${key}) AS acquired`, | ||
| ); | ||
| if (!acquired) { | ||
mthadley marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| throw new Error("Attempted to release lock that was never held."); | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,7 +25,7 @@ export class DisableMirroringCommand extends BaseCommand { | |
|
|
||
| override async perform(pgslice: Pgslice): Promise<void> { | ||
| await pgslice.start(async (tx) => { | ||
| await this.context.pgslice.disableMirroring(tx, { table: this.table }); | ||
| await pgslice.disableMirroring(tx, { table: this.table }); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Style/consistency change?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I should actually clean this up: There's two ways in a command to get access to the I can look at that later. |
||
| this.context.stdout.write( | ||
| `Mirroring triggers disabled for ${this.table}\n`, | ||
| ); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An alternative would be row based locks on a dedicated table which might have easier debugging, but I can see why session-level advisory locks make more sense given
fill's batch processing.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, and if we end up having issues with this advisory locking strategy, maybe we even do that. But it's nice to have
pgslicenot depend on a specific schema in that way.