Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/config/DefaultDI.ts
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,11 @@ export const DefaultDI = () => {
"canonical_sponsor_family",
CanonicalSponsorFamilySchema,
CanonicalSponsorFamilyPrimaryKeyNames,
[],
// (resolver_version, normalized_name) is the family natural key — must be
// enforced at the storage layer so two processes racing to mint the same
// family converge on one row. Without this, the family-tier identity
// tables silently forked under multi-process load.
[["resolver_version", "normalized_name"]]
)
);
Expand Down Expand Up @@ -837,6 +842,11 @@ export const DefaultDI = () => {
"canonical_underwriter_family",
CanonicalUnderwriterFamilySchema,
CanonicalUnderwriterFamilyPrimaryKeyNames,
[],
// (resolver_version, normalized_name) is the family natural key — must be
// enforced at the storage layer so two processes racing to mint the same
// family converge on one row. Without this, the family-tier identity
// tables silently forked under multi-process load.
[["resolver_version", "normalized_name"]]
)
);
Expand Down
12 changes: 12 additions & 0 deletions src/config/TestingDI.ts
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,12 @@ export function resetDependencyInjectionsForTesting() {
new InMemoryTabularStorage(
CanonicalSponsorFamilySchema,
CanonicalSponsorFamilyPrimaryKeyNames,
[],
undefined,
undefined,
undefined,
// (resolver_version, normalized_name) is the family natural key — see
// DefaultDI for the multi-process race rationale.
[["resolver_version", "normalized_name"]]
)
);
Expand Down Expand Up @@ -745,6 +751,12 @@ export function resetDependencyInjectionsForTesting() {
new InMemoryTabularStorage(
CanonicalUnderwriterFamilySchema,
CanonicalUnderwriterFamilyPrimaryKeyNames,
[],
undefined,
undefined,
undefined,
// (resolver_version, normalized_name) is the family natural key — see
// DefaultDI for the multi-process race rationale.
[["resolver_version", "normalized_name"]]
)
);
Expand Down
265 changes: 265 additions & 0 deletions src/resolver/FamilyResolver.race.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
/**
* @license
* Copyright 2026 Steven Roussey <sroussey@gmail.com>
* SPDX-License-Identifier: Apache-2.0
*/

import { describe, expect, it, beforeEach } from "bun:test";
import { InMemoryTabularStorage } from "workglow";
import { CanonicalSponsorFamilyRepo } from "../storage/canonical/CanonicalSponsorFamilyRepo";
import {
CanonicalSponsorFamilySchema,
CanonicalSponsorFamilyPrimaryKeyNames,
type CanonicalSponsorFamily,
} from "../storage/canonical/CanonicalSponsorFamilySchema";
import { CanonicalSponsorFamilyAliasRepo } from "../storage/canonical/CanonicalSponsorFamilyAliasRepo";
import {
CanonicalSponsorFamilyAliasSchema,
CanonicalSponsorFamilyAliasPrimaryKeyNames,
type CanonicalSponsorFamilyAlias,
CanonicalUnderwriterFamilyAliasSchema,
CanonicalUnderwriterFamilyAliasPrimaryKeyNames,
type CanonicalUnderwriterFamilyAlias,
} from "../storage/canonical/CanonicalAliasSchemas";
import { CanonicalUnderwriterFamilyRepo } from "../storage/canonical/CanonicalUnderwriterFamilyRepo";
import {
CanonicalUnderwriterFamilySchema,
CanonicalUnderwriterFamilyPrimaryKeyNames,
type CanonicalUnderwriterFamily,
} from "../storage/canonical/CanonicalUnderwriterFamilySchema";
import { CanonicalUnderwriterFamilyAliasRepo } from "../storage/canonical/CanonicalUnderwriterFamilyAliasRepo";
import { SponsorFamilyResolver } from "./SponsorFamilyResolver";
import { UnderwriterFamilyResolver } from "./UnderwriterFamilyResolver";

interface FamilyTestKit<Resolver> {
readonly kind: "sponsor" | "underwriter";
readonly canonStorage: InMemoryTabularStorage<any, any, any>;
readonly makeResolver: () => Resolver;
/** Resolve a single common name through the resolver under test. */
readonly resolve: (resolver: Resolver, name: string) => Promise<string>;
}

function makeSponsorKit(): FamilyTestKit<SponsorFamilyResolver> {
const canonStorage = new InMemoryTabularStorage<
typeof CanonicalSponsorFamilySchema,
typeof CanonicalSponsorFamilyPrimaryKeyNames,
CanonicalSponsorFamily
>(
CanonicalSponsorFamilySchema,
CanonicalSponsorFamilyPrimaryKeyNames,
[],
undefined,
undefined,
undefined,
// Mirrors DefaultDI / TestingDI post-fix wiring: (resolver_version,
// normalized_name) is the family natural key.
[["resolver_version", "normalized_name"]]
);
const aliasStorage = new InMemoryTabularStorage<
typeof CanonicalSponsorFamilyAliasSchema,
typeof CanonicalSponsorFamilyAliasPrimaryKeyNames,
CanonicalSponsorFamilyAlias
>(CanonicalSponsorFamilyAliasSchema, CanonicalSponsorFamilyAliasPrimaryKeyNames, []);
const canonRepo = new CanonicalSponsorFamilyRepo(canonStorage);
const aliasRepo = new CanonicalSponsorFamilyAliasRepo({ repository: aliasStorage });
return {
kind: "sponsor",
canonStorage,
makeResolver: () =>
new SponsorFamilyResolver({
canonicalSponsorFamilyRepo: canonRepo,
canonicalSponsorFamilyAliasRepo: aliasRepo,
activeResolverVersion: "1.0.0",
}),
resolve: (r, name) => r.resolve(name),
};
}

function makeUnderwriterKit(): FamilyTestKit<UnderwriterFamilyResolver> {
const canonStorage = new InMemoryTabularStorage<
typeof CanonicalUnderwriterFamilySchema,
typeof CanonicalUnderwriterFamilyPrimaryKeyNames,
CanonicalUnderwriterFamily
>(
CanonicalUnderwriterFamilySchema,
CanonicalUnderwriterFamilyPrimaryKeyNames,
[],
undefined,
undefined,
undefined,
[["resolver_version", "normalized_name"]]
);
const aliasStorage = new InMemoryTabularStorage<
typeof CanonicalUnderwriterFamilyAliasSchema,
typeof CanonicalUnderwriterFamilyAliasPrimaryKeyNames,
CanonicalUnderwriterFamilyAlias
>(CanonicalUnderwriterFamilyAliasSchema, CanonicalUnderwriterFamilyAliasPrimaryKeyNames, []);
const canonRepo = new CanonicalUnderwriterFamilyRepo(canonStorage);
const aliasRepo = new CanonicalUnderwriterFamilyAliasRepo({ repository: aliasStorage });
return {
kind: "underwriter",
canonStorage,
makeResolver: () =>
new UnderwriterFamilyResolver({
canonicalUnderwriterFamilyRepo: canonRepo,
canonicalUnderwriterFamilyAliasRepo: aliasRepo,
activeResolverVersion: "1.0.0",
}),
resolve: (r, name) => r.resolve(name),
};
}

/**
* Probes whether the underlying storage actually enforces the family natural
* key. This is the unit test that pins the DefaultDI / TestingDI fix — if a
* future refactor drops `uniqueIndexes` for family tables, this assertion
* fires before the multi-process race tests below.
*/
async function storageEnforcesFamilyUniqueness(
canonStorage: InMemoryTabularStorage<any, any, any>,
idField: string
): Promise<boolean> {
const a: Record<string, unknown> = {
[idField]: "11111111-1111-1111-1111-111111111111",
resolver_version: "1.0.0",
display_name: "Goldman Sachs",
normalized_name: "GOLDMAN SACHS",
created_at: "2026-05-22T00:00:00.000Z",
};
await canonStorage.put(a);
try {
await canonStorage.put({
...a,
[idField]: "22222222-2222-2222-2222-222222222222",
});
return false;
} catch {
return true;
}
}

// Synthesised error shapes per backend — both backends surface UNIQUE
// rejections through `@workglow/storage` with the same `StorageError`
// message prefix that `isUniqueConstraintError` matches.
const ERROR_SHAPES = {
sqlite: () =>
new Error(
"UNIQUE constraint failed: canonical_*_family.resolver_version, canonical_*_family.normalized_name"
),
pg: () =>
new Error(
"UNIQUE constraint failed (postgres unique_violation 23505): duplicate key on (resolver_version, normalized_name)"
),
} as const;

type ErrorShape = keyof typeof ERROR_SHAPES;

function describeFamilyRaces<R>(
label: string,
buildKit: () => FamilyTestKit<R>,
idField: string
): void {
describe(`${label} concurrent resolution`, () => {
let kit: FamilyTestKit<R>;
let resolver: R;

beforeEach(async () => {
kit = buildKit();
// Fail loud if a future workglow regression silently drops UNIQUE
// enforcement on the family table — the multi-process race tests
// assume the storage layer is the backstop when twin instance
// mutexes don't collapse contention.
const enforces = await storageEnforcesFamilyUniqueness(kit.canonStorage, idField);
expect(enforces).toBe(true);
// Rebuild for the actual race tests (the probe row would otherwise
// pollute getAll()).
kit = buildKit();
resolver = kit.makeResolver();
});

it("two parallel resolves on the same family name return one canonical id and create one row", async () => {
const [a, b] = await Promise.all([
kit.resolve(resolver, "Goldman Sachs"),
kit.resolve(resolver, "Goldman Sachs"),
]);
expect(a).toBe(b);
const rows = await kit.canonStorage.getAll();
expect(rows.length).toBe(1);
});

it("many parallel resolves on the same family name still produce one canonical row", async () => {
const fanout = 25;
const results = await Promise.all(
Array.from({ length: fanout }, () => kit.resolve(resolver, "Pershing Square Sponsor"))
);
expect(new Set(results).size).toBe(1);
const rows = await kit.canonStorage.getAll();
expect(rows.length).toBe(1);
});

function runMultiProcessRace({ errorShape }: { errorShape: ErrorShape }): void {
it(`twin resolver instances racing the same family name converge under ${errorShape} UNIQUE rejection`, async () => {
const localKit = buildKit();
// Count UNIQUE rejections at the storage layer so we assert the
// backstop actually fires — proves the test exercises the storage
// UNIQUE retry path, not just an accidental id match.
let uniqueRejections = 0;
const originalPut = localKit.canonStorage.put.bind(localKit.canonStorage);
localKit.canonStorage.put = async (value: any) => {
// Let the real put run; if it succeeds, we mimic a multi-process
// race only when the underlying storage actually rejected the
// call (i.e. another writer already inserted a row with the same
// natural key). The InMemory storage UNIQUE constraint surfaces
// the rejection naturally — we just re-throw it under the
// requested backend message shape so the consumer (which
// matches on the message prefix) treats both alike.
try {
return await originalPut(value);
} catch (err) {
const msg =
err !== null &&
typeof err === "object" &&
typeof (err as { message?: unknown }).message === "string"
? (err as { message: string }).message
: "";
if (msg.startsWith("UNIQUE constraint failed")) {
uniqueRejections += 1;
throw ERROR_SHAPES[errorShape]();
}
throw err;
}
};

const resolverA = localKit.makeResolver();
const resolverB = localKit.makeResolver();

const fanout = 20;
const results = await Promise.all(
Array.from({ length: fanout }, (_, i) => {
const r = i % 2 === 0 ? resolverA : resolverB;
return localKit.resolve(r, "Apollo Sponsor");
})
);

const ids = new Set(results);
expect(ids.size).toBe(1);
const rows = await localKit.canonStorage.getAll();
expect(rows.length).toBe(1);
// At least one storage-level UNIQUE rejection must have fired —
// otherwise the two instances accidentally never raced and the
// test doesn't exercise the multi-process backstop.
expect(uniqueRejections).toBeGreaterThanOrEqual(1);
});
}

runMultiProcessRace({ errorShape: "sqlite" });
runMultiProcessRace({ errorShape: "pg" });
});
}

describeFamilyRaces("SponsorFamilyResolver", makeSponsorKit, "canonical_sponsor_family_id");
describeFamilyRaces(
"UnderwriterFamilyResolver",
makeUnderwriterKit,
"canonical_underwriter_family_id"
);
39 changes: 30 additions & 9 deletions src/resolver/FamilyResolver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import { normalizeCompanyName } from "../storage/company/CompanyNormalization";
import { AsyncMutex } from "../util/AsyncMutex";
import { isUniqueConstraintError } from "./isUniqueConstraintError";

/**
* The single source of truth for a family natural key (sponsor or underwriter).
Expand Down Expand Up @@ -50,12 +51,17 @@ interface FamilyResolverOptions {
* returns the pre-alias id. Mirrors the {@link PersonResolver} /
* {@link CompanyResolver} fix.
*
* Multi-process callers (workers, separate `sec` invocations) still need a
* backend-level UNIQUE constraint to be race-free — single-process mutexes
* are not visible to other processes.
* The mutex map is instance-scoped (well, static-instance-scoped here): it
* collapses intra-process contention on a shared key. Multi-process /
* multi-instance contention is collapsed at the storage layer via the UNIQUE
* index on (resolver_version, normalized_name) wired in DefaultDI /
* TestingDI. When a concurrent writer in another process wins the UNIQUE
* race, the loser's `createFamily` rejects with a UNIQUE constraint error;
* the catch below re-queries `findIdByNormalizedName` and converges on the
* winner's id rather than failing the resolve.
*/
export class FamilyResolver {
private static readonly _keyMutexes = new Map<string, { mutex: AsyncMutex; refs: number }>();
private readonly _keyMutexes = new Map<string, { mutex: AsyncMutex; refs: number }>();

constructor(private opts: FamilyResolverOptions) {}

Expand All @@ -66,24 +72,39 @@ export class FamilyResolver {
}
const key = `${this.opts.activeResolverVersion}|${this.opts.kind}-family|${normalized}`;

let entry = FamilyResolver._keyMutexes.get(key);
let entry = this._keyMutexes.get(key);
if (entry === undefined) {
entry = { mutex: new AsyncMutex(), refs: 0 };
FamilyResolver._keyMutexes.set(key, entry);
this._keyMutexes.set(key, entry);
}
entry.refs += 1;

let resolvedId: string;
try {
resolvedId = await entry.mutex.lock(async () => {
const existing = await this.opts.findIdByNormalizedName(normalized);
const candidateId = existing ?? (await this.opts.createFamily(commonName, normalized));
let candidateId: string;
if (existing !== undefined) {
candidateId = existing;
} else {
try {
candidateId = await this.opts.createFamily(commonName, normalized);
} catch (err) {
// A concurrent writer in a different process / resolver instance
// won the UNIQUE constraint race. Re-query so we converge on the
// winner's canonical family id instead of failing.
if (!isUniqueConstraintError(err)) throw err;
const winner = await this.opts.findIdByNormalizedName(normalized);
if (winner === undefined) throw err;
candidateId = winner;
}
}
return await this.opts.resolveAlias(candidateId);
});
} finally {
entry.refs -= 1;
if (entry.refs === 0 && FamilyResolver._keyMutexes.get(key) === entry) {
FamilyResolver._keyMutexes.delete(key);
if (entry.refs === 0 && this._keyMutexes.get(key) === entry) {
this._keyMutexes.delete(key);
}
}

Expand Down
Loading