Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 108 additions & 45 deletions src/resolver/CompanyResolver.race.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,28 @@ import {
} from "../storage/canonical/CanonicalAliasSchemas";
import type { CompanyObservation } from "../storage/observation/CompanyObservationSchema";
import { CompanyResolver } from "./CompanyResolver";
import { isUniqueConstraintError } from "./isUniqueConstraintError";

type ErrorShape = "sqlite" | "pg";
type CompanyKey = "cik" | "crd";

function synthesizeUniqueError(shape: ErrorShape, key: CompanyKey): Error {
if (shape === "sqlite") {
const cols =
key === "cik"
? "canonical_company.resolver_version, canonical_company.cik"
: "canonical_company.resolver_version, canonical_company.crd_number";
return new Error(`UNIQUE constraint failed: ${cols}`);
}
const constraintName =
key === "cik"
? "canonical_company_uniq_resolver_version_cik"
: "canonical_company_uniq_resolver_version_crd_number";
return Object.assign(
new Error(`duplicate key value violates unique constraint "${constraintName}"`),
{ code: "23505" }
);
}

function makeRepos() {
const canonStorage = new InMemoryTabularStorage<
Expand Down Expand Up @@ -180,63 +202,104 @@ async function storageEnforcesCompanyUniqueness(): Promise<boolean> {
// keeping twin canonical rows from being minted, and the resolver's
// UNIQUE-rejection retry path is what makes the loser converge on the
// winner's canonical id instead of failing.
// See PersonResolver.race.test.ts for the rationale. We additionally
// parameterise over the key kind because the canonical_company table has
// UNIQUE indexes on BOTH (resolver_version, cik) and
// (resolver_version, crd_number). Postgres includes the constraint name
// in the error message, so the test exercises both constraint-name forms.
async function runMultiProcessRace(opts: {
errorShape: ErrorShape;
key: CompanyKey;
}): Promise<{ uniqueRejections: number; ids: ReadonlySet<string> }> {
const setup = makeRepos();
let uniqueRejections = 0;
const originalPut = setup.canonStorage.put.bind(setup.canonStorage);
setup.canonStorage.put = async (value) => {
try {
return await originalPut(value);
} catch (err) {
if (isUniqueConstraintError(err)) {
uniqueRejections += 1;
throw synthesizeUniqueError(opts.errorShape, opts.key);
}
throw err;
}
};

const resolverA = new CompanyResolver({
canonicalCompanyRepo: setup.canonRepo,
canonicalCompanyAliasRepo: setup.aliasRepo,
activeResolverVersion: "1.0.0",
});
const resolverB = new CompanyResolver({
canonicalCompanyRepo: setup.canonRepo,
canonicalCompanyAliasRepo: setup.aliasRepo,
activeResolverVersion: "1.0.0",
});
const fanout = 50;
const results = await Promise.all(
Array.from({ length: fanout }, (_, i) => {
const r = i % 2 === 0 ? resolverA : resolverB;
const claim =
opts.key === "cik"
? obs({ cik: 5555, observation_id: i + 1 })
: obs({ crd_number: "CRD-RACE-1", observation_id: i + 1 });
return r.resolve(claim);
})
);
const rows = await setup.canonStorage.getAll();
expect(rows.length).toBe(1);
return { uniqueRejections, ids: new Set(results) };
}

describe("CompanyResolver multi-process race (storage-level UNIQUE constraint)", () => {
beforeEach(async () => {
// Fail loud if a future workglow regression silently drops UNIQUE
// enforcement — the rest of the suite assumes the storage layer is the
// backstop when the in-process AsyncMutex is bypassed.
const enforces = await storageEnforcesCompanyUniqueness();
expect(enforces).toBe(true);
// Sanity check: the resolver's helper still recognises the
// SQLite/InMemory shape the storage actually throws today.
const sample = new Error(
"UNIQUE constraint failed: canonical_company.resolver_version, canonical_company.cik"
);
expect(isUniqueConstraintError(sample)).toBe(true);
});

it("twin resolver instances racing the same CIK still collapse to one canonical row", async () => {
const setup = makeRepos();
// Count UNIQUE rejections at the storage layer so we can assert the
// constraint actually fired — proving the test exercises the
// storage backstop, not just an accidental id match.
let uniqueRejections = 0;
const originalPut = setup.canonStorage.put.bind(setup.canonStorage);
setup.canonStorage.put = async (value) => {
try {
return await originalPut(value);
} catch (err) {
if (
err !== null &&
typeof err === "object" &&
typeof (err as { message?: unknown }).message === "string" &&
((err as { message: string }).message).startsWith(
"UNIQUE constraint failed"
)
) {
uniqueRejections += 1;
}
throw err;
}
};
it("twin resolver instances racing the same CIK collapse to one row (SQLite/InMemory error shape)", async () => {
const { uniqueRejections, ids } = await runMultiProcessRace({
errorShape: "sqlite",
key: "cik",
});
expect(ids.size).toBe(1);
expect(uniqueRejections).toBeGreaterThanOrEqual(1);
});

const resolverA = new CompanyResolver({
canonicalCompanyRepo: setup.canonRepo,
canonicalCompanyAliasRepo: setup.aliasRepo,
activeResolverVersion: "1.0.0",
it("twin resolver instances racing the same CIK collapse to one row (Postgres error shape)", async () => {
const { uniqueRejections, ids } = await runMultiProcessRace({
errorShape: "pg",
key: "cik",
});
const resolverB = new CompanyResolver({
canonicalCompanyRepo: setup.canonRepo,
canonicalCompanyAliasRepo: setup.aliasRepo,
activeResolverVersion: "1.0.0",
expect(ids.size).toBe(1);
expect(uniqueRejections).toBeGreaterThanOrEqual(1);
});

it("twin resolver instances racing the same CRD collapse to one row (SQLite/InMemory error shape)", async () => {
const { uniqueRejections, ids } = await runMultiProcessRace({
errorShape: "sqlite",
key: "crd",
});
const fanout = 50;
const results = await Promise.all(
Array.from({ length: fanout }, (_, i) => {
const r = i % 2 === 0 ? resolverA : resolverB;
return r.resolve(obs({ cik: 5555, observation_id: i + 1 }));
})
);
expect(new Set(results).size).toBe(1);
const rows = await setup.canonStorage.getAll();
expect(rows.length).toBe(1);
// At least one storage-level UNIQUE rejection must have fired —
// otherwise the two instances accidentally never raced and the test
// doesn't actually exercise the multi-process backstop.
expect(ids.size).toBe(1);
expect(uniqueRejections).toBeGreaterThanOrEqual(1);
});

it("twin resolver instances racing the same CRD collapse to one row (Postgres error shape)", async () => {
const { uniqueRejections, ids } = await runMultiProcessRace({
errorShape: "pg",
key: "crd",
});
expect(ids.size).toBe(1);
expect(uniqueRejections).toBeGreaterThanOrEqual(1);
});

Expand Down
132 changes: 85 additions & 47 deletions src/resolver/PersonResolver.race.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,23 @@ import {
} from "../storage/canonical/CanonicalAliasSchemas";
import type { PersonObservation } from "../storage/observation/PersonObservationSchema";
import { PersonResolver } from "./PersonResolver";
import { isUniqueConstraintError } from "./isUniqueConstraintError";

type ErrorShape = "sqlite" | "pg";

function synthesizeUniqueError(shape: ErrorShape): Error {
if (shape === "sqlite") {
return new Error(
"UNIQUE constraint failed: canonical_person.resolver_version, canonical_person.cik"
);
}
return Object.assign(
new Error(
'duplicate key value violates unique constraint "canonical_person_uniq_resolver_version_cik"'
),
{ code: "23505" }
);
}

function makeRepos() {
const canonStorage = new InMemoryTabularStorage<
Expand Down Expand Up @@ -197,64 +214,85 @@ async function storageEnforcesPersonUniqueness(): Promise<boolean> {
// minted, and PersonResolver.resolve()'s UNIQUE-rejection retry path is
// what makes the loser converge on the winner's canonical id instead of
// failing.
// Drives the multi-process race scenario under a chosen storage-error
// shape. We monkey-patch `put` to translate the storage's natural UNIQUE
// rejection into the chosen error shape — proving the resolver's retry
// path recognises both the SQLite/InMemory error shape AND the raw
// Postgres `pg.DatabaseError` shape (`code: "23505"` + the
// "duplicate key value violates unique constraint" message) without
// requiring an actual Postgres connection in the unit suite. The winner
// row stays in place so the loser's re-query converges on it.
async function runMultiProcessRace(opts: {
errorShape: ErrorShape;
}): Promise<{ uniqueRejections: number; ids: ReadonlySet<string> }> {
const setup = makeRepos();
let uniqueRejections = 0;
const originalPut = setup.canonStorage.put.bind(setup.canonStorage);
setup.canonStorage.put = async (value) => {
try {
return await originalPut(value);
} catch (err) {
if (isUniqueConstraintError(err)) {
uniqueRejections += 1;
throw synthesizeUniqueError(opts.errorShape);
}
throw err;
}
};

const resolverA = new PersonResolver({
canonicalPersonRepo: setup.canonRepo,
canonicalPersonAliasRepo: setup.aliasRepo,
activeResolverVersion: "1.0.0",
});
const resolverB = new PersonResolver({
canonicalPersonRepo: setup.canonRepo,
canonicalPersonAliasRepo: setup.aliasRepo,
activeResolverVersion: "1.0.0",
});

const fanout = 50;
const results = await Promise.all(
Array.from({ length: fanout }, (_, i) => {
const r = i % 2 === 0 ? resolverA : resolverB;
return r.resolve(obs({ cik: 5555, observation_id: i + 1 }));
})
);
const rows = await setup.canonStorage.getAll();
expect(rows.length).toBe(1);
return { uniqueRejections, ids: new Set(results) };
}

describe("PersonResolver multi-process race (storage-level UNIQUE constraint)", () => {
beforeEach(async () => {
// Fail loud if a future workglow regression silently drops UNIQUE
// enforcement — the rest of the suite assumes the storage layer is the
// backstop when the in-process AsyncMutex is bypassed.
const enforces = await storageEnforcesPersonUniqueness();
expect(enforces).toBe(true);
// Also sanity-check that the SQLite/InMemory shape the storage layer
// actually throws is still recognised by the resolver's helper. If
// workglow ever changes the wording, this asserts loudly here
// instead of silently breaking the retry path in production.
const sample = new Error(
"UNIQUE constraint failed: canonical_person.resolver_version, canonical_person.cik"
);
expect(isUniqueConstraintError(sample)).toBe(true);
});

it("twin resolver instances racing the same CIK still collapse to one canonical row", async () => {
const setup = makeRepos();
// Count UNIQUE rejections at the storage layer so we can assert the
// constraint actually fired — proving the test exercises the
// storage backstop, not just an accidental id match.
let uniqueRejections = 0;
const originalPut = setup.canonStorage.put.bind(setup.canonStorage);
setup.canonStorage.put = async (value) => {
try {
return await originalPut(value);
} catch (err) {
if (
err !== null &&
typeof err === "object" &&
typeof (err as { message?: unknown }).message === "string" &&
((err as { message: string }).message).startsWith(
"UNIQUE constraint failed"
)
) {
uniqueRejections += 1;
}
throw err;
}
};

const resolverA = new PersonResolver({
canonicalPersonRepo: setup.canonRepo,
canonicalPersonAliasRepo: setup.aliasRepo,
activeResolverVersion: "1.0.0",
});
const resolverB = new PersonResolver({
canonicalPersonRepo: setup.canonRepo,
canonicalPersonAliasRepo: setup.aliasRepo,
activeResolverVersion: "1.0.0",
it("twin resolver instances racing the same CIK collapse to one row (SQLite/InMemory error shape)", async () => {
const { uniqueRejections, ids } = await runMultiProcessRace({
errorShape: "sqlite",
});
expect(ids.size).toBe(1);
expect(uniqueRejections).toBeGreaterThanOrEqual(1);
});

const fanout = 50;
const results = await Promise.all(
Array.from({ length: fanout }, (_, i) => {
const r = i % 2 === 0 ? resolverA : resolverB;
return r.resolve(obs({ cik: 5555, observation_id: i + 1 }));
})
);
expect(new Set(results).size).toBe(1);
const rows = await setup.canonStorage.getAll();
expect(rows.length).toBe(1);
// At least one storage-level UNIQUE rejection must have fired —
// otherwise the two instances accidentally never raced and the test
// doesn't actually exercise the multi-process backstop.
it("twin resolver instances racing the same CIK collapse to one row (Postgres error shape)", async () => {
const { uniqueRejections, ids } = await runMultiProcessRace({
errorShape: "pg",
});
expect(ids.size).toBe(1);
expect(uniqueRejections).toBeGreaterThanOrEqual(1);
});

Expand Down
Loading