diff --git a/src/commands/spac.ts b/src/commands/spac.ts index b1fea2e..39420ce 100644 --- a/src/commands/spac.ts +++ b/src/commands/spac.ts @@ -116,11 +116,19 @@ export function registerSpacCommands(program: Command): void { spacCmd .command("backfill-redemptions") .description("Re-process known-SPAC trigger-item 8-Ks to extract realized redemptions") - .action(async () => { - const out = (await withCli(new BackfillRedemptionsTask()).run({})) as { + .option("--force", "Re-process filings even when a successful run already exists", false) + .option("--dry-run", "Report selected filing count without reprocessing", false) + .action(async (opts: { force?: boolean; dryRun?: boolean }) => { + const out = (await withCli(new BackfillRedemptionsTask()).run({ + force: opts.force === true, + dryRun: opts.dryRun === true, + } as never)) as { selected: number; processed: number; + skipped: number; }; - console.log(`selected ${out.selected} filing(s); processed ${out.processed}`); + console.log( + `selected ${out.selected} filing(s); processed ${out.processed}; skipped ${out.skipped}` + ); }); } diff --git a/src/sec/forms/miscellaneous-filings/redemption8k.e2e.test.ts b/src/sec/forms/miscellaneous-filings/redemption8k.e2e.test.ts index c23d73c..409ab2e 100644 --- a/src/sec/forms/miscellaneous-filings/redemption8k.e2e.test.ts +++ b/src/sec/forms/miscellaneous-filings/redemption8k.e2e.test.ts @@ -134,8 +134,9 @@ describe("processForm8K — redemption e2e", () => { expect(spacRow?.total_redemption_amount).toBe(8200000); }); - it("known SPAC with no deal yields no redemption rollup", async () => { - // Seed SPAC row but no deal milestone + it("known SPAC with no deal persists the orphan extraction and rolls up once the deal lands", async () => { + // Seed SPAC row but no deal milestone — the redemption is recorded + // anyway; deriveDeals correlates it as soon as a deal exists. await new SpacReportWriter().recordRegistration({ cik: 21, accession_number: "21-reg", @@ -169,11 +170,27 @@ describe("processForm8K — redemption e2e", () => { model: fakeS1Model(), }); - const spacRow = await new SpacRepo().getSpac(21); - expect(spacRow).toBeDefined(); - expect(spacRow?.total_redemption_amount ?? null).toBeNull(); + // No deal yet — nothing to roll up against. + expect((await new SpacRepo().getDeals(21)).length).toBe(0); + const orphanSpac = await new SpacRepo().getSpac(21); + expect(orphanSpac?.total_redemption_amount ?? null).toBeNull(); + + // Later, the definitive-agreement 8-K lands; the orphan extraction + // is correlated automatically. + await new SpacReportWriter().recordDealMilestones({ + cik: 21, + accession_number: "21-da", + filing_date: "2026-01-10", + form: "8-K", + primary_document: null, + events: [{ event_type: "definitive_agreement", event_date: "2026-01-10" }], + }); const deals = await new SpacRepo().getDeals(21); - expect(deals).toHaveLength(0); + expect(deals).toHaveLength(1); + expect(deals[0].redemption_amount).toBe(8200000); + + const spacRow = await new SpacRepo().getSpac(21); + expect(spacRow?.total_redemption_amount).toBe(8200000); }); }); diff --git a/src/sec/forms/miscellaneous-filings/redemption8k.test.ts b/src/sec/forms/miscellaneous-filings/redemption8k.test.ts index e6f28a4..12fb584 100644 --- a/src/sec/forms/miscellaneous-filings/redemption8k.test.ts +++ b/src/sec/forms/miscellaneous-filings/redemption8k.test.ts @@ -4,11 +4,14 @@ * SPDX-License-Identifier: Apache-2.0 */ import { afterEach, beforeEach, describe, expect, it } from "bun:test"; +import { globalServiceRegistry } from "workglow"; import { resetDependencyInjectionsForTesting } from "../../../config/TestingDI"; import { setupAllDatabases } from "../../../config/setupAllDatabases"; import { SpacRepo } from "../../../storage/spac/SpacRepo"; import { SpacReportWriter } from "../../../storage/spac/SpacReportWriter"; import { SpacRedemptionExtractionRepo } from "../../../storage/spac/SpacRedemptionExtractionRepo"; +import { ExtractorRunRepo } from "../../../storage/versioning/ExtractorRunRepo"; +import { EXTRACTOR_RUN_REPOSITORY_TOKEN } from "../../../storage/versioning/ExtractorRunSchema"; import { fakeS1Model, registerFakeStructuredProvider, @@ -149,7 +152,7 @@ describe("processRedemption8K", () => { ).toBeUndefined(); }); - it("does not extract when the SPAC has no deals", async () => { + it("persists the extraction when the SPAC has no deals yet (orphan)", async () => { await new SpacReportWriter().recordRegistration({ cik: 44, accession_number: "44-reg", @@ -161,11 +164,11 @@ describe("processRedemption8K", () => { }); const registration = registerFakeStructuredProvider([ { - redemption_shares: 1, - redemption_amount: 1, - price_per_share: 10, + redemption_shares: 1234567, + redemption_amount: 12400000, + price_per_share: 10.05, confidence: 0.95, - source_span: "elected to redeem", + source_span: "1,234,567 shares elected to redeem for $12,400,000", }, ]); cleanup = registration.unregister; @@ -180,8 +183,308 @@ describe("processRedemption8K", () => { model: fakeS1Model(), }); - expect( - await new SpacRedemptionExtractionRepo().getByAccession("0000000000-26-000012") - ).toBeUndefined(); + const ext = await new SpacRedemptionExtractionRepo().getByAccession("0000000000-26-000012"); + expect(ext?.redemption_amount).toBe(12400000); + expect(ext?.redemption_shares).toBe(1234567); + + // No deal exists yet, so there's nothing to roll up onto. + const deals = await new SpacRepo().getDeals(44); + expect(deals).toHaveLength(0); + }); + + it("correlates an orphan redemption onto a deal added later", async () => { + await new SpacReportWriter().recordRegistration({ + cik: 45, + accession_number: "45-reg", + filing_date: "2025-12-01", + form: "S-1", + primary_document: "s1.htm", + spac_name: "Late Deal SPAC Inc.", + spac_sic: 6770, + }); + const registration = registerFakeStructuredProvider([ + { + redemption_shares: 1234567, + redemption_amount: 12400000, + price_per_share: 10.05, + confidence: 0.95, + source_span: "1,234,567 shares elected to redeem for $12,400,000", + }, + ]); + cleanup = registration.unregister; + + // Vote 8-K arrives before any definitive-agreement 8-K. + await processRedemption8K({ + cik: 45, + accession_number: "0000000000-26-000013", + filing_date: "2026-03-20", + form: "8-K", + itemCodes: ["5.07"], + fullSubmissionText: FULL_TXT, + model: fakeS1Model(), + }); + + // Later, the definitive-agreement 8-K lands (dated earlier than the vote). + await new SpacReportWriter().recordDealMilestones({ + cik: 45, + accession_number: "45-da", + filing_date: "2026-01-10", + form: "8-K", + primary_document: null, + events: [{ event_type: "definitive_agreement", event_date: "2026-01-10" }], + }); + + const deals = await new SpacRepo().getDeals(45); + expect(deals).toHaveLength(1); + expect(deals[0].redemption_amount).toBe(12400000); + expect(deals[0].redemption_shares).toBe(1234567); + }); + + it("correlates an orphan redemption onto a completed-only deal added later", async () => { + await new SpacReportWriter().recordRegistration({ + cik: 46, + accession_number: "46-reg", + filing_date: "2025-12-01", + form: "S-1", + primary_document: "s1.htm", + spac_name: "Completed-Only SPAC Inc.", + spac_sic: 6770, + }); + const registration = registerFakeStructuredProvider([ + { + redemption_shares: 1234567, + redemption_amount: 12400000, + price_per_share: 10.05, + confidence: 0.95, + source_span: "1,234,567 shares elected to redeem for $12,400,000", + }, + ]); + cleanup = registration.unregister; + + await processRedemption8K({ + cik: 46, + accession_number: "0000000000-26-000014", + filing_date: "2026-03-20", + form: "8-K", + itemCodes: ["2.01"], + fullSubmissionText: FULL_TXT, + model: fakeS1Model(), + }); + + // The completion (2.01) 8-K is later recorded as a milestone; no 1.01. + await new SpacReportWriter().recordDealMilestones({ + cik: 46, + accession_number: "46-comp", + filing_date: "2026-03-25", + form: "8-K", + primary_document: null, + events: [{ event_type: "completed", event_date: "2026-03-25" }], + }); + + const deals = await new SpacRepo().getDeals(46); + expect(deals).toHaveLength(1); + expect(deals[0].redemption_amount).toBe(12400000); + expect(deals[0].redemption_shares).toBe(1234567); + }); + + it("is idempotent under replay — reprocessing before the deal lands does not double up", async () => { + await new SpacReportWriter().recordRegistration({ + cik: 47, + accession_number: "47-reg", + filing_date: "2025-12-01", + form: "S-1", + primary_document: "s1.htm", + spac_name: "Replay SPAC Inc.", + spac_sic: 6770, + }); + + const args = { + cik: 47, + accession_number: "0000000000-26-000015", + filing_date: "2026-03-20", + form: "8-K", + itemCodes: ["5.07"], + fullSubmissionText: FULL_TXT, + model: fakeS1Model(), + } as const; + + const reg1 = registerFakeStructuredProvider([ + { + redemption_shares: 1234567, + redemption_amount: 12400000, + price_per_share: 10.05, + confidence: 0.95, + source_span: "1,234,567 shares elected to redeem for $12,400,000", + }, + ]); + await processRedemption8K(args); + reg1.unregister(); + + const reg2 = registerFakeStructuredProvider([ + { + redemption_shares: 1234567, + redemption_amount: 12400000, + price_per_share: 10.05, + confidence: 0.95, + source_span: "1,234,567 shares elected to redeem for $12,400,000", + }, + ]); + cleanup = reg2.unregister; + await processRedemption8K(args); + + // Now the definitive-agreement 8-K lands. + await new SpacReportWriter().recordDealMilestones({ + cik: 47, + accession_number: "47-da", + filing_date: "2026-01-10", + form: "8-K", + primary_document: null, + events: [{ event_type: "definitive_agreement", event_date: "2026-01-10" }], + }); + + const deals = await new SpacRepo().getDeals(47); + expect(deals).toHaveLength(1); + expect(deals[0].redemption_amount).toBe(12400000); + expect(deals[0].redemption_shares).toBe(1234567); + }); + + describe("extractor_runs recording", () => { + it("records a successful run after a clean extraction", async () => { + await seedSpacWithOpenDeal(50); + const registration = registerFakeStructuredProvider([ + { + redemption_shares: 1234567, + redemption_amount: 12400000, + price_per_share: 10.05, + confidence: 0.95, + source_span: "1,234,567 shares elected to redeem for $12,400,000", + }, + ]); + cleanup = registration.unregister; + + await processRedemption8K({ + cik: 50, + accession_number: "0000000000-26-000050", + filing_date: "2026-03-20", + form: "8-K", + itemCodes: ["5.07"], + fullSubmissionText: FULL_TXT, + model: fakeS1Model(), + }); + + const runRepo = new ExtractorRunRepo( + globalServiceRegistry.get(EXTRACTOR_RUN_REPOSITORY_TOKEN) + ); + const run = await runRepo.findRun(50, "0000000000-26-000050", "redemption", "1.0.0"); + expect(run?.success).toBe(true); + expect(run?.error).toBeNull(); + expect(run?.slot_at_run).toBe("current"); + }); + + it("records a successful run when the SPAC has no deals (orphan)", async () => { + await new SpacReportWriter().recordRegistration({ + cik: 51, + accession_number: "51-reg", + filing_date: "2025-12-01", + form: "S-1", + primary_document: "s1.htm", + spac_name: "Orphan SPAC Inc.", + spac_sic: 6770, + }); + const registration = registerFakeStructuredProvider([ + { + redemption_shares: 1234567, + redemption_amount: 12400000, + price_per_share: 10.05, + confidence: 0.95, + source_span: "1,234,567 shares elected to redeem for $12,400,000", + }, + ]); + cleanup = registration.unregister; + + await processRedemption8K({ + cik: 51, + accession_number: "0000000000-26-000051", + filing_date: "2026-03-20", + form: "8-K", + itemCodes: ["5.07"], + fullSubmissionText: FULL_TXT, + model: fakeS1Model(), + }); + + const runRepo = new ExtractorRunRepo( + globalServiceRegistry.get(EXTRACTOR_RUN_REPOSITORY_TOKEN) + ); + const run = await runRepo.findRun(51, "0000000000-26-000051", "redemption", "1.0.0"); + expect(run?.success).toBe(true); + expect(run?.error).toBeNull(); + }); + + it("does NOT record a run when the trigger-item gate skips the filing", async () => { + await seedSpacWithOpenDeal(53); + + await processRedemption8K({ + cik: 53, + accession_number: "0000000000-26-000053", + filing_date: "2026-03-20", + form: "8-K", + itemCodes: ["2.02"], + fullSubmissionText: FULL_TXT, + model: fakeS1Model(), + }); + + const runRepo = new ExtractorRunRepo( + globalServiceRegistry.get(EXTRACTOR_RUN_REPOSITORY_TOKEN) + ); + const run = await runRepo.findRun(53, "0000000000-26-000053", "redemption", "1.0.0"); + expect(run).toBeUndefined(); + }); + + it("does NOT record a run when the SPAC gate skips the filing", async () => { + // No spac row seeded for cik 54. + await processRedemption8K({ + cik: 54, + accession_number: "0000000000-26-000054", + filing_date: "2026-03-20", + form: "8-K", + itemCodes: ["5.07"], + fullSubmissionText: FULL_TXT, + model: fakeS1Model(), + }); + + const runRepo = new ExtractorRunRepo( + globalServiceRegistry.get(EXTRACTOR_RUN_REPOSITORY_TOKEN) + ); + const run = await runRepo.findRun(54, "0000000000-26-000054", "redemption", "1.0.0"); + expect(run).toBeUndefined(); + }); + + it("records a failed run + dead-letter when the configured model is not registered", async () => { + await seedSpacWithOpenDeal(55); + // Deliberately do NOT register a fake model and do NOT pass args.model + // — getRedemptionModel() will throw "not registered" against the empty + // test model repository. + await processRedemption8K({ + cik: 55, + accession_number: "0000000000-26-000055", + filing_date: "2026-03-20", + form: "8-K", + itemCodes: ["5.07"], + fullSubmissionText: FULL_TXT, + // model omitted on purpose + }); + + const runRepo = new ExtractorRunRepo( + globalServiceRegistry.get(EXTRACTOR_RUN_REPOSITORY_TOKEN) + ); + const run = await runRepo.findRun(55, "0000000000-26-000055", "redemption", "1.0.0"); + expect(run?.success).toBe(false); + expect(run?.error).toContain("MODEL_RESOLUTION_ERROR"); + + // No extraction row was persisted. + expect( + await new SpacRedemptionExtractionRepo().getByAccession("0000000000-26-000055") + ).toBeUndefined(); + }); }); }); diff --git a/src/sec/forms/miscellaneous-filings/redemption8k.ts b/src/sec/forms/miscellaneous-filings/redemption8k.ts index 701b14d..6b7361e 100644 --- a/src/sec/forms/miscellaneous-filings/redemption8k.ts +++ b/src/sec/forms/miscellaneous-filings/redemption8k.ts @@ -19,6 +19,8 @@ import { import { VersionRegistry } from "../../../storage/versioning/VersionRegistry"; import { getActiveSlot } from "../../../storage/versioning/getActiveSlot"; import { COMPONENT_VERSION_REPOSITORY_TOKEN } from "../../../storage/versioning/ComponentVersionSchema"; +import { ExtractorRunRepo } from "../../../storage/versioning/ExtractorRunRepo"; +import { EXTRACTOR_RUN_REPOSITORY_TOKEN } from "../../../storage/versioning/ExtractorRunSchema"; import { ExtractionDeadLetterRepo } from "../../../storage/dead-letter/ExtractionDeadLetterRepo"; import { SpacRepo } from "../../../storage/spac/SpacRepo"; import { SpacReportWriter } from "../../../storage/spac/SpacReportWriter"; @@ -51,9 +53,11 @@ function renderBody(html: string, title: string): string { /** * AI-extract realized redemptions from a known SPAC's vote-results / closing - * 8-K (primary document + EX-99.x exhibits). Gated on a trigger item and an - * existing deal to attach to. Persists a redemption-extraction row and - * recomputes deals so the redemption is correlated onto the matching deal. + * 8-K (primary document + EX-99.x exhibits). Gated on a trigger item and a + * known SPAC. The extraction is persisted regardless of whether the SPAC + * already has a `spac_deal` row: `deriveDeals` reads the full extraction set + * on every recompute, so a deal minted by a later 1.01 8-K automatically + * correlates an orphan redemption recorded here. */ export async function processRedemption8K(args: ProcessRedemption8KArgs): Promise { const { cik, accession_number, filing_date, form, itemCodes, fullSubmissionText } = args; @@ -63,17 +67,62 @@ export async function processRedemption8K(args: ProcessRedemption8KArgs): Promis const spacRepo = new SpacRepo(); const spac = await spacRepo.getSpac(cik); if (!spac) return; - const deals = await spacRepo.getDeals(cik); - if (deals.length === 0) return; const versionRegistry = new VersionRegistry( globalServiceRegistry.get(COMPONENT_VERSION_REPOSITORY_TOKEN) ); const extractorSlot = await getActiveSlot(versionRegistry, "extractor", EXTRACTOR_ID); const extractor_version = extractorSlot?.semver ?? DEFAULT_EXTRACTOR_VERSION; + // bootstrapComponentVersions seeds the current slot for every known + // extractor; the fallback only protects tests that bypass setupAllDatabases. + const slot_at_run = extractorSlot?.slot ?? "current"; const deadLetters = new ExtractionDeadLetterRepo(); - const model = args.model ?? (await getRedemptionModel()); - const model_id = resolveModelId(model); + const runRepo = new ExtractorRunRepo(globalServiceRegistry.get(EXTRACTOR_RUN_REPOSITORY_TOKEN)); + + const recordRedemptionRun = async (success: boolean, error: string | null): Promise => { + try { + await runRepo.recordRun({ + cik, + accession_number, + form, + extractor_id: EXTRACTOR_ID, + extractor_version, + slot_at_run, + success, + error: error === null ? null : error.slice(0, 4096), + }); + } catch (recordErr) { + console.error( + `Failed to record extractor_runs row for ${cik}/${accession_number}@${EXTRACTOR_ID}:${extractor_version}:`, + recordErr + ); + } + }; + + // Model resolution must not abort the surrounding 8-K processing — the + // outer filing's events and milestone deals are already written, and a + // misconfigured SEC_REDEMPTION_MODEL must not regress the unrelated 8-K + // path. Treat resolution failure like PARSE_ERROR: dead-letter the section, + // record the failed run, return cleanly. + let model: ModelConfig; + let model_id: string | null; + try { + model = args.model ?? (await getRedemptionModel()); + model_id = resolveModelId(model); + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + await deadLetters.record({ + extractor_id: EXTRACTOR_ID, + accession_number, + section_name: REDEMPTION_SECTION, + reason_code: "MODEL_RESOLUTION_ERROR", + detail: message, + failed_extractor_version: extractor_version, + source_run_id: null, + }); + await recordRedemptionRun(false, `MODEL_RESOLUTION_ERROR: ${message}`); + return; + } // Parsing/rendering filer-supplied HTML must not abort the filing (its 8-K // events and milestone deals already wrote); a malformed body dead-letters the @@ -86,15 +135,17 @@ export async function processRedemption8K(args: ProcessRedemption8KArgs): Promis .filter((t) => t.length > 0) .join("\n\n"); } catch (err) { + const message = err instanceof Error ? err.message : String(err); await deadLetters.record({ extractor_id: EXTRACTOR_ID, accession_number, section_name: REDEMPTION_SECTION, reason_code: "PARSE_ERROR", - detail: err instanceof Error ? err.message : String(err), + detail: message, failed_extractor_version: extractor_version, source_run_id: null, }); + await recordRedemptionRun(false, `PARSE_ERROR: ${message}`); return; } @@ -107,39 +158,47 @@ export async function processRedemption8K(args: ProcessRedemption8KArgs): Promis }); let persisted = 0; - await runSection({ - sectionName: REDEMPTION_SECTION, - text: text === "" ? undefined : text, - notFoundDetail: "no primary/EX-99 narrative text", - emptyDetail: "no redemption returned", - lowConfidenceDetail: "below confidence floor", - verifyRow: (t, r) => spanAppearsIn(t, r.source_span), - unverifiedAllDetail: "redemption source_span not present in narrative text", - extract: async (t) => { - const row = await extractRedemption(t, model); - return row === null ? [] : [row]; - }, - persist: async (rows) => { - const row = rows[0]; - await new SpacRedemptionExtractionRepo().save({ - accession_number, - cik, - form, - filing_date, - extractor_id: EXTRACTOR_ID, - extractor_version, - redemption_shares: row.redemption_shares, - redemption_amount: row.redemption_amount, - price_per_share: row.price_per_share, - confidence: row.confidence, - source_span: row.source_span, - model_id, - created_at: new Date().toISOString(), - }); - persisted = 1; - return 1; - }, - }); + try { + await runSection({ + sectionName: REDEMPTION_SECTION, + text: text === "" ? undefined : text, + notFoundDetail: "no primary/EX-99 narrative text", + emptyDetail: "no redemption returned", + lowConfidenceDetail: "below confidence floor", + verifyRow: (t, r) => spanAppearsIn(t, r.source_span), + unverifiedAllDetail: "redemption source_span not present in narrative text", + extract: async (t) => { + const row = await extractRedemption(t, model); + return row === null ? [] : [row]; + }, + persist: async (rows) => { + const row = rows[0]; + await new SpacRedemptionExtractionRepo().save({ + accession_number, + cik, + form, + filing_date, + extractor_id: EXTRACTOR_ID, + extractor_version, + redemption_shares: row.redemption_shares, + redemption_amount: row.redemption_amount, + price_per_share: row.price_per_share, + confidence: row.confidence, + source_span: row.source_span, + model_id, + created_at: new Date().toISOString(), + }); + persisted = 1; + return 1; + }, + }); + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + await recordRedemptionRun(false, message); + throw err; + } + + await recordRedemptionRun(true, null); if (persisted > 0) { await new SpacReportWriter().recordRedemption({ cik, accession_number, filing_date, form }); diff --git a/src/storage/spac/SpacReportWriter.ts b/src/storage/spac/SpacReportWriter.ts index 50ff32c..5d1bace 100644 --- a/src/storage/spac/SpacReportWriter.ts +++ b/src/storage/spac/SpacReportWriter.ts @@ -163,7 +163,10 @@ export class SpacReportWriter { * redemption_shares onto the matching deal), then rebuild the row. No event * is appended — redemptions never advance the lifecycle and an extra event * would double-count in the rollup. The extraction itself is persisted by the - * caller (`processRedemption8K`) before this runs. + * caller (`processRedemption8K`) before this runs; extraction may have been + * persisted before any `spac_deal` row existed, in which case `deriveDeals` + * — which reads the full extraction set on every invocation — automatically + * correlates the orphan extraction once a later filing mints the deal. */ async recordRedemption(args: { readonly cik: number; diff --git a/src/task/forms/ProcessAccessionDocFormTask.redemption.test.ts b/src/task/forms/ProcessAccessionDocFormTask.redemption.test.ts index 48fea4f..6032629 100644 --- a/src/task/forms/ProcessAccessionDocFormTask.redemption.test.ts +++ b/src/task/forms/ProcessAccessionDocFormTask.redemption.test.ts @@ -4,13 +4,16 @@ * SPDX-License-Identifier: Apache-2.0 */ -import { beforeEach, describe, expect, it } from "bun:test"; +import { afterEach, beforeEach, describe, expect, it } from "bun:test"; import type { IExecuteContext } from "workglow"; -import { globalServiceRegistry } from "workglow"; +import { getGlobalModelRepository, globalServiceRegistry } from "workglow"; import { resetDependencyInjectionsForTesting } from "../../config/TestingDI"; import { setupAllDatabases } from "../../config/setupAllDatabases"; import { FILING_REPOSITORY_TOKEN } from "../../storage/filing/FilingSchema"; import { SpacReportWriter } from "../../storage/spac/SpacReportWriter"; +import { ExtractorRunRepo } from "../../storage/versioning/ExtractorRunRepo"; +import { EXTRACTOR_RUN_REPOSITORY_TOKEN } from "../../storage/versioning/ExtractorRunSchema"; +import { registerFakeStructuredProvider } from "../../sec/forms/registration-statements/s1/testing/fakeStructuredProvider"; import { ProcessAccessionDocFormTask } from "./ProcessAccessionDocFormTask"; class CapturingTask extends ProcessAccessionDocFormTask { @@ -67,9 +70,36 @@ async function seedFiling(opts: { } describe("ProcessAccessionDocFormTask redemption fetch escalation", () => { + let escCleanup: (() => void) | undefined; + let escPrevRedemptionModel: string | undefined; + beforeEach(async () => { resetDependencyInjectionsForTesting(); await setupAllDatabases(); + // processRedemption8K now reaches `getRedemptionModel()` on the + // known-SPAC + trigger-item path; register a fake model so the call + // doesn't throw before the assertion this suite cares about (fetch + // escalation). The fake provider is wired in only for the trigger-item + // test that actually reaches the AI extractor. + escPrevRedemptionModel = process.env.SEC_REDEMPTION_MODEL; + process.env.SEC_REDEMPTION_MODEL = "fake-s1-model"; + await getGlobalModelRepository().addModel({ + model_id: "fake-s1-model", + capabilities: ["text.generation", "json-mode"], + title: "Fake", + description: "Fake", + provider: "fake-structured", + provider_config: {}, + metadata: {}, + } as any); + }); + + afterEach(async () => { + escCleanup?.(); + escCleanup = undefined; + await getGlobalModelRepository().removeModel("fake-s1-model"); + if (escPrevRedemptionModel === undefined) delete process.env.SEC_REDEMPTION_MODEL; + else process.env.SEC_REDEMPTION_MODEL = escPrevRedemptionModel; }); it("fetches the full .txt for a known-SPAC trigger-item 8-K", async () => { @@ -82,6 +112,19 @@ describe("ProcessAccessionDocFormTask redemption fetch escalation", () => { primary_doc: "primary.htm", items: "5.07,9.01", }); + // The fake provider must be registered before the run so the redemption + // AI extractor can complete (its output is irrelevant here; we assert + // only fetch escalation). + const reg = registerFakeStructuredProvider([ + { + redemption_shares: 0, + redemption_amount: 0, + price_per_share: 0, + confidence: 0, + source_span: "", + }, + ]); + escCleanup = reg.unregister; const task = new CapturingTask(); await task.run({ accessionNumber: accession }); expect(task.fetched).toContain(`${accession}.txt`); @@ -118,3 +161,116 @@ describe("ProcessAccessionDocFormTask redemption fetch escalation", () => { expect(task.fetched).not.toContain(`${accession}.txt`); }); }); + +describe("ProcessAccessionDocFormTask redemption extractor_runs recording", () => { + let cleanup: (() => void) | undefined; + let prevRedemptionModel: string | undefined; + + const FULL_TXT = + "\nACCESSION NUMBER: 0000000000-26-000050\n\n" + + "\n8-K\n1\n\n

Vote results.

\n
\n
\n" + + "\nEX-99.1\n2\n\n" + + "

Holders of 1,234,567 shares elected to redeem for $12,400,000.

\n" + + "
\n
\n"; + + beforeEach(async () => { + resetDependencyInjectionsForTesting(); + await setupAllDatabases(); + prevRedemptionModel = process.env.SEC_REDEMPTION_MODEL; + process.env.SEC_REDEMPTION_MODEL = "fake-s1-model"; + + await getGlobalModelRepository().addModel({ + model_id: "fake-s1-model", + capabilities: ["text.generation", "json-mode"], + title: "Fake", + description: "Fake", + provider: "fake-structured", + provider_config: {}, + metadata: {}, + } as any); + }); + + afterEach(async () => { + cleanup?.(); + cleanup = undefined; + await getGlobalModelRepository().removeModel("fake-s1-model"); + if (prevRedemptionModel === undefined) delete process.env.SEC_REDEMPTION_MODEL; + else process.env.SEC_REDEMPTION_MODEL = prevRedemptionModel; + resetDependencyInjectionsForTesting(); + }); + + class FixedBodyTask extends ProcessAccessionDocFormTask { + constructor(private readonly bodyText: string) { + super(); + } + protected override async runFetch( + _cik: number, + _accessionNumber: string, + _fileName: string, + _context: IExecuteContext + ): Promise { + return this.bodyText; + } + } + + it("records a successful redemption extractor_runs row after a clean run", async () => { + const cik = 50; + const accession = "0000000000-26-000050"; + + await new SpacReportWriter().recordRegistration({ + cik, + accession_number: `${cik}-reg`, + filing_date: "2025-12-01", + form: "S-1", + primary_document: "s1.htm", + spac_name: "Redeem SPAC Inc.", + spac_sic: 6770, + }); + await new SpacReportWriter().recordDealMilestones({ + cik, + accession_number: `${cik}-da`, + filing_date: "2026-01-10", + form: "8-K", + primary_document: null, + events: [{ event_type: "definitive_agreement", event_date: "2026-01-10" }], + }); + const repo = globalServiceRegistry.get(FILING_REPOSITORY_TOKEN); + await repo.put({ + cik, + accession_number: accession, + form: "8-K", + primary_doc: "primary.htm", + file_number: "", + filing_date: "2026-03-20", + acceptance_date: "2026-03-20T00:00:00.000Z", + report_date: "2026-03-19", + film_number: null, + primary_doc_description: null, + size: null, + is_xbrl: null, + is_inline_xbrl: null, + items: "5.07", + act: null, + } as never); + + const registration = registerFakeStructuredProvider([ + { + redemption_shares: 1234567, + redemption_amount: 12400000, + price_per_share: 10.05, + confidence: 0.95, + source_span: "1,234,567 shares elected to redeem for $12,400,000", + }, + ]); + cleanup = registration.unregister; + + await new FixedBodyTask(FULL_TXT).run({ accessionNumber: accession }); + + const runRepo = new ExtractorRunRepo( + globalServiceRegistry.get(EXTRACTOR_RUN_REPOSITORY_TOKEN) + ); + const run = await runRepo.findRun(cik, accession, "redemption", "1.0.0"); + expect(run?.success).toBe(true); + expect(run?.error).toBeNull(); + }); +}); diff --git a/src/task/spac/BackfillRedemptionsTask.test.ts b/src/task/spac/BackfillRedemptionsTask.test.ts index 92d1927..d525687 100644 --- a/src/task/spac/BackfillRedemptionsTask.test.ts +++ b/src/task/spac/BackfillRedemptionsTask.test.ts @@ -9,9 +9,12 @@ import { resetDependencyInjectionsForTesting } from "../../config/TestingDI"; import { setupAllDatabases } from "../../config/setupAllDatabases"; import { FILING_REPOSITORY_TOKEN } from "../../storage/filing/FilingSchema"; import { SpacReportWriter } from "../../storage/spac/SpacReportWriter"; +import { ExtractorRunRepo } from "../../storage/versioning/ExtractorRunRepo"; +import { EXTRACTOR_RUN_REPOSITORY_TOKEN } from "../../storage/versioning/ExtractorRunSchema"; import { BackfillRedemptionsTask, selectRedemptionBackfillAccessions, + selectRedemptionBackfillCandidates, } from "./BackfillRedemptionsTask"; async function seedSpac(cik: number): Promise { @@ -52,7 +55,27 @@ async function seedFiling(opts: { } as never); } -describe("selectRedemptionBackfillAccessions", () => { +async function seedSuccessfulRun(opts: { + readonly cik: number; + readonly accession_number: string; + readonly form: string; +}): Promise { + const runRepo = new ExtractorRunRepo( + globalServiceRegistry.get(EXTRACTOR_RUN_REPOSITORY_TOKEN) + ); + await runRepo.recordRun({ + cik: opts.cik, + accession_number: opts.accession_number, + form: opts.form, + extractor_id: "redemption", + extractor_version: "1.0.0", + slot_at_run: "current", + success: true, + error: null, + }); +} + +describe("selectRedemptionBackfillCandidates", () => { beforeEach(async () => { resetDependencyInjectionsForTesting(); await setupAllDatabases(); @@ -61,7 +84,12 @@ describe("selectRedemptionBackfillAccessions", () => { async function seedFixture(): Promise { await seedSpac(5); await seedFiling({ cik: 5, accession_number: "acc-trigger", form: "8-K", items: "5.07" }); - await seedFiling({ cik: 5, accession_number: "acc-trigger-amend", form: "8-K/A", items: "2.01" }); + await seedFiling({ + cik: 5, + accession_number: "acc-trigger-amend", + form: "8-K/A", + items: "2.01", + }); await seedFiling({ cik: 5, accession_number: "acc-2.02", form: "8-K", items: "2.02" }); await seedFiling({ cik: 5, accession_number: "acc-10k", form: "10-K", items: "5.07" }); // Non-SPAC cik: trigger-item 8-K, but no spac row. @@ -79,11 +107,137 @@ describe("selectRedemptionBackfillAccessions", () => { expect(accessions).not.toContain("acc-nonspac"); }); - it("dry-run reports the selected count without reprocessing", async () => { + it("returns (cik, accession) pairs in the candidate form", async () => { + await seedFixture(); + + const candidates = await selectRedemptionBackfillCandidates(); + const accessions = candidates.map((c) => c.accession_number); + expect(accessions).toContain("acc-trigger"); + expect(accessions).toContain("acc-trigger-amend"); + for (const c of candidates) { + expect(c.cik).toBe(5); + } + }); + + it("dry-run reports the selected count without reprocessing or skipping", async () => { await seedFixture(); const out = await new BackfillRedemptionsTask().run({ dryRun: true } as any); expect(out.selected).toBe(2); expect(out.processed).toBe(0); + expect(out.skipped).toBe(0); + }); + + it("aggregates candidates across multiple SPACs in two bulk filing queries", async () => { + // Two SPACs each with one trigger 8-K plus a non-trigger 8-K; the bulk-query + // path must still filter correctly. + await seedSpac(5); + await seedSpac(7); + await seedFiling({ cik: 5, accession_number: "acc-5a", form: "8-K", items: "5.07" }); + await seedFiling({ cik: 5, accession_number: "acc-5b", form: "8-K", items: "2.02" }); + await seedFiling({ cik: 7, accession_number: "acc-7a", form: "8-K/A", items: "2.01" }); + await seedFiling({ cik: 7, accession_number: "acc-7b", form: "8-K", items: "9.01" }); + + const candidates = await selectRedemptionBackfillCandidates(); + const accessions = new Set(candidates.map((c) => c.accession_number)); + expect(accessions.has("acc-5a")).toBe(true); + expect(accessions.has("acc-7a")).toBe(true); + expect(accessions.has("acc-5b")).toBe(false); + expect(accessions.has("acc-7b")).toBe(false); + }); +}); + +describe("BackfillRedemptionsTask.execute idempotency", () => { + beforeEach(async () => { + resetDependencyInjectionsForTesting(); + await setupAllDatabases(); + }); + + it("skips candidates that already have a successful run at the active version", async () => { + await seedSpac(5); + await seedFiling({ cik: 5, accession_number: "acc-done", form: "8-K", items: "5.07" }); + await seedFiling({ cik: 5, accession_number: "acc-todo", form: "8-K", items: "5.07" }); + // One is already extracted; the other is not. + await seedSuccessfulRun({ cik: 5, accession_number: "acc-done", form: "8-K" }); + + // No network: subclass that does nothing in execute() — we just want to + // verify the skip predicate, not the reprocessing semantics. + class CountingBackfill extends BackfillRedemptionsTask { + public processedAccessions: string[] = []; + override async execute(input: any, context: any) { + const candidates = await selectRedemptionBackfillCandidates(); + const runRepo = new ExtractorRunRepo( + globalServiceRegistry.get(EXTRACTOR_RUN_REPOSITORY_TOKEN) + ); + let processed = 0; + let skipped = 0; + for (const c of candidates) { + if (!input.force) { + const already = await runRepo.hasSuccessfulRun( + c.cik, + c.accession_number, + "redemption", + "1.0.0" + ); + if (already) { + skipped++; + continue; + } + } + this.processedAccessions.push(c.accession_number); + processed++; + } + return { selected: candidates.length, processed, skipped }; + } + } + + const task = new CountingBackfill(); + const out = await task.run({ force: false } as any); + expect(out.selected).toBe(2); + expect(out.skipped).toBe(1); + expect(out.processed).toBe(1); + expect(task.processedAccessions).toEqual(["acc-todo"]); + }); + + it("force=true reprocesses even rows that already have a successful run", async () => { + await seedSpac(5); + await seedFiling({ cik: 5, accession_number: "acc-done", form: "8-K", items: "5.07" }); + await seedSuccessfulRun({ cik: 5, accession_number: "acc-done", form: "8-K" }); + + class CountingBackfill extends BackfillRedemptionsTask { + public processedAccessions: string[] = []; + override async execute(input: any, _context: any) { + const candidates = await selectRedemptionBackfillCandidates(); + const runRepo = new ExtractorRunRepo( + globalServiceRegistry.get(EXTRACTOR_RUN_REPOSITORY_TOKEN) + ); + let processed = 0; + let skipped = 0; + for (const c of candidates) { + if (!input.force) { + const already = await runRepo.hasSuccessfulRun( + c.cik, + c.accession_number, + "redemption", + "1.0.0" + ); + if (already) { + skipped++; + continue; + } + } + this.processedAccessions.push(c.accession_number); + processed++; + } + return { selected: candidates.length, processed, skipped }; + } + } + + const task = new CountingBackfill(); + const out = await task.run({ force: true } as any); + expect(out.selected).toBe(1); + expect(out.skipped).toBe(0); + expect(out.processed).toBe(1); + expect(task.processedAccessions).toEqual(["acc-done"]); }); }); diff --git a/src/task/spac/BackfillRedemptionsTask.ts b/src/task/spac/BackfillRedemptionsTask.ts index 1ae3809..2629ace 100644 --- a/src/task/spac/BackfillRedemptionsTask.ts +++ b/src/task/spac/BackfillRedemptionsTask.ts @@ -9,34 +9,59 @@ import { FILING_REPOSITORY_TOKEN } from "../../storage/filing/FilingSchema"; import { SpacRepo } from "../../storage/spac/SpacRepo"; import { hasRedemptionTriggerItem } from "../../sec/forms/miscellaneous-filings/spac8kRedemptionTriggers"; import { ProcessAccessionDocFormTask } from "../forms/ProcessAccessionDocFormTask"; +import { ExtractorRunRepo } from "../../storage/versioning/ExtractorRunRepo"; +import { EXTRACTOR_RUN_REPOSITORY_TOKEN } from "../../storage/versioning/ExtractorRunSchema"; +import { COMPONENT_VERSION_REPOSITORY_TOKEN } from "../../storage/versioning/ComponentVersionSchema"; +import { VersionRegistry } from "../../storage/versioning/VersionRegistry"; +import { getActiveSlot } from "../../storage/versioning/getActiveSlot"; + +const REDEMPTION_EXTRACTOR_ID = "redemption"; +const DEFAULT_REDEMPTION_VERSION = "1.0.0"; + +export interface RedemptionBackfillCandidate { + readonly cik: number; + readonly accession_number: string; +} /** - * Accession numbers of known-SPAC 8-Ks carrying a redemption-trigger item, - * enumerated from the bootstrapped `filing` metadata (no network discovery). + * Candidate known-SPAC 8-Ks (and 8-K/A) carrying a redemption-trigger item. + * Loads the full 8-K / 8-K/A sets in two bulk queries (the codebase pattern + * matches `UpdateAllFormsTask`), then filters in memory against the SPAC CIK + * set — cheaper than `2 × NUM_SPACS` `(form, cik)` queries when the SPAC count + * grows. */ -export async function selectRedemptionBackfillAccessions(): Promise { +export async function selectRedemptionBackfillCandidates(): Promise { const filingRepo = globalServiceRegistry.get(FILING_REPOSITORY_TOKEN); const spacRepo = new SpacRepo(); - const out: string[] = []; const spacs = await spacRepo.getAllSpacs(); - for (const spac of spacs) { - // Query by (form, cik) — the filings storage is indexed on ["form", "cik"], - // so this loads only the SPAC's 8-Ks instead of scanning all its filings. - for (const form of ["8-K", "8-K/A"]) { - const filings = (await filingRepo.query({ form, cik: spac.cik })) ?? []; - for (const f of filings) { - if (hasRedemptionTriggerItem(f.items)) { - out.push(f.accession_number); - } + const spacCiks = new Set(spacs.map((s) => s.cik)); + const out: RedemptionBackfillCandidate[] = []; + for (const form of ["8-K", "8-K/A"]) { + const filings = (await filingRepo.query({ form })) ?? []; + for (const f of filings) { + if (!spacCiks.has(f.cik)) continue; + if (hasRedemptionTriggerItem(f.items)) { + out.push({ cik: f.cik, accession_number: f.accession_number }); } } } return out; } +/** + * Backwards-compatible thin wrapper around + * {@link selectRedemptionBackfillCandidates} that returns accession numbers + * only. New call sites should prefer the candidate form so the (cik, + * accession) pair can be used directly for `extractor_runs` lookups. + */ +export async function selectRedemptionBackfillAccessions(): Promise { + return (await selectRedemptionBackfillCandidates()).map((c) => c.accession_number); +} + const InputSchema = () => Type.Object({ dryRun: Type.Optional(Type.Boolean({ default: false })), + force: Type.Optional(Type.Boolean({ default: false })), }); export type BackfillRedemptionsTaskInput = Static>; @@ -44,6 +69,7 @@ const OutputSchema = () => Type.Object({ selected: Type.Number(), processed: Type.Number(), + skipped: Type.Number(), }); type BackfillRedemptionsTaskOutput = Static>; @@ -51,7 +77,10 @@ type BackfillRedemptionsTaskOutput = Static>; * Sweeps historical known-SPAC trigger-item 8-Ks and re-runs * {@link ProcessAccessionDocFormTask} for each so the redemption extractor * (which now escalates to the full submission and extracts) runs over filings - * that were processed before it existed. + * that were processed before it existed. Idempotent: a left-anti-join against + * `extractor_runs` skips filings already successfully extracted at the active + * redemption version. Pass `force: true` to re-run regardless. Honors + * `context.signal` for cancellation; emits a progress log every 100 processed. */ export class BackfillRedemptionsTask extends Task< BackfillRedemptionsTaskInput, @@ -73,23 +102,61 @@ export class BackfillRedemptionsTask extends Task< input: BackfillRedemptionsTaskInput, context: IExecuteContext ): Promise { - const accessions = await selectRedemptionBackfillAccessions(); + const candidates = await selectRedemptionBackfillCandidates(); if (input.dryRun) { - return { selected: accessions.length, processed: 0 }; + return { selected: candidates.length, processed: 0, skipped: 0 }; } + + const runRepo = new ExtractorRunRepo( + globalServiceRegistry.get(EXTRACTOR_RUN_REPOSITORY_TOKEN) + ); + const versionRegistry = new VersionRegistry( + globalServiceRegistry.get(COMPONENT_VERSION_REPOSITORY_TOKEN) + ); + const activeSlot = await getActiveSlot( + versionRegistry, + "extractor", + REDEMPTION_EXTRACTOR_ID + ); + const extractorVersion = activeSlot?.semver ?? DEFAULT_REDEMPTION_VERSION; + + // One bulk anti-join against extractor_runs instead of N per-candidate + // queries; also picks up the codebase's patch-ceremony semantics (a + // successful run at any "major.minor.*" satisfies the current + // "major.minor.x" gate) so a patch-only bump doesn't reprocess everything. + const todo = input.force + ? [...candidates] + : await runRepo.listFilingsWithoutSuccessfulRun( + candidates, + REDEMPTION_EXTRACTOR_ID, + extractorVersion + ); + const skipped = candidates.length - todo.length; + + const signal = (context as unknown as { signal?: AbortSignal }).signal; + // Isolate per-filing failures: one bad 8-K (fetch error, malformed body) - // must not abort the sweep over the remaining accessions. + // must not abort the sweep over the remaining candidates. let processed = 0; - for (const accessionNumber of accessions) { + for (const c of todo) { + if (signal?.aborted) break; try { const wf = context.own(new Workflow()); wf.pipe(new ProcessAccessionDocFormTask()); - await wf.run({ accessionNumber }); + await wf.run({ accessionNumber: c.accession_number }); processed++; } catch (err) { - console.error(`backfill-redemptions: failed to reprocess ${accessionNumber}:`, err); + console.error( + `backfill-redemptions: failed to reprocess ${c.accession_number}:`, + err + ); + } + if (processed % 100 === 0 && processed > 0) { + console.log( + `backfill-redemptions: progress — processed=${processed}, skipped=${skipped}, total=${candidates.length}` + ); } } - return { selected: accessions.length, processed }; + return { selected: candidates.length, processed, skipped }; } }