From cc14a67a0d11a30dbe426b5704bdab34fb278a48 Mon Sep 17 00:00:00 2001 From: Yvette Carlisle Date: Tue, 30 Jun 2026 07:09:07 -0400 Subject: [PATCH 1/6] {"schema":"decodex/commit/1","summary":"Normalize live adapter module tree","authority":"manual"} --- .../dreaming_readback_artifacts.rs | 0 .../evidence_selection.rs | 8 ++-- .../bin/real_world_live_adapter/lightrag.rs | 12 +++--- .../main.rs} | 38 +++++++++---------- .../src/bin/real_world_live_adapter/model.rs | 12 +++--- .../consolidation_knowledge.rs | 5 ++- .../live_adapter_tasks.rs | 5 ++- 7 files changed, 40 insertions(+), 40 deletions(-) rename apps/elf-eval/src/bin/real_world_live_adapter/{ => dreaming_readback}/dreaming_readback_artifacts.rs (100%) rename apps/elf-eval/src/bin/{real_world_live_adapter.rs => real_world_live_adapter/main.rs} (79%) diff --git a/apps/elf-eval/src/bin/real_world_live_adapter/dreaming_readback_artifacts.rs b/apps/elf-eval/src/bin/real_world_live_adapter/dreaming_readback/dreaming_readback_artifacts.rs similarity index 100% rename from apps/elf-eval/src/bin/real_world_live_adapter/dreaming_readback_artifacts.rs rename to apps/elf-eval/src/bin/real_world_live_adapter/dreaming_readback/dreaming_readback_artifacts.rs diff --git a/apps/elf-eval/src/bin/real_world_live_adapter/evidence_selection.rs b/apps/elf-eval/src/bin/real_world_live_adapter/evidence_selection.rs index 6edd9749..2623c7f8 100644 --- a/apps/elf-eval/src/bin/real_world_live_adapter/evidence_selection.rs +++ b/apps/elf-eval/src/bin/real_world_live_adapter/evidence_selection.rs @@ -1,7 +1,7 @@ -#[path = "evidence_selection/claims.rs"] mod claims; -#[path = "evidence_selection/common.rs"] mod common; -#[path = "evidence_selection/required.rs"] mod required; -#[path = "evidence_selection/temporal.rs"] mod temporal; +mod claims; +mod common; +mod required; +mod temporal; use crate::{ BTreeSet, CorpusText, IngestedCorpus, LiveExpectedClaim, LiveMemoryEvolution, LoadedJob, diff --git a/apps/elf-eval/src/bin/real_world_live_adapter/lightrag.rs b/apps/elf-eval/src/bin/real_world_live_adapter/lightrag.rs index 268cfe41..f9053849 100644 --- a/apps/elf-eval/src/bin/real_world_live_adapter/lightrag.rs +++ b/apps/elf-eval/src/bin/real_world_live_adapter/lightrag.rs @@ -1,8 +1,8 @@ -#[path = "lightrag/api.rs"] mod api; -#[path = "lightrag/corpus.rs"] mod corpus; -#[path = "lightrag/mapping.rs"] mod mapping; -#[path = "lightrag/metadata.rs"] mod metadata; -#[path = "lightrag/runtime.rs"] mod runtime; -#[path = "lightrag/status.rs"] mod status; +mod api; +mod corpus; +mod mapping; +mod metadata; +mod runtime; +mod status; pub(super) use runtime::run_lightrag_async; diff --git a/apps/elf-eval/src/bin/real_world_live_adapter.rs b/apps/elf-eval/src/bin/real_world_live_adapter/main.rs similarity index 79% rename from apps/elf-eval/src/bin/real_world_live_adapter.rs rename to apps/elf-eval/src/bin/real_world_live_adapter/main.rs index 625c73f3..2b2e47c1 100644 --- a/apps/elf-eval/src/bin/real_world_live_adapter.rs +++ b/apps/elf-eval/src/bin/real_world_live_adapter/main.rs @@ -2,23 +2,23 @@ //! Live adapter materializer for the real-world job benchmark. -#[path = "real_world_live_adapter/capture.rs"] mod capture; -#[path = "real_world_live_adapter/consolidation_adapter.rs"] mod consolidation_adapter; -#[path = "real_world_live_adapter/dreaming_readback.rs"] mod dreaming_readback; -#[path = "real_world_live_adapter/elf_domain_materializers.rs"] mod elf_domain_materializers; -#[path = "real_world_live_adapter/elf_runtime.rs"] mod elf_runtime; -#[path = "real_world_live_adapter/evidence_selection.rs"] mod evidence_selection; -#[path = "real_world_live_adapter/fixtures.rs"] mod fixtures; -#[path = "real_world_live_adapter/ingestion.rs"] mod ingestion; -#[path = "real_world_live_adapter/knowledge_adapter.rs"] mod knowledge_adapter; -#[path = "real_world_live_adapter/lightrag.rs"] mod lightrag; -#[path = "real_world_live_adapter/materialization.rs"] mod materialization; -#[path = "real_world_live_adapter/model.rs"] mod model; -#[path = "real_world_live_adapter/operator_debug.rs"] mod operator_debug; -#[path = "real_world_live_adapter/output.rs"] mod output; -#[path = "real_world_live_adapter/qmd.rs"] mod qmd; -#[path = "real_world_live_adapter/runtime_support.rs"] mod runtime_support; -#[path = "real_world_live_adapter/service_runtime.rs"] mod service_runtime; +mod capture; +mod consolidation_adapter; +mod dreaming_readback; +mod elf_domain_materializers; +mod elf_runtime; +mod evidence_selection; +mod fixtures; +mod ingestion; +mod knowledge_adapter; +mod lightrag; +mod materialization; +mod model; +mod operator_debug; +mod output; +mod qmd; +mod runtime_support; +mod service_runtime; use std::{ collections::{BTreeSet, HashMap}, @@ -125,6 +125,4 @@ async fn main() -> Result<()> { } } -#[cfg(test)] -#[path = "real_world_live_adapter/tests.rs"] -mod tests; +#[cfg(test)] mod tests; diff --git a/apps/elf-eval/src/bin/real_world_live_adapter/model.rs b/apps/elf-eval/src/bin/real_world_live_adapter/model.rs index 9e870d53..ccb8d770 100644 --- a/apps/elf-eval/src/bin/real_world_live_adapter/model.rs +++ b/apps/elf-eval/src/bin/real_world_live_adapter/model.rs @@ -1,9 +1,9 @@ -#[path = "model/cli.rs"] mod cli; -#[path = "model/consolidation.rs"] mod consolidation; -#[path = "model/live.rs"] mod live; -#[path = "model/materialization.rs"] mod materialization; -#[path = "model/providers.rs"] mod providers; -#[path = "model/runtime.rs"] mod runtime; +mod cli; +mod consolidation; +mod live; +mod materialization; +mod providers; +mod runtime; pub(super) use self::{ cli::{Args, CommandArgs, ElfArgs, LightragArgs, QmdArgs}, diff --git a/apps/elf-eval/tests/real_world_job_benchmark/consolidation_knowledge.rs b/apps/elf-eval/tests/real_world_job_benchmark/consolidation_knowledge.rs index bc5684ab..3dcc727a 100644 --- a/apps/elf-eval/tests/real_world_job_benchmark/consolidation_knowledge.rs +++ b/apps/elf-eval/tests/real_world_job_benchmark/consolidation_knowledge.rs @@ -6,8 +6,9 @@ use serde_json::Value; use crate::support; fn real_world_live_adapter_sources(workspace: &Path) -> Result { - let mut source = - fs::read_to_string(workspace.join("apps/elf-eval/src/bin/real_world_live_adapter.rs"))?; + let mut source = fs::read_to_string( + workspace.join("apps/elf-eval/src/bin/real_world_live_adapter/main.rs"), + )?; append_rust_sources( workspace.join("apps/elf-eval/src/bin/real_world_live_adapter").as_path(), diff --git a/apps/elf-eval/tests/real_world_job_benchmark/live_adapter_tasks.rs b/apps/elf-eval/tests/real_world_job_benchmark/live_adapter_tasks.rs index 364d03d0..6138b039 100644 --- a/apps/elf-eval/tests/real_world_job_benchmark/live_adapter_tasks.rs +++ b/apps/elf-eval/tests/real_world_job_benchmark/live_adapter_tasks.rs @@ -6,8 +6,9 @@ use serde_json::Value; use crate::support; fn real_world_live_adapter_sources(workspace: &Path) -> Result { - let mut source = - fs::read_to_string(workspace.join("apps/elf-eval/src/bin/real_world_live_adapter.rs"))?; + let mut source = fs::read_to_string( + workspace.join("apps/elf-eval/src/bin/real_world_live_adapter/main.rs"), + )?; append_rust_sources( workspace.join("apps/elf-eval/src/bin/real_world_live_adapter").as_path(), From 735e524c05177edf8e54472d87cb95b43b3fcfea Mon Sep 17 00:00:00 2001 From: Yvette Carlisle Date: Tue, 30 Jun 2026 07:16:37 -0400 Subject: [PATCH 2/6] {"schema":"decodex/commit/1","summary":"Split live consolidation adapter modules","authority":"manual"} --- .../consolidation_adapter.rs | 342 +----------------- .../consolidation_adapter/evidence.rs | 37 ++ .../consolidation_adapter/fixture.rs | 19 + .../consolidation_adapter/refs.rs | 80 ++++ .../consolidation_adapter/review.rs | 71 ++++ .../consolidation_adapter/run.rs | 177 +++++++++ 6 files changed, 401 insertions(+), 325 deletions(-) create mode 100644 apps/elf-eval/src/bin/real_world_live_adapter/consolidation_adapter/evidence.rs create mode 100644 apps/elf-eval/src/bin/real_world_live_adapter/consolidation_adapter/fixture.rs create mode 100644 apps/elf-eval/src/bin/real_world_live_adapter/consolidation_adapter/refs.rs create mode 100644 apps/elf-eval/src/bin/real_world_live_adapter/consolidation_adapter/review.rs create mode 100644 apps/elf-eval/src/bin/real_world_live_adapter/consolidation_adapter/run.rs diff --git a/apps/elf-eval/src/bin/real_world_live_adapter/consolidation_adapter.rs b/apps/elf-eval/src/bin/real_world_live_adapter/consolidation_adapter.rs index 66878b5e..f88d829a 100644 --- a/apps/elf-eval/src/bin/real_world_live_adapter/consolidation_adapter.rs +++ b/apps/elf-eval/src/bin/real_world_live_adapter/consolidation_adapter.rs @@ -1,27 +1,17 @@ +mod evidence; +mod fixture; +mod refs; +mod review; +mod run; + use crate::{ - ConsolidationApplyIntent, ConsolidationInputRef, ConsolidationLineage, ConsolidationMarker, - ConsolidationMarkerSeverity, ConsolidationMarkers, ConsolidationMaterializationEvidence, - ConsolidationProposalDiff, ConsolidationProposalInput, ConsolidationProposalResponse, - ConsolidationReviewAction, ConsolidationSourceKind, ConsolidationSourceSnapshot, - ConsolidationUnsupportedClaimFlag, CorpusText, IngestedCorpus, LiveConsolidationFixture, - LiveConsolidationProposal, LoadedJob, OffsetDateTime, PreparedConsolidationRun, Result, Uuid, - eyre, serde_json, + ConsolidationInputRef, ConsolidationMaterializationEvidence, ConsolidationProposalResponse, + ConsolidationReviewAction, CorpusText, IngestedCorpus, LiveConsolidationFixture, LoadedJob, + PreparedConsolidationRun, Result, Uuid, serde_json::Value, }; pub(super) fn live_consolidation_fixture(loaded: &LoadedJob) -> Result { - let value = - loaded.value.pointer("/corpus/adapter_response/consolidation").cloned().ok_or_else( - || { - eyre::eyre!( - "{} does not contain adapter_response.consolidation.", - loaded.path.display() - ) - }, - )?; - - serde_json::from_value(value).map_err(|err| { - eyre::eyre!("Failed to parse consolidation fixture {}: {err}", loaded.path.display()) - }) + fixture::live_consolidation_fixture(loaded) } pub(super) fn prepare_consolidation_run( @@ -31,38 +21,7 @@ pub(super) fn prepare_consolidation_run( fixture: &LiveConsolidationFixture, corpus: &[CorpusText], ) -> Result { - let mut input_refs = Vec::new(); - let mut proposals = Vec::new(); - - for proposal in &fixture.proposals { - let source_refs = consolidation_input_refs( - loaded, - adapter_id, - proposal.source_refs.as_slice(), - ingested, - corpus, - )?; - - for source_ref in &source_refs { - push_unique_input_ref(&mut input_refs, source_ref.clone()); - } - - proposals.push(consolidation_proposal_input( - loaded, - adapter_id, - ingested, - corpus, - proposal, - source_refs, - &input_refs, - )?); - } - - if proposals.is_empty() { - return Err(eyre::eyre!("{} has no consolidation proposals.", loaded.job.job_id)); - } - - Ok(PreparedConsolidationRun { input_refs, proposals }) + run::prepare_consolidation_run(loaded, adapter_id, ingested, fixture, corpus) } pub(super) fn validate_reviewed_consolidation_count( @@ -70,16 +29,7 @@ pub(super) fn validate_reviewed_consolidation_count( fixture: &LiveConsolidationFixture, reviewed: &[ConsolidationProposalResponse], ) -> Result<()> { - if reviewed.len() == fixture.proposals.len() { - return Ok(()); - } - - Err(eyre::eyre!( - "ELF consolidation materialized {} proposals for {} fixture proposals in {}.", - reviewed.len(), - fixture.proposals.len(), - loaded.job.job_id - )) + review::validate_reviewed_consolidation_count(loaded, fixture, reviewed) } pub(super) fn consolidation_materialization_evidence( @@ -88,278 +38,20 @@ pub(super) fn consolidation_materialization_evidence( input_refs: &[ConsolidationInputRef], reviewed: &[ConsolidationProposalResponse], ) -> ConsolidationMaterializationEvidence { - let review_actions = reviewed - .iter() - .flat_map(|proposal| proposal.review_events.iter().map(|event| event.action.clone())) - .collect::>(); - let final_review_states = - reviewed.iter().map(|proposal| proposal.review_state.clone()).collect::>(); - let unsupported_claim_flag_count = fixture - .proposals - .iter() - .map(|proposal| { - proposal.unsupported_claim_count.max(proposal.unsupported_claim_flags.len()) - }) - .sum(); - let review_event_count = - reviewed.iter().map(|proposal| proposal.review_events.len()).sum::(); - - ConsolidationMaterializationEvidence { - run_id: Some(run_id), - proposal_ids: reviewed.iter().map(|proposal| proposal.proposal_id).collect(), - source_lineage_count: input_refs.len(), - unsupported_claim_flag_count, - review_event_count, - review_actions, - final_review_states, - } + evidence::consolidation_materialization_evidence(run_id, fixture, input_refs, reviewed) } pub(super) fn consolidation_review_action(raw: &str) -> Result { - match raw { - "apply" => Ok(ConsolidationReviewAction::Apply), - "discard" => Ok(ConsolidationReviewAction::Discard), - "defer" => Ok(ConsolidationReviewAction::Defer), - "approve" => Ok(ConsolidationReviewAction::Approve), - _ => Err(eyre::eyre!("Unknown consolidation review action {raw}.")), - } + review::consolidation_review_action(raw) } pub(super) fn live_consolidation_response( fixture: &LiveConsolidationFixture, reviewed: &[ConsolidationProposalResponse], -) -> Result { - let proposals = fixture - .proposals - .iter() - .zip(reviewed) - .map(|(fixture_proposal, reviewed_proposal)| { - serde_json::json!({ - "proposal_id": reviewed_proposal.proposal_id.to_string(), - "proposal_kind": fixture_proposal.proposal_kind.clone(), - "source_refs": fixture_proposal.source_refs.clone(), - "expected_source_refs": if fixture_proposal.expected_source_refs.is_empty() { - fixture_proposal.source_refs.clone() - } else { - fixture_proposal.expected_source_refs.clone() - }, - "usefulness_score": fixture_proposal.usefulness_score, - "min_usefulness_score": fixture_proposal.min_usefulness_score, - "expected_review_action": fixture_proposal.expected_review_action.clone(), - "actual_review_action": fixture_proposal.actual_review_action.clone(), - "source_mutations": fixture_proposal.source_mutations.clone(), - "unsupported_claim_count": fixture_proposal - .unsupported_claim_count - .max(fixture_proposal.unsupported_claim_flags.len()), - "unsupported_claim_flags": fixture_proposal.unsupported_claim_flags.clone(), - "diff": fixture_proposal.diff.clone(), - "live_review_state": reviewed_proposal.review_state.clone(), - "live_review_event_count": reviewed_proposal.review_events.len() - }) - }) - .collect::>(); - - Ok(serde_json::json!({ "proposals": proposals, "executable_gaps": [] })) +) -> Result { + review::live_consolidation_response(fixture, reviewed) } pub(super) fn live_note_ids(ingested: &IngestedCorpus) -> Vec { - let mut note_ids = Vec::new(); - - for ids in ingested.note_ids_by_evidence.values() { - for note_id in ids { - if !note_ids.iter().any(|existing| existing == note_id) { - note_ids.push(*note_id); - } - } - } - - note_ids -} - -fn consolidation_proposal_input( - loaded: &LoadedJob, - adapter_id: &str, - ingested: &IngestedCorpus, - corpus: &[CorpusText], - proposal: &LiveConsolidationProposal, - source_refs: Vec, - input_refs: &[ConsolidationInputRef], -) -> Result { - let unsupported_claim_flags = - consolidation_unsupported_claim_flags(loaded, adapter_id, proposal, ingested, corpus)?; - let diff = consolidation_diff(proposal.diff.clone())?; - let proposed_payload = object_or_empty(diff.after.clone()); - let lineage = ConsolidationLineage { - source_refs: source_refs.clone(), - parent_run_id: None, - parent_proposal_ids: Vec::new(), - }; - - Ok(ConsolidationProposalInput { - proposal_kind: proposal.proposal_kind.clone(), - apply_intent: consolidation_apply_intent(proposal.actual_review_action.as_str()), - source_refs, - source_snapshot: serde_json::json!({ - "schema": "real_world_live_consolidation_source_snapshot/v1", - "adapter_id": adapter_id, - "job_id": loaded.job.job_id, - "proposal_id": proposal.proposal_id - }), - lineage, - confidence: proposal.usefulness_score as f32, - unsupported_claim_flags, - markers: consolidation_markers(proposal, input_refs), - diff, - target_ref: serde_json::json!({ - "schema": "real_world_live_consolidation_target/v1", - "proposal_id": proposal.proposal_id - }), - proposed_payload, - }) -} - -fn consolidation_input_refs( - loaded: &LoadedJob, - adapter_id: &str, - evidence_ids: &[String], - ingested: &IngestedCorpus, - corpus: &[CorpusText], -) -> Result> { - evidence_ids - .iter() - .map(|evidence_id| { - let note_id = ingested - .note_ids_by_evidence - .get(evidence_id) - .and_then(|ids| ids.first().copied()) - .ok_or_else(|| { - eyre::eyre!( - "No live note id mapped for consolidation evidence {} in {}.", - evidence_id, - loaded.job.job_id - ) - })?; - let text = corpus - .iter() - .find(|item| item.evidence_id == *evidence_id) - .map(|item| item.text.as_str()) - .unwrap_or(evidence_id.as_str()); - let content_hash = format!("blake3:{}", blake3::hash(text.as_bytes()).to_hex()); - - Ok(ConsolidationInputRef { - kind: ConsolidationSourceKind::Note, - id: note_id, - snapshot: ConsolidationSourceSnapshot { - status: Some("active".to_string()), - updated_at: Some(OffsetDateTime::now_utc()), - content_hash: Some(content_hash), - embedding_version: None, - trace_version: None, - source_ref: serde_json::json!({ - "schema": "real_world_live_adapter/v1", - "adapter": adapter_id, - "job_id": loaded.job.job_id, - "evidence_id": evidence_id - }), - metadata: serde_json::json!({ - "evidence_id": evidence_id, - "source": "memory_notes" - }), - }, - }) - }) - .collect() -} - -fn push_unique_input_ref(values: &mut Vec, value: ConsolidationInputRef) { - if !values.iter().any(|existing| existing.id == value.id) { - values.push(value); - } -} - -fn consolidation_unsupported_claim_flags( - loaded: &LoadedJob, - adapter_id: &str, - proposal: &LiveConsolidationProposal, - ingested: &IngestedCorpus, - corpus: &[CorpusText], -) -> Result> { - proposal - .unsupported_claim_flags - .iter() - .map(|flag| { - let source = flag - .source_ref - .as_deref() - .map(|source_ref| { - consolidation_input_refs( - loaded, - adapter_id, - &[source_ref.to_string()], - ingested, - corpus, - ) - .and_then(|refs| { - refs.into_iter().next().ok_or_else(|| { - eyre::eyre!( - "Unsupported claim source {} did not map to a live source.", - source_ref - ) - }) - }) - }) - .transpose()?; - - Ok(ConsolidationUnsupportedClaimFlag { - claim_id: flag.claim_id.clone(), - message: flag.message.clone(), - source, - }) - }) - .collect() -} - -fn consolidation_diff(value: serde_json::Value) -> Result { - let summary = value - .get("summary") - .and_then(serde_json::Value::as_str) - .unwrap_or("Live consolidation proposal.") - .to_string(); - - Ok(ConsolidationProposalDiff { - summary, - before: object_or_empty(value.get("before").cloned().unwrap_or(serde_json::Value::Null)), - after: object_or_empty(value.get("after").cloned().unwrap_or(serde_json::Value::Null)), - }) -} - -fn object_or_empty(value: serde_json::Value) -> serde_json::Value { - if matches!(value, serde_json::Value::Object(_)) { value } else { serde_json::json!({}) } -} - -fn consolidation_apply_intent(action: &str) -> ConsolidationApplyIntent { - if action == "apply" { - ConsolidationApplyIntent::CreateDerivedNote - } else { - ConsolidationApplyIntent::NoOp - } -} - -fn consolidation_markers( - proposal: &LiveConsolidationProposal, - input_refs: &[ConsolidationInputRef], -) -> ConsolidationMarkers { - if !proposal.proposal_kind.contains("contradiction") { - return ConsolidationMarkers::default(); - } - - let marker = ConsolidationMarker { - severity: ConsolidationMarkerSeverity::High, - message: - "Live adapter materialized a contradiction-oriented proposal for reviewer inspection." - .to_string(), - source: input_refs.first().cloned(), - }; - - ConsolidationMarkers { contradictions: vec![marker], staleness: Vec::new() } + refs::live_note_ids(ingested) } diff --git a/apps/elf-eval/src/bin/real_world_live_adapter/consolidation_adapter/evidence.rs b/apps/elf-eval/src/bin/real_world_live_adapter/consolidation_adapter/evidence.rs new file mode 100644 index 00000000..5b994aca --- /dev/null +++ b/apps/elf-eval/src/bin/real_world_live_adapter/consolidation_adapter/evidence.rs @@ -0,0 +1,37 @@ +use crate::{ + ConsolidationInputRef, ConsolidationMaterializationEvidence, ConsolidationProposalResponse, + LiveConsolidationFixture, Uuid, +}; + +pub(in crate::consolidation_adapter) fn consolidation_materialization_evidence( + run_id: Uuid, + fixture: &LiveConsolidationFixture, + input_refs: &[ConsolidationInputRef], + reviewed: &[ConsolidationProposalResponse], +) -> ConsolidationMaterializationEvidence { + let review_actions = reviewed + .iter() + .flat_map(|proposal| proposal.review_events.iter().map(|event| event.action.clone())) + .collect::>(); + let final_review_states = + reviewed.iter().map(|proposal| proposal.review_state.clone()).collect::>(); + let unsupported_claim_flag_count = fixture + .proposals + .iter() + .map(|proposal| { + proposal.unsupported_claim_count.max(proposal.unsupported_claim_flags.len()) + }) + .sum(); + let review_event_count = + reviewed.iter().map(|proposal| proposal.review_events.len()).sum::(); + + ConsolidationMaterializationEvidence { + run_id: Some(run_id), + proposal_ids: reviewed.iter().map(|proposal| proposal.proposal_id).collect(), + source_lineage_count: input_refs.len(), + unsupported_claim_flag_count, + review_event_count, + review_actions, + final_review_states, + } +} diff --git a/apps/elf-eval/src/bin/real_world_live_adapter/consolidation_adapter/fixture.rs b/apps/elf-eval/src/bin/real_world_live_adapter/consolidation_adapter/fixture.rs new file mode 100644 index 00000000..f58f2cf9 --- /dev/null +++ b/apps/elf-eval/src/bin/real_world_live_adapter/consolidation_adapter/fixture.rs @@ -0,0 +1,19 @@ +use crate::{LiveConsolidationFixture, LoadedJob, Result, eyre, serde_json}; + +pub(in crate::consolidation_adapter) fn live_consolidation_fixture( + loaded: &LoadedJob, +) -> Result { + let value = + loaded.value.pointer("/corpus/adapter_response/consolidation").cloned().ok_or_else( + || { + eyre::eyre!( + "{} does not contain adapter_response.consolidation.", + loaded.path.display() + ) + }, + )?; + + serde_json::from_value(value).map_err(|err| { + eyre::eyre!("Failed to parse consolidation fixture {}: {err}", loaded.path.display()) + }) +} diff --git a/apps/elf-eval/src/bin/real_world_live_adapter/consolidation_adapter/refs.rs b/apps/elf-eval/src/bin/real_world_live_adapter/consolidation_adapter/refs.rs new file mode 100644 index 00000000..e8748f69 --- /dev/null +++ b/apps/elf-eval/src/bin/real_world_live_adapter/consolidation_adapter/refs.rs @@ -0,0 +1,80 @@ +use crate::{ + ConsolidationInputRef, ConsolidationSourceKind, ConsolidationSourceSnapshot, CorpusText, + IngestedCorpus, LoadedJob, OffsetDateTime, Result, Uuid, eyre, serde_json, +}; + +pub(in crate::consolidation_adapter) fn live_note_ids(ingested: &IngestedCorpus) -> Vec { + let mut note_ids = Vec::new(); + + for ids in ingested.note_ids_by_evidence.values() { + for note_id in ids { + if !note_ids.iter().any(|existing| existing == note_id) { + note_ids.push(*note_id); + } + } + } + + note_ids +} + +pub(in crate::consolidation_adapter) fn consolidation_input_refs( + loaded: &LoadedJob, + adapter_id: &str, + evidence_ids: &[String], + ingested: &IngestedCorpus, + corpus: &[CorpusText], +) -> Result> { + evidence_ids + .iter() + .map(|evidence_id| { + let note_id = ingested + .note_ids_by_evidence + .get(evidence_id) + .and_then(|ids| ids.first().copied()) + .ok_or_else(|| { + eyre::eyre!( + "No live note id mapped for consolidation evidence {} in {}.", + evidence_id, + loaded.job.job_id + ) + })?; + let text = corpus + .iter() + .find(|item| item.evidence_id == *evidence_id) + .map(|item| item.text.as_str()) + .unwrap_or(evidence_id.as_str()); + let content_hash = format!("blake3:{}", blake3::hash(text.as_bytes()).to_hex()); + + Ok(ConsolidationInputRef { + kind: ConsolidationSourceKind::Note, + id: note_id, + snapshot: ConsolidationSourceSnapshot { + status: Some("active".to_string()), + updated_at: Some(OffsetDateTime::now_utc()), + content_hash: Some(content_hash), + embedding_version: None, + trace_version: None, + source_ref: serde_json::json!({ + "schema": "real_world_live_adapter/v1", + "adapter": adapter_id, + "job_id": loaded.job.job_id, + "evidence_id": evidence_id + }), + metadata: serde_json::json!({ + "evidence_id": evidence_id, + "source": "memory_notes" + }), + }, + }) + }) + .collect() +} + +pub(in crate::consolidation_adapter) fn push_unique_input_ref( + values: &mut Vec, + value: ConsolidationInputRef, +) { + if !values.iter().any(|existing| existing.id == value.id) { + values.push(value); + } +} diff --git a/apps/elf-eval/src/bin/real_world_live_adapter/consolidation_adapter/review.rs b/apps/elf-eval/src/bin/real_world_live_adapter/consolidation_adapter/review.rs new file mode 100644 index 00000000..1cf713df --- /dev/null +++ b/apps/elf-eval/src/bin/real_world_live_adapter/consolidation_adapter/review.rs @@ -0,0 +1,71 @@ +use crate::{ + ConsolidationProposalResponse, ConsolidationReviewAction, LiveConsolidationFixture, LoadedJob, + Result, eyre, + serde_json::{self, Value}, +}; + +pub(in crate::consolidation_adapter) fn validate_reviewed_consolidation_count( + loaded: &LoadedJob, + fixture: &LiveConsolidationFixture, + reviewed: &[ConsolidationProposalResponse], +) -> Result<()> { + if reviewed.len() == fixture.proposals.len() { + return Ok(()); + } + + Err(eyre::eyre!( + "ELF consolidation materialized {} proposals for {} fixture proposals in {}.", + reviewed.len(), + fixture.proposals.len(), + loaded.job.job_id + )) +} + +pub(in crate::consolidation_adapter) fn consolidation_review_action( + raw: &str, +) -> Result { + match raw { + "apply" => Ok(ConsolidationReviewAction::Apply), + "discard" => Ok(ConsolidationReviewAction::Discard), + "defer" => Ok(ConsolidationReviewAction::Defer), + "approve" => Ok(ConsolidationReviewAction::Approve), + _ => Err(eyre::eyre!("Unknown consolidation review action {raw}.")), + } +} + +pub(in crate::consolidation_adapter) fn live_consolidation_response( + fixture: &LiveConsolidationFixture, + reviewed: &[ConsolidationProposalResponse], +) -> Result { + let proposals = fixture + .proposals + .iter() + .zip(reviewed) + .map(|(fixture_proposal, reviewed_proposal)| { + serde_json::json!({ + "proposal_id": reviewed_proposal.proposal_id.to_string(), + "proposal_kind": fixture_proposal.proposal_kind.clone(), + "source_refs": fixture_proposal.source_refs.clone(), + "expected_source_refs": if fixture_proposal.expected_source_refs.is_empty() { + fixture_proposal.source_refs.clone() + } else { + fixture_proposal.expected_source_refs.clone() + }, + "usefulness_score": fixture_proposal.usefulness_score, + "min_usefulness_score": fixture_proposal.min_usefulness_score, + "expected_review_action": fixture_proposal.expected_review_action.clone(), + "actual_review_action": fixture_proposal.actual_review_action.clone(), + "source_mutations": fixture_proposal.source_mutations.clone(), + "unsupported_claim_count": fixture_proposal + .unsupported_claim_count + .max(fixture_proposal.unsupported_claim_flags.len()), + "unsupported_claim_flags": fixture_proposal.unsupported_claim_flags.clone(), + "diff": fixture_proposal.diff.clone(), + "live_review_state": reviewed_proposal.review_state.clone(), + "live_review_event_count": reviewed_proposal.review_events.len() + }) + }) + .collect::>(); + + Ok(serde_json::json!({ "proposals": proposals, "executable_gaps": [] })) +} diff --git a/apps/elf-eval/src/bin/real_world_live_adapter/consolidation_adapter/run.rs b/apps/elf-eval/src/bin/real_world_live_adapter/consolidation_adapter/run.rs new file mode 100644 index 00000000..be71e4f3 --- /dev/null +++ b/apps/elf-eval/src/bin/real_world_live_adapter/consolidation_adapter/run.rs @@ -0,0 +1,177 @@ +use crate::{ + ConsolidationApplyIntent, ConsolidationInputRef, ConsolidationLineage, ConsolidationMarker, + ConsolidationMarkerSeverity, ConsolidationMarkers, ConsolidationProposalDiff, + ConsolidationProposalInput, ConsolidationUnsupportedClaimFlag, CorpusText, IngestedCorpus, + LiveConsolidationFixture, LiveConsolidationProposal, LoadedJob, PreparedConsolidationRun, + Result, consolidation_adapter::refs, eyre, serde_json, +}; + +pub(in crate::consolidation_adapter) fn prepare_consolidation_run( + loaded: &LoadedJob, + adapter_id: &str, + ingested: &IngestedCorpus, + fixture: &LiveConsolidationFixture, + corpus: &[CorpusText], +) -> Result { + let mut input_refs = Vec::new(); + let mut proposals = Vec::new(); + + for proposal in &fixture.proposals { + let source_refs = refs::consolidation_input_refs( + loaded, + adapter_id, + proposal.source_refs.as_slice(), + ingested, + corpus, + )?; + + for source_ref in &source_refs { + refs::push_unique_input_ref(&mut input_refs, source_ref.clone()); + } + + proposals.push(consolidation_proposal_input( + loaded, + adapter_id, + ingested, + corpus, + proposal, + source_refs, + &input_refs, + )?); + } + + if proposals.is_empty() { + return Err(eyre::eyre!("{} has no consolidation proposals.", loaded.job.job_id)); + } + + Ok(PreparedConsolidationRun { input_refs, proposals }) +} + +fn consolidation_proposal_input( + loaded: &LoadedJob, + adapter_id: &str, + ingested: &IngestedCorpus, + corpus: &[CorpusText], + proposal: &LiveConsolidationProposal, + source_refs: Vec, + input_refs: &[ConsolidationInputRef], +) -> Result { + let unsupported_claim_flags = + consolidation_unsupported_claim_flags(loaded, adapter_id, proposal, ingested, corpus)?; + let diff = consolidation_diff(proposal.diff.clone())?; + let proposed_payload = object_or_empty(diff.after.clone()); + let lineage = ConsolidationLineage { + source_refs: source_refs.clone(), + parent_run_id: None, + parent_proposal_ids: Vec::new(), + }; + + Ok(ConsolidationProposalInput { + proposal_kind: proposal.proposal_kind.clone(), + apply_intent: consolidation_apply_intent(proposal.actual_review_action.as_str()), + source_refs, + source_snapshot: serde_json::json!({ + "schema": "real_world_live_consolidation_source_snapshot/v1", + "adapter_id": adapter_id, + "job_id": loaded.job.job_id, + "proposal_id": proposal.proposal_id + }), + lineage, + confidence: proposal.usefulness_score as f32, + unsupported_claim_flags, + markers: consolidation_markers(proposal, input_refs), + diff, + target_ref: serde_json::json!({ + "schema": "real_world_live_consolidation_target/v1", + "proposal_id": proposal.proposal_id + }), + proposed_payload, + }) +} + +fn consolidation_unsupported_claim_flags( + loaded: &LoadedJob, + adapter_id: &str, + proposal: &LiveConsolidationProposal, + ingested: &IngestedCorpus, + corpus: &[CorpusText], +) -> Result> { + proposal + .unsupported_claim_flags + .iter() + .map(|flag| { + let source = flag + .source_ref + .as_deref() + .map(|source_ref| { + refs::consolidation_input_refs( + loaded, + adapter_id, + &[source_ref.to_string()], + ingested, + corpus, + ) + .and_then(|refs| { + refs.into_iter().next().ok_or_else(|| { + eyre::eyre!( + "Unsupported claim source {} did not map to a live source.", + source_ref + ) + }) + }) + }) + .transpose()?; + + Ok(ConsolidationUnsupportedClaimFlag { + claim_id: flag.claim_id.clone(), + message: flag.message.clone(), + source, + }) + }) + .collect() +} + +fn consolidation_diff(value: serde_json::Value) -> Result { + let summary = value + .get("summary") + .and_then(serde_json::Value::as_str) + .unwrap_or("Live consolidation proposal.") + .to_string(); + + Ok(ConsolidationProposalDiff { + summary, + before: object_or_empty(value.get("before").cloned().unwrap_or(serde_json::Value::Null)), + after: object_or_empty(value.get("after").cloned().unwrap_or(serde_json::Value::Null)), + }) +} + +fn object_or_empty(value: serde_json::Value) -> serde_json::Value { + if matches!(value, serde_json::Value::Object(_)) { value } else { serde_json::json!({}) } +} + +fn consolidation_apply_intent(action: &str) -> ConsolidationApplyIntent { + if action == "apply" { + ConsolidationApplyIntent::CreateDerivedNote + } else { + ConsolidationApplyIntent::NoOp + } +} + +fn consolidation_markers( + proposal: &LiveConsolidationProposal, + input_refs: &[ConsolidationInputRef], +) -> ConsolidationMarkers { + if !proposal.proposal_kind.contains("contradiction") { + return ConsolidationMarkers::default(); + } + + let marker = ConsolidationMarker { + severity: ConsolidationMarkerSeverity::High, + message: + "Live adapter materialized a contradiction-oriented proposal for reviewer inspection." + .to_string(), + source: input_refs.first().cloned(), + }; + + ConsolidationMarkers { contradictions: vec![marker], staleness: Vec::new() } +} From 0ffd030b255a8a312dfae86673338e8ae283a9b4 Mon Sep 17 00:00:00 2001 From: Yvette Carlisle Date: Tue, 30 Jun 2026 07:22:15 -0400 Subject: [PATCH 3/6] {"schema":"decodex/commit/1","summary":"Split live dreaming readback artifacts","authority":"manual"} --- .../dreaming_readback_artifacts.rs | 222 +----------------- .../dreaming_readback_artifacts/content.rs | 50 ++++ .../dreaming_readback_artifacts/scoring.rs | 46 ++++ .../source_refs.rs | 36 +++ .../dreaming_readback_artifacts/stamp.rs | 41 ++++ .../dreaming_readback_artifacts/template.rs | 28 +++ .../dreaming_readback_artifacts/trace.rs | 29 +++ 7 files changed, 241 insertions(+), 211 deletions(-) create mode 100644 apps/elf-eval/src/bin/real_world_live_adapter/dreaming_readback/dreaming_readback_artifacts/content.rs create mode 100644 apps/elf-eval/src/bin/real_world_live_adapter/dreaming_readback/dreaming_readback_artifacts/scoring.rs create mode 100644 apps/elf-eval/src/bin/real_world_live_adapter/dreaming_readback/dreaming_readback_artifacts/source_refs.rs create mode 100644 apps/elf-eval/src/bin/real_world_live_adapter/dreaming_readback/dreaming_readback_artifacts/stamp.rs create mode 100644 apps/elf-eval/src/bin/real_world_live_adapter/dreaming_readback/dreaming_readback_artifacts/template.rs create mode 100644 apps/elf-eval/src/bin/real_world_live_adapter/dreaming_readback/dreaming_readback_artifacts/trace.rs diff --git a/apps/elf-eval/src/bin/real_world_live_adapter/dreaming_readback/dreaming_readback_artifacts.rs b/apps/elf-eval/src/bin/real_world_live_adapter/dreaming_readback/dreaming_readback_artifacts.rs index bacbb047..e7310ccc 100644 --- a/apps/elf-eval/src/bin/real_world_live_adapter/dreaming_readback/dreaming_readback_artifacts.rs +++ b/apps/elf-eval/src/bin/real_world_live_adapter/dreaming_readback/dreaming_readback_artifacts.rs @@ -1,216 +1,16 @@ -use crate::{ - AGENT_ID, BTreeSet, DreamingReadbackMaterializationEvidence, LoadedJob, Result, TENANT_ID, - TraceStageOutput, Uuid, Value, eyre, serde_json, +mod content; +mod scoring; +mod source_refs; +mod stamp; +mod template; +mod trace; + +pub(super) use self::{ + content::dreaming_readback_content, scoring::dreaming_readback_scoring_evidence_ids, + source_refs::collect_dreaming_artifact_source_refs, stamp::stamp_dreaming_readback_artifact, + template::dreaming_readback_template_artifacts, trace::dreaming_readback_trace_stages, }; -pub(super) fn dreaming_readback_template_artifacts(loaded: &LoadedJob) -> Result> { - let pointer = match loaded.job.suite.as_str() { - "memory_summary" => "/corpus/adapter_response/answer/memory_summaries", - "proactive_brief" => "/corpus/adapter_response/answer/proactive_briefs", - "scheduled_memory" => "/corpus/adapter_response/answer/scheduled_tasks", - _ => return Ok(Vec::new()), - }; - let artifacts = - loaded.value.pointer(pointer).and_then(Value::as_array).cloned().ok_or_else(|| { - eyre::eyre!( - "{} missing service-native readback template at {pointer}.", - loaded.job.job_id - ) - })?; - - if artifacts.is_empty() { - return Err(eyre::eyre!( - "{} has no service-native readback template artifacts.", - loaded.job.job_id - )); - } - - Ok(artifacts) -} - -pub(super) fn dreaming_readback_scoring_evidence_ids( - loaded: &LoadedJob, - service_evidence_ids: &[String], -) -> Vec { - let selected = service_evidence_ids.iter().map(String::as_str).collect::>(); - let trap_ids = negative_trap_evidence_ids(loaded); - let mut evidence_ids = Vec::new(); - - for evidence in &loaded.job.required_evidence { - if selected.contains(evidence.evidence_id.as_str()) - && !trap_ids.contains(evidence.evidence_id.as_str()) - { - crate::push_unique(&mut evidence_ids, evidence.evidence_id.clone()); - } - } - - if evidence_ids.is_empty() { - for evidence_id in service_evidence_ids { - if !trap_ids.contains(evidence_id.as_str()) { - crate::push_unique(&mut evidence_ids, evidence_id.clone()); - } - } - } - - evidence_ids -} - -pub(super) fn stamp_dreaming_readback_artifact( - artifact: &mut Value, - loaded: &LoadedJob, - project_id: &str, - trace_id: Uuid, - generated_at: &str, -) { - artifact["generated_at"] = serde_json::json!(generated_at); - artifact["tenant_id"] = serde_json::json!(TENANT_ID); - artifact["project_id"] = serde_json::json!(project_id); - artifact["agent_id"] = serde_json::json!(AGENT_ID); - artifact["read_profile"] = serde_json::json!("private_only"); - artifact["service_readback"] = serde_json::json!({ - "schema": "elf.service_native_dreaming_readback/v1", - "job_id": loaded.job.job_id, - "suite": loaded.job.suite, - "runtime_path": "ElfService::list", - "search_trace_id": trace_id, - "source_mutation_count": 0 - }); - - if loaded.job.suite == "scheduled_memory" { - let trace = artifact - .as_object_mut() - .map(|object| object.entry("execution_trace").or_insert_with(|| serde_json::json!({}))); - - if let Some(trace) = trace { - trace["trace_id"] = serde_json::json!(format!("service-native-{trace_id}")); - trace["trigger_kind"] = serde_json::json!("service_native_readback"); - trace["status"] = serde_json::json!("completed"); - } - - artifact["source_mutations"] = serde_json::json!([]); - } -} - -pub(super) fn collect_dreaming_artifact_source_refs(value: &Value, refs: &mut Vec) { - match value { - Value::Array(items) => - for item in items { - collect_dreaming_artifact_source_refs(item, refs); - }, - Value::Object(map) => - for (key, value) in map { - if matches!(key.as_str(), "source_refs" | "evidence_refs" | "evidence_ids") - && let Some(items) = value.as_array() - { - for item in items { - if let Some(source_ref) = item.as_str() { - crate::push_unique(refs, source_ref.to_string()); - } - } - } - if key == "evidence_id" - && let Some(source_ref) = value.as_str() - { - crate::push_unique(refs, source_ref.to_string()); - } - - collect_dreaming_artifact_source_refs(value, refs); - }, - _ => {}, - } -} - -pub(super) fn dreaming_readback_content(suite: &str, artifacts: &[Value]) -> String { - let mut parts = Vec::new(); - - for artifact in artifacts { - match suite { - "memory_summary" => { - for entry in artifact.get("entries").and_then(Value::as_array).into_iter().flatten() - { - if let Some(text) = entry.get("text").and_then(Value::as_str) { - parts.push(text.to_string()); - } - } - }, - "proactive_brief" => { - for suggestion in - artifact.get("suggestions").and_then(Value::as_array).into_iter().flatten() - { - if let Some(title) = suggestion.get("title").and_then(Value::as_str) { - parts.push(title.to_string()); - } - if let Some(body) = suggestion.get("body").and_then(Value::as_str) { - parts.push(body.to_string()); - } - } - }, - "scheduled_memory" => { - for output in - artifact.get("outputs").and_then(Value::as_array).into_iter().flatten() - { - if let Some(text) = output.get("text").and_then(Value::as_str) { - parts.push(text.to_string()); - } - } - }, - _ => {}, - } - } - - if parts.is_empty() { - "Service-native Dreaming readback produced no artifact text.".to_string() - } else { - parts.join(" ") - } -} - -pub(super) fn dreaming_readback_trace_stages( - loaded: &LoadedJob, - evidence: &DreamingReadbackMaterializationEvidence, -) -> Vec { - vec![ - TraceStageOutput { - stage_name: "dreaming_readback.service_list".to_string(), - kept_evidence: evidence.selected_source_refs.clone(), - dropped_evidence: evidence.missing_source_refs.clone(), - demoted_evidence: Vec::new(), - distractor_evidence: Vec::new(), - notes: format!( - "Read {} source refs from ElfService::list for {}.", - evidence.selected_source_refs.len(), - loaded.job.suite - ), - }, - TraceStageOutput { - stage_name: "dreaming_readback.source_mutation_guard".to_string(), - kept_evidence: evidence.selected_source_refs.clone(), - dropped_evidence: Vec::new(), - demoted_evidence: Vec::new(), - distractor_evidence: Vec::new(), - notes: "Generated readback artifacts without mutating source notes.".to_string(), - }, - ] -} - -fn negative_trap_evidence_ids(loaded: &LoadedJob) -> BTreeSet<&str> { - loaded - .value - .get("negative_traps") - .and_then(Value::as_array) - .into_iter() - .flatten() - .filter(|trap| trap.get("failure_if_used").and_then(Value::as_bool).unwrap_or(false)) - .flat_map(|trap| { - trap.get("evidence_ids") - .and_then(Value::as_array) - .into_iter() - .flatten() - .filter_map(Value::as_str) - }) - .collect() -} - #[cfg(test)] mod tests { use std::path::PathBuf; diff --git a/apps/elf-eval/src/bin/real_world_live_adapter/dreaming_readback/dreaming_readback_artifacts/content.rs b/apps/elf-eval/src/bin/real_world_live_adapter/dreaming_readback/dreaming_readback_artifacts/content.rs new file mode 100644 index 00000000..820d0ff4 --- /dev/null +++ b/apps/elf-eval/src/bin/real_world_live_adapter/dreaming_readback/dreaming_readback_artifacts/content.rs @@ -0,0 +1,50 @@ +use crate::Value; + +pub(in crate::dreaming_readback) fn dreaming_readback_content( + suite: &str, + artifacts: &[Value], +) -> String { + let mut parts = Vec::new(); + + for artifact in artifacts { + match suite { + "memory_summary" => push_memory_summary_text(artifact, &mut parts), + "proactive_brief" => push_proactive_brief_text(artifact, &mut parts), + "scheduled_memory" => push_scheduled_memory_text(artifact, &mut parts), + _ => {}, + } + } + + if parts.is_empty() { + "Service-native Dreaming readback produced no artifact text.".to_string() + } else { + parts.join(" ") + } +} + +fn push_memory_summary_text(artifact: &Value, parts: &mut Vec) { + for entry in artifact.get("entries").and_then(Value::as_array).into_iter().flatten() { + if let Some(text) = entry.get("text").and_then(Value::as_str) { + parts.push(text.to_string()); + } + } +} + +fn push_proactive_brief_text(artifact: &Value, parts: &mut Vec) { + for suggestion in artifact.get("suggestions").and_then(Value::as_array).into_iter().flatten() { + if let Some(title) = suggestion.get("title").and_then(Value::as_str) { + parts.push(title.to_string()); + } + if let Some(body) = suggestion.get("body").and_then(Value::as_str) { + parts.push(body.to_string()); + } + } +} + +fn push_scheduled_memory_text(artifact: &Value, parts: &mut Vec) { + for output in artifact.get("outputs").and_then(Value::as_array).into_iter().flatten() { + if let Some(text) = output.get("text").and_then(Value::as_str) { + parts.push(text.to_string()); + } + } +} diff --git a/apps/elf-eval/src/bin/real_world_live_adapter/dreaming_readback/dreaming_readback_artifacts/scoring.rs b/apps/elf-eval/src/bin/real_world_live_adapter/dreaming_readback/dreaming_readback_artifacts/scoring.rs new file mode 100644 index 00000000..8bbb540d --- /dev/null +++ b/apps/elf-eval/src/bin/real_world_live_adapter/dreaming_readback/dreaming_readback_artifacts/scoring.rs @@ -0,0 +1,46 @@ +use crate::{BTreeSet, LoadedJob, Value}; + +pub(in crate::dreaming_readback) fn dreaming_readback_scoring_evidence_ids( + loaded: &LoadedJob, + service_evidence_ids: &[String], +) -> Vec { + let selected = service_evidence_ids.iter().map(String::as_str).collect::>(); + let trap_ids = negative_trap_evidence_ids(loaded); + let mut evidence_ids = Vec::new(); + + for evidence in &loaded.job.required_evidence { + if selected.contains(evidence.evidence_id.as_str()) + && !trap_ids.contains(evidence.evidence_id.as_str()) + { + crate::push_unique(&mut evidence_ids, evidence.evidence_id.clone()); + } + } + + if evidence_ids.is_empty() { + for evidence_id in service_evidence_ids { + if !trap_ids.contains(evidence_id.as_str()) { + crate::push_unique(&mut evidence_ids, evidence_id.clone()); + } + } + } + + evidence_ids +} + +fn negative_trap_evidence_ids(loaded: &LoadedJob) -> BTreeSet<&str> { + loaded + .value + .get("negative_traps") + .and_then(Value::as_array) + .into_iter() + .flatten() + .filter(|trap| trap.get("failure_if_used").and_then(Value::as_bool).unwrap_or(false)) + .flat_map(|trap| { + trap.get("evidence_ids") + .and_then(Value::as_array) + .into_iter() + .flatten() + .filter_map(Value::as_str) + }) + .collect() +} diff --git a/apps/elf-eval/src/bin/real_world_live_adapter/dreaming_readback/dreaming_readback_artifacts/source_refs.rs b/apps/elf-eval/src/bin/real_world_live_adapter/dreaming_readback/dreaming_readback_artifacts/source_refs.rs new file mode 100644 index 00000000..415ac9dc --- /dev/null +++ b/apps/elf-eval/src/bin/real_world_live_adapter/dreaming_readback/dreaming_readback_artifacts/source_refs.rs @@ -0,0 +1,36 @@ +use crate::Value; + +pub(in crate::dreaming_readback) fn collect_dreaming_artifact_source_refs( + value: &Value, + refs: &mut Vec, +) { + match value { + Value::Array(items) => + for item in items { + collect_dreaming_artifact_source_refs(item, refs); + }, + Value::Object(map) => + for (key, value) in map { + collect_named_source_refs(key, value, refs); + collect_dreaming_artifact_source_refs(value, refs); + }, + _ => {}, + } +} + +fn collect_named_source_refs(key: &str, value: &Value, refs: &mut Vec) { + if matches!(key, "source_refs" | "evidence_refs" | "evidence_ids") + && let Some(items) = value.as_array() + { + for item in items { + if let Some(source_ref) = item.as_str() { + crate::push_unique(refs, source_ref.to_string()); + } + } + } + if key == "evidence_id" + && let Some(source_ref) = value.as_str() + { + crate::push_unique(refs, source_ref.to_string()); + } +} diff --git a/apps/elf-eval/src/bin/real_world_live_adapter/dreaming_readback/dreaming_readback_artifacts/stamp.rs b/apps/elf-eval/src/bin/real_world_live_adapter/dreaming_readback/dreaming_readback_artifacts/stamp.rs new file mode 100644 index 00000000..0ac71b11 --- /dev/null +++ b/apps/elf-eval/src/bin/real_world_live_adapter/dreaming_readback/dreaming_readback_artifacts/stamp.rs @@ -0,0 +1,41 @@ +use crate::{AGENT_ID, LoadedJob, TENANT_ID, Uuid, Value, serde_json}; + +pub(in crate::dreaming_readback) fn stamp_dreaming_readback_artifact( + artifact: &mut Value, + loaded: &LoadedJob, + project_id: &str, + trace_id: Uuid, + generated_at: &str, +) { + artifact["generated_at"] = serde_json::json!(generated_at); + artifact["tenant_id"] = serde_json::json!(TENANT_ID); + artifact["project_id"] = serde_json::json!(project_id); + artifact["agent_id"] = serde_json::json!(AGENT_ID); + artifact["read_profile"] = serde_json::json!("private_only"); + artifact["service_readback"] = serde_json::json!({ + "schema": "elf.service_native_dreaming_readback/v1", + "job_id": loaded.job.job_id, + "suite": loaded.job.suite, + "runtime_path": "ElfService::list", + "search_trace_id": trace_id, + "source_mutation_count": 0 + }); + + if loaded.job.suite == "scheduled_memory" { + stamp_scheduled_memory_trace(artifact, trace_id); + + artifact["source_mutations"] = serde_json::json!([]); + } +} + +fn stamp_scheduled_memory_trace(artifact: &mut Value, trace_id: Uuid) { + let trace = artifact + .as_object_mut() + .map(|object| object.entry("execution_trace").or_insert_with(|| serde_json::json!({}))); + + if let Some(trace) = trace { + trace["trace_id"] = serde_json::json!(format!("service-native-{trace_id}")); + trace["trigger_kind"] = serde_json::json!("service_native_readback"); + trace["status"] = serde_json::json!("completed"); + } +} diff --git a/apps/elf-eval/src/bin/real_world_live_adapter/dreaming_readback/dreaming_readback_artifacts/template.rs b/apps/elf-eval/src/bin/real_world_live_adapter/dreaming_readback/dreaming_readback_artifacts/template.rs new file mode 100644 index 00000000..e6f8cb78 --- /dev/null +++ b/apps/elf-eval/src/bin/real_world_live_adapter/dreaming_readback/dreaming_readback_artifacts/template.rs @@ -0,0 +1,28 @@ +use crate::{LoadedJob, Result, Value, eyre}; + +pub(in crate::dreaming_readback) fn dreaming_readback_template_artifacts( + loaded: &LoadedJob, +) -> Result> { + let pointer = match loaded.job.suite.as_str() { + "memory_summary" => "/corpus/adapter_response/answer/memory_summaries", + "proactive_brief" => "/corpus/adapter_response/answer/proactive_briefs", + "scheduled_memory" => "/corpus/adapter_response/answer/scheduled_tasks", + _ => return Ok(Vec::new()), + }; + let artifacts = + loaded.value.pointer(pointer).and_then(Value::as_array).cloned().ok_or_else(|| { + eyre::eyre!( + "{} missing service-native readback template at {pointer}.", + loaded.job.job_id + ) + })?; + + if artifacts.is_empty() { + return Err(eyre::eyre!( + "{} has no service-native readback template artifacts.", + loaded.job.job_id + )); + } + + Ok(artifacts) +} diff --git a/apps/elf-eval/src/bin/real_world_live_adapter/dreaming_readback/dreaming_readback_artifacts/trace.rs b/apps/elf-eval/src/bin/real_world_live_adapter/dreaming_readback/dreaming_readback_artifacts/trace.rs new file mode 100644 index 00000000..4bec19e0 --- /dev/null +++ b/apps/elf-eval/src/bin/real_world_live_adapter/dreaming_readback/dreaming_readback_artifacts/trace.rs @@ -0,0 +1,29 @@ +use crate::{DreamingReadbackMaterializationEvidence, LoadedJob, TraceStageOutput}; + +pub(in crate::dreaming_readback) fn dreaming_readback_trace_stages( + loaded: &LoadedJob, + evidence: &DreamingReadbackMaterializationEvidence, +) -> Vec { + vec![ + TraceStageOutput { + stage_name: "dreaming_readback.service_list".to_string(), + kept_evidence: evidence.selected_source_refs.clone(), + dropped_evidence: evidence.missing_source_refs.clone(), + demoted_evidence: Vec::new(), + distractor_evidence: Vec::new(), + notes: format!( + "Read {} source refs from ElfService::list for {}.", + evidence.selected_source_refs.len(), + loaded.job.suite + ), + }, + TraceStageOutput { + stage_name: "dreaming_readback.source_mutation_guard".to_string(), + kept_evidence: evidence.selected_source_refs.clone(), + dropped_evidence: Vec::new(), + demoted_evidence: Vec::new(), + distractor_evidence: Vec::new(), + notes: "Generated readback artifacts without mutating source notes.".to_string(), + }, + ] +} From 20150bd6560445b671ebdc8aba3cc5e06243ba06 Mon Sep 17 00:00:00 2001 From: Yvette Carlisle Date: Tue, 30 Jun 2026 07:25:44 -0400 Subject: [PATCH 4/6] {"schema":"decodex/commit/1","summary":"Split live materialization helpers","authority":"manual"} --- .../materialization.rs | 266 +----------------- .../materialization/declared.rs | 160 +++++++++++ .../materialization/job.rs | 114 ++++++++ .../materialization/support.rs | 43 +++ 4 files changed, 327 insertions(+), 256 deletions(-) create mode 100644 apps/elf-eval/src/bin/real_world_live_adapter/materialization/declared.rs create mode 100644 apps/elf-eval/src/bin/real_world_live_adapter/materialization/job.rs create mode 100644 apps/elf-eval/src/bin/real_world_live_adapter/materialization/support.rs diff --git a/apps/elf-eval/src/bin/real_world_live_adapter/materialization.rs b/apps/elf-eval/src/bin/real_world_live_adapter/materialization.rs index 708e9793..3bed1c94 100644 --- a/apps/elf-eval/src/bin/real_world_live_adapter/materialization.rs +++ b/apps/elf-eval/src/bin/real_world_live_adapter/materialization.rs @@ -1,163 +1,30 @@ -use crate::{ - AdapterResponseOutput, AnswerOutput, CostOutput, LoadedJob, MaterializationStatus, - MaterializedJob, MaterializedJobEvidence, MaterializedJobInput, TraceExplainabilityOutput, - TraceStageOutput, -}; +mod declared; +mod job; +mod support; + +use crate::{LoadedJob, MaterializationStatus, MaterializedJob, MaterializedJobInput}; pub(super) fn materialized_job( loaded: &LoadedJob, adapter_id: &str, input: MaterializedJobInput, ) -> MaterializedJob { - let capture_failure = input.capture_failure.clone(); - let required_evidence_satisfied = capture_failure.is_none() - && crate::required_evidence_satisfied(loaded, &input.evidence_ids); - let status = if input.failure.is_some() { - MaterializationStatus::Incomplete - } else if !required_evidence_satisfied { - MaterializationStatus::WrongResult - } else { - MaterializationStatus::Pass - }; - let failure_stage = if input.failure.is_some() { - Some("live_adapter.retrieve".to_string()) - } else if capture_failure.is_some() { - Some("live_adapter.capture_policy".to_string()) - } else { - None - }; - let failure_reason = input.failure.clone().or(capture_failure); - let stage_notes = if let Some(reason) = &failure_reason { - reason.clone() - } else if !required_evidence_satisfied { - "Adapter did not return all required mapped evidence for this job.".to_string() - } else { - "Adapter returned mapped evidence through its live retrieval path.".to_string() - }; - let trace_stages = input.trace_stages.unwrap_or_else(|| { - vec![TraceStageOutput { - stage_name: failure_stage - .clone() - .unwrap_or_else(|| "live_adapter.retrieve".to_string()), - kept_evidence: input.evidence_ids.clone(), - dropped_evidence: Vec::new(), - demoted_evidence: Vec::new(), - distractor_evidence: Vec::new(), - notes: stage_notes, - }] - }); - - MaterializedJob { - response: AdapterResponseOutput { - adapter_id: adapter_id.to_string(), - answer: AnswerOutput { - content: input.content, - evidence_ids: input.evidence_ids.clone(), - claims: crate::answer_claims(loaded, &input.evidence_ids), - pages: input.pages, - memory_summaries: input.memory_summaries, - proactive_briefs: input.proactive_briefs, - scheduled_tasks: input.scheduled_tasks, - latency_ms: input.latency_ms, - cost: CostOutput { - currency: "USD".to_string(), - amount: 0.0, - input_tokens: 0, - output_tokens: 0, - }, - trace_explainability: TraceExplainabilityOutput { - trace_id: input.trace_id.map(|id| id.to_string()), - failure_stage: failure_stage.clone(), - failure_reason: failure_reason.clone(), - stages: trace_stages, - }, - }, - consolidation: input.consolidation_response, - }, - operator_debug: input.operator_debug, - evidence: MaterializedJobEvidence { - job_id: loaded.job.job_id.clone(), - suite: loaded.job.suite.clone(), - title: loaded.job.title.clone(), - status, - query: loaded.job.prompt.content.clone(), - evidence_ids: input.evidence_ids, - returned_count: input.returned_count, - indexing_latency_ms: input.indexing_latency_ms, - latency_ms: input.latency_ms, - trace_id: input.trace_id, - failure: failure_reason, - source_mappings: input.source_mappings, - operator_debug: input.operator_debug_evidence, - capture: input.capture, - consolidation: input.consolidation, - knowledge: input.knowledge, - temporal_reconciliation: input.temporal_reconciliation, - dreaming_readback: input.dreaming_readback, - }, - } + job::materialized_job(loaded, adapter_id, input) } pub(super) fn declared_encoding_job( adapter_id: &str, loaded: &LoadedJob, ) -> Option { - if is_operator_debug_live_adapter(adapter_id, loaded.job.suite.as_str()) { - return None; - } - if is_elf_consolidation_live_adapter(adapter_id, loaded.job.suite.as_str()) { - return None; - } - if is_elf_knowledge_live_adapter(adapter_id, loaded.job.suite.as_str()) { - return None; - } - if is_elf_capture_live_adapter(adapter_id, loaded.job.suite.as_str()) { - return None; - } - - let status = loaded.job.encoding.status?; - let reason = loaded.job.encoding.reason.clone().unwrap_or_else(|| { - format!("Fixture declares {} for this live adapter job.", status.as_str()) - }); - - Some(materialized_declared_status_job( - adapter_id, - loaded, - status.materialization_status(), - reason, - )) + declared::declared_encoding_job(adapter_id, loaded) } pub(super) fn not_encoded_job(adapter_id: &str, loaded: &LoadedJob) -> Option { - if is_operator_debug_live_adapter(adapter_id, loaded.job.suite.as_str()) { - return None; - } - if is_elf_consolidation_live_adapter(adapter_id, loaded.job.suite.as_str()) { - return None; - } - if is_elf_knowledge_live_adapter(adapter_id, loaded.job.suite.as_str()) { - return None; - } - if is_elf_capture_live_adapter(adapter_id, loaded.job.suite.as_str()) { - return None; - } - if is_elf_dreaming_readback_live_adapter(adapter_id, loaded.job.suite.as_str()) { - return None; - } - - not_encoded_reason(loaded.job.suite.as_str()).map(|reason| { - materialized_declared_status_job( - adapter_id, - loaded, - MaterializationStatus::NotEncoded, - reason.to_string(), - ) - }) + declared::not_encoded_job(adapter_id, loaded) } pub(super) fn is_elf_dreaming_readback_live_adapter(adapter_id: &str, suite: &str) -> bool { - matches!(suite, "memory_summary" | "proactive_brief" | "scheduled_memory") - && matches!(adapter_id, "elf_service_native_dreaming" | "elf_live_real_world") + support::is_elf_dreaming_readback_live_adapter(adapter_id, suite) } pub(super) fn materialized_declared_status_job( @@ -166,118 +33,5 @@ pub(super) fn materialized_declared_status_job( status: MaterializationStatus, reason: String, ) -> MaterializedJob { - let failure = match status { - MaterializationStatus::Pass | MaterializationStatus::WrongResult => None, - MaterializationStatus::Blocked - | MaterializationStatus::Incomplete - | MaterializationStatus::NotEncoded => Some(reason.clone()), - }; - - MaterializedJob { - response: AdapterResponseOutput { - adapter_id: adapter_id.to_string(), - answer: AnswerOutput { - content: String::new(), - evidence_ids: Vec::new(), - claims: Vec::new(), - pages: Vec::new(), - memory_summaries: Vec::new(), - proactive_briefs: Vec::new(), - scheduled_tasks: Vec::new(), - latency_ms: 0.0, - cost: CostOutput { - currency: "USD".to_string(), - amount: 0.0, - input_tokens: 0, - output_tokens: 0, - }, - trace_explainability: TraceExplainabilityOutput { - trace_id: None, - failure_stage: Some("live_adapter.suite_support".to_string()), - failure_reason: failure.clone(), - stages: vec![TraceStageOutput { - stage_name: "live_adapter.suite_support".to_string(), - kept_evidence: Vec::new(), - dropped_evidence: Vec::new(), - demoted_evidence: Vec::new(), - distractor_evidence: Vec::new(), - notes: reason.clone(), - }], - }, - }, - consolidation: None, - }, - evidence: MaterializedJobEvidence { - job_id: loaded.job.job_id.clone(), - suite: loaded.job.suite.clone(), - title: loaded.job.title.clone(), - status, - query: loaded.job.prompt.content.clone(), - evidence_ids: Vec::new(), - returned_count: 0, - indexing_latency_ms: None, - latency_ms: 0.0, - trace_id: None, - failure, - source_mappings: Vec::new(), - operator_debug: None, - capture: None, - consolidation: None, - knowledge: None, - temporal_reconciliation: None, - dreaming_readback: None, - }, - operator_debug: None, - } -} - -fn is_operator_debug_live_adapter(adapter_id: &str, suite: &str) -> bool { - suite == "operator_debugging_ux" - && matches!( - adapter_id, - "elf_live_real_world" - | "qmd_live_real_world" - | "elf_operator_debug_live" - | "qmd_operator_debug_live" - ) -} - -fn is_elf_consolidation_live_adapter(adapter_id: &str, suite: &str) -> bool { - suite == "consolidation" && adapter_id == "elf_live_real_world" -} - -fn is_elf_knowledge_live_adapter(adapter_id: &str, suite: &str) -> bool { - suite == "knowledge_compilation" && adapter_id == "elf_live_real_world" -} - -fn is_elf_capture_live_adapter(adapter_id: &str, suite: &str) -> bool { - suite == "capture_integration" - && matches!(adapter_id, "elf_live_real_world" | "elf_capture_write_policy_live") -} - -fn not_encoded_reason(suite: &str) -> Option<&'static str> { - match suite { - "trust_source_of_truth" - | "work_resume" - | "project_decisions" - | "retrieval" - | "memory_evolution" - | "personalization" => None, - "consolidation" => Some( - "The live adapter sweep retrieves evidence-linked answers but does not generate or review consolidation proposals.", - ), - "knowledge_compilation" => Some( - "The live adapter sweep retrieves evidence-linked answers but does not generate derived knowledge pages.", - ), - "operator_debugging_ux" => Some( - "The full live adapter sweep keeps operator trace/viewer diagnostics in a focused operator-debug slice.", - ), - "capture_integration" => Some( - "The live adapter sweep does not exercise capture integrations or write-policy redaction boundaries.", - ), - "production_ops" => Some( - "The live adapter sweep does not run backup/restore, private corpus, provider credential, or backfill operations.", - ), - _ => Some("The live adapter sweep has no encoded runtime path for this suite."), - } + declared::materialized_declared_status_job(adapter_id, loaded, status, reason) } diff --git a/apps/elf-eval/src/bin/real_world_live_adapter/materialization/declared.rs b/apps/elf-eval/src/bin/real_world_live_adapter/materialization/declared.rs new file mode 100644 index 00000000..91e9c198 --- /dev/null +++ b/apps/elf-eval/src/bin/real_world_live_adapter/materialization/declared.rs @@ -0,0 +1,160 @@ +use crate::{ + AdapterResponseOutput, AnswerOutput, CostOutput, LoadedJob, MaterializationStatus, + MaterializedJob, MaterializedJobEvidence, TraceExplainabilityOutput, TraceStageOutput, + materialization::support, +}; + +pub(super) fn declared_encoding_job( + adapter_id: &str, + loaded: &LoadedJob, +) -> Option { + if support::is_operator_debug_live_adapter(adapter_id, loaded.job.suite.as_str()) { + return None; + } + if support::is_elf_consolidation_live_adapter(adapter_id, loaded.job.suite.as_str()) { + return None; + } + if support::is_elf_knowledge_live_adapter(adapter_id, loaded.job.suite.as_str()) { + return None; + } + if support::is_elf_capture_live_adapter(adapter_id, loaded.job.suite.as_str()) { + return None; + } + + let status = loaded.job.encoding.status?; + let reason = loaded.job.encoding.reason.clone().unwrap_or_else(|| { + format!("Fixture declares {} for this live adapter job.", status.as_str()) + }); + + Some(materialized_declared_status_job( + adapter_id, + loaded, + status.materialization_status(), + reason, + )) +} + +pub(super) fn not_encoded_job(adapter_id: &str, loaded: &LoadedJob) -> Option { + if support::is_operator_debug_live_adapter(adapter_id, loaded.job.suite.as_str()) { + return None; + } + if support::is_elf_consolidation_live_adapter(adapter_id, loaded.job.suite.as_str()) { + return None; + } + if support::is_elf_knowledge_live_adapter(adapter_id, loaded.job.suite.as_str()) { + return None; + } + if support::is_elf_capture_live_adapter(adapter_id, loaded.job.suite.as_str()) { + return None; + } + if support::is_elf_dreaming_readback_live_adapter(adapter_id, loaded.job.suite.as_str()) { + return None; + } + + not_encoded_reason(loaded.job.suite.as_str()).map(|reason| { + materialized_declared_status_job( + adapter_id, + loaded, + MaterializationStatus::NotEncoded, + reason.to_string(), + ) + }) +} + +pub(super) fn materialized_declared_status_job( + adapter_id: &str, + loaded: &LoadedJob, + status: MaterializationStatus, + reason: String, +) -> MaterializedJob { + let failure = match status { + MaterializationStatus::Pass | MaterializationStatus::WrongResult => None, + MaterializationStatus::Blocked + | MaterializationStatus::Incomplete + | MaterializationStatus::NotEncoded => Some(reason.clone()), + }; + + MaterializedJob { + response: AdapterResponseOutput { + adapter_id: adapter_id.to_string(), + answer: AnswerOutput { + content: String::new(), + evidence_ids: Vec::new(), + claims: Vec::new(), + pages: Vec::new(), + memory_summaries: Vec::new(), + proactive_briefs: Vec::new(), + scheduled_tasks: Vec::new(), + latency_ms: 0.0, + cost: CostOutput { + currency: "USD".to_string(), + amount: 0.0, + input_tokens: 0, + output_tokens: 0, + }, + trace_explainability: TraceExplainabilityOutput { + trace_id: None, + failure_stage: Some("live_adapter.suite_support".to_string()), + failure_reason: failure.clone(), + stages: vec![TraceStageOutput { + stage_name: "live_adapter.suite_support".to_string(), + kept_evidence: Vec::new(), + dropped_evidence: Vec::new(), + demoted_evidence: Vec::new(), + distractor_evidence: Vec::new(), + notes: reason.clone(), + }], + }, + }, + consolidation: None, + }, + evidence: MaterializedJobEvidence { + job_id: loaded.job.job_id.clone(), + suite: loaded.job.suite.clone(), + title: loaded.job.title.clone(), + status, + query: loaded.job.prompt.content.clone(), + evidence_ids: Vec::new(), + returned_count: 0, + indexing_latency_ms: None, + latency_ms: 0.0, + trace_id: None, + failure, + source_mappings: Vec::new(), + operator_debug: None, + capture: None, + consolidation: None, + knowledge: None, + temporal_reconciliation: None, + dreaming_readback: None, + }, + operator_debug: None, + } +} + +fn not_encoded_reason(suite: &str) -> Option<&'static str> { + match suite { + "trust_source_of_truth" + | "work_resume" + | "project_decisions" + | "retrieval" + | "memory_evolution" + | "personalization" => None, + "consolidation" => Some( + "The live adapter sweep retrieves evidence-linked answers but does not generate or review consolidation proposals.", + ), + "knowledge_compilation" => Some( + "The live adapter sweep retrieves evidence-linked answers but does not generate derived knowledge pages.", + ), + "operator_debugging_ux" => Some( + "The full live adapter sweep keeps operator trace/viewer diagnostics in a focused operator-debug slice.", + ), + "capture_integration" => Some( + "The live adapter sweep does not exercise capture integrations or write-policy redaction boundaries.", + ), + "production_ops" => Some( + "The live adapter sweep does not run backup/restore, private corpus, provider credential, or backfill operations.", + ), + _ => Some("The live adapter sweep has no encoded runtime path for this suite."), + } +} diff --git a/apps/elf-eval/src/bin/real_world_live_adapter/materialization/job.rs b/apps/elf-eval/src/bin/real_world_live_adapter/materialization/job.rs new file mode 100644 index 00000000..3b6cacb8 --- /dev/null +++ b/apps/elf-eval/src/bin/real_world_live_adapter/materialization/job.rs @@ -0,0 +1,114 @@ +use crate::{ + AdapterResponseOutput, AnswerOutput, CostOutput, LoadedJob, MaterializationStatus, + MaterializedJob, MaterializedJobEvidence, MaterializedJobInput, TraceExplainabilityOutput, + TraceStageOutput, +}; + +pub(super) fn materialized_job( + loaded: &LoadedJob, + adapter_id: &str, + input: MaterializedJobInput, +) -> MaterializedJob { + let capture_failure = input.capture_failure.clone(); + let required_evidence_satisfied = capture_failure.is_none() + && crate::required_evidence_satisfied(loaded, &input.evidence_ids); + let status = materialization_status(input.failure.is_some(), required_evidence_satisfied); + let failure_stage = failure_stage(input.failure.is_some(), capture_failure.is_some()); + let failure_reason = input.failure.clone().or(capture_failure); + let stage_notes = stage_notes(&failure_reason, required_evidence_satisfied); + let trace_stages = input.trace_stages.unwrap_or_else(|| { + vec![TraceStageOutput { + stage_name: failure_stage + .clone() + .unwrap_or_else(|| "live_adapter.retrieve".to_string()), + kept_evidence: input.evidence_ids.clone(), + dropped_evidence: Vec::new(), + demoted_evidence: Vec::new(), + distractor_evidence: Vec::new(), + notes: stage_notes, + }] + }); + + MaterializedJob { + response: AdapterResponseOutput { + adapter_id: adapter_id.to_string(), + answer: AnswerOutput { + content: input.content, + evidence_ids: input.evidence_ids.clone(), + claims: crate::answer_claims(loaded, &input.evidence_ids), + pages: input.pages, + memory_summaries: input.memory_summaries, + proactive_briefs: input.proactive_briefs, + scheduled_tasks: input.scheduled_tasks, + latency_ms: input.latency_ms, + cost: CostOutput { + currency: "USD".to_string(), + amount: 0.0, + input_tokens: 0, + output_tokens: 0, + }, + trace_explainability: TraceExplainabilityOutput { + trace_id: input.trace_id.map(|id| id.to_string()), + failure_stage: failure_stage.clone(), + failure_reason: failure_reason.clone(), + stages: trace_stages, + }, + }, + consolidation: input.consolidation_response, + }, + operator_debug: input.operator_debug, + evidence: MaterializedJobEvidence { + job_id: loaded.job.job_id.clone(), + suite: loaded.job.suite.clone(), + title: loaded.job.title.clone(), + status, + query: loaded.job.prompt.content.clone(), + evidence_ids: input.evidence_ids, + returned_count: input.returned_count, + indexing_latency_ms: input.indexing_latency_ms, + latency_ms: input.latency_ms, + trace_id: input.trace_id, + failure: failure_reason, + source_mappings: input.source_mappings, + operator_debug: input.operator_debug_evidence, + capture: input.capture, + consolidation: input.consolidation, + knowledge: input.knowledge, + temporal_reconciliation: input.temporal_reconciliation, + dreaming_readback: input.dreaming_readback, + }, + } +} + +fn materialization_status( + has_retrieval_failure: bool, + required_evidence_satisfied: bool, +) -> MaterializationStatus { + if has_retrieval_failure { + MaterializationStatus::Incomplete + } else if !required_evidence_satisfied { + MaterializationStatus::WrongResult + } else { + MaterializationStatus::Pass + } +} + +fn failure_stage(has_retrieval_failure: bool, has_capture_failure: bool) -> Option { + if has_retrieval_failure { + Some("live_adapter.retrieve".to_string()) + } else if has_capture_failure { + Some("live_adapter.capture_policy".to_string()) + } else { + None + } +} + +fn stage_notes(failure_reason: &Option, required_evidence_satisfied: bool) -> String { + if let Some(reason) = failure_reason { + reason.clone() + } else if !required_evidence_satisfied { + "Adapter did not return all required mapped evidence for this job.".to_string() + } else { + "Adapter returned mapped evidence through its live retrieval path.".to_string() + } +} diff --git a/apps/elf-eval/src/bin/real_world_live_adapter/materialization/support.rs b/apps/elf-eval/src/bin/real_world_live_adapter/materialization/support.rs new file mode 100644 index 00000000..7a9bd23d --- /dev/null +++ b/apps/elf-eval/src/bin/real_world_live_adapter/materialization/support.rs @@ -0,0 +1,43 @@ +pub(in crate::materialization) fn is_elf_dreaming_readback_live_adapter( + adapter_id: &str, + suite: &str, +) -> bool { + matches!(suite, "memory_summary" | "proactive_brief" | "scheduled_memory") + && matches!(adapter_id, "elf_service_native_dreaming" | "elf_live_real_world") +} + +pub(in crate::materialization) fn is_operator_debug_live_adapter( + adapter_id: &str, + suite: &str, +) -> bool { + suite == "operator_debugging_ux" + && matches!( + adapter_id, + "elf_live_real_world" + | "qmd_live_real_world" + | "elf_operator_debug_live" + | "qmd_operator_debug_live" + ) +} + +pub(in crate::materialization) fn is_elf_consolidation_live_adapter( + adapter_id: &str, + suite: &str, +) -> bool { + suite == "consolidation" && adapter_id == "elf_live_real_world" +} + +pub(in crate::materialization) fn is_elf_knowledge_live_adapter( + adapter_id: &str, + suite: &str, +) -> bool { + suite == "knowledge_compilation" && adapter_id == "elf_live_real_world" +} + +pub(in crate::materialization) fn is_elf_capture_live_adapter( + adapter_id: &str, + suite: &str, +) -> bool { + suite == "capture_integration" + && matches!(adapter_id, "elf_live_real_world" | "elf_capture_write_policy_live") +} From 126c1a6d135640da94fa06e494b8075bc0a0c6b7 Mon Sep 17 00:00:00 2001 From: Yvette Carlisle Date: Tue, 30 Jun 2026 07:32:58 -0400 Subject: [PATCH 5/6] {"schema":"decodex/commit/1","summary":"Split live runtime support modules","authority":"manual"} --- .../runtime_support.rs | 284 +----------------- .../runtime_support/chunks.rs | 68 +++++ .../runtime_support/commands.rs | 82 +++++ .../runtime_support/config.rs | 35 +++ .../runtime_support/embedding.rs | 48 +++ .../runtime_support/ids.rs | 42 +++ 6 files changed, 287 insertions(+), 272 deletions(-) create mode 100644 apps/elf-eval/src/bin/real_world_live_adapter/runtime_support/chunks.rs create mode 100644 apps/elf-eval/src/bin/real_world_live_adapter/runtime_support/commands.rs create mode 100644 apps/elf-eval/src/bin/real_world_live_adapter/runtime_support/config.rs create mode 100644 apps/elf-eval/src/bin/real_world_live_adapter/runtime_support/embedding.rs create mode 100644 apps/elf-eval/src/bin/real_world_live_adapter/runtime_support/ids.rs diff --git a/apps/elf-eval/src/bin/real_world_live_adapter/runtime_support.rs b/apps/elf-eval/src/bin/real_world_live_adapter/runtime_support.rs index 524b4f47..8f35f049 100644 --- a/apps/elf-eval/src/bin/real_world_live_adapter/runtime_support.rs +++ b/apps/elf-eval/src/bin/real_world_live_adapter/runtime_support.rs @@ -1,273 +1,13 @@ -use std::{ - collections::BTreeSet, - fs::{self, OpenOptions}, - io::Write as _, - path::Path, - process::{Command, Stdio}, - sync::Arc, +mod chunks; +mod commands; +mod config; +mod embedding; +mod ids; + +pub(super) use self::{ + chunks::note_text_chunks, + commands::{run_logged_command, run_logged_shell, run_qmd_command}, + config::{deterministic_providers, runtime_config}, + embedding::{embed_text, normalize_ascii_alnum_lowercase, terms}, + ids::{project_id_for_job, push_unique, short_hash, slug}, }; - -use blake3::Hasher; -use color_eyre::{Result, eyre}; - -use crate::{ - BaselineRuntime, DeterministicEmbedding, ELF_NOTE_CHUNK_CHARS, NoopExtractor, QmdArgs, - TokenOverlapRerank, -}; -use elf_config::Config; -use elf_service::Providers; - -pub(super) fn runtime_config(runtime: &BaselineRuntime) -> Result { - let mut cfg = elf_config::load(&runtime.config_path)?; - - cfg.storage.postgres.dsn = runtime.dsn.clone(); - cfg.storage.postgres.pool_max_conns = 12; - cfg.storage.qdrant.url = runtime.qdrant_url.clone(); - cfg.storage.qdrant.collection = runtime.collection.clone(); - cfg.storage.qdrant.docs_collection = runtime.docs_collection.clone(); - cfg.providers.embedding.provider_id = "local".to_string(); - cfg.providers.embedding.model = "local-hash".to_string(); - cfg.providers.embedding.dimensions = cfg.storage.qdrant.vector_dim; - cfg.providers.rerank.provider_id = "local".to_string(); - cfg.providers.rerank.model = "local-token-overlap".to_string(); - cfg.providers.llm_extractor.provider_id = "disabled".to_string(); - cfg.providers.llm_extractor.model = "disabled".to_string(); - cfg.context = None; - - Ok(cfg) -} - -pub(super) fn deterministic_providers(vector_dim: u32) -> Providers { - Providers::new( - Arc::new(DeterministicEmbedding { vector_dim }), - Arc::new(TokenOverlapRerank), - Arc::new(NoopExtractor), - ) -} - -pub(super) fn run_qmd_command( - label: &str, - args: &QmdArgs, - home_dir: &Path, - qmd_args: &[&str], - log_path: &Path, -) -> Result { - let mut command = Command::new("npx"); - - command - .current_dir(&args.qmd_dir) - .env("HOME", home_dir) - .env("XDG_CACHE_HOME", "/root/.cache") - .env("QMD_FORCE_CPU", "1") - .arg("tsx") - .arg("src/cli/qmd.ts"); - - for arg in qmd_args { - command.arg(arg); - } - - run_logged_command(label, &mut command, log_path) -} - -pub(super) fn run_logged_shell( - label: &str, - cwd: &Path, - script: &str, - log_path: &Path, -) -> Result<()> { - let mut command = Command::new("bash"); - - command.current_dir(cwd).arg("-lc").arg(script); - - run_logged_command(label, &mut command, log_path).map(|_| ()) -} - -pub(super) fn run_logged_command( - label: &str, - command: &mut Command, - log_path: &Path, -) -> Result { - if let Some(parent) = log_path.parent() { - fs::create_dir_all(parent)?; - } - - let command_debug = format!("{command:?}"); - let output = command.stdout(Stdio::piped()).stderr(Stdio::piped()).output()?; - let stdout = String::from_utf8_lossy(&output.stdout).to_string(); - let stderr = String::from_utf8_lossy(&output.stderr).to_string(); - let mut log = OpenOptions::new().create(true).append(true).open(log_path)?; - - writeln!(log, "## {label}")?; - writeln!(log, "$ {command_debug}")?; - - if !stdout.trim().is_empty() { - writeln!(log, "\nstdout:\n{stdout}")?; - } - if !stderr.trim().is_empty() { - writeln!(log, "\nstderr:\n{stderr}")?; - } - if !output.status.success() { - return Err(eyre::eyre!( - "{label} failed with status {}. Inspect {}.", - output.status, - log_path.display() - )); - } - - Ok(stdout) -} - -pub(super) fn project_id_for_job(job_id: &str) -> String { - format!("job-{}", slug(job_id)) -} - -pub(super) fn slug(value: &str) -> String { - let mut out = String::new(); - let mut last_dash = false; - - for ch in value.chars() { - if ch.is_ascii_alphanumeric() { - out.push(ch.to_ascii_lowercase()); - - last_dash = false; - } else if !last_dash && !out.is_empty() { - out.push('-'); - - last_dash = true; - } - } - - while out.ends_with('-') { - out.pop(); - } - - if out.is_empty() { "item".to_string() } else { out } -} - -pub(super) fn short_hash(value: &str) -> String { - let mut hasher = Hasher::new(); - - hasher.update(value.as_bytes()); - - hasher.finalize().to_hex().chars().take(12).collect() -} - -pub(super) fn push_unique(values: &mut Vec, value: String) { - if !values.iter().any(|existing| existing == &value) { - values.push(value); - } -} - -pub(super) fn embed_text(text: &str, vector_dim: u32) -> Vec { - let dim = vector_dim as usize; - let mut vector = vec![0.0_f32; dim]; - - if dim == 0 { - return vector; - } - - let normalized = normalize_ascii_alnum_lowercase(text); - - for term in normalized.split_whitespace() { - if term.len() < 2 { - continue; - } - - let hash = blake3::hash(term.as_bytes()); - let bytes = hash.as_bytes(); - let idx = (u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]) as usize) % dim; - - vector[idx] += 1.0; - } - - let norm = vector.iter().map(|value| value * value).sum::().sqrt(); - - if norm > 0.0 { - for value in &mut vector { - *value /= norm; - } - } - - vector -} - -pub(super) fn terms(text: &str) -> BTreeSet { - normalize_ascii_alnum_lowercase(text) - .split_whitespace() - .filter(|term| term.len() >= 2) - .map(ToString::to_string) - .collect() -} - -pub(super) fn normalize_ascii_alnum_lowercase(text: &str) -> String { - text.chars() - .map(|ch| if ch.is_ascii_alphanumeric() { ch.to_ascii_lowercase() } else { ' ' }) - .collect() -} - -pub(super) fn note_text_chunks(text: &str) -> Vec { - let normalized = text.split_whitespace().collect::>().join(" "); - - if normalized.chars().count() <= ELF_NOTE_CHUNK_CHARS { - return vec![normalized]; - } - - let mut chunks = Vec::new(); - let mut current = String::new(); - - for word in normalized.split_whitespace() { - if word.chars().count() > ELF_NOTE_CHUNK_CHARS { - if !current.is_empty() { - chunks.push(current); - - current = String::new(); - } - - chunks.extend(split_long_token(word)); - - continue; - } - - let separator = usize::from(!current.is_empty()); - - if current.chars().count() + separator + word.chars().count() > ELF_NOTE_CHUNK_CHARS - && !current.is_empty() - { - chunks.push(current); - - current = String::new(); - } - if !current.is_empty() { - current.push(' '); - } - - current.push_str(word); - } - - if !current.is_empty() { - chunks.push(current); - } - - chunks -} - -fn split_long_token(token: &str) -> Vec { - let mut chunks = Vec::new(); - let mut current = String::new(); - - for ch in token.chars() { - if current.chars().count() >= ELF_NOTE_CHUNK_CHARS { - chunks.push(current); - - current = String::new(); - } - - current.push(ch); - } - - if !current.is_empty() { - chunks.push(current); - } - - chunks -} diff --git a/apps/elf-eval/src/bin/real_world_live_adapter/runtime_support/chunks.rs b/apps/elf-eval/src/bin/real_world_live_adapter/runtime_support/chunks.rs new file mode 100644 index 00000000..880843d7 --- /dev/null +++ b/apps/elf-eval/src/bin/real_world_live_adapter/runtime_support/chunks.rs @@ -0,0 +1,68 @@ +use crate::ELF_NOTE_CHUNK_CHARS; + +pub(crate) fn note_text_chunks(text: &str) -> Vec { + let normalized = text.split_whitespace().collect::>().join(" "); + + if normalized.chars().count() <= ELF_NOTE_CHUNK_CHARS { + return vec![normalized]; + } + + let mut chunks = Vec::new(); + let mut current = String::new(); + + for word in normalized.split_whitespace() { + if word.chars().count() > ELF_NOTE_CHUNK_CHARS { + if !current.is_empty() { + chunks.push(current); + + current = String::new(); + } + + chunks.extend(split_long_token(word)); + + continue; + } + + let separator = usize::from(!current.is_empty()); + + if current.chars().count() + separator + word.chars().count() > ELF_NOTE_CHUNK_CHARS + && !current.is_empty() + { + chunks.push(current); + + current = String::new(); + } + if !current.is_empty() { + current.push(' '); + } + + current.push_str(word); + } + + if !current.is_empty() { + chunks.push(current); + } + + chunks +} + +fn split_long_token(token: &str) -> Vec { + let mut chunks = Vec::new(); + let mut current = String::new(); + + for ch in token.chars() { + if current.chars().count() >= ELF_NOTE_CHUNK_CHARS { + chunks.push(current); + + current = String::new(); + } + + current.push(ch); + } + + if !current.is_empty() { + chunks.push(current); + } + + chunks +} diff --git a/apps/elf-eval/src/bin/real_world_live_adapter/runtime_support/commands.rs b/apps/elf-eval/src/bin/real_world_live_adapter/runtime_support/commands.rs new file mode 100644 index 00000000..c04109dc --- /dev/null +++ b/apps/elf-eval/src/bin/real_world_live_adapter/runtime_support/commands.rs @@ -0,0 +1,82 @@ +use std::{ + fs::{self, OpenOptions}, + io::Write as _, + path::Path, + process::{Command, Stdio}, +}; + +use color_eyre::{Result, eyre}; + +use crate::QmdArgs; + +pub(crate) fn run_qmd_command( + label: &str, + args: &QmdArgs, + home_dir: &Path, + qmd_args: &[&str], + log_path: &Path, +) -> Result { + let mut command = Command::new("npx"); + + command + .current_dir(&args.qmd_dir) + .env("HOME", home_dir) + .env("XDG_CACHE_HOME", "/root/.cache") + .env("QMD_FORCE_CPU", "1") + .arg("tsx") + .arg("src/cli/qmd.ts"); + + for arg in qmd_args { + command.arg(arg); + } + + run_logged_command(label, &mut command, log_path) +} + +pub(crate) fn run_logged_shell( + label: &str, + cwd: &Path, + script: &str, + log_path: &Path, +) -> Result<()> { + let mut command = Command::new("bash"); + + command.current_dir(cwd).arg("-lc").arg(script); + + run_logged_command(label, &mut command, log_path).map(|_| ()) +} + +pub(crate) fn run_logged_command( + label: &str, + command: &mut Command, + log_path: &Path, +) -> Result { + if let Some(parent) = log_path.parent() { + fs::create_dir_all(parent)?; + } + + let command_debug = format!("{command:?}"); + let output = command.stdout(Stdio::piped()).stderr(Stdio::piped()).output()?; + let stdout = String::from_utf8_lossy(&output.stdout).to_string(); + let stderr = String::from_utf8_lossy(&output.stderr).to_string(); + let mut log = OpenOptions::new().create(true).append(true).open(log_path)?; + + writeln!(log, "## {label}")?; + writeln!(log, "$ {command_debug}")?; + + if !stdout.trim().is_empty() { + writeln!(log, "\nstdout:\n{stdout}")?; + } + if !stderr.trim().is_empty() { + writeln!(log, "\nstderr:\n{stderr}")?; + } + if !output.status.success() { + return Err(eyre::eyre!( + "{label} failed with status {}. Inspect {}.", + output.status, + log_path.display() + )); + } + + Ok(stdout) +} diff --git a/apps/elf-eval/src/bin/real_world_live_adapter/runtime_support/config.rs b/apps/elf-eval/src/bin/real_world_live_adapter/runtime_support/config.rs new file mode 100644 index 00000000..9d3cf7ea --- /dev/null +++ b/apps/elf-eval/src/bin/real_world_live_adapter/runtime_support/config.rs @@ -0,0 +1,35 @@ +use std::sync::Arc; + +use color_eyre::Result; + +use crate::{BaselineRuntime, DeterministicEmbedding, NoopExtractor, TokenOverlapRerank}; +use elf_config::Config; +use elf_service::Providers; + +pub(crate) fn runtime_config(runtime: &BaselineRuntime) -> Result { + let mut cfg = elf_config::load(&runtime.config_path)?; + + cfg.storage.postgres.dsn = runtime.dsn.clone(); + cfg.storage.postgres.pool_max_conns = 12; + cfg.storage.qdrant.url = runtime.qdrant_url.clone(); + cfg.storage.qdrant.collection = runtime.collection.clone(); + cfg.storage.qdrant.docs_collection = runtime.docs_collection.clone(); + cfg.providers.embedding.provider_id = "local".to_string(); + cfg.providers.embedding.model = "local-hash".to_string(); + cfg.providers.embedding.dimensions = cfg.storage.qdrant.vector_dim; + cfg.providers.rerank.provider_id = "local".to_string(); + cfg.providers.rerank.model = "local-token-overlap".to_string(); + cfg.providers.llm_extractor.provider_id = "disabled".to_string(); + cfg.providers.llm_extractor.model = "disabled".to_string(); + cfg.context = None; + + Ok(cfg) +} + +pub(crate) fn deterministic_providers(vector_dim: u32) -> Providers { + Providers::new( + Arc::new(DeterministicEmbedding { vector_dim }), + Arc::new(TokenOverlapRerank), + Arc::new(NoopExtractor), + ) +} diff --git a/apps/elf-eval/src/bin/real_world_live_adapter/runtime_support/embedding.rs b/apps/elf-eval/src/bin/real_world_live_adapter/runtime_support/embedding.rs new file mode 100644 index 00000000..ab0e0f74 --- /dev/null +++ b/apps/elf-eval/src/bin/real_world_live_adapter/runtime_support/embedding.rs @@ -0,0 +1,48 @@ +use std::collections::BTreeSet; + +pub(crate) fn embed_text(text: &str, vector_dim: u32) -> Vec { + let dim = vector_dim as usize; + let mut vector = vec![0.0_f32; dim]; + + if dim == 0 { + return vector; + } + + let normalized = normalize_ascii_alnum_lowercase(text); + + for term in normalized.split_whitespace() { + if term.len() < 2 { + continue; + } + + let hash = blake3::hash(term.as_bytes()); + let bytes = hash.as_bytes(); + let idx = (u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]) as usize) % dim; + + vector[idx] += 1.0; + } + + let norm = vector.iter().map(|value| value * value).sum::().sqrt(); + + if norm > 0.0 { + for value in &mut vector { + *value /= norm; + } + } + + vector +} + +pub(crate) fn terms(text: &str) -> BTreeSet { + normalize_ascii_alnum_lowercase(text) + .split_whitespace() + .filter(|term| term.len() >= 2) + .map(ToString::to_string) + .collect() +} + +pub(crate) fn normalize_ascii_alnum_lowercase(text: &str) -> String { + text.chars() + .map(|ch| if ch.is_ascii_alphanumeric() { ch.to_ascii_lowercase() } else { ' ' }) + .collect() +} diff --git a/apps/elf-eval/src/bin/real_world_live_adapter/runtime_support/ids.rs b/apps/elf-eval/src/bin/real_world_live_adapter/runtime_support/ids.rs new file mode 100644 index 00000000..eed22a8c --- /dev/null +++ b/apps/elf-eval/src/bin/real_world_live_adapter/runtime_support/ids.rs @@ -0,0 +1,42 @@ +use blake3::Hasher; + +pub(crate) fn project_id_for_job(job_id: &str) -> String { + format!("job-{}", slug(job_id)) +} + +pub(crate) fn slug(value: &str) -> String { + let mut out = String::new(); + let mut last_dash = false; + + for ch in value.chars() { + if ch.is_ascii_alphanumeric() { + out.push(ch.to_ascii_lowercase()); + + last_dash = false; + } else if !last_dash && !out.is_empty() { + out.push('-'); + + last_dash = true; + } + } + + while out.ends_with('-') { + out.pop(); + } + + if out.is_empty() { "item".to_string() } else { out } +} + +pub(crate) fn short_hash(value: &str) -> String { + let mut hasher = Hasher::new(); + + hasher.update(value.as_bytes()); + + hasher.finalize().to_hex().chars().take(12).collect() +} + +pub(crate) fn push_unique(values: &mut Vec, value: String) { + if !values.iter().any(|existing| existing == &value) { + values.push(value); + } +} From 0d0171cd54609cf65f6b1d2808a2fdaab8519c45 Mon Sep 17 00:00:00 2001 From: Yvette Carlisle Date: Tue, 30 Jun 2026 07:42:12 -0400 Subject: [PATCH 6/6] {"schema":"decodex/commit/1","summary":"Split live capture adapter modules","authority":"manual"} --- .../bin/real_world_live_adapter/capture.rs | 282 +----------------- .../capture/fixture.rs | 33 ++ .../real_world_live_adapter/capture/policy.rs | 48 +++ .../capture/runtime.rs | 73 +++++ .../capture/validation.rs | 193 ++++++++++++ 5 files changed, 358 insertions(+), 271 deletions(-) create mode 100644 apps/elf-eval/src/bin/real_world_live_adapter/capture/fixture.rs create mode 100644 apps/elf-eval/src/bin/real_world_live_adapter/capture/policy.rs create mode 100644 apps/elf-eval/src/bin/real_world_live_adapter/capture/runtime.rs create mode 100644 apps/elf-eval/src/bin/real_world_live_adapter/capture/validation.rs diff --git a/apps/elf-eval/src/bin/real_world_live_adapter/capture.rs b/apps/elf-eval/src/bin/real_world_live_adapter/capture.rs index 5e34fb20..1d8a159c 100644 --- a/apps/elf-eval/src/bin/real_world_live_adapter/capture.rs +++ b/apps/elf-eval/src/bin/real_world_live_adapter/capture.rs @@ -1,272 +1,12 @@ -use crate::{ - CaptureMaterializationEvidence, CaptureRuntimeEvidence, CaptureRuntimeEvidenceItem, - CaptureRuntimeSourceRefEvidence, CorpusText, LiveCaptureAction, LoadedJob, Result, SearchItem, - eyre, serde_json, +mod fixture; +mod policy; +mod runtime; +mod validation; + +#[cfg(test)] pub(super) use self::runtime::capture_runtime_evidence_from_source_refs; +pub(super) use self::{ + fixture::{apply_capture_runtime_source_refs, capture_for_job}, + policy::{capture_action_str, elf_stored_corpus_texts, write_policy_from_value}, + runtime::{capture_runtime_evidence_from_search_items, capture_with_runtime_source_refs}, + validation::validate_capture_runtime_evidence, }; -use elf_domain::writegate::{self, WritePolicy}; - -pub(super) fn capture_runtime_evidence_from_search_items( - items: &[SearchItem], -) -> CaptureRuntimeEvidence { - let source_refs = items.iter().map(|item| &item.source_ref); - - capture_runtime_evidence_from_source_refs(source_refs) -} - -pub(super) fn capture_runtime_evidence_from_source_refs<'a>( - source_refs: impl IntoIterator, -) -> CaptureRuntimeEvidence { - let mut runtime = CaptureRuntimeEvidence::default(); - - for source_ref in source_refs { - let Some(evidence_id) = source_ref.get("evidence_id").and_then(serde_json::Value::as_str) - else { - continue; - }; - - if runtime.items.iter().any(|item| item.evidence_id == evidence_id) { - continue; - } - - runtime.items.push(CaptureRuntimeEvidenceItem { - evidence_id: evidence_id.to_string(), - source_id: source_ref - .get("source_id") - .and_then(serde_json::Value::as_str) - .map(ToString::to_string), - evidence_binding: source_ref - .get("evidence_binding") - .and_then(serde_json::Value::as_str) - .map(ToString::to_string), - write_policy_applied: source_ref - .get("write_policy_applied") - .and_then(serde_json::Value::as_bool) - .unwrap_or(false), - capture_action: source_ref - .get("capture_action") - .and_then(serde_json::Value::as_str) - .map(ToString::to_string), - source_ref: source_ref.clone(), - }); - } - - runtime -} - -pub(super) fn capture_with_runtime_source_refs( - mut capture: CaptureMaterializationEvidence, - runtime: &CaptureRuntimeEvidence, -) -> CaptureMaterializationEvidence { - capture.source_ids.clear(); - capture.runtime_source_refs.clear(); - - for item in &runtime.items { - if let Some(source_id) = item.source_id.as_deref() { - crate::push_unique(&mut capture.source_ids, source_id.to_string()); - } - - capture.runtime_source_refs.push(CaptureRuntimeSourceRefEvidence { - evidence_id: item.evidence_id.clone(), - source_ref: item.source_ref.clone(), - }); - } - - capture -} - -pub(super) fn validate_capture_runtime_evidence( - suite: &str, - corpus: &[CorpusText], - capture: &CaptureMaterializationEvidence, - runtime: &CaptureRuntimeEvidence, -) -> Option { - if suite != "capture_integration" { - return None; - } - - let mut failures = Vec::new(); - let mut expected_redactions = 0_usize; - let mut expected_exclusions = 0_usize; - - for item in corpus { - match item.capture.action { - LiveCaptureAction::Exclude => { - if runtime.item_for(item.evidence_id.as_str()).is_some() { - failures.push(format!( - "excluded evidence {} was returned by live search", - item.evidence_id - )); - } - if capture.stored_evidence_ids.iter().any(|id| id == &item.evidence_id) { - failures.push(format!( - "excluded evidence {} was stored by live ingestion", - item.evidence_id - )); - } - if !capture.excluded_evidence_ids.iter().any(|id| id == &item.evidence_id) { - failures.push(format!( - "excluded evidence {} was not recorded as excluded", - item.evidence_id - )); - } - }, - LiveCaptureAction::Store => { - let runtime_item = runtime.item_for(item.evidence_id.as_str()); - - if let Some(expected_source_id) = item.capture.source_id.as_deref() { - match runtime_item.and_then(|observed| observed.source_id.as_deref()) { - Some(observed) if observed == expected_source_id => {}, - Some(observed) => failures.push(format!( - "evidence {} returned source_id {observed}, expected {expected_source_id}", - item.evidence_id - )), - None => failures.push(format!( - "evidence {} did not return expected source_id {expected_source_id}", - item.evidence_id - )), - } - } - if let Some(expected_binding) = item.capture.evidence_binding.as_deref() { - match runtime_item.and_then(|observed| observed.evidence_binding.as_deref()) { - Some(observed) if observed == expected_binding => {}, - Some(observed) => failures.push(format!( - "evidence {} returned evidence_binding {observed}, expected {expected_binding}", - item.evidence_id - )), - None => failures.push(format!( - "evidence {} did not return expected evidence_binding {expected_binding}", - item.evidence_id - )), - } - } - if let Some(policy_value) = &item.capture.write_policy { - match write_policy_from_value(policy_value, item.evidence_id.as_str()) { - Ok(policy) => { - expected_exclusions += policy.exclusions.len(); - expected_redactions += policy.redactions.len(); - }, - Err(err) => failures.push(err.to_string()), - } - - if !runtime_item.is_some_and(|observed| observed.write_policy_applied) { - failures.push(format!( - "evidence {} did not return write_policy_applied=true", - item.evidence_id - )); - } - } - if let Some(observed) = - runtime_item.and_then(|observed| observed.capture_action.as_deref()) - && observed != capture_action_str(item.capture.action) - { - failures.push(format!( - "evidence {} returned capture_action {observed}, expected {}", - item.evidence_id, - capture_action_str(item.capture.action) - )); - } - }, - } - } - - if capture.write_policy_exclusion_count < expected_exclusions { - failures.push(format!( - "write-policy exclusion count {} was below expected {expected_exclusions}", - capture.write_policy_exclusion_count - )); - } - if capture.write_policy_redaction_count < expected_redactions { - failures.push(format!( - "write-policy redaction count {} was below expected {expected_redactions}", - capture.write_policy_redaction_count - )); - } - if expected_exclusions + expected_redactions > 0 && capture.write_policy_audit_count == 0 { - failures - .push("write-policy audit count was zero despite expected policy effects".to_string()); - } - if failures.is_empty() { - None - } else { - Some(format!("Capture runtime validation failed: {}", failures.join("; "))) - } -} - -pub(super) fn elf_stored_corpus_texts(corpus: &[CorpusText]) -> Result> { - let mut stored = Vec::new(); - - for item in corpus { - if item.capture.action == LiveCaptureAction::Exclude { - continue; - } - - stored.push(CorpusText { - evidence_id: item.evidence_id.clone(), - text: transformed_capture_text(item)?.trim().to_string(), - capture: item.capture.clone(), - }); - } - - Ok(stored) -} - -pub(super) fn write_policy_from_value( - value: &serde_json::Value, - evidence_id: &str, -) -> Result { - serde_json::from_value::(value.clone()).map_err(|err| { - eyre::eyre!("Failed to parse write_policy for evidence {evidence_id}: {err}") - }) -} - -pub(super) fn apply_capture_runtime_source_refs( - value: &mut serde_json::Value, - capture: &CaptureMaterializationEvidence, -) { - let Some(items) = value.pointer_mut("/corpus/items").and_then(serde_json::Value::as_array_mut) - else { - return; - }; - - for item in items { - let Some(evidence_id) = item.get("evidence_id").and_then(serde_json::Value::as_str) else { - continue; - }; - let Some(source_ref) = capture - .runtime_source_refs - .iter() - .find(|source_ref| source_ref.evidence_id == evidence_id) - else { - continue; - }; - - item["source_ref"] = source_ref.source_ref.clone(); - } -} - -pub(super) fn capture_for_job( - loaded: &LoadedJob, - capture: CaptureMaterializationEvidence, -) -> Option { - if loaded.job.suite == "capture_integration" { Some(capture) } else { None } -} - -pub(super) fn capture_action_str(action: LiveCaptureAction) -> &'static str { - match action { - LiveCaptureAction::Store => "store", - LiveCaptureAction::Exclude => "exclude", - } -} - -fn transformed_capture_text(item: &CorpusText) -> Result { - let Some(policy_value) = &item.capture.write_policy else { - return Ok(item.text.clone()); - }; - let policy = write_policy_from_value(policy_value, item.evidence_id.as_str())?; - let result = - writegate::apply_write_policy(item.text.as_str(), Some(&policy)).map_err(|err| { - eyre::eyre!("Invalid write_policy for evidence {}: {err:?}", item.evidence_id) - })?; - - Ok(result.transformed) -} diff --git a/apps/elf-eval/src/bin/real_world_live_adapter/capture/fixture.rs b/apps/elf-eval/src/bin/real_world_live_adapter/capture/fixture.rs new file mode 100644 index 00000000..30bc8de1 --- /dev/null +++ b/apps/elf-eval/src/bin/real_world_live_adapter/capture/fixture.rs @@ -0,0 +1,33 @@ +use crate::{CaptureMaterializationEvidence, LoadedJob}; + +pub(crate) fn apply_capture_runtime_source_refs( + value: &mut serde_json::Value, + capture: &CaptureMaterializationEvidence, +) { + let Some(items) = value.pointer_mut("/corpus/items").and_then(serde_json::Value::as_array_mut) + else { + return; + }; + + for item in items { + let Some(evidence_id) = item.get("evidence_id").and_then(serde_json::Value::as_str) else { + continue; + }; + let Some(source_ref) = capture + .runtime_source_refs + .iter() + .find(|source_ref| source_ref.evidence_id == evidence_id) + else { + continue; + }; + + item["source_ref"] = source_ref.source_ref.clone(); + } +} + +pub(crate) fn capture_for_job( + loaded: &LoadedJob, + capture: CaptureMaterializationEvidence, +) -> Option { + if loaded.job.suite == "capture_integration" { Some(capture) } else { None } +} diff --git a/apps/elf-eval/src/bin/real_world_live_adapter/capture/policy.rs b/apps/elf-eval/src/bin/real_world_live_adapter/capture/policy.rs new file mode 100644 index 00000000..96ce409c --- /dev/null +++ b/apps/elf-eval/src/bin/real_world_live_adapter/capture/policy.rs @@ -0,0 +1,48 @@ +use serde_json::Value; + +use crate::{CorpusText, LiveCaptureAction, Result, eyre}; +use elf_domain::writegate::{self, WritePolicy}; + +pub(crate) fn elf_stored_corpus_texts(corpus: &[CorpusText]) -> Result> { + let mut stored = Vec::new(); + + for item in corpus { + if item.capture.action == LiveCaptureAction::Exclude { + continue; + } + + stored.push(CorpusText { + evidence_id: item.evidence_id.clone(), + text: transformed_capture_text(item)?.trim().to_string(), + capture: item.capture.clone(), + }); + } + + Ok(stored) +} + +pub(crate) fn write_policy_from_value(value: &Value, evidence_id: &str) -> Result { + serde_json::from_value::(value.clone()).map_err(|err| { + eyre::eyre!("Failed to parse write_policy for evidence {evidence_id}: {err}") + }) +} + +pub(crate) fn capture_action_str(action: LiveCaptureAction) -> &'static str { + match action { + LiveCaptureAction::Store => "store", + LiveCaptureAction::Exclude => "exclude", + } +} + +fn transformed_capture_text(item: &CorpusText) -> Result { + let Some(policy_value) = &item.capture.write_policy else { + return Ok(item.text.clone()); + }; + let policy = write_policy_from_value(policy_value, item.evidence_id.as_str())?; + let result = + writegate::apply_write_policy(item.text.as_str(), Some(&policy)).map_err(|err| { + eyre::eyre!("Invalid write_policy for evidence {}: {err:?}", item.evidence_id) + })?; + + Ok(result.transformed) +} diff --git a/apps/elf-eval/src/bin/real_world_live_adapter/capture/runtime.rs b/apps/elf-eval/src/bin/real_world_live_adapter/capture/runtime.rs new file mode 100644 index 00000000..c627e001 --- /dev/null +++ b/apps/elf-eval/src/bin/real_world_live_adapter/capture/runtime.rs @@ -0,0 +1,73 @@ +use crate::{ + CaptureMaterializationEvidence, CaptureRuntimeEvidence, CaptureRuntimeEvidenceItem, + CaptureRuntimeSourceRefEvidence, SearchItem, +}; + +pub(crate) fn capture_runtime_evidence_from_search_items( + items: &[SearchItem], +) -> CaptureRuntimeEvidence { + let source_refs = items.iter().map(|item| &item.source_ref); + + capture_runtime_evidence_from_source_refs(source_refs) +} + +pub(crate) fn capture_runtime_evidence_from_source_refs<'a>( + source_refs: impl IntoIterator, +) -> CaptureRuntimeEvidence { + let mut runtime = CaptureRuntimeEvidence::default(); + + for source_ref in source_refs { + let Some(evidence_id) = source_ref.get("evidence_id").and_then(serde_json::Value::as_str) + else { + continue; + }; + + if runtime.items.iter().any(|item| item.evidence_id == evidence_id) { + continue; + } + + runtime.items.push(CaptureRuntimeEvidenceItem { + evidence_id: evidence_id.to_string(), + source_id: source_ref + .get("source_id") + .and_then(serde_json::Value::as_str) + .map(ToString::to_string), + evidence_binding: source_ref + .get("evidence_binding") + .and_then(serde_json::Value::as_str) + .map(ToString::to_string), + write_policy_applied: source_ref + .get("write_policy_applied") + .and_then(serde_json::Value::as_bool) + .unwrap_or(false), + capture_action: source_ref + .get("capture_action") + .and_then(serde_json::Value::as_str) + .map(ToString::to_string), + source_ref: source_ref.clone(), + }); + } + + runtime +} + +pub(crate) fn capture_with_runtime_source_refs( + mut capture: CaptureMaterializationEvidence, + runtime: &CaptureRuntimeEvidence, +) -> CaptureMaterializationEvidence { + capture.source_ids.clear(); + capture.runtime_source_refs.clear(); + + for item in &runtime.items { + if let Some(source_id) = item.source_id.as_deref() { + crate::push_unique(&mut capture.source_ids, source_id.to_string()); + } + + capture.runtime_source_refs.push(CaptureRuntimeSourceRefEvidence { + evidence_id: item.evidence_id.clone(), + source_ref: item.source_ref.clone(), + }); + } + + capture +} diff --git a/apps/elf-eval/src/bin/real_world_live_adapter/capture/validation.rs b/apps/elf-eval/src/bin/real_world_live_adapter/capture/validation.rs new file mode 100644 index 00000000..32f5f989 --- /dev/null +++ b/apps/elf-eval/src/bin/real_world_live_adapter/capture/validation.rs @@ -0,0 +1,193 @@ +use crate::{ + CaptureMaterializationEvidence, CaptureRuntimeEvidence, CaptureRuntimeEvidenceItem, CorpusText, + LiveCaptureAction, capture::policy, +}; + +#[derive(Default)] +struct CaptureRuntimeValidator { + failures: Vec, + expected_redactions: usize, + expected_exclusions: usize, +} +impl CaptureRuntimeValidator { + fn validate_corpus_item( + &mut self, + item: &CorpusText, + capture: &CaptureMaterializationEvidence, + runtime: &CaptureRuntimeEvidence, + ) { + match item.capture.action { + LiveCaptureAction::Exclude => self.validate_excluded_item(item, capture, runtime), + LiveCaptureAction::Store => self.validate_stored_item(item, runtime), + } + } + + fn validate_excluded_item( + &mut self, + item: &CorpusText, + capture: &CaptureMaterializationEvidence, + runtime: &CaptureRuntimeEvidence, + ) { + if runtime.item_for(item.evidence_id.as_str()).is_some() { + self.failures.push(format!( + "excluded evidence {} was returned by live search", + item.evidence_id + )); + } + if capture.stored_evidence_ids.iter().any(|id| id == &item.evidence_id) { + self.failures.push(format!( + "excluded evidence {} was stored by live ingestion", + item.evidence_id + )); + } + if !capture.excluded_evidence_ids.iter().any(|id| id == &item.evidence_id) { + self.failures.push(format!( + "excluded evidence {} was not recorded as excluded", + item.evidence_id + )); + } + } + + fn validate_stored_item(&mut self, item: &CorpusText, runtime: &CaptureRuntimeEvidence) { + let runtime_item = runtime.item_for(item.evidence_id.as_str()); + + self.validate_source_id(item, runtime_item); + self.validate_evidence_binding(item, runtime_item); + self.validate_write_policy(item, runtime_item); + self.validate_capture_action(item, runtime_item); + } + + fn validate_source_id( + &mut self, + item: &CorpusText, + runtime_item: Option<&CaptureRuntimeEvidenceItem>, + ) { + let Some(expected_source_id) = item.capture.source_id.as_deref() else { + return; + }; + + match runtime_item.and_then(|observed| observed.source_id.as_deref()) { + Some(observed) if observed == expected_source_id => {}, + Some(observed) => self.failures.push(format!( + "evidence {} returned source_id {observed}, expected {expected_source_id}", + item.evidence_id + )), + None => self.failures.push(format!( + "evidence {} did not return expected source_id {expected_source_id}", + item.evidence_id + )), + } + } + + fn validate_evidence_binding( + &mut self, + item: &CorpusText, + runtime_item: Option<&CaptureRuntimeEvidenceItem>, + ) { + let Some(expected_binding) = item.capture.evidence_binding.as_deref() else { + return; + }; + + match runtime_item.and_then(|observed| observed.evidence_binding.as_deref()) { + Some(observed) if observed == expected_binding => {}, + Some(observed) => self.failures.push(format!( + "evidence {} returned evidence_binding {observed}, expected {expected_binding}", + item.evidence_id + )), + None => self.failures.push(format!( + "evidence {} did not return expected evidence_binding {expected_binding}", + item.evidence_id + )), + } + } + + fn validate_write_policy( + &mut self, + item: &CorpusText, + runtime_item: Option<&CaptureRuntimeEvidenceItem>, + ) { + let Some(policy_value) = &item.capture.write_policy else { + return; + }; + + match policy::write_policy_from_value(policy_value, item.evidence_id.as_str()) { + Ok(policy) => { + self.expected_exclusions += policy.exclusions.len(); + self.expected_redactions += policy.redactions.len(); + }, + Err(err) => self.failures.push(err.to_string()), + } + + if !runtime_item.is_some_and(|observed| observed.write_policy_applied) { + self.failures.push(format!( + "evidence {} did not return write_policy_applied=true", + item.evidence_id + )); + } + } + + fn validate_capture_action( + &mut self, + item: &CorpusText, + runtime_item: Option<&CaptureRuntimeEvidenceItem>, + ) { + if let Some(observed) = runtime_item.and_then(|observed| observed.capture_action.as_deref()) + && observed != policy::capture_action_str(item.capture.action) + { + self.failures.push(format!( + "evidence {} returned capture_action {observed}, expected {}", + item.evidence_id, + policy::capture_action_str(item.capture.action) + )); + } + } + + fn validate_write_policy_totals( + mut self, + capture: &CaptureMaterializationEvidence, + ) -> Option { + if capture.write_policy_exclusion_count < self.expected_exclusions { + self.failures.push(format!( + "write-policy exclusion count {} was below expected {}", + capture.write_policy_exclusion_count, self.expected_exclusions + )); + } + if capture.write_policy_redaction_count < self.expected_redactions { + self.failures.push(format!( + "write-policy redaction count {} was below expected {}", + capture.write_policy_redaction_count, self.expected_redactions + )); + } + if self.expected_exclusions + self.expected_redactions > 0 + && capture.write_policy_audit_count == 0 + { + self.failures.push( + "write-policy audit count was zero despite expected policy effects".to_string(), + ); + } + if self.failures.is_empty() { + None + } else { + Some(format!("Capture runtime validation failed: {}", self.failures.join("; "))) + } + } +} + +pub(crate) fn validate_capture_runtime_evidence( + suite: &str, + corpus: &[CorpusText], + capture: &CaptureMaterializationEvidence, + runtime: &CaptureRuntimeEvidence, +) -> Option { + if suite != "capture_integration" { + return None; + } + + let mut validator = CaptureRuntimeValidator::default(); + + for item in corpus { + validator.validate_corpus_item(item, capture, runtime); + } + + validator.validate_write_policy_totals(capture) +}