Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/config/DefaultDI.ts
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,11 @@ export const DefaultDI = () => {
"canonical_sponsor_family",
CanonicalSponsorFamilySchema,
CanonicalSponsorFamilyPrimaryKeyNames,
[],
// (resolver_version, normalized_name) is the family natural key — must be
// enforced at the storage layer so two processes racing to mint the same
// family converge on one row. Without this, the family-tier identity
// tables silently forked under multi-process load.
[["resolver_version", "normalized_name"]]
)
);
Expand Down Expand Up @@ -837,6 +842,11 @@ export const DefaultDI = () => {
"canonical_underwriter_family",
CanonicalUnderwriterFamilySchema,
CanonicalUnderwriterFamilyPrimaryKeyNames,
[],
// (resolver_version, normalized_name) is the family natural key — must be
// enforced at the storage layer so two processes racing to mint the same
// family converge on one row. Without this, the family-tier identity
// tables silently forked under multi-process load.
[["resolver_version", "normalized_name"]]
)
);
Expand Down
12 changes: 12 additions & 0 deletions src/config/TestingDI.ts
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,12 @@ export function resetDependencyInjectionsForTesting() {
new InMemoryTabularStorage(
CanonicalSponsorFamilySchema,
CanonicalSponsorFamilyPrimaryKeyNames,
[],
undefined,
undefined,
undefined,
// (resolver_version, normalized_name) is the family natural key — see
// DefaultDI for the multi-process race rationale.
[["resolver_version", "normalized_name"]]
)
);
Expand Down Expand Up @@ -745,6 +751,12 @@ export function resetDependencyInjectionsForTesting() {
new InMemoryTabularStorage(
CanonicalUnderwriterFamilySchema,
CanonicalUnderwriterFamilyPrimaryKeyNames,
[],
undefined,
undefined,
undefined,
// (resolver_version, normalized_name) is the family natural key — see
// DefaultDI for the multi-process race rationale.
[["resolver_version", "normalized_name"]]
)
);
Expand Down
153 changes: 108 additions & 45 deletions src/resolver/CompanyResolver.race.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,28 @@ import {
} from "../storage/canonical/CanonicalAliasSchemas";
import type { CompanyObservation } from "../storage/observation/CompanyObservationSchema";
import { CompanyResolver } from "./CompanyResolver";
import { isUniqueConstraintError } from "./isUniqueConstraintError";

type ErrorShape = "sqlite" | "pg";
type CompanyKey = "cik" | "crd";

function synthesizeUniqueError(shape: ErrorShape, key: CompanyKey): Error {
if (shape === "sqlite") {
const cols =
key === "cik"
? "canonical_company.resolver_version, canonical_company.cik"
: "canonical_company.resolver_version, canonical_company.crd_number";
return new Error(`UNIQUE constraint failed: ${cols}`);
}
const constraintName =
key === "cik"
? "canonical_company_uniq_resolver_version_cik"
: "canonical_company_uniq_resolver_version_crd_number";
return Object.assign(
new Error(`duplicate key value violates unique constraint "${constraintName}"`),
{ code: "23505" }
);
}

function makeRepos() {
const canonStorage = new InMemoryTabularStorage<
Expand Down Expand Up @@ -180,63 +202,104 @@ async function storageEnforcesCompanyUniqueness(): Promise<boolean> {
// keeping twin canonical rows from being minted, and the resolver's
// UNIQUE-rejection retry path is what makes the loser converge on the
// winner's canonical id instead of failing.
// See PersonResolver.race.test.ts for the rationale. We additionally
// parameterise over the key kind because the canonical_company table has
// UNIQUE indexes on BOTH (resolver_version, cik) and
// (resolver_version, crd_number). Postgres includes the constraint name
// in the error message, so the test exercises both constraint-name forms.
async function runMultiProcessRace(opts: {
errorShape: ErrorShape;
key: CompanyKey;
}): Promise<{ uniqueRejections: number; ids: ReadonlySet<string> }> {
const setup = makeRepos();
let uniqueRejections = 0;
const originalPut = setup.canonStorage.put.bind(setup.canonStorage);
setup.canonStorage.put = async (value) => {
try {
return await originalPut(value);
} catch (err) {
if (isUniqueConstraintError(err)) {
uniqueRejections += 1;
throw synthesizeUniqueError(opts.errorShape, opts.key);
}
throw err;
}
};

const resolverA = new CompanyResolver({
canonicalCompanyRepo: setup.canonRepo,
canonicalCompanyAliasRepo: setup.aliasRepo,
activeResolverVersion: "1.0.0",
});
const resolverB = new CompanyResolver({
canonicalCompanyRepo: setup.canonRepo,
canonicalCompanyAliasRepo: setup.aliasRepo,
activeResolverVersion: "1.0.0",
});
const fanout = 50;
const results = await Promise.all(
Array.from({ length: fanout }, (_, i) => {
const r = i % 2 === 0 ? resolverA : resolverB;
const claim =
opts.key === "cik"
? obs({ cik: 5555, observation_id: i + 1 })
: obs({ crd_number: "CRD-RACE-1", observation_id: i + 1 });
return r.resolve(claim);
})
);
const rows = await setup.canonStorage.getAll();
expect(rows.length).toBe(1);
return { uniqueRejections, ids: new Set(results) };
}

describe("CompanyResolver multi-process race (storage-level UNIQUE constraint)", () => {
beforeEach(async () => {
// Fail loud if a future workglow regression silently drops UNIQUE
// enforcement — the rest of the suite assumes the storage layer is the
// backstop when the in-process AsyncMutex is bypassed.
const enforces = await storageEnforcesCompanyUniqueness();
expect(enforces).toBe(true);
// Sanity check: the resolver's helper still recognises the
// SQLite/InMemory shape the storage actually throws today.
const sample = new Error(
"UNIQUE constraint failed: canonical_company.resolver_version, canonical_company.cik"
);
expect(isUniqueConstraintError(sample)).toBe(true);
});

it("twin resolver instances racing the same CIK still collapse to one canonical row", async () => {
const setup = makeRepos();
// Count UNIQUE rejections at the storage layer so we can assert the
// constraint actually fired — proving the test exercises the
// storage backstop, not just an accidental id match.
let uniqueRejections = 0;
const originalPut = setup.canonStorage.put.bind(setup.canonStorage);
setup.canonStorage.put = async (value) => {
try {
return await originalPut(value);
} catch (err) {
if (
err !== null &&
typeof err === "object" &&
typeof (err as { message?: unknown }).message === "string" &&
((err as { message: string }).message).startsWith(
"UNIQUE constraint failed"
)
) {
uniqueRejections += 1;
}
throw err;
}
};
it("twin resolver instances racing the same CIK collapse to one row (SQLite/InMemory error shape)", async () => {
const { uniqueRejections, ids } = await runMultiProcessRace({
errorShape: "sqlite",
key: "cik",
});
expect(ids.size).toBe(1);
expect(uniqueRejections).toBeGreaterThanOrEqual(1);
});

const resolverA = new CompanyResolver({
canonicalCompanyRepo: setup.canonRepo,
canonicalCompanyAliasRepo: setup.aliasRepo,
activeResolverVersion: "1.0.0",
it("twin resolver instances racing the same CIK collapse to one row (Postgres error shape)", async () => {
const { uniqueRejections, ids } = await runMultiProcessRace({
errorShape: "pg",
key: "cik",
});
const resolverB = new CompanyResolver({
canonicalCompanyRepo: setup.canonRepo,
canonicalCompanyAliasRepo: setup.aliasRepo,
activeResolverVersion: "1.0.0",
expect(ids.size).toBe(1);
expect(uniqueRejections).toBeGreaterThanOrEqual(1);
});

it("twin resolver instances racing the same CRD collapse to one row (SQLite/InMemory error shape)", async () => {
const { uniqueRejections, ids } = await runMultiProcessRace({
errorShape: "sqlite",
key: "crd",
});
const fanout = 50;
const results = await Promise.all(
Array.from({ length: fanout }, (_, i) => {
const r = i % 2 === 0 ? resolverA : resolverB;
return r.resolve(obs({ cik: 5555, observation_id: i + 1 }));
})
);
expect(new Set(results).size).toBe(1);
const rows = await setup.canonStorage.getAll();
expect(rows.length).toBe(1);
// At least one storage-level UNIQUE rejection must have fired —
// otherwise the two instances accidentally never raced and the test
// doesn't actually exercise the multi-process backstop.
expect(ids.size).toBe(1);
expect(uniqueRejections).toBeGreaterThanOrEqual(1);
});

it("twin resolver instances racing the same CRD collapse to one row (Postgres error shape)", async () => {
const { uniqueRejections, ids } = await runMultiProcessRace({
errorShape: "pg",
key: "crd",
});
expect(ids.size).toBe(1);
expect(uniqueRejections).toBeGreaterThanOrEqual(1);
});

Expand Down
Loading