diff --git a/src/resolver/CompanyResolver.race.test.ts b/src/resolver/CompanyResolver.race.test.ts index 66d9692..b8d04f5 100644 --- a/src/resolver/CompanyResolver.race.test.ts +++ b/src/resolver/CompanyResolver.race.test.ts @@ -20,6 +20,28 @@ import { } from "../storage/canonical/CanonicalAliasSchemas"; import type { CompanyObservation } from "../storage/observation/CompanyObservationSchema"; import { CompanyResolver } from "./CompanyResolver"; +import { isUniqueConstraintError } from "./isUniqueConstraintError"; + +type ErrorShape = "sqlite" | "pg"; +type CompanyKey = "cik" | "crd"; + +function synthesizeUniqueError(shape: ErrorShape, key: CompanyKey): Error { + if (shape === "sqlite") { + const cols = + key === "cik" + ? "canonical_company.resolver_version, canonical_company.cik" + : "canonical_company.resolver_version, canonical_company.crd_number"; + return new Error(`UNIQUE constraint failed: ${cols}`); + } + const constraintName = + key === "cik" + ? "canonical_company_uniq_resolver_version_cik" + : "canonical_company_uniq_resolver_version_crd_number"; + return Object.assign( + new Error(`duplicate key value violates unique constraint "${constraintName}"`), + { code: "23505" } + ); +} function makeRepos() { const canonStorage = new InMemoryTabularStorage< @@ -180,6 +202,56 @@ async function storageEnforcesCompanyUniqueness(): Promise { // keeping twin canonical rows from being minted, and the resolver's // UNIQUE-rejection retry path is what makes the loser converge on the // winner's canonical id instead of failing. +// See PersonResolver.race.test.ts for the rationale. We additionally +// parameterise over the key kind because the canonical_company table has +// UNIQUE indexes on BOTH (resolver_version, cik) and +// (resolver_version, crd_number). Postgres includes the constraint name +// in the error message, so the test exercises both constraint-name forms. +async function runMultiProcessRace(opts: { + errorShape: ErrorShape; + key: CompanyKey; +}): Promise<{ uniqueRejections: number; ids: ReadonlySet }> { + const setup = makeRepos(); + let uniqueRejections = 0; + const originalPut = setup.canonStorage.put.bind(setup.canonStorage); + setup.canonStorage.put = async (value) => { + try { + return await originalPut(value); + } catch (err) { + if (isUniqueConstraintError(err)) { + uniqueRejections += 1; + throw synthesizeUniqueError(opts.errorShape, opts.key); + } + throw err; + } + }; + + const resolverA = new CompanyResolver({ + canonicalCompanyRepo: setup.canonRepo, + canonicalCompanyAliasRepo: setup.aliasRepo, + activeResolverVersion: "1.0.0", + }); + const resolverB = new CompanyResolver({ + canonicalCompanyRepo: setup.canonRepo, + canonicalCompanyAliasRepo: setup.aliasRepo, + activeResolverVersion: "1.0.0", + }); + const fanout = 50; + const results = await Promise.all( + Array.from({ length: fanout }, (_, i) => { + const r = i % 2 === 0 ? resolverA : resolverB; + const claim = + opts.key === "cik" + ? obs({ cik: 5555, observation_id: i + 1 }) + : obs({ crd_number: "CRD-RACE-1", observation_id: i + 1 }); + return r.resolve(claim); + }) + ); + const rows = await setup.canonStorage.getAll(); + expect(rows.length).toBe(1); + return { uniqueRejections, ids: new Set(results) }; +} + describe("CompanyResolver multi-process race (storage-level UNIQUE constraint)", () => { beforeEach(async () => { // Fail loud if a future workglow regression silently drops UNIQUE @@ -187,56 +259,47 @@ describe("CompanyResolver multi-process race (storage-level UNIQUE constraint)", // backstop when the in-process AsyncMutex is bypassed. const enforces = await storageEnforcesCompanyUniqueness(); expect(enforces).toBe(true); + // Sanity check: the resolver's helper still recognises the + // SQLite/InMemory shape the storage actually throws today. + const sample = new Error( + "UNIQUE constraint failed: canonical_company.resolver_version, canonical_company.cik" + ); + expect(isUniqueConstraintError(sample)).toBe(true); }); - it("twin resolver instances racing the same CIK still collapse to one canonical row", async () => { - const setup = makeRepos(); - // Count UNIQUE rejections at the storage layer so we can assert the - // constraint actually fired — proving the test exercises the - // storage backstop, not just an accidental id match. - let uniqueRejections = 0; - const originalPut = setup.canonStorage.put.bind(setup.canonStorage); - setup.canonStorage.put = async (value) => { - try { - return await originalPut(value); - } catch (err) { - if ( - err !== null && - typeof err === "object" && - typeof (err as { message?: unknown }).message === "string" && - ((err as { message: string }).message).startsWith( - "UNIQUE constraint failed" - ) - ) { - uniqueRejections += 1; - } - throw err; - } - }; + it("twin resolver instances racing the same CIK collapse to one row (SQLite/InMemory error shape)", async () => { + const { uniqueRejections, ids } = await runMultiProcessRace({ + errorShape: "sqlite", + key: "cik", + }); + expect(ids.size).toBe(1); + expect(uniqueRejections).toBeGreaterThanOrEqual(1); + }); - const resolverA = new CompanyResolver({ - canonicalCompanyRepo: setup.canonRepo, - canonicalCompanyAliasRepo: setup.aliasRepo, - activeResolverVersion: "1.0.0", + it("twin resolver instances racing the same CIK collapse to one row (Postgres error shape)", async () => { + const { uniqueRejections, ids } = await runMultiProcessRace({ + errorShape: "pg", + key: "cik", }); - const resolverB = new CompanyResolver({ - canonicalCompanyRepo: setup.canonRepo, - canonicalCompanyAliasRepo: setup.aliasRepo, - activeResolverVersion: "1.0.0", + expect(ids.size).toBe(1); + expect(uniqueRejections).toBeGreaterThanOrEqual(1); + }); + + it("twin resolver instances racing the same CRD collapse to one row (SQLite/InMemory error shape)", async () => { + const { uniqueRejections, ids } = await runMultiProcessRace({ + errorShape: "sqlite", + key: "crd", }); - const fanout = 50; - const results = await Promise.all( - Array.from({ length: fanout }, (_, i) => { - const r = i % 2 === 0 ? resolverA : resolverB; - return r.resolve(obs({ cik: 5555, observation_id: i + 1 })); - }) - ); - expect(new Set(results).size).toBe(1); - const rows = await setup.canonStorage.getAll(); - expect(rows.length).toBe(1); - // At least one storage-level UNIQUE rejection must have fired — - // otherwise the two instances accidentally never raced and the test - // doesn't actually exercise the multi-process backstop. + expect(ids.size).toBe(1); + expect(uniqueRejections).toBeGreaterThanOrEqual(1); + }); + + it("twin resolver instances racing the same CRD collapse to one row (Postgres error shape)", async () => { + const { uniqueRejections, ids } = await runMultiProcessRace({ + errorShape: "pg", + key: "crd", + }); + expect(ids.size).toBe(1); expect(uniqueRejections).toBeGreaterThanOrEqual(1); }); diff --git a/src/resolver/PersonResolver.race.test.ts b/src/resolver/PersonResolver.race.test.ts index f7d20a9..2be92c1 100644 --- a/src/resolver/PersonResolver.race.test.ts +++ b/src/resolver/PersonResolver.race.test.ts @@ -20,6 +20,23 @@ import { } from "../storage/canonical/CanonicalAliasSchemas"; import type { PersonObservation } from "../storage/observation/PersonObservationSchema"; import { PersonResolver } from "./PersonResolver"; +import { isUniqueConstraintError } from "./isUniqueConstraintError"; + +type ErrorShape = "sqlite" | "pg"; + +function synthesizeUniqueError(shape: ErrorShape): Error { + if (shape === "sqlite") { + return new Error( + "UNIQUE constraint failed: canonical_person.resolver_version, canonical_person.cik" + ); + } + return Object.assign( + new Error( + 'duplicate key value violates unique constraint "canonical_person_uniq_resolver_version_cik"' + ), + { code: "23505" } + ); +} function makeRepos() { const canonStorage = new InMemoryTabularStorage< @@ -197,6 +214,55 @@ async function storageEnforcesPersonUniqueness(): Promise { // minted, and PersonResolver.resolve()'s UNIQUE-rejection retry path is // what makes the loser converge on the winner's canonical id instead of // failing. +// Drives the multi-process race scenario under a chosen storage-error +// shape. We monkey-patch `put` to translate the storage's natural UNIQUE +// rejection into the chosen error shape — proving the resolver's retry +// path recognises both the SQLite/InMemory error shape AND the raw +// Postgres `pg.DatabaseError` shape (`code: "23505"` + the +// "duplicate key value violates unique constraint" message) without +// requiring an actual Postgres connection in the unit suite. The winner +// row stays in place so the loser's re-query converges on it. +async function runMultiProcessRace(opts: { + errorShape: ErrorShape; +}): Promise<{ uniqueRejections: number; ids: ReadonlySet }> { + const setup = makeRepos(); + let uniqueRejections = 0; + const originalPut = setup.canonStorage.put.bind(setup.canonStorage); + setup.canonStorage.put = async (value) => { + try { + return await originalPut(value); + } catch (err) { + if (isUniqueConstraintError(err)) { + uniqueRejections += 1; + throw synthesizeUniqueError(opts.errorShape); + } + throw err; + } + }; + + const resolverA = new PersonResolver({ + canonicalPersonRepo: setup.canonRepo, + canonicalPersonAliasRepo: setup.aliasRepo, + activeResolverVersion: "1.0.0", + }); + const resolverB = new PersonResolver({ + canonicalPersonRepo: setup.canonRepo, + canonicalPersonAliasRepo: setup.aliasRepo, + activeResolverVersion: "1.0.0", + }); + + const fanout = 50; + const results = await Promise.all( + Array.from({ length: fanout }, (_, i) => { + const r = i % 2 === 0 ? resolverA : resolverB; + return r.resolve(obs({ cik: 5555, observation_id: i + 1 })); + }) + ); + const rows = await setup.canonStorage.getAll(); + expect(rows.length).toBe(1); + return { uniqueRejections, ids: new Set(results) }; +} + describe("PersonResolver multi-process race (storage-level UNIQUE constraint)", () => { beforeEach(async () => { // Fail loud if a future workglow regression silently drops UNIQUE @@ -204,57 +270,29 @@ describe("PersonResolver multi-process race (storage-level UNIQUE constraint)", // backstop when the in-process AsyncMutex is bypassed. const enforces = await storageEnforcesPersonUniqueness(); expect(enforces).toBe(true); + // Also sanity-check that the SQLite/InMemory shape the storage layer + // actually throws is still recognised by the resolver's helper. If + // workglow ever changes the wording, this asserts loudly here + // instead of silently breaking the retry path in production. + const sample = new Error( + "UNIQUE constraint failed: canonical_person.resolver_version, canonical_person.cik" + ); + expect(isUniqueConstraintError(sample)).toBe(true); }); - it("twin resolver instances racing the same CIK still collapse to one canonical row", async () => { - const setup = makeRepos(); - // Count UNIQUE rejections at the storage layer so we can assert the - // constraint actually fired — proving the test exercises the - // storage backstop, not just an accidental id match. - let uniqueRejections = 0; - const originalPut = setup.canonStorage.put.bind(setup.canonStorage); - setup.canonStorage.put = async (value) => { - try { - return await originalPut(value); - } catch (err) { - if ( - err !== null && - typeof err === "object" && - typeof (err as { message?: unknown }).message === "string" && - ((err as { message: string }).message).startsWith( - "UNIQUE constraint failed" - ) - ) { - uniqueRejections += 1; - } - throw err; - } - }; - - const resolverA = new PersonResolver({ - canonicalPersonRepo: setup.canonRepo, - canonicalPersonAliasRepo: setup.aliasRepo, - activeResolverVersion: "1.0.0", - }); - const resolverB = new PersonResolver({ - canonicalPersonRepo: setup.canonRepo, - canonicalPersonAliasRepo: setup.aliasRepo, - activeResolverVersion: "1.0.0", + it("twin resolver instances racing the same CIK collapse to one row (SQLite/InMemory error shape)", async () => { + const { uniqueRejections, ids } = await runMultiProcessRace({ + errorShape: "sqlite", }); + expect(ids.size).toBe(1); + expect(uniqueRejections).toBeGreaterThanOrEqual(1); + }); - const fanout = 50; - const results = await Promise.all( - Array.from({ length: fanout }, (_, i) => { - const r = i % 2 === 0 ? resolverA : resolverB; - return r.resolve(obs({ cik: 5555, observation_id: i + 1 })); - }) - ); - expect(new Set(results).size).toBe(1); - const rows = await setup.canonStorage.getAll(); - expect(rows.length).toBe(1); - // At least one storage-level UNIQUE rejection must have fired — - // otherwise the two instances accidentally never raced and the test - // doesn't actually exercise the multi-process backstop. + it("twin resolver instances racing the same CIK collapse to one row (Postgres error shape)", async () => { + const { uniqueRejections, ids } = await runMultiProcessRace({ + errorShape: "pg", + }); + expect(ids.size).toBe(1); expect(uniqueRejections).toBeGreaterThanOrEqual(1); }); diff --git a/src/resolver/isUniqueConstraintError.test.ts b/src/resolver/isUniqueConstraintError.test.ts new file mode 100644 index 0000000..bc24bef --- /dev/null +++ b/src/resolver/isUniqueConstraintError.test.ts @@ -0,0 +1,110 @@ +/** + * @license + * Copyright 2026 Steven Roussey + * SPDX-License-Identifier: Apache-2.0 + */ + +import { describe, expect, it } from "bun:test"; +import { isUniqueConstraintError } from "./isUniqueConstraintError"; + +describe("isUniqueConstraintError", () => { + describe("SQLite / InMemory", () => { + it("matches the canonical SQLite/InMemory message", () => { + expect( + isUniqueConstraintError( + new Error( + "UNIQUE constraint failed: canonical_person.resolver_version, canonical_person.cik" + ) + ) + ).toBe(true); + }); + + it("matches the SQLite native error code", () => { + expect(isUniqueConstraintError({ code: "SQLITE_CONSTRAINT_UNIQUE" })).toBe(true); + }); + + it("is case-insensitive on the SQLite/InMemory message", () => { + expect(isUniqueConstraintError(new Error("unique constraint failed: foo"))).toBe( + true + ); + expect(isUniqueConstraintError(new Error("Unique Constraint Failed: foo"))).toBe( + true + ); + }); + }); + + describe("Postgres", () => { + it("matches a Postgres error by SQLSTATE code alone (no message)", () => { + expect(isUniqueConstraintError({ code: "23505" })).toBe(true); + }); + + it("matches a Postgres error by message alone (no code)", () => { + expect( + isUniqueConstraintError( + new Error( + 'duplicate key value violates unique constraint "canonical_company_uniq_resolver_version_cik"' + ) + ) + ).toBe(true); + }); + + it("matches a Postgres error with both code and message", () => { + const pgError = Object.assign( + new Error( + 'duplicate key value violates unique constraint "canonical_company_uniq_resolver_version_crd_number"' + ), + { code: "23505" } + ); + expect(isUniqueConstraintError(pgError)).toBe(true); + }); + + it("is case-insensitive on the Postgres message", () => { + expect( + isUniqueConstraintError( + new Error("DUPLICATE KEY VALUE VIOLATES UNIQUE CONSTRAINT \"foo\"") + ) + ).toBe(true); + }); + + it("matches the Postgres message when embedded mid-string", () => { + expect( + isUniqueConstraintError( + new Error( + 'ERROR: duplicate key value violates unique constraint "x"\nDETAIL: Key (a)=(1) already exists.' + ) + ) + ).toBe(true); + }); + }); + + describe("rejects unrelated errors", () => { + it("rejects unrelated Postgres SQLSTATE codes", () => { + expect(isUniqueConstraintError({ code: "23503" })).toBe(false); // FK violation + expect(isUniqueConstraintError({ code: "23502" })).toBe(false); // NOT NULL violation + expect(isUniqueConstraintError({ code: "23514" })).toBe(false); // CHECK violation + }); + + it("rejects unrelated error messages", () => { + expect(isUniqueConstraintError(new Error("connection refused"))).toBe(false); + expect(isUniqueConstraintError(new Error(""))).toBe(false); + expect(isUniqueConstraintError(new Error("unique"))).toBe(false); + expect(isUniqueConstraintError(new Error("duplicate key"))).toBe(false); + }); + + it("rejects an Error without a recognised code or message", () => { + const e = new Error("something else broke"); + expect(isUniqueConstraintError(e)).toBe(false); + }); + + it("rejects non-Error inputs", () => { + expect(isUniqueConstraintError(null)).toBe(false); + expect(isUniqueConstraintError(undefined)).toBe(false); + expect(isUniqueConstraintError("UNIQUE constraint failed")).toBe(false); + expect(isUniqueConstraintError(23505)).toBe(false); + expect(isUniqueConstraintError(true)).toBe(false); + expect(isUniqueConstraintError({})).toBe(false); + expect(isUniqueConstraintError({ message: 42 })).toBe(false); + expect(isUniqueConstraintError({ code: 23505 })).toBe(false); // number, not string + }); + }); +}); diff --git a/src/resolver/isUniqueConstraintError.ts b/src/resolver/isUniqueConstraintError.ts index 04b999e..ddd36fd 100644 --- a/src/resolver/isUniqueConstraintError.ts +++ b/src/resolver/isUniqueConstraintError.ts @@ -7,14 +7,31 @@ /** * Detects a UNIQUE-index violation thrown by `@workglow/storage` backends. * - * All three backends (InMemory, SQLite, Postgres) surface the violation as - * a `StorageError` whose message starts with `"UNIQUE constraint failed"`. - * We match on the message rather than `instanceof StorageError` so that - * future wrappers / re-thrown errors continue to be recognised; the - * message prefix is stable across backends. + * Three backends in production today: + * - InMemory / SQLite — surface the violation as an `Error` whose message + * starts (case-insensitively) with `"UNIQUE constraint failed"`. SQLite + * additionally carries `code: "SQLITE_CONSTRAINT_UNIQUE"` on the native + * `better-sqlite3` error. + * - Postgres — propagates the raw `pg.DatabaseError` unmodified through + * `PostgresTabularStorage._putInternal`. It carries `code: "23505"` + * (SQLSTATE `unique_violation`) and a message of the form + * `"duplicate key value violates unique constraint \"\""`. We match + * BOTH signals so the helper still fires if a future wrapper layer + * strips the SQLSTATE but preserves the message (or vice versa). + * + * We deliberately avoid `instanceof pg.DatabaseError` / `instanceof SqliteError` + * — neither `pg` nor `better-sqlite3` is a direct dependency of `@workglow/sec`, + * and string/code matching is robust to wrapped or re-thrown errors. */ export function isUniqueConstraintError(err: unknown): boolean { if (err === null || typeof err !== "object") return false; + const code = (err as { code?: unknown }).code; + if (code === "23505" || code === "SQLITE_CONSTRAINT_UNIQUE") return true; const msg = (err as { message?: unknown }).message; - return typeof msg === "string" && msg.startsWith("UNIQUE constraint failed"); + if (typeof msg !== "string") return false; + const lower = msg.toLowerCase(); + return ( + lower.startsWith("unique constraint failed") || + lower.includes("duplicate key value violates unique constraint") + ); }