diff --git a/apps/elf-eval/src/bin/live_baseline_elf/backfill.rs b/apps/elf-eval/src/bin/live_baseline_elf/backfill.rs index 0769b218..cf316b32 100644 --- a/apps/elf-eval/src/bin/live_baseline_elf/backfill.rs +++ b/apps/elf-eval/src/bin/live_baseline_elf/backfill.rs @@ -1,344 +1,9 @@ mod backfill_checkpoint; +mod config; +mod notes; +mod run; -use color_eyre::Result; - -use crate::{ - AGENT_ID, AddNoteInput, AddNoteRequest, BTreeMap, BackfillAttemptEvidence, - BackfillCheckpointEntry, BackfillOutcome, BackfillReport, BackfillResumeReport, CorpusNote, - DuplicateSourceNote, ElfService, ExistingBackfillNote, Instant, NoteOp, PROJECT_ID, Path, - PathBuf, SCOPE, TENANT_ID, Uuid, env, eyre, +pub(super) use self::{ + backfill_checkpoint::backfill_checkpoint_path, config::worker_concurrency, + run::run_resumable_backfill, }; - -pub(super) fn backfill_checkpoint_path(out: &Path) -> PathBuf { - backfill_checkpoint::backfill_checkpoint_path(out) -} - -pub(super) fn backfill_batch_size() -> usize { - crate::parse_env_usize("ELF_BASELINE_BACKFILL_BATCH_SIZE").unwrap_or(32).max(1) -} - -pub(super) fn worker_concurrency() -> usize { - let default = match env::var("ELF_BASELINE_PROFILE").as_deref() { - Ok("backfill" | "large") => 4, - Ok("stress") => 4, - Ok("scale" | "full") => 2, - _ => 1, - }; - - crate::parse_env_usize("ELF_BASELINE_WORKER_CONCURRENCY").unwrap_or(default).clamp(1, 32) -} - -pub(super) fn backfill_resume_probe_enabled() -> bool { - env::var("ELF_BASELINE_BACKFILL_RESUME_PROBE") - .map(|value| value != "0" && !value.eq_ignore_ascii_case("false")) - .unwrap_or(true) -} - -pub(super) fn backfill_interrupt_after(source_count: usize) -> Option { - if !backfill_resume_probe_enabled() || source_count <= 1 { - return None; - } - - let configured = crate::parse_env_usize("ELF_BASELINE_BACKFILL_INTERRUPT_AFTER"); - let default = (source_count / 2).max(1); - - Some(configured.unwrap_or(default).clamp(1, source_count.saturating_sub(1))) -} - -pub(super) fn note_input(note: &CorpusNote) -> AddNoteInput { - let hash = backfill_checkpoint::source_hash(note); - - AddNoteInput { - r#type: "fact".to_string(), - key: Some(note.key.clone()), - text: note.text.clone(), - structured: None, - importance: 0.9, - confidence: 0.95, - ttl_days: None, - source_ref: serde_json::json!({ - "source": "ELF live baseline corpus", - "title": note.title, - "document": note.source_doc, - "source_hash": hash, - }), - write_policy: None, - } -} - -pub(super) fn note_op_string(op: NoteOp) -> Result { - let value = serde_json::to_value(op)?; - - value - .as_str() - .map(ToString::to_string) - .ok_or_else(|| eyre::eyre!("Serialized note op was not a string.")) -} - -pub(super) async fn load_existing_backfill_notes( - service: &ElfService, -) -> Result> { - let rows = sqlx::query_as::<_, (Uuid, String, Option)>( - "\ -SELECT note_id, source_ref->>'document' AS source_doc, source_ref->>'source_hash' AS source_hash -FROM memory_notes -WHERE tenant_id = $1 - AND project_id = $2 - AND agent_id = $3 - AND scope = $4 - AND status = 'active' - AND source_ref->>'source' = 'ELF live baseline corpus' - AND source_ref->>'document' IS NOT NULL -ORDER BY updated_at DESC", - ) - .bind(TENANT_ID) - .bind(PROJECT_ID) - .bind(AGENT_ID) - .bind(SCOPE) - .fetch_all(&service.db.pool) - .await?; - let mut out = BTreeMap::new(); - - for (note_id, source_doc, hash) in rows { - out.entry(source_doc).or_insert(ExistingBackfillNote { note_id, source_hash: hash }); - } - - Ok(out) -} - -pub(super) async fn duplicate_source_notes( - service: &ElfService, -) -> Result> { - let rows = sqlx::query_as::<_, (String, i64, Vec)>( - "\ -SELECT - source_ref->>'document' AS source_doc, - COUNT(*)::bigint AS count, - array_agg(note_id ORDER BY note_id)::uuid[] AS note_ids -FROM memory_notes -WHERE tenant_id = $1 - AND project_id = $2 - AND agent_id = $3 - AND scope = $4 - AND status = 'active' - AND source_ref->>'source' = 'ELF live baseline corpus' - AND source_ref->>'document' IS NOT NULL -GROUP BY source_ref->>'document' -HAVING COUNT(*) > 1 -ORDER BY source_doc", - ) - .bind(TENANT_ID) - .bind(PROJECT_ID) - .bind(AGENT_ID) - .bind(SCOPE) - .fetch_all(&service.db.pool) - .await?; - - Ok(rows - .into_iter() - .map(|(source_doc, count, note_ids)| DuplicateSourceNote { source_doc, count, note_ids }) - .collect()) -} - -pub(super) async fn run_resumable_backfill( - service: &ElfService, - notes: &[CorpusNote], - checkpoint_path: &Path, -) -> Result { - let started_at = Instant::now(); - let corpus_hash = backfill_checkpoint::corpus_hash(notes); - let batch_size = backfill_batch_size(); - let interrupt_after = backfill_interrupt_after(notes.len()); - let first_attempt = run_backfill_attempt( - service, - notes, - checkpoint_path, - &corpus_hash, - batch_size, - 1, - interrupt_after, - ) - .await?; - let interrupted = first_attempt.interrupted; - let completed_before_resume = first_attempt.checkpoint_completed; - let mut attempts = Vec::new(); - - attempts.push(first_attempt); - - if interrupted { - attempts.push( - run_backfill_attempt( - service, - notes, - checkpoint_path, - &corpus_hash, - batch_size, - 2, - None, - ) - .await?, - ); - } - - let checkpoint = backfill_checkpoint::load_backfill_checkpoint(checkpoint_path, &corpus_hash)?; - let existing = load_existing_backfill_notes(service).await?; - let mut note_ids = Vec::with_capacity(notes.len()); - - for note in notes { - let Some(entry) = checkpoint.completed.get(¬e.source_doc) else { - return Err(eyre::eyre!( - "Backfill checkpoint missing completed source {}.", - note.source_doc - )); - }; - - if !backfill_checkpoint::checkpoint_entry_valid(note, entry, &existing) { - return Err(eyre::eyre!( - "Backfill checkpoint entry for {} does not match Postgres state.", - note.source_doc - )); - } - - note_ids.push(entry.note_id); - } - - let duplicate_source_notes = duplicate_source_notes(service).await?; - let attempted_writes = attempts.iter().map(|attempt| attempt.attempted_writes).sum(); - let skipped_completed = attempts.iter().map(|attempt| attempt.skipped_completed).sum(); - let completed_after_resume = checkpoint.completed.len(); - let report = BackfillReport { - checkpoint_path: checkpoint_path.display().to_string(), - corpus_hash, - source_count: notes.len(), - completed_count: note_ids.len(), - batch_size, - worker_concurrency: worker_concurrency(), - elapsed_seconds: started_at.elapsed().as_secs_f64(), - attempted_writes, - skipped_completed, - duplicate_source_notes, - resume: BackfillResumeReport { - enabled: interrupt_after.is_some(), - interrupted, - interrupt_after, - resume_attempts: attempts.len(), - completed_before_resume, - completed_after_resume, - }, - attempts, - }; - - Ok(BackfillOutcome { report, note_ids }) -} - -pub(super) async fn run_backfill_attempt( - service: &ElfService, - notes: &[CorpusNote], - checkpoint_path: &Path, - corpus_hash: &str, - batch_size: usize, - attempt: usize, - interrupt_after: Option, -) -> Result { - let mut checkpoint = - backfill_checkpoint::load_backfill_checkpoint(checkpoint_path, corpus_hash)?; - let existing = load_existing_backfill_notes(service).await?; - let notes_by_source = - notes.iter().map(|note| (note.source_doc.as_str(), note)).collect::>(); - let checkpoint_len_before_prune = checkpoint.completed.len(); - - checkpoint.completed.retain(|source_doc, entry| { - notes_by_source - .get(source_doc.as_str()) - .is_some_and(|note| backfill_checkpoint::checkpoint_entry_valid(note, entry, &existing)) - }); - - if checkpoint.completed.len() != checkpoint_len_before_prune { - backfill_checkpoint::write_backfill_checkpoint(checkpoint_path, &checkpoint)?; - } - - let mut pending = Vec::new(); - let mut skipped_completed = 0_usize; - - for note in notes { - if checkpoint.completed.contains_key(¬e.source_doc) { - skipped_completed += 1; - } else { - pending.push(note); - } - } - - let max_writes = interrupt_after.unwrap_or(usize::MAX); - let mut attempted_writes = 0_usize; - let mut completed_writes = 0_usize; - let mut cursor = 0_usize; - - while cursor < pending.len() && attempted_writes < max_writes { - let remaining_budget = max_writes.saturating_sub(attempted_writes); - let take = batch_size.min(remaining_budget).min(pending.len() - cursor); - let batch = &pending[cursor..cursor + take]; - let response = service - .add_note(AddNoteRequest { - tenant_id: TENANT_ID.to_string(), - project_id: PROJECT_ID.to_string(), - agent_id: AGENT_ID.to_string(), - scope: SCOPE.to_string(), - notes: batch.iter().map(|note| note_input(note)).collect(), - }) - .await?; - - if response.results.len() != batch.len() { - return Err(eyre::eyre!( - "Backfill add_note returned {} results for {} inputs.", - response.results.len(), - batch.len() - )); - } - - for (note, result) in batch.iter().zip(response.results) { - let op = note_op_string(result.op)?; - - if op == "REJECTED" { - return Err(eyre::eyre!( - "Backfill note {} was rejected: {:?}.", - note.source_doc, - result.reason_code - )); - } - - let note_id = result.note_id.ok_or_else(|| { - eyre::eyre!("Backfill note {} did not return a note_id.", note.source_doc) - })?; - - checkpoint.completed.insert( - note.source_doc.clone(), - BackfillCheckpointEntry { - note_id, - key: note.key.clone(), - source_hash: backfill_checkpoint::source_hash(note), - op, - }, - ); - - completed_writes += 1; - } - - attempted_writes += batch.len(); - cursor += batch.len(); - - backfill_checkpoint::write_backfill_checkpoint(checkpoint_path, &checkpoint)?; - } - - let interrupted = cursor < pending.len(); - - Ok(BackfillAttemptEvidence { - attempt, - resumed: skipped_completed > 0, - interrupt_after, - skipped_completed, - attempted_writes, - completed_writes, - checkpoint_completed: checkpoint.completed.len(), - interrupted, - }) -} diff --git a/apps/elf-eval/src/bin/live_baseline_elf/backfill_checkpoint.rs b/apps/elf-eval/src/bin/live_baseline_elf/backfill/backfill_checkpoint.rs similarity index 99% rename from apps/elf-eval/src/bin/live_baseline_elf/backfill_checkpoint.rs rename to apps/elf-eval/src/bin/live_baseline_elf/backfill/backfill_checkpoint.rs index e5cdd67f..653e0ec3 100644 --- a/apps/elf-eval/src/bin/live_baseline_elf/backfill_checkpoint.rs +++ b/apps/elf-eval/src/bin/live_baseline_elf/backfill/backfill_checkpoint.rs @@ -5,7 +5,7 @@ use crate::{ ExistingBackfillNote, Hasher, Path, PathBuf, fs, }; -pub(super) fn backfill_checkpoint_path(out: &Path) -> PathBuf { +pub(crate) fn backfill_checkpoint_path(out: &Path) -> PathBuf { crate::env_string(&["ELF_BASELINE_BACKFILL_CHECKPOINT"]) .map(PathBuf::from) .unwrap_or_else(|| out.with_file_name("elf-backfill-checkpoint.json")) diff --git a/apps/elf-eval/src/bin/live_baseline_elf/backfill/config.rs b/apps/elf-eval/src/bin/live_baseline_elf/backfill/config.rs new file mode 100644 index 00000000..ed102fca --- /dev/null +++ b/apps/elf-eval/src/bin/live_baseline_elf/backfill/config.rs @@ -0,0 +1,33 @@ +use crate::env; + +pub(crate) fn worker_concurrency() -> usize { + let default = match env::var("ELF_BASELINE_PROFILE").as_deref() { + Ok("backfill" | "large") => 4, + Ok("stress") => 4, + Ok("scale" | "full") => 2, + _ => 1, + }; + + crate::parse_env_usize("ELF_BASELINE_WORKER_CONCURRENCY").unwrap_or(default).clamp(1, 32) +} + +pub(super) fn backfill_batch_size() -> usize { + crate::parse_env_usize("ELF_BASELINE_BACKFILL_BATCH_SIZE").unwrap_or(32).max(1) +} + +pub(super) fn backfill_resume_probe_enabled() -> bool { + env::var("ELF_BASELINE_BACKFILL_RESUME_PROBE") + .map(|value| value != "0" && !value.eq_ignore_ascii_case("false")) + .unwrap_or(true) +} + +pub(super) fn backfill_interrupt_after(source_count: usize) -> Option { + if !backfill_resume_probe_enabled() || source_count <= 1 { + return None; + } + + let configured = crate::parse_env_usize("ELF_BASELINE_BACKFILL_INTERRUPT_AFTER"); + let default = (source_count / 2).max(1); + + Some(configured.unwrap_or(default).clamp(1, source_count.saturating_sub(1))) +} diff --git a/apps/elf-eval/src/bin/live_baseline_elf/backfill/notes.rs b/apps/elf-eval/src/bin/live_baseline_elf/backfill/notes.rs new file mode 100644 index 00000000..de4745e7 --- /dev/null +++ b/apps/elf-eval/src/bin/live_baseline_elf/backfill/notes.rs @@ -0,0 +1,102 @@ +use color_eyre::Result; + +use crate::{ + AGENT_ID, AddNoteInput, BTreeMap, CorpusNote, DuplicateSourceNote, ElfService, + ExistingBackfillNote, NoteOp, PROJECT_ID, SCOPE, TENANT_ID, Uuid, + backfill::backfill_checkpoint, eyre, +}; + +pub(super) fn note_input(note: &CorpusNote) -> AddNoteInput { + let hash = backfill_checkpoint::source_hash(note); + + AddNoteInput { + r#type: "fact".to_string(), + key: Some(note.key.clone()), + text: note.text.clone(), + structured: None, + importance: 0.9, + confidence: 0.95, + ttl_days: None, + source_ref: serde_json::json!({ + "source": "ELF live baseline corpus", + "title": note.title, + "document": note.source_doc, + "source_hash": hash, + }), + write_policy: None, + } +} + +pub(super) fn note_op_string(op: NoteOp) -> Result { + let value = serde_json::to_value(op)?; + + value + .as_str() + .map(ToString::to_string) + .ok_or_else(|| eyre::eyre!("Serialized note op was not a string.")) +} + +pub(super) async fn load_existing_backfill_notes( + service: &ElfService, +) -> Result> { + let rows = sqlx::query_as::<_, (Uuid, String, Option)>( + "\ +SELECT note_id, source_ref->>'document' AS source_doc, source_ref->>'source_hash' AS source_hash +FROM memory_notes +WHERE tenant_id = $1 + AND project_id = $2 + AND agent_id = $3 + AND scope = $4 + AND status = 'active' + AND source_ref->>'source' = 'ELF live baseline corpus' + AND source_ref->>'document' IS NOT NULL +ORDER BY updated_at DESC", + ) + .bind(TENANT_ID) + .bind(PROJECT_ID) + .bind(AGENT_ID) + .bind(SCOPE) + .fetch_all(&service.db.pool) + .await?; + let mut out = BTreeMap::new(); + + for (note_id, source_doc, hash) in rows { + out.entry(source_doc).or_insert(ExistingBackfillNote { note_id, source_hash: hash }); + } + + Ok(out) +} + +pub(super) async fn duplicate_source_notes( + service: &ElfService, +) -> Result> { + let rows = sqlx::query_as::<_, (String, i64, Vec)>( + "\ +SELECT + source_ref->>'document' AS source_doc, + COUNT(*)::bigint AS count, + array_agg(note_id ORDER BY note_id)::uuid[] AS note_ids +FROM memory_notes +WHERE tenant_id = $1 + AND project_id = $2 + AND agent_id = $3 + AND scope = $4 + AND status = 'active' + AND source_ref->>'source' = 'ELF live baseline corpus' + AND source_ref->>'document' IS NOT NULL +GROUP BY source_ref->>'document' +HAVING COUNT(*) > 1 +ORDER BY source_doc", + ) + .bind(TENANT_ID) + .bind(PROJECT_ID) + .bind(AGENT_ID) + .bind(SCOPE) + .fetch_all(&service.db.pool) + .await?; + + Ok(rows + .into_iter() + .map(|(source_doc, count, note_ids)| DuplicateSourceNote { source_doc, count, note_ids }) + .collect()) +} diff --git a/apps/elf-eval/src/bin/live_baseline_elf/backfill/run.rs b/apps/elf-eval/src/bin/live_baseline_elf/backfill/run.rs new file mode 100644 index 00000000..14f2a3c3 --- /dev/null +++ b/apps/elf-eval/src/bin/live_baseline_elf/backfill/run.rs @@ -0,0 +1,212 @@ +use color_eyre::Result; + +use crate::{ + AGENT_ID, AddNoteRequest, BTreeMap, BackfillAttemptEvidence, BackfillCheckpointEntry, + BackfillOutcome, BackfillReport, BackfillResumeReport, CorpusNote, ElfService, Instant, + PROJECT_ID, Path, SCOPE, TENANT_ID, + backfill::{backfill_checkpoint, config, notes}, + eyre, +}; + +pub(crate) async fn run_resumable_backfill( + service: &ElfService, + notes: &[CorpusNote], + checkpoint_path: &Path, +) -> Result { + let started_at = Instant::now(); + let corpus_hash = backfill_checkpoint::corpus_hash(notes); + let batch_size = config::backfill_batch_size(); + let interrupt_after = config::backfill_interrupt_after(notes.len()); + let first_attempt = run_backfill_attempt( + service, + notes, + checkpoint_path, + &corpus_hash, + batch_size, + 1, + interrupt_after, + ) + .await?; + let interrupted = first_attempt.interrupted; + let completed_before_resume = first_attempt.checkpoint_completed; + let mut attempts = Vec::new(); + + attempts.push(first_attempt); + + if interrupted { + attempts.push( + run_backfill_attempt( + service, + notes, + checkpoint_path, + &corpus_hash, + batch_size, + 2, + None, + ) + .await?, + ); + } + + let checkpoint = backfill_checkpoint::load_backfill_checkpoint(checkpoint_path, &corpus_hash)?; + let existing = notes::load_existing_backfill_notes(service).await?; + let mut note_ids = Vec::with_capacity(notes.len()); + + for note in notes { + let Some(entry) = checkpoint.completed.get(¬e.source_doc) else { + return Err(eyre::eyre!( + "Backfill checkpoint missing completed source {}.", + note.source_doc + )); + }; + + if !backfill_checkpoint::checkpoint_entry_valid(note, entry, &existing) { + return Err(eyre::eyre!( + "Backfill checkpoint entry for {} does not match Postgres state.", + note.source_doc + )); + } + + note_ids.push(entry.note_id); + } + + let duplicate_source_notes = notes::duplicate_source_notes(service).await?; + let attempted_writes = attempts.iter().map(|attempt| attempt.attempted_writes).sum(); + let skipped_completed = attempts.iter().map(|attempt| attempt.skipped_completed).sum(); + let completed_after_resume = checkpoint.completed.len(); + let report = BackfillReport { + checkpoint_path: checkpoint_path.display().to_string(), + corpus_hash, + source_count: notes.len(), + completed_count: note_ids.len(), + batch_size, + worker_concurrency: config::worker_concurrency(), + elapsed_seconds: started_at.elapsed().as_secs_f64(), + attempted_writes, + skipped_completed, + duplicate_source_notes, + resume: BackfillResumeReport { + enabled: interrupt_after.is_some(), + interrupted, + interrupt_after, + resume_attempts: attempts.len(), + completed_before_resume, + completed_after_resume, + }, + attempts, + }; + + Ok(BackfillOutcome { report, note_ids }) +} + +async fn run_backfill_attempt( + service: &ElfService, + notes: &[CorpusNote], + checkpoint_path: &Path, + corpus_hash: &str, + batch_size: usize, + attempt: usize, + interrupt_after: Option, +) -> Result { + let mut checkpoint = + backfill_checkpoint::load_backfill_checkpoint(checkpoint_path, corpus_hash)?; + let existing = notes::load_existing_backfill_notes(service).await?; + let notes_by_source = + notes.iter().map(|note| (note.source_doc.as_str(), note)).collect::>(); + let checkpoint_len_before_prune = checkpoint.completed.len(); + + checkpoint.completed.retain(|source_doc, entry| { + notes_by_source + .get(source_doc.as_str()) + .is_some_and(|note| backfill_checkpoint::checkpoint_entry_valid(note, entry, &existing)) + }); + + if checkpoint.completed.len() != checkpoint_len_before_prune { + backfill_checkpoint::write_backfill_checkpoint(checkpoint_path, &checkpoint)?; + } + + let mut pending = Vec::new(); + let mut skipped_completed = 0_usize; + + for note in notes { + if checkpoint.completed.contains_key(¬e.source_doc) { + skipped_completed += 1; + } else { + pending.push(note); + } + } + + let max_writes = interrupt_after.unwrap_or(usize::MAX); + let mut attempted_writes = 0_usize; + let mut completed_writes = 0_usize; + let mut cursor = 0_usize; + + while cursor < pending.len() && attempted_writes < max_writes { + let remaining_budget = max_writes.saturating_sub(attempted_writes); + let take = batch_size.min(remaining_budget).min(pending.len() - cursor); + let batch = &pending[cursor..cursor + take]; + let response = service + .add_note(AddNoteRequest { + tenant_id: TENANT_ID.to_string(), + project_id: PROJECT_ID.to_string(), + agent_id: AGENT_ID.to_string(), + scope: SCOPE.to_string(), + notes: batch.iter().map(|note| notes::note_input(note)).collect(), + }) + .await?; + + if response.results.len() != batch.len() { + return Err(eyre::eyre!( + "Backfill add_note returned {} results for {} inputs.", + response.results.len(), + batch.len() + )); + } + + for (note, result) in batch.iter().zip(response.results) { + let op = notes::note_op_string(result.op)?; + + if op == "REJECTED" { + return Err(eyre::eyre!( + "Backfill note {} was rejected: {:?}.", + note.source_doc, + result.reason_code + )); + } + + let note_id = result.note_id.ok_or_else(|| { + eyre::eyre!("Backfill note {} did not return a note_id.", note.source_doc) + })?; + + checkpoint.completed.insert( + note.source_doc.clone(), + BackfillCheckpointEntry { + note_id, + key: note.key.clone(), + source_hash: backfill_checkpoint::source_hash(note), + op, + }, + ); + + completed_writes += 1; + } + + attempted_writes += batch.len(); + cursor += batch.len(); + + backfill_checkpoint::write_backfill_checkpoint(checkpoint_path, &checkpoint)?; + } + + let interrupted = cursor < pending.len(); + + Ok(BackfillAttemptEvidence { + attempt, + resumed: skipped_completed > 0, + interrupt_after, + skipped_completed, + attempted_writes, + completed_writes, + checkpoint_completed: checkpoint.completed.len(), + interrupted, + }) +} diff --git a/apps/elf-eval/src/bin/live_baseline_elf/checks.rs b/apps/elf-eval/src/bin/live_baseline_elf/checks.rs index 7f427ead..d8237fc2 100644 --- a/apps/elf-eval/src/bin/live_baseline_elf/checks.rs +++ b/apps/elf-eval/src/bin/live_baseline_elf/checks.rs @@ -1,352 +1,38 @@ -#[path = "checks/lifecycle.rs"] mod lifecycle; -#[path = "checks/reporting.rs"] mod reporting; -#[path = "checks/resource.rs"] mod resource; -#[path = "checks/stress.rs"] mod stress; - -use color_eyre::Result; +mod core; +mod env_config; +mod lifecycle; +mod reporting; +mod resource; +mod stress; +mod workload; + +pub(super) use self::{ + core::{outbox_done, resumable_backfill_check, retrieval_check, worker_indexing_check}, + env_config::{parse_env_u64, parse_env_usize}, + lifecycle::run_lifecycle_checks_impl as run_lifecycle_checks, + reporting::{ + cost_proxy_report_impl as cost_proxy_report, incomplete_check_impl as incomplete_check, + latency_percentile_impl as latency_percentile, operational_cases_impl as operational_cases, + project_status_from_summary_impl as project_status_from_summary, + summarize_checks_impl as summarize_checks, + }, + resource::resource_envelope_check_impl as resource_envelope_check, + stress::{ + run_concurrent_write_check_impl as run_concurrent_write_check, + run_soak_stability_check_impl as run_soak_stability_check, + }, + workload::{ + concurrency_probe_indexes, concurrent_add_request, concurrent_note_count, + concurrent_query_case, soak_add_request, soak_config, soak_query_case, + }, +}; use crate::{ - AGENT_ID, AddNoteInput, AddNoteRequest, Arc, BTreeMap, BackfillReport, BaselineRuntime, - CheckResult, CheckSummary, CorpusNote, CostProxyReport, DeleteRequest, Duration, ElfService, - EmbeddingRuntimeReport, Instant, JoinSet, OperationalCase, PROJECT_ID, Path, QueryCase, - QueryResult, Report, ResourceEnvelopeEvidence, SCOPE, SoakConfig, TENANT_ID, UpdateRequest, - Uuid, WorkerRunEvidence, contains_case_insensitive, distinctive_terms, env, eyre, fs, + AGENT_ID, Arc, BTreeMap, BaselineRuntime, CheckResult, CheckSummary, CorpusNote, + CostProxyReport, DeleteRequest, Duration, ElfService, EmbeddingRuntimeReport, Instant, JoinSet, + OperationalCase, PROJECT_ID, Path, QueryCase, QueryResult, Report, ResourceEnvelopeEvidence, + TENANT_ID, UpdateRequest, Uuid, contains_case_insensitive, distinctive_terms, env, eyre, fs, run_single_query, runtime::{build_service, run_worker_until_indexed}, time, }; - -pub(super) fn outbox_done(counts: &BTreeMap, expected_note_count: usize) -> bool { - let done = counts.get("DONE").copied().unwrap_or_default(); - let expected = i64::try_from(expected_note_count).unwrap_or(i64::MAX); - let pending = counts.get("PENDING").copied().unwrap_or_default(); - let failed = counts.get("FAILED").copied().unwrap_or_default(); - let claimed = counts.get("CLAIMED").copied().unwrap_or_default(); - - done >= expected && pending == 0 && failed == 0 && claimed == 0 -} - -pub(super) fn retrieval_check(query_results: &[QueryResult]) -> CheckResult { - let pass_count = query_results.iter().filter(|result| result.matched).count(); - let fail_count = query_results.len().saturating_sub(pass_count); - let expected_evidence_ids = query_results - .iter() - .map(|result| { - serde_json::json!({ - "query_id": result.id, - "expected": result.expected_evidence_ids, - "allowed_alternates": result.allowed_alternate_evidence_ids, - }) - }) - .collect::>(); - - CheckResult { - name: "same_corpus_retrieval", - status: if fail_count == 0 { "pass" } else { "wrong_result" }, - reason: if fail_count == 0 { - "All same-corpus retrieval queries returned expected evidence.".to_string() - } else { - format!("{fail_count} same-corpus retrieval query case(s) missed expected evidence.") - }, - evidence: serde_json::json!({ - "total": query_results.len(), - "pass": pass_count, - "fail": fail_count, - "wrong_result_count": fail_count, - "expected_evidence_ids": expected_evidence_ids, - }), - } -} - -pub(super) fn worker_indexing_check(evidence: WorkerRunEvidence) -> CheckResult { - let pass = outbox_done(&evidence.after, evidence.expected_note_count) - && evidence.chunk_rows >= i64::try_from(evidence.expected_note_count).unwrap_or(i64::MAX) - && evidence.chunk_embedding_rows >= evidence.chunk_rows; - - CheckResult { - name: "async_worker_indexing_e2e", - status: if pass { "pass" } else { "lifecycle_fail" }, - reason: if pass { - "ELF worker processed corpus outbox jobs into persisted chunks and embeddings." - .to_string() - } else { - "ELF worker did not fully process corpus outbox jobs into searchable chunks." - .to_string() - }, - evidence: serde_json::json!(evidence), - } -} - -pub(super) fn resumable_backfill_check(report: &BackfillReport) -> CheckResult { - let resume_pass = !report.resume.enabled - || (report.resume.interrupted - && report.resume.resume_attempts >= 2 - && report.skipped_completed > 0); - let pass = report.completed_count == report.source_count - && report.duplicate_source_notes.is_empty() - && resume_pass; - - CheckResult { - name: "resumable_backfill_no_duplicates", - status: if pass { "pass" } else { "lifecycle_fail" }, - reason: if pass { - "Checkpointed backfill resumed from durable progress and did not duplicate source documents." - .to_string() - } else { - "Checkpointed backfill did not complete cleanly, did not prove resume, or duplicated source documents." - .to_string() - }, - evidence: serde_json::json!(report), - } -} - -pub(super) fn concurrent_note_count() -> usize { - if let Ok(value) = env::var("ELF_BASELINE_CONCURRENT_NOTES") - && let Ok(parsed) = value.parse::() - { - return parsed.max(1); - } - - match env::var("ELF_BASELINE_PROFILE").as_deref() { - Ok("backfill" | "large") => 32, - Ok("stress") => 32, - Ok("scale" | "full") => 16, - _ => 4, - } -} - -pub(super) fn concurrent_add_request(index: usize) -> AddNoteRequest { - let marker = concurrent_marker(index); - - AddNoteRequest { - tenant_id: TENANT_ID.to_string(), - project_id: PROJECT_ID.to_string(), - agent_id: AGENT_ID.to_string(), - scope: SCOPE.to_string(), - notes: vec![AddNoteInput { - r#type: "fact".to_string(), - key: Some(format!("concurrent_{index:03}")), - text: format!( - "Concurrent benchmark note {index:03} records marker `{marker}` for write race validation." - ), - structured: None, - importance: 0.91, - confidence: 0.96, - ttl_days: None, - source_ref: serde_json::json!({ - "source": "ELF live baseline concurrent write check", - "document": format!("concurrent-{index:03}.md"), - }), - write_policy: None, - }], - } -} - -pub(super) fn concurrent_query_case(index: usize) -> QueryCase { - let marker = concurrent_marker(index); - - QueryCase::generated( - format!("concurrent-{index:03}"), - format!("Find the concurrent benchmark note containing marker {marker}."), - format!("concurrent-{index:03}.md"), - vec![marker], - ) -} - -pub(super) fn concurrent_marker(index: usize) -> String { - format!("concurrency-{}-{index:03}", marker_word(index)) -} - -pub(super) fn soak_config() -> SoakConfig { - let profile = env::var("ELF_BASELINE_PROFILE").ok(); - let (default_seconds, default_rounds) = match profile.as_deref() { - Some("backfill" | "large") => (60, 6), - Some("stress") => (60, 6), - Some("scale" | "full") => (15, 3), - _ => (0, 0), - }; - - SoakConfig { - target_seconds: parse_env_u64("ELF_BASELINE_SOAK_SECONDS").unwrap_or(default_seconds), - write_rounds: parse_env_usize("ELF_BASELINE_SOAK_ROUNDS").unwrap_or(default_rounds), - probe_interval_millis: parse_env_u64("ELF_BASELINE_SOAK_PROBE_INTERVAL_MS") - .unwrap_or(1_000) - .max(100), - } -} - -pub(super) fn parse_env_u64(name: &str) -> Option { - env::var(name).ok()?.parse::().ok() -} - -pub(super) fn parse_env_usize(name: &str) -> Option { - env::var(name).ok()?.parse::().ok() -} - -pub(super) fn soak_add_request(index: usize) -> AddNoteRequest { - let marker = soak_marker(index); - let (topic, detail) = soak_topic(index); - - AddNoteRequest { - tenant_id: TENANT_ID.to_string(), - project_id: PROJECT_ID.to_string(), - agent_id: AGENT_ID.to_string(), - scope: SCOPE.to_string(), - notes: vec![AddNoteInput { - r#type: "fact".to_string(), - key: Some(format!("soak_{index:03}")), - text: format!( - "Soak benchmark note {index:03} covers {topic}. {detail} It records stability marker `{marker}` for repeated worker and search probes." - ), - structured: None, - importance: 0.92, - confidence: 0.97, - ttl_days: None, - source_ref: serde_json::json!({ - "source": "ELF live baseline soak stability check", - "document": format!("soak-{index:03}.md"), - }), - write_policy: None, - }], - } -} - -pub(super) fn soak_query_case(index: usize) -> QueryCase { - let marker = soak_marker(index); - let (topic, _) = soak_topic(index); - - QueryCase::generated( - format!("soak-{index:03}"), - format!("Find the soak benchmark note about {topic} containing marker {marker}."), - format!("soak-{index:03}.md"), - vec![marker], - ) -} - -pub(super) fn soak_marker(index: usize) -> String { - format!("soak-stability-{}-{index:03}", marker_word(index)) -} - -pub(super) fn marker_word(index: usize) -> &'static str { - const WORDS: &[&str] = &[ - "aurora", "banyan", "cobalt", "delta", "ember", "fennel", "granite", "harbor", "indigo", - "jasper", "keystone", "lantern", "meridian", "nebula", "onyx", "prairie", "quartz", - "raven", "solstice", "topaz", "umbra", "verdant", "willow", "xenon", "yarrow", "zephyr", - "atlas", "beacon", "citadel", "drift", "equinox", "forge", - ]; - - WORDS[index % WORDS.len()] -} - -pub(super) fn soak_topic(index: usize) -> (&'static str, &'static str) { - const TOPICS: &[(&str, &str)] = &[ - ( - "release rollback fencing", - "The rollback controller waits for a signed deploy fence before the next canary.", - ), - ( - "invoice export batching", - "The exporter groups invoice CSV rows by merchant ledger before upload.", - ), - ("search shard warming", "The search router warms tenant shard caches before rank probes."), - ( - "incident pager routing", - "The incident desk routes page ownership through the release captain.", - ), - ( - "backup restore rehearsal", - "The restore rehearsal checks WAL freshness before dry-run recovery.", - ), - ( - "feature flag expiry", - "The flag sweeper archives expired toggles before deleting rollout rules.", - ), - ( - "support queue triage", - "The support classifier separates billing tickets from access tickets.", - ), - ( - "analytics job watermark", - "The analytics worker stores a warehouse watermark after each import.", - ), - ]; - - TOPICS[index % TOPICS.len()] -} - -pub(super) fn concurrency_probe_indexes(note_count: usize) -> Vec { - let mut indexes = vec![0, note_count / 2, note_count.saturating_sub(1)]; - - indexes.sort_unstable(); - indexes.dedup(); - - indexes -} - -pub(super) fn cost_proxy_report( - notes: &[CorpusNote], - queries: &[QueryResult], - embedding: &EmbeddingRuntimeReport, -) -> CostProxyReport { - reporting::cost_proxy_report_impl(notes, queries, embedding) -} - -pub(super) fn latency_percentile(latencies: &[f64], percentile: f64) -> f64 { - reporting::latency_percentile_impl(latencies, percentile) -} - -pub(super) fn operational_cases() -> Vec { - reporting::operational_cases_impl() -} - -pub(super) fn incomplete_check(name: &'static str, reason: &str) -> CheckResult { - reporting::incomplete_check_impl(name, reason) -} - -pub(super) fn summarize_checks(checks: &[CheckResult]) -> CheckSummary { - reporting::summarize_checks_impl(checks) -} - -pub(super) fn project_status_from_summary(summary: &CheckSummary) -> &'static str { - reporting::project_status_from_summary_impl(summary) -} - -pub(super) async fn resource_envelope_check( - service: &ElfService, - corpus_dir: &Path, - report_path: &Path, - checkpoint_path: &Path, - elapsed_seconds: f64, -) -> CheckResult { - resource::resource_envelope_check_impl( - service, - corpus_dir, - report_path, - checkpoint_path, - elapsed_seconds, - ) - .await -} - -pub(super) async fn run_lifecycle_checks( - runtime: &BaselineRuntime, - service: &ElfService, - notes: &[CorpusNote], - note_ids: &[Uuid], -) -> Result> { - lifecycle::run_lifecycle_checks_impl(runtime, service, notes, note_ids).await -} - -pub(super) async fn run_concurrent_write_check( - runtime: &BaselineRuntime, - service: Arc, -) -> Result { - stress::run_concurrent_write_check_impl(runtime, service).await -} - -pub(super) async fn run_soak_stability_check( - runtime: &BaselineRuntime, - service: Arc, -) -> Result> { - stress::run_soak_stability_check_impl(runtime, service).await -} diff --git a/apps/elf-eval/src/bin/live_baseline_elf/checks/core.rs b/apps/elf-eval/src/bin/live_baseline_elf/checks/core.rs new file mode 100644 index 00000000..1e054f71 --- /dev/null +++ b/apps/elf-eval/src/bin/live_baseline_elf/checks/core.rs @@ -0,0 +1,85 @@ +use crate::{BTreeMap, BackfillReport, CheckResult, QueryResult, WorkerRunEvidence}; + +pub(crate) fn outbox_done(counts: &BTreeMap, expected_note_count: usize) -> bool { + let done = counts.get("DONE").copied().unwrap_or_default(); + let expected = i64::try_from(expected_note_count).unwrap_or(i64::MAX); + let pending = counts.get("PENDING").copied().unwrap_or_default(); + let failed = counts.get("FAILED").copied().unwrap_or_default(); + let claimed = counts.get("CLAIMED").copied().unwrap_or_default(); + + done >= expected && pending == 0 && failed == 0 && claimed == 0 +} + +pub(crate) fn retrieval_check(query_results: &[QueryResult]) -> CheckResult { + let pass_count = query_results.iter().filter(|result| result.matched).count(); + let fail_count = query_results.len().saturating_sub(pass_count); + let expected_evidence_ids = query_results + .iter() + .map(|result| { + serde_json::json!({ + "query_id": result.id, + "expected": result.expected_evidence_ids, + "allowed_alternates": result.allowed_alternate_evidence_ids, + }) + }) + .collect::>(); + + CheckResult { + name: "same_corpus_retrieval", + status: if fail_count == 0 { "pass" } else { "wrong_result" }, + reason: if fail_count == 0 { + "All same-corpus retrieval queries returned expected evidence.".to_string() + } else { + format!("{fail_count} same-corpus retrieval query case(s) missed expected evidence.") + }, + evidence: serde_json::json!({ + "total": query_results.len(), + "pass": pass_count, + "fail": fail_count, + "wrong_result_count": fail_count, + "expected_evidence_ids": expected_evidence_ids, + }), + } +} + +pub(crate) fn worker_indexing_check(evidence: WorkerRunEvidence) -> CheckResult { + let pass = outbox_done(&evidence.after, evidence.expected_note_count) + && evidence.chunk_rows >= i64::try_from(evidence.expected_note_count).unwrap_or(i64::MAX) + && evidence.chunk_embedding_rows >= evidence.chunk_rows; + + CheckResult { + name: "async_worker_indexing_e2e", + status: if pass { "pass" } else { "lifecycle_fail" }, + reason: if pass { + "ELF worker processed corpus outbox jobs into persisted chunks and embeddings." + .to_string() + } else { + "ELF worker did not fully process corpus outbox jobs into searchable chunks." + .to_string() + }, + evidence: serde_json::json!(evidence), + } +} + +pub(crate) fn resumable_backfill_check(report: &BackfillReport) -> CheckResult { + let resume_pass = !report.resume.enabled + || (report.resume.interrupted + && report.resume.resume_attempts >= 2 + && report.skipped_completed > 0); + let pass = report.completed_count == report.source_count + && report.duplicate_source_notes.is_empty() + && resume_pass; + + CheckResult { + name: "resumable_backfill_no_duplicates", + status: if pass { "pass" } else { "lifecycle_fail" }, + reason: if pass { + "Checkpointed backfill resumed from durable progress and did not duplicate source documents." + .to_string() + } else { + "Checkpointed backfill did not complete cleanly, did not prove resume, or duplicated source documents." + .to_string() + }, + evidence: serde_json::json!(report), + } +} diff --git a/apps/elf-eval/src/bin/live_baseline_elf/checks/env_config.rs b/apps/elf-eval/src/bin/live_baseline_elf/checks/env_config.rs new file mode 100644 index 00000000..902f056f --- /dev/null +++ b/apps/elf-eval/src/bin/live_baseline_elf/checks/env_config.rs @@ -0,0 +1,9 @@ +use crate::env; + +pub(crate) fn parse_env_u64(name: &str) -> Option { + env::var(name).ok()?.parse::().ok() +} + +pub(crate) fn parse_env_usize(name: &str) -> Option { + env::var(name).ok()?.parse::().ok() +} diff --git a/apps/elf-eval/src/bin/live_baseline_elf/checks/lifecycle.rs b/apps/elf-eval/src/bin/live_baseline_elf/checks/lifecycle.rs index 13a12ba2..d6553ae4 100644 --- a/apps/elf-eval/src/bin/live_baseline_elf/checks/lifecycle.rs +++ b/apps/elf-eval/src/bin/live_baseline_elf/checks/lifecycle.rs @@ -5,7 +5,7 @@ use crate::checks::{ PROJECT_ID, QueryCase, TENANT_ID, UpdateRequest, Uuid, }; -pub(super) async fn run_lifecycle_checks_impl( +pub(crate) async fn run_lifecycle_checks_impl( runtime: &BaselineRuntime, service: &ElfService, notes: &[CorpusNote], diff --git a/apps/elf-eval/src/bin/live_baseline_elf/checks/reporting.rs b/apps/elf-eval/src/bin/live_baseline_elf/checks/reporting.rs index 815bbb90..32ddac51 100644 --- a/apps/elf-eval/src/bin/live_baseline_elf/checks/reporting.rs +++ b/apps/elf-eval/src/bin/live_baseline_elf/checks/reporting.rs @@ -3,7 +3,7 @@ use crate::checks::{ OperationalCase, QueryResult, env, }; -pub(super) fn cost_proxy_report_impl( +pub(crate) fn cost_proxy_report_impl( notes: &[CorpusNote], queries: &[QueryResult], embedding: &EmbeddingRuntimeReport, @@ -32,7 +32,7 @@ pub(super) fn cost_proxy_report_impl( } } -pub(super) fn latency_percentile_impl(latencies: &[f64], percentile: f64) -> f64 { +pub(crate) fn latency_percentile_impl(latencies: &[f64], percentile: f64) -> f64 { if latencies.is_empty() { return 0.0; } @@ -46,7 +46,7 @@ pub(super) fn latency_percentile_impl(latencies: &[f64], percentile: f64) -> f64 sorted[rank.min(sorted.len().saturating_sub(1))] } -pub(super) fn operational_cases_impl() -> Vec { +pub(crate) fn operational_cases_impl() -> Vec { vec![ operational_case( "private_corpus_addendum", @@ -115,7 +115,7 @@ pub(super) fn operational_cases_impl() -> Vec { ] } -pub(super) fn incomplete_check_impl(name: &'static str, reason: &str) -> CheckResult { +pub(crate) fn incomplete_check_impl(name: &'static str, reason: &str) -> CheckResult { CheckResult { name, status: "incomplete", @@ -124,7 +124,7 @@ pub(super) fn incomplete_check_impl(name: &'static str, reason: &str) -> CheckRe } } -pub(super) fn summarize_checks_impl(checks: &[CheckResult]) -> CheckSummary { +pub(crate) fn summarize_checks_impl(checks: &[CheckResult]) -> CheckSummary { let wrong_result = checks.iter().filter(|check| check.status == "wrong_result").count(); let lifecycle_fail = checks.iter().filter(|check| check.status == "lifecycle_fail").count(); @@ -140,7 +140,7 @@ pub(super) fn summarize_checks_impl(checks: &[CheckResult]) -> CheckSummary { } } -pub(super) fn project_status_from_summary_impl(summary: &CheckSummary) -> &'static str { +pub(crate) fn project_status_from_summary_impl(summary: &CheckSummary) -> &'static str { if summary.wrong_result > 0 { "wrong_result" } else if summary.lifecycle_fail > 0 { diff --git a/apps/elf-eval/src/bin/live_baseline_elf/checks/resource.rs b/apps/elf-eval/src/bin/live_baseline_elf/checks/resource.rs index 3c012dca..0f50e31b 100644 --- a/apps/elf-eval/src/bin/live_baseline_elf/checks/resource.rs +++ b/apps/elf-eval/src/bin/live_baseline_elf/checks/resource.rs @@ -2,7 +2,7 @@ use color_eyre::Result; use crate::checks::{CheckResult, ElfService, Path, ResourceEnvelopeEvidence, env, fs}; -pub(super) async fn resource_envelope_check_impl( +pub(crate) async fn resource_envelope_check_impl( service: &ElfService, corpus_dir: &Path, report_path: &Path, diff --git a/apps/elf-eval/src/bin/live_baseline_elf/checks/stress.rs b/apps/elf-eval/src/bin/live_baseline_elf/checks/stress.rs index de919fd2..70a0081a 100644 --- a/apps/elf-eval/src/bin/live_baseline_elf/checks/stress.rs +++ b/apps/elf-eval/src/bin/live_baseline_elf/checks/stress.rs @@ -5,7 +5,7 @@ use crate::checks::{ eyre, time, }; -pub(super) async fn run_concurrent_write_check_impl( +pub(crate) async fn run_concurrent_write_check_impl( runtime: &BaselineRuntime, service: Arc, ) -> Result { @@ -71,7 +71,7 @@ pub(super) async fn run_concurrent_write_check_impl( }) } -pub(super) async fn run_soak_stability_check_impl( +pub(crate) async fn run_soak_stability_check_impl( runtime: &BaselineRuntime, service: Arc, ) -> Result> { diff --git a/apps/elf-eval/src/bin/live_baseline_elf/checks/workload.rs b/apps/elf-eval/src/bin/live_baseline_elf/checks/workload.rs new file mode 100644 index 00000000..d2981ce9 --- /dev/null +++ b/apps/elf-eval/src/bin/live_baseline_elf/checks/workload.rs @@ -0,0 +1,180 @@ +use crate::{ + AGENT_ID, AddNoteInput, AddNoteRequest, PROJECT_ID, QueryCase, SCOPE, SoakConfig, TENANT_ID, + checks, env, +}; + +pub(crate) fn concurrent_note_count() -> usize { + if let Ok(value) = env::var("ELF_BASELINE_CONCURRENT_NOTES") + && let Ok(parsed) = value.parse::() + { + return parsed.max(1); + } + + match env::var("ELF_BASELINE_PROFILE").as_deref() { + Ok("backfill" | "large") => 32, + Ok("stress") => 32, + Ok("scale" | "full") => 16, + _ => 4, + } +} + +pub(crate) fn concurrent_add_request(index: usize) -> AddNoteRequest { + let marker = concurrent_marker(index); + + AddNoteRequest { + tenant_id: TENANT_ID.to_string(), + project_id: PROJECT_ID.to_string(), + agent_id: AGENT_ID.to_string(), + scope: SCOPE.to_string(), + notes: vec![AddNoteInput { + r#type: "fact".to_string(), + key: Some(format!("concurrent_{index:03}")), + text: format!( + "Concurrent benchmark note {index:03} records marker `{marker}` for write race validation." + ), + structured: None, + importance: 0.91, + confidence: 0.96, + ttl_days: None, + source_ref: serde_json::json!({ + "source": "ELF live baseline concurrent write check", + "document": format!("concurrent-{index:03}.md"), + }), + write_policy: None, + }], + } +} + +pub(crate) fn concurrent_query_case(index: usize) -> QueryCase { + let marker = concurrent_marker(index); + + QueryCase::generated( + format!("concurrent-{index:03}"), + format!("Find the concurrent benchmark note containing marker {marker}."), + format!("concurrent-{index:03}.md"), + vec![marker], + ) +} + +pub(crate) fn soak_config() -> SoakConfig { + let profile = env::var("ELF_BASELINE_PROFILE").ok(); + let (default_seconds, default_rounds) = match profile.as_deref() { + Some("backfill" | "large") => (60, 6), + Some("stress") => (60, 6), + Some("scale" | "full") => (15, 3), + _ => (0, 0), + }; + + SoakConfig { + target_seconds: checks::parse_env_u64("ELF_BASELINE_SOAK_SECONDS") + .unwrap_or(default_seconds), + write_rounds: checks::parse_env_usize("ELF_BASELINE_SOAK_ROUNDS").unwrap_or(default_rounds), + probe_interval_millis: checks::parse_env_u64("ELF_BASELINE_SOAK_PROBE_INTERVAL_MS") + .unwrap_or(1_000) + .max(100), + } +} + +pub(crate) fn soak_add_request(index: usize) -> AddNoteRequest { + let marker = soak_marker(index); + let (topic, detail) = soak_topic(index); + + AddNoteRequest { + tenant_id: TENANT_ID.to_string(), + project_id: PROJECT_ID.to_string(), + agent_id: AGENT_ID.to_string(), + scope: SCOPE.to_string(), + notes: vec![AddNoteInput { + r#type: "fact".to_string(), + key: Some(format!("soak_{index:03}")), + text: format!( + "Soak benchmark note {index:03} covers {topic}. {detail} It records stability marker `{marker}` for repeated worker and search probes." + ), + structured: None, + importance: 0.92, + confidence: 0.97, + ttl_days: None, + source_ref: serde_json::json!({ + "source": "ELF live baseline soak stability check", + "document": format!("soak-{index:03}.md"), + }), + write_policy: None, + }], + } +} + +pub(crate) fn soak_query_case(index: usize) -> QueryCase { + let marker = soak_marker(index); + let (topic, _) = soak_topic(index); + + QueryCase::generated( + format!("soak-{index:03}"), + format!("Find the soak benchmark note about {topic} containing marker {marker}."), + format!("soak-{index:03}.md"), + vec![marker], + ) +} + +pub(crate) fn concurrency_probe_indexes(note_count: usize) -> Vec { + let mut indexes = vec![0, note_count / 2, note_count.saturating_sub(1)]; + + indexes.sort_unstable(); + indexes.dedup(); + + indexes +} + +fn concurrent_marker(index: usize) -> String { + format!("concurrency-{}-{index:03}", marker_word(index)) +} + +fn soak_marker(index: usize) -> String { + format!("soak-stability-{}-{index:03}", marker_word(index)) +} + +fn marker_word(index: usize) -> &'static str { + const WORDS: &[&str] = &[ + "aurora", "banyan", "cobalt", "delta", "ember", "fennel", "granite", "harbor", "indigo", + "jasper", "keystone", "lantern", "meridian", "nebula", "onyx", "prairie", "quartz", + "raven", "solstice", "topaz", "umbra", "verdant", "willow", "xenon", "yarrow", "zephyr", + "atlas", "beacon", "citadel", "drift", "equinox", "forge", + ]; + + WORDS[index % WORDS.len()] +} + +fn soak_topic(index: usize) -> (&'static str, &'static str) { + const TOPICS: &[(&str, &str)] = &[ + ( + "release rollback fencing", + "The rollback controller waits for a signed deploy fence before the next canary.", + ), + ( + "invoice export batching", + "The exporter groups invoice CSV rows by merchant ledger before upload.", + ), + ("search shard warming", "The search router warms tenant shard caches before rank probes."), + ( + "incident pager routing", + "The incident desk routes page ownership through the release captain.", + ), + ( + "backup restore rehearsal", + "The restore rehearsal checks WAL freshness before dry-run recovery.", + ), + ( + "feature flag expiry", + "The flag sweeper archives expired toggles before deleting rollout rules.", + ), + ( + "support queue triage", + "The support classifier separates billing tickets from access tickets.", + ), + ( + "analytics job watermark", + "The analytics worker stores a warehouse watermark after each import.", + ), + ]; + + TOPICS[index % TOPICS.len()] +} diff --git a/apps/elf-eval/src/bin/live_baseline_elf.rs b/apps/elf-eval/src/bin/live_baseline_elf/main.rs similarity index 94% rename from apps/elf-eval/src/bin/live_baseline_elf.rs rename to apps/elf-eval/src/bin/live_baseline_elf/main.rs index 35e59045..0e002b55 100644 --- a/apps/elf-eval/src/bin/live_baseline_elf.rs +++ b/apps/elf-eval/src/bin/live_baseline_elf/main.rs @@ -2,12 +2,12 @@ //! Docker live-baseline runner for ELF's own same-corpus retrieval path. -#[path = "live_baseline_elf/backfill.rs"] mod backfill; -#[path = "live_baseline_elf/checks.rs"] mod checks; -#[path = "live_baseline_elf/corpus.rs"] mod corpus; -#[path = "live_baseline_elf/providers.rs"] mod providers; -#[path = "live_baseline_elf/runtime.rs"] mod runtime; -#[path = "live_baseline_elf/types.rs"] mod types; +mod backfill; +mod checks; +mod corpus; +mod providers; +mod runtime; +mod types; use std::{ collections::{BTreeMap, HashSet}, @@ -41,7 +41,7 @@ use elf_service::{ }; use elf_storage::{db::Db, qdrant::QdrantStore}; use elf_testkit::TestDatabase; -use elf_worker::worker::{self, WorkerState}; +use elf_worker::worker::WorkerState; use providers::{ EmbeddingMode, deterministic_providers, embedding_mode, env_string, runtime_config, }; diff --git a/apps/elf-eval/src/bin/live_baseline_elf/providers.rs b/apps/elf-eval/src/bin/live_baseline_elf/providers.rs index e3200058..331c42f2 100644 --- a/apps/elf-eval/src/bin/live_baseline_elf/providers.rs +++ b/apps/elf-eval/src/bin/live_baseline_elf/providers.rs @@ -1,246 +1,9 @@ -use crate::{ - Arc, BaselineRuntime, BoxFuture, Config, EmbeddingProvider, EmbeddingProviderConfig, - EmbeddingRuntimeReport, ExtractorProvider, LlmProviderConfig, ProviderConfig, Providers, - RerankProvider, Serialize, Value, env, eyre, +mod config; +mod deterministic; +mod env_config; + +pub(super) use self::{ + config::{EmbeddingMode, embedding_mode, embedding_runtime_report, runtime_config}, + deterministic::deterministic_providers, + env_config::env_string, }; - -#[derive(Debug)] -pub(super) struct DeterministicEmbedding { - vector_dim: u32, -} -impl EmbeddingProvider for DeterministicEmbedding { - fn embed<'a>( - &'a self, - _cfg: &'a EmbeddingProviderConfig, - texts: &'a [String], - ) -> BoxFuture<'a, elf_service::Result>>> { - let dim = self.vector_dim; - let vectors = texts.iter().map(|text| crate::embed_text(text, dim)).collect(); - - Box::pin(async move { Ok(vectors) }) - } -} - -#[derive(Debug)] -pub(super) struct TokenOverlapRerank; -impl RerankProvider for TokenOverlapRerank { - fn rerank<'a>( - &'a self, - _cfg: &'a ProviderConfig, - query: &'a str, - docs: &'a [String], - ) -> BoxFuture<'a, elf_service::Result>> { - let query_terms = crate::terms(query); - let scores = docs - .iter() - .map(|doc| { - let doc_terms = crate::terms(doc); - let hits = query_terms.intersection(&doc_terms).count() as f32; - - hits / query_terms.len().max(1) as f32 - }) - .collect(); - - Box::pin(async move { Ok(scores) }) - } -} - -#[derive(Debug)] -pub(super) struct NoopExtractor; -impl ExtractorProvider for NoopExtractor { - fn extract<'a>( - &'a self, - _cfg: &'a LlmProviderConfig, - _messages: &'a [Value], - ) -> BoxFuture<'a, elf_service::Result> { - Box::pin(async move { Ok(serde_json::json!({ "notes": [] })) }) - } -} - -#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)] -#[serde(rename_all = "snake_case")] -pub(super) enum EmbeddingMode { - Local, - Provider, -} - -pub(super) fn runtime_config(runtime: &BaselineRuntime) -> color_eyre::Result { - let embedding_mode = embedding_mode()?; - let mut cfg = elf_config::load(&runtime.config_path)?; - - cfg.storage.postgres.dsn = runtime.dsn.clone(); - cfg.storage.postgres.pool_max_conns = 12; - cfg.storage.qdrant.url = runtime.qdrant_url.clone(); - cfg.storage.qdrant.collection = runtime.collection.clone(); - cfg.storage.qdrant.docs_collection = runtime.docs_collection.clone(); - - if embedding_mode == EmbeddingMode::Provider { - apply_provider_embedding_overrides(&mut cfg)?; - - cfg.storage.qdrant.vector_dim = cfg.providers.embedding.dimensions; - } else { - cfg.providers.embedding.provider_id = "local".to_string(); - cfg.providers.embedding.model = "local-hash".to_string(); - cfg.providers.embedding.dimensions = cfg.storage.qdrant.vector_dim; - } - - cfg.providers.rerank.provider_id = "local".to_string(); - cfg.providers.rerank.model = "local-token-overlap".to_string(); - cfg.providers.llm_extractor.provider_id = "disabled".to_string(); - cfg.providers.llm_extractor.model = "disabled".to_string(); - cfg.context = None; - - Ok(cfg) -} - -pub(super) fn deterministic_providers(vector_dim: u32) -> Providers { - Providers::new( - Arc::new(DeterministicEmbedding { vector_dim }), - Arc::new(TokenOverlapRerank), - Arc::new(NoopExtractor), - ) -} - -pub(super) fn embedding_mode() -> color_eyre::Result { - let raw = env::var("ELF_BASELINE_ELF_EMBEDDING_MODE") - .unwrap_or_else(|_| "local".to_string()) - .to_ascii_lowercase(); - - match raw.as_str() { - "local" | "deterministic" => Ok(EmbeddingMode::Local), - "provider" | "production" => Ok(EmbeddingMode::Provider), - _ => Err(eyre::eyre!( - "Unsupported ELF_BASELINE_ELF_EMBEDDING_MODE={raw:?}; use local or provider." - )), - } -} - -pub(super) fn apply_provider_embedding_overrides(cfg: &mut Config) -> color_eyre::Result<()> { - apply_env_string( - &mut cfg.providers.embedding.provider_id, - &[ - "ELF_BASELINE_ELF_EMBEDDING_PROVIDER_ID", - "QWEN_EMBEDDING_PROVIDER_ID", - "EMBEDDING_PROVIDER_ID", - ], - ); - apply_env_string( - &mut cfg.providers.embedding.api_base, - &[ - "ELF_BASELINE_ELF_EMBEDDING_API_BASE", - "QWEN_EMBEDDING_API_BASE", - "DASHSCOPE_API_BASE", - "EMBEDDING_API_BASE", - ], - ); - apply_env_string( - &mut cfg.providers.embedding.api_key, - &[ - "ELF_BASELINE_ELF_EMBEDDING_API_KEY", - "QWEN_API_KEY", - "DASHSCOPE_API_KEY", - "EMBEDDING_API_KEY", - ], - ); - apply_env_string( - &mut cfg.providers.embedding.path, - &["ELF_BASELINE_ELF_EMBEDDING_PATH", "QWEN_EMBEDDING_PATH", "EMBEDDING_PATH"], - ); - apply_env_string( - &mut cfg.providers.embedding.model, - &["ELF_BASELINE_ELF_EMBEDDING_MODEL", "QWEN_EMBEDDING_MODEL", "EMBEDDING_MODEL"], - ); - - if let Some(dimensions) = env_u32(&[ - "ELF_BASELINE_ELF_EMBEDDING_DIMENSIONS", - "QWEN_EMBEDDING_DIMENSIONS", - "DASHSCOPE_EMBEDDING_DIMENSIONS", - "EMBEDDING_DIMENSIONS", - ]) { - cfg.providers.embedding.dimensions = dimensions; - } - if let Some(timeout_ms) = env_u64(&[ - "ELF_BASELINE_ELF_EMBEDDING_TIMEOUT_MS", - "QWEN_EMBEDDING_TIMEOUT_MS", - "EMBEDDING_TIMEOUT_MS", - ]) { - cfg.providers.embedding.timeout_ms = timeout_ms; - } else { - cfg.providers.embedding.timeout_ms = cfg.providers.embedding.timeout_ms.max(30_000); - } - - if cfg.providers.embedding.provider_id == "local" { - if env_string(&["ELF_BASELINE_ELF_EMBEDDING_API_KEY", "QWEN_API_KEY"]).is_some() { - cfg.providers.embedding.provider_id = "qwen".to_string(); - } else if env_string(&["DASHSCOPE_API_KEY"]).is_some() { - cfg.providers.embedding.provider_id = "dashscope".to_string(); - } else if env_string(&["EMBEDDING_API_KEY"]).is_some() { - cfg.providers.embedding.provider_id = "provider".to_string(); - } - } - if cfg.providers.embedding.provider_id == "local" { - return Err(eyre::eyre!( - "Provider embedding mode requires a non-local provider id or QWEN_API_KEY/DASHSCOPE_API_KEY/EMBEDDING_API_KEY." - )); - } - if cfg.providers.embedding.api_base.trim().is_empty() - || cfg.providers.embedding.api_base == "http://127.0.0.1" - { - return Err(eyre::eyre!( - "Provider embedding mode requires ELF_BASELINE_ELF_EMBEDDING_API_BASE, QWEN_EMBEDDING_API_BASE, DASHSCOPE_API_BASE, or EMBEDDING_API_BASE." - )); - } - if cfg.providers.embedding.api_key.trim().is_empty() - || cfg.providers.embedding.api_key == "local-dev-placeholder" - { - return Err(eyre::eyre!( - "Provider embedding mode requires ELF_BASELINE_ELF_EMBEDDING_API_KEY, QWEN_API_KEY, DASHSCOPE_API_KEY, or EMBEDDING_API_KEY." - )); - } - if cfg.providers.embedding.model == "local-hash" - || cfg.providers.embedding.model.trim().is_empty() - { - return Err(eyre::eyre!( - "Provider embedding mode requires ELF_BASELINE_ELF_EMBEDDING_MODEL, QWEN_EMBEDDING_MODEL, or EMBEDDING_MODEL." - )); - } - if cfg.providers.embedding.dimensions == 0 { - return Err(eyre::eyre!( - "Provider embedding dimensions must be greater than zero; set ELF_BASELINE_ELF_EMBEDDING_DIMENSIONS, QWEN_EMBEDDING_DIMENSIONS, DASHSCOPE_EMBEDDING_DIMENSIONS, or EMBEDDING_DIMENSIONS." - )); - } - - Ok(()) -} - -pub(super) fn embedding_runtime_report(cfg: &Config) -> EmbeddingRuntimeReport { - EmbeddingRuntimeReport { - mode: embedding_mode().unwrap_or(EmbeddingMode::Local), - provider_id: cfg.providers.embedding.provider_id.clone(), - model: cfg.providers.embedding.model.clone(), - dimensions: cfg.providers.embedding.dimensions, - timeout_ms: cfg.providers.embedding.timeout_ms, - api_base: cfg.providers.embedding.api_base.clone(), - path: cfg.providers.embedding.path.clone(), - } -} - -pub(super) fn apply_env_string(target: &mut String, names: &[&str]) { - if let Some(value) = env_string(names) { - *target = value; - } -} - -pub(super) fn env_string(names: &[&str]) -> Option { - names.iter().find_map(|name| { - env::var(name).ok().map(|value| value.trim().to_string()).filter(|value| !value.is_empty()) - }) -} - -pub(super) fn env_u32(names: &[&str]) -> Option { - env_string(names).and_then(|value| value.parse::().ok()) -} - -pub(super) fn env_u64(names: &[&str]) -> Option { - env_string(names).and_then(|value| value.parse::().ok()) -} diff --git a/apps/elf-eval/src/bin/live_baseline_elf/providers/config.rs b/apps/elf-eval/src/bin/live_baseline_elf/providers/config.rs new file mode 100644 index 00000000..3d33ac02 --- /dev/null +++ b/apps/elf-eval/src/bin/live_baseline_elf/providers/config.rs @@ -0,0 +1,166 @@ +use color_eyre::Result; + +use crate::{ + BaselineRuntime, Config, EmbeddingRuntimeReport, Serialize, env, eyre, providers::env_config, +}; + +#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)] +#[serde(rename_all = "snake_case")] +pub(crate) enum EmbeddingMode { + Local, + Provider, +} + +pub(crate) fn runtime_config(runtime: &BaselineRuntime) -> Result { + let embedding_mode = embedding_mode()?; + let mut cfg = elf_config::load(&runtime.config_path)?; + + cfg.storage.postgres.dsn = runtime.dsn.clone(); + cfg.storage.postgres.pool_max_conns = 12; + cfg.storage.qdrant.url = runtime.qdrant_url.clone(); + cfg.storage.qdrant.collection = runtime.collection.clone(); + cfg.storage.qdrant.docs_collection = runtime.docs_collection.clone(); + + if embedding_mode == EmbeddingMode::Provider { + apply_provider_embedding_overrides(&mut cfg)?; + + cfg.storage.qdrant.vector_dim = cfg.providers.embedding.dimensions; + } else { + cfg.providers.embedding.provider_id = "local".to_string(); + cfg.providers.embedding.model = "local-hash".to_string(); + cfg.providers.embedding.dimensions = cfg.storage.qdrant.vector_dim; + } + + cfg.providers.rerank.provider_id = "local".to_string(); + cfg.providers.rerank.model = "local-token-overlap".to_string(); + cfg.providers.llm_extractor.provider_id = "disabled".to_string(); + cfg.providers.llm_extractor.model = "disabled".to_string(); + cfg.context = None; + + Ok(cfg) +} + +pub(crate) fn embedding_mode() -> Result { + let raw = env::var("ELF_BASELINE_ELF_EMBEDDING_MODE") + .unwrap_or_else(|_| "local".to_string()) + .to_ascii_lowercase(); + + match raw.as_str() { + "local" | "deterministic" => Ok(EmbeddingMode::Local), + "provider" | "production" => Ok(EmbeddingMode::Provider), + _ => Err(eyre::eyre!( + "Unsupported ELF_BASELINE_ELF_EMBEDDING_MODE={raw:?}; use local or provider." + )), + } +} + +pub(crate) fn embedding_runtime_report(cfg: &Config) -> EmbeddingRuntimeReport { + EmbeddingRuntimeReport { + mode: embedding_mode().unwrap_or(EmbeddingMode::Local), + provider_id: cfg.providers.embedding.provider_id.clone(), + model: cfg.providers.embedding.model.clone(), + dimensions: cfg.providers.embedding.dimensions, + timeout_ms: cfg.providers.embedding.timeout_ms, + api_base: cfg.providers.embedding.api_base.clone(), + path: cfg.providers.embedding.path.clone(), + } +} + +fn apply_provider_embedding_overrides(cfg: &mut Config) -> Result<()> { + env_config::apply_env_string( + &mut cfg.providers.embedding.provider_id, + &[ + "ELF_BASELINE_ELF_EMBEDDING_PROVIDER_ID", + "QWEN_EMBEDDING_PROVIDER_ID", + "EMBEDDING_PROVIDER_ID", + ], + ); + env_config::apply_env_string( + &mut cfg.providers.embedding.api_base, + &[ + "ELF_BASELINE_ELF_EMBEDDING_API_BASE", + "QWEN_EMBEDDING_API_BASE", + "DASHSCOPE_API_BASE", + "EMBEDDING_API_BASE", + ], + ); + env_config::apply_env_string( + &mut cfg.providers.embedding.api_key, + &[ + "ELF_BASELINE_ELF_EMBEDDING_API_KEY", + "QWEN_API_KEY", + "DASHSCOPE_API_KEY", + "EMBEDDING_API_KEY", + ], + ); + env_config::apply_env_string( + &mut cfg.providers.embedding.path, + &["ELF_BASELINE_ELF_EMBEDDING_PATH", "QWEN_EMBEDDING_PATH", "EMBEDDING_PATH"], + ); + env_config::apply_env_string( + &mut cfg.providers.embedding.model, + &["ELF_BASELINE_ELF_EMBEDDING_MODEL", "QWEN_EMBEDDING_MODEL", "EMBEDDING_MODEL"], + ); + + if let Some(dimensions) = env_config::env_u32(&[ + "ELF_BASELINE_ELF_EMBEDDING_DIMENSIONS", + "QWEN_EMBEDDING_DIMENSIONS", + "DASHSCOPE_EMBEDDING_DIMENSIONS", + "EMBEDDING_DIMENSIONS", + ]) { + cfg.providers.embedding.dimensions = dimensions; + } + if let Some(timeout_ms) = env_config::env_u64(&[ + "ELF_BASELINE_ELF_EMBEDDING_TIMEOUT_MS", + "QWEN_EMBEDDING_TIMEOUT_MS", + "EMBEDDING_TIMEOUT_MS", + ]) { + cfg.providers.embedding.timeout_ms = timeout_ms; + } else { + cfg.providers.embedding.timeout_ms = cfg.providers.embedding.timeout_ms.max(30_000); + } + + if cfg.providers.embedding.provider_id == "local" { + if env_config::env_string(&["ELF_BASELINE_ELF_EMBEDDING_API_KEY", "QWEN_API_KEY"]).is_some() + { + cfg.providers.embedding.provider_id = "qwen".to_string(); + } else if env_config::env_string(&["DASHSCOPE_API_KEY"]).is_some() { + cfg.providers.embedding.provider_id = "dashscope".to_string(); + } else if env_config::env_string(&["EMBEDDING_API_KEY"]).is_some() { + cfg.providers.embedding.provider_id = "provider".to_string(); + } + } + if cfg.providers.embedding.provider_id == "local" { + return Err(eyre::eyre!( + "Provider embedding mode requires a non-local provider id or QWEN_API_KEY/DASHSCOPE_API_KEY/EMBEDDING_API_KEY." + )); + } + if cfg.providers.embedding.api_base.trim().is_empty() + || cfg.providers.embedding.api_base == "http://127.0.0.1" + { + return Err(eyre::eyre!( + "Provider embedding mode requires ELF_BASELINE_ELF_EMBEDDING_API_BASE, QWEN_EMBEDDING_API_BASE, DASHSCOPE_API_BASE, or EMBEDDING_API_BASE." + )); + } + if cfg.providers.embedding.api_key.trim().is_empty() + || cfg.providers.embedding.api_key == "local-dev-placeholder" + { + return Err(eyre::eyre!( + "Provider embedding mode requires ELF_BASELINE_ELF_EMBEDDING_API_KEY, QWEN_API_KEY, DASHSCOPE_API_KEY, or EMBEDDING_API_KEY." + )); + } + if cfg.providers.embedding.model == "local-hash" + || cfg.providers.embedding.model.trim().is_empty() + { + return Err(eyre::eyre!( + "Provider embedding mode requires ELF_BASELINE_ELF_EMBEDDING_MODEL, QWEN_EMBEDDING_MODEL, or EMBEDDING_MODEL." + )); + } + if cfg.providers.embedding.dimensions == 0 { + return Err(eyre::eyre!( + "Provider embedding dimensions must be greater than zero; set ELF_BASELINE_ELF_EMBEDDING_DIMENSIONS, QWEN_EMBEDDING_DIMENSIONS, DASHSCOPE_EMBEDDING_DIMENSIONS, or EMBEDDING_DIMENSIONS." + )); + } + + Ok(()) +} diff --git a/apps/elf-eval/src/bin/live_baseline_elf/providers/deterministic.rs b/apps/elf-eval/src/bin/live_baseline_elf/providers/deterministic.rs new file mode 100644 index 00000000..083032ab --- /dev/null +++ b/apps/elf-eval/src/bin/live_baseline_elf/providers/deterministic.rs @@ -0,0 +1,66 @@ +use crate::{ + Arc, BoxFuture, EmbeddingProvider, EmbeddingProviderConfig, ExtractorProvider, + LlmProviderConfig, ProviderConfig, Providers, RerankProvider, Value, +}; +use elf_service::Result; + +#[derive(Debug)] +struct DeterministicEmbedding { + vector_dim: u32, +} +impl EmbeddingProvider for DeterministicEmbedding { + fn embed<'a>( + &'a self, + _cfg: &'a EmbeddingProviderConfig, + texts: &'a [String], + ) -> BoxFuture<'a, Result>>> { + let dim = self.vector_dim; + let vectors = texts.iter().map(|text| crate::embed_text(text, dim)).collect(); + + Box::pin(async move { Ok(vectors) }) + } +} + +#[derive(Debug)] +struct TokenOverlapRerank; +impl RerankProvider for TokenOverlapRerank { + fn rerank<'a>( + &'a self, + _cfg: &'a ProviderConfig, + query: &'a str, + docs: &'a [String], + ) -> BoxFuture<'a, Result>> { + let query_terms = crate::terms(query); + let scores = docs + .iter() + .map(|doc| { + let doc_terms = crate::terms(doc); + let hits = query_terms.intersection(&doc_terms).count() as f32; + + hits / query_terms.len().max(1) as f32 + }) + .collect(); + + Box::pin(async move { Ok(scores) }) + } +} + +#[derive(Debug)] +struct NoopExtractor; +impl ExtractorProvider for NoopExtractor { + fn extract<'a>( + &'a self, + _cfg: &'a LlmProviderConfig, + _messages: &'a [Value], + ) -> BoxFuture<'a, Result> { + Box::pin(async move { Ok(serde_json::json!({ "notes": [] })) }) + } +} + +pub(crate) fn deterministic_providers(vector_dim: u32) -> Providers { + Providers::new( + Arc::new(DeterministicEmbedding { vector_dim }), + Arc::new(TokenOverlapRerank), + Arc::new(NoopExtractor), + ) +} diff --git a/apps/elf-eval/src/bin/live_baseline_elf/providers/env_config.rs b/apps/elf-eval/src/bin/live_baseline_elf/providers/env_config.rs new file mode 100644 index 00000000..80018f4e --- /dev/null +++ b/apps/elf-eval/src/bin/live_baseline_elf/providers/env_config.rs @@ -0,0 +1,21 @@ +use crate::env; + +pub(crate) fn env_string(names: &[&str]) -> Option { + names.iter().find_map(|name| { + env::var(name).ok().map(|value| value.trim().to_string()).filter(|value| !value.is_empty()) + }) +} + +pub(super) fn apply_env_string(target: &mut String, names: &[&str]) { + if let Some(value) = env_string(names) { + *target = value; + } +} + +pub(super) fn env_u32(names: &[&str]) -> Option { + env_string(names).and_then(|value| value.parse::().ok()) +} + +pub(super) fn env_u64(names: &[&str]) -> Option { + env_string(names).and_then(|value| value.parse::().ok()) +} diff --git a/apps/elf-eval/src/bin/live_baseline_elf/runtime.rs b/apps/elf-eval/src/bin/live_baseline_elf/runtime.rs index 1f69397f..fd2c8882 100644 --- a/apps/elf-eval/src/bin/live_baseline_elf/runtime.rs +++ b/apps/elf-eval/src/bin/live_baseline_elf/runtime.rs @@ -1,309 +1,9 @@ -use color_eyre::Result; - -use crate::{ - AGENT_ID, Arc, BTreeMap, BaselineRuntime, ChunkingConfig, Db, ElfService, EmbeddingMode, - FailedOutboxJob, Instant, JoinSet, PROJECT_ID, PayloadLevel, QdrantStore, QueryCase, - QueryResult, SearchRequest, TENANT_ID, Uuid, Value, WorkerRunEvidence, WorkerState, env, eyre, - worker, +mod query; +mod service; +mod worker; + +pub(super) use self::{ + query::{run_queries, run_single_query}, + service::build_service, + worker::run_worker_until_indexed, }; - -pub(super) fn worker_max_iterations(note_count: usize) -> usize { - env::var("ELF_BASELINE_WORKER_MAX_ITERATIONS") - .ok() - .and_then(|value| value.parse::().ok()) - .unwrap_or_else(|| note_count.saturating_mul(3).saturating_add(32)) -} - -pub(super) async fn build_service(runtime: &BaselineRuntime) -> Result { - let cfg = crate::runtime_config(runtime)?; - let embedding_mode = crate::embedding_mode()?; - let vector_dim = cfg.storage.qdrant.vector_dim; - let db = Db::connect(&cfg.storage.postgres).await?; - - db.ensure_schema(cfg.storage.qdrant.vector_dim).await?; - - let qdrant = QdrantStore::new(&cfg.storage.qdrant)?; - - qdrant.ensure_collection().await?; - - if embedding_mode == EmbeddingMode::Provider { - Ok(ElfService::new(cfg, db, qdrant)) - } else { - Ok(ElfService::with_providers(cfg, db, qdrant, crate::deterministic_providers(vector_dim))) - } -} - -pub(super) async fn build_worker_state(runtime: &BaselineRuntime) -> Result { - let cfg = crate::runtime_config(runtime)?; - let db = Db::connect(&cfg.storage.postgres).await?; - - db.ensure_schema(cfg.storage.qdrant.vector_dim).await?; - - let qdrant = QdrantStore::new(&cfg.storage.qdrant)?; - - qdrant.ensure_collection().await?; - - let docs_qdrant = - QdrantStore::new_with_collection(&cfg.storage.qdrant, &cfg.storage.qdrant.docs_collection)?; - - docs_qdrant.ensure_collection().await?; - - let tokenizer = elf_chunking::load_tokenizer(&cfg.chunking.tokenizer_repo) - .map_err(|err| eyre::eyre!("Failed to load tokenizer for live baseline worker: {err}"))?; - let chunking = ChunkingConfig { - max_tokens: cfg.chunking.max_tokens, - overlap_tokens: cfg.chunking.overlap_tokens, - }; - - Ok(WorkerState { - db, - qdrant, - docs_qdrant, - embedding: cfg.providers.embedding, - chunking, - tokenizer, - }) -} - -pub(super) async fn run_worker_until_indexed( - runtime: &BaselineRuntime, - service: &ElfService, - note_ids: &[Uuid], - label: &str, -) -> Result { - let concurrency = crate::worker_concurrency(); - let mut states = Vec::with_capacity(concurrency); - - for _ in 0..concurrency { - states.push(Arc::new(build_worker_state(runtime).await?)); - } - - let before = outbox_status_counts(service, note_ids).await?; - let max_iterations = worker_max_iterations(note_ids.len()); - let mut iterations = 0_usize; - - while iterations < max_iterations { - let after = outbox_status_counts(service, note_ids).await?; - - if crate::outbox_done(&after, note_ids.len()) { - let (chunk_rows, chunk_embedding_rows) = chunk_counts(service, note_ids).await?; - let failed_jobs = failed_outbox_jobs(service, note_ids).await?; - - return Ok(WorkerRunEvidence { - label: label.to_string(), - expected_note_count: note_ids.len(), - concurrency, - iterations, - before, - after, - chunk_rows, - chunk_embedding_rows, - failed_jobs, - }); - } - - let mut set = JoinSet::new(); - - for state in &states { - let state = Arc::clone(state); - - set.spawn(async move { - worker::process_once(&state) - .await - .map_err(|err| eyre::eyre!("Worker process_once failed: {err}")) - }); - } - - while let Some(joined) = set.join_next().await { - joined??; - } - - iterations = iterations.saturating_add(concurrency); - } - - let after = outbox_status_counts(service, note_ids).await?; - let (chunk_rows, chunk_embedding_rows) = chunk_counts(service, note_ids).await?; - let failed_jobs = failed_outbox_jobs(service, note_ids).await?; - - Ok(WorkerRunEvidence { - label: label.to_string(), - expected_note_count: note_ids.len(), - concurrency, - iterations, - before, - after, - chunk_rows, - chunk_embedding_rows, - failed_jobs, - }) -} - -pub(super) async fn outbox_status_counts( - service: &ElfService, - note_ids: &[Uuid], -) -> Result> { - if note_ids.is_empty() { - return Ok(BTreeMap::new()); - } - - let rows = sqlx::query_as::<_, (String, i64)>( - "\ -SELECT status, COUNT(*)::bigint -FROM indexing_outbox -WHERE note_id = ANY($1) -GROUP BY status -ORDER BY status", - ) - .bind(note_ids) - .fetch_all(&service.db.pool) - .await?; - - Ok(rows.into_iter().collect()) -} - -pub(super) async fn chunk_counts(service: &ElfService, note_ids: &[Uuid]) -> Result<(i64, i64)> { - if note_ids.is_empty() { - return Ok((0, 0)); - } - - let chunk_rows = sqlx::query_scalar::<_, i64>( - "\ -SELECT COUNT(*)::bigint -FROM memory_note_chunks -WHERE note_id = ANY($1)", - ) - .bind(note_ids) - .fetch_one(&service.db.pool) - .await?; - let chunk_embedding_rows = sqlx::query_scalar::<_, i64>( - "\ -SELECT COUNT(*)::bigint -FROM memory_note_chunks c -JOIN note_chunk_embeddings e ON e.chunk_id = c.chunk_id -WHERE c.note_id = ANY($1)", - ) - .bind(note_ids) - .fetch_one(&service.db.pool) - .await?; - - Ok((chunk_rows, chunk_embedding_rows)) -} - -pub(super) async fn failed_outbox_jobs( - service: &ElfService, - note_ids: &[Uuid], -) -> Result> { - if note_ids.is_empty() { - return Ok(Vec::new()); - } - - let rows = sqlx::query_as::<_, (Uuid, Option, String, i32, Option)>( - "\ -SELECT o.note_id, n.key, o.op, o.attempts, o.last_error -FROM indexing_outbox o -LEFT JOIN memory_notes n ON n.note_id = o.note_id -WHERE o.note_id = ANY($1) - AND o.status = 'FAILED' -ORDER BY n.key NULLS LAST, o.note_id", - ) - .bind(note_ids) - .fetch_all(&service.db.pool) - .await?; - - Ok(rows - .into_iter() - .map(|(note_id, note_key, op, attempts, last_error)| FailedOutboxJob { - note_id, - note_key, - op, - attempts, - last_error, - }) - .collect()) -} - -pub(super) async fn run_queries( - service: &ElfService, - queries: Vec, -) -> Result> { - let mut out = Vec::with_capacity(queries.len()); - - for case in queries { - out.push(run_single_query(service, case).await?); - } - - Ok(out) -} - -pub(super) async fn run_single_query(service: &ElfService, case: QueryCase) -> Result { - let top_k = env::var("ELF_BASELINE_TOP_K") - .ok() - .and_then(|value| value.parse::().ok()) - .unwrap_or(10); - let started_at = Instant::now(); - let response = service - .search_raw(SearchRequest { - tenant_id: TENANT_ID.to_string(), - project_id: PROJECT_ID.to_string(), - agent_id: AGENT_ID.to_string(), - token_id: None, - payload_level: PayloadLevel::L2, - read_profile: "private_only".to_string(), - query: case.query.clone(), - top_k: Some(top_k), - candidate_k: Some(top_k.max(20).saturating_mul(4)), - filter: None, - record_hits: Some(false), - ranking: None, - }) - .await?; - let latency_ms = started_at.elapsed().as_secs_f64() * 1_000.0; - let top = response.items.first(); - let top_text = top.map(|item| item.snippet.clone()).unwrap_or_default(); - let matched_terms = case - .expected_terms - .iter() - .filter(|term| crate::contains_case_insensitive(&top_text, term)) - .cloned() - .collect::>(); - let top_key = top.and_then(|item| item.key.clone()); - let expected_docs = crate::expected_docs_for_case(&case); - let matched_doc = top_key - .as_deref() - .and_then(|key| expected_docs.iter().find(|doc| crate::key_for_doc(doc) == key)); - let top_evidence_id = top.and_then(|item| { - item.source_ref.get("document").and_then(Value::as_str).map(crate::evidence_id_for_doc) - }); - let matched_evidence_id = matched_doc.map(|doc| crate::evidence_id_for_doc(doc)); - let matched = matched_terms.len() == case.expected_terms.len() || matched_doc.is_some(); - let expected_evidence_ids = if case.expected_evidence_ids.is_empty() { - vec![crate::evidence_id_for_doc(&case.expected_doc)] - } else { - case.expected_evidence_ids.clone() - }; - let allowed_alternate_evidence_ids = if case.allowed_alternate_evidence_ids.is_empty() { - case.allowed_alternate_docs.iter().map(|doc| crate::evidence_id_for_doc(doc)).collect() - } else { - case.allowed_alternate_evidence_ids.clone() - }; - - Ok(QueryResult { - id: case.id, - task: case.task, - trace_id: response.trace_id, - query: case.query, - expected_doc: case.expected_doc, - allowed_alternate_docs: case.allowed_alternate_docs, - expected_terms: case.expected_terms, - expected_evidence_ids, - allowed_alternate_evidence_ids, - matched, - matched_terms, - top_evidence_id, - matched_evidence_id, - top_note_key: top_key, - top_snippet: top.map(|item| item.snippet.clone()), - latency_ms, - returned_count: response.items.len(), - }) -} diff --git a/apps/elf-eval/src/bin/live_baseline_elf/runtime/query.rs b/apps/elf-eval/src/bin/live_baseline_elf/runtime/query.rs new file mode 100644 index 00000000..0c3184c1 --- /dev/null +++ b/apps/elf-eval/src/bin/live_baseline_elf/runtime/query.rs @@ -0,0 +1,92 @@ +use color_eyre::Result; + +use crate::{ + AGENT_ID, ElfService, Instant, PROJECT_ID, PayloadLevel, QueryCase, QueryResult, SearchRequest, + TENANT_ID, Value, env, +}; + +pub(crate) async fn run_queries( + service: &ElfService, + queries: Vec, +) -> Result> { + let mut out = Vec::with_capacity(queries.len()); + + for case in queries { + out.push(run_single_query(service, case).await?); + } + + Ok(out) +} + +pub(crate) async fn run_single_query(service: &ElfService, case: QueryCase) -> Result { + let top_k = env::var("ELF_BASELINE_TOP_K") + .ok() + .and_then(|value| value.parse::().ok()) + .unwrap_or(10); + let started_at = Instant::now(); + let response = service + .search_raw(SearchRequest { + tenant_id: TENANT_ID.to_string(), + project_id: PROJECT_ID.to_string(), + agent_id: AGENT_ID.to_string(), + token_id: None, + payload_level: PayloadLevel::L2, + read_profile: "private_only".to_string(), + query: case.query.clone(), + top_k: Some(top_k), + candidate_k: Some(top_k.max(20).saturating_mul(4)), + filter: None, + record_hits: Some(false), + ranking: None, + }) + .await?; + let latency_ms = started_at.elapsed().as_secs_f64() * 1_000.0; + let top = response.items.first(); + let top_text = top.map(|item| item.snippet.clone()).unwrap_or_default(); + let matched_terms = case + .expected_terms + .iter() + .filter(|term| crate::contains_case_insensitive(&top_text, term)) + .cloned() + .collect::>(); + let top_key = top.and_then(|item| item.key.clone()); + let expected_docs = crate::expected_docs_for_case(&case); + let matched_doc = top_key + .as_deref() + .and_then(|key| expected_docs.iter().find(|doc| crate::key_for_doc(doc) == key)); + let top_evidence_id = top.and_then(|item| { + item.source_ref.get("document").and_then(Value::as_str).map(crate::evidence_id_for_doc) + }); + let matched_evidence_id = matched_doc.map(|doc| crate::evidence_id_for_doc(doc)); + let matched = matched_terms.len() == case.expected_terms.len() || matched_doc.is_some(); + let expected_evidence_ids = if case.expected_evidence_ids.is_empty() { + vec![crate::evidence_id_for_doc(&case.expected_doc)] + } else { + case.expected_evidence_ids.clone() + }; + let allowed_alternate_evidence_ids = if case.allowed_alternate_evidence_ids.is_empty() { + case.allowed_alternate_docs.iter().map(|doc| crate::evidence_id_for_doc(doc)).collect() + } else { + case.allowed_alternate_evidence_ids.clone() + }; + + Ok(QueryResult { + id: case.id, + task: case.task, + trace_id: response.trace_id, + query: case.query, + expected_doc: case.expected_doc, + allowed_alternate_docs: case.allowed_alternate_docs, + expected_terms: case.expected_terms, + expected_evidence_ids, + allowed_alternate_evidence_ids, + matched, + matched_terms, + top_evidence_id, + matched_evidence_id, + top_note_key: top_key, + top_snippet: top.map(|item| item.snippet.clone()), + latency_ms, + returned_count: response.items.len(), + }) +} diff --git a/apps/elf-eval/src/bin/live_baseline_elf/runtime/service.rs b/apps/elf-eval/src/bin/live_baseline_elf/runtime/service.rs new file mode 100644 index 00000000..b80f0565 --- /dev/null +++ b/apps/elf-eval/src/bin/live_baseline_elf/runtime/service.rs @@ -0,0 +1,56 @@ +use color_eyre::Result; + +use crate::{ + BaselineRuntime, ChunkingConfig, Db, ElfService, EmbeddingMode, QdrantStore, WorkerState, eyre, +}; + +pub(crate) async fn build_service(runtime: &BaselineRuntime) -> Result { + let cfg = crate::runtime_config(runtime)?; + let embedding_mode = crate::embedding_mode()?; + let vector_dim = cfg.storage.qdrant.vector_dim; + let db = Db::connect(&cfg.storage.postgres).await?; + + db.ensure_schema(cfg.storage.qdrant.vector_dim).await?; + + let qdrant = QdrantStore::new(&cfg.storage.qdrant)?; + + qdrant.ensure_collection().await?; + + if embedding_mode == EmbeddingMode::Provider { + Ok(ElfService::new(cfg, db, qdrant)) + } else { + Ok(ElfService::with_providers(cfg, db, qdrant, crate::deterministic_providers(vector_dim))) + } +} + +pub(super) async fn build_worker_state(runtime: &BaselineRuntime) -> Result { + let cfg = crate::runtime_config(runtime)?; + let db = Db::connect(&cfg.storage.postgres).await?; + + db.ensure_schema(cfg.storage.qdrant.vector_dim).await?; + + let qdrant = QdrantStore::new(&cfg.storage.qdrant)?; + + qdrant.ensure_collection().await?; + + let docs_qdrant = + QdrantStore::new_with_collection(&cfg.storage.qdrant, &cfg.storage.qdrant.docs_collection)?; + + docs_qdrant.ensure_collection().await?; + + let tokenizer = elf_chunking::load_tokenizer(&cfg.chunking.tokenizer_repo) + .map_err(|err| eyre::eyre!("Failed to load tokenizer for live baseline worker: {err}"))?; + let chunking = ChunkingConfig { + max_tokens: cfg.chunking.max_tokens, + overlap_tokens: cfg.chunking.overlap_tokens, + }; + + Ok(WorkerState { + db, + qdrant, + docs_qdrant, + embedding: cfg.providers.embedding, + chunking, + tokenizer, + }) +} diff --git a/apps/elf-eval/src/bin/live_baseline_elf/runtime/worker.rs b/apps/elf-eval/src/bin/live_baseline_elf/runtime/worker.rs new file mode 100644 index 00000000..dc7e76f6 --- /dev/null +++ b/apps/elf-eval/src/bin/live_baseline_elf/runtime/worker.rs @@ -0,0 +1,171 @@ +use color_eyre::Result; + +use crate::{ + Arc, BTreeMap, BaselineRuntime, ElfService, FailedOutboxJob, JoinSet, Uuid, WorkerRunEvidence, + env, eyre, runtime::service, +}; +use elf_worker::worker; + +pub(crate) async fn run_worker_until_indexed( + runtime: &BaselineRuntime, + service: &ElfService, + note_ids: &[Uuid], + label: &str, +) -> Result { + let concurrency = crate::worker_concurrency(); + let mut states = Vec::with_capacity(concurrency); + + for _ in 0..concurrency { + states.push(Arc::new(service::build_worker_state(runtime).await?)); + } + + let before = outbox_status_counts(service, note_ids).await?; + let max_iterations = worker_max_iterations(note_ids.len()); + let mut iterations = 0_usize; + + while iterations < max_iterations { + let after = outbox_status_counts(service, note_ids).await?; + + if crate::outbox_done(&after, note_ids.len()) { + let (chunk_rows, chunk_embedding_rows) = chunk_counts(service, note_ids).await?; + let failed_jobs = failed_outbox_jobs(service, note_ids).await?; + + return Ok(WorkerRunEvidence { + label: label.to_string(), + expected_note_count: note_ids.len(), + concurrency, + iterations, + before, + after, + chunk_rows, + chunk_embedding_rows, + failed_jobs, + }); + } + + let mut set = JoinSet::new(); + + for state in &states { + let state = Arc::clone(state); + + set.spawn(async move { + worker::process_once(&state) + .await + .map_err(|err| eyre::eyre!("Worker process_once failed: {err}")) + }); + } + + while let Some(joined) = set.join_next().await { + joined??; + } + + iterations = iterations.saturating_add(concurrency); + } + + let after = outbox_status_counts(service, note_ids).await?; + let (chunk_rows, chunk_embedding_rows) = chunk_counts(service, note_ids).await?; + let failed_jobs = failed_outbox_jobs(service, note_ids).await?; + + Ok(WorkerRunEvidence { + label: label.to_string(), + expected_note_count: note_ids.len(), + concurrency, + iterations, + before, + after, + chunk_rows, + chunk_embedding_rows, + failed_jobs, + }) +} + +fn worker_max_iterations(note_count: usize) -> usize { + env::var("ELF_BASELINE_WORKER_MAX_ITERATIONS") + .ok() + .and_then(|value| value.parse::().ok()) + .unwrap_or_else(|| note_count.saturating_mul(3).saturating_add(32)) +} + +async fn outbox_status_counts( + service: &ElfService, + note_ids: &[Uuid], +) -> Result> { + if note_ids.is_empty() { + return Ok(BTreeMap::new()); + } + + let rows = sqlx::query_as::<_, (String, i64)>( + "\ +SELECT status, COUNT(*)::bigint +FROM indexing_outbox +WHERE note_id = ANY($1) +GROUP BY status +ORDER BY status", + ) + .bind(note_ids) + .fetch_all(&service.db.pool) + .await?; + + Ok(rows.into_iter().collect()) +} + +async fn chunk_counts(service: &ElfService, note_ids: &[Uuid]) -> Result<(i64, i64)> { + if note_ids.is_empty() { + return Ok((0, 0)); + } + + let chunk_rows = sqlx::query_scalar::<_, i64>( + "\ +SELECT COUNT(*)::bigint +FROM memory_note_chunks +WHERE note_id = ANY($1)", + ) + .bind(note_ids) + .fetch_one(&service.db.pool) + .await?; + let chunk_embedding_rows = sqlx::query_scalar::<_, i64>( + "\ +SELECT COUNT(*)::bigint +FROM memory_note_chunks c +JOIN note_chunk_embeddings e ON e.chunk_id = c.chunk_id +WHERE c.note_id = ANY($1)", + ) + .bind(note_ids) + .fetch_one(&service.db.pool) + .await?; + + Ok((chunk_rows, chunk_embedding_rows)) +} + +async fn failed_outbox_jobs( + service: &ElfService, + note_ids: &[Uuid], +) -> Result> { + if note_ids.is_empty() { + return Ok(Vec::new()); + } + + let rows = sqlx::query_as::<_, (Uuid, Option, String, i32, Option)>( + "\ +SELECT o.note_id, n.key, o.op, o.attempts, o.last_error +FROM indexing_outbox o +LEFT JOIN memory_notes n ON n.note_id = o.note_id +WHERE o.note_id = ANY($1) + AND o.status = 'FAILED' +ORDER BY n.key NULLS LAST, o.note_id", + ) + .bind(note_ids) + .fetch_all(&service.db.pool) + .await?; + + Ok(rows + .into_iter() + .map(|(note_id, note_key, op, attempts, last_error)| FailedOutboxJob { + note_id, + note_key, + op, + attempts, + last_error, + }) + .collect()) +} diff --git a/apps/elf-eval/src/bin/live_baseline_elf/types.rs b/apps/elf-eval/src/bin/live_baseline_elf/types.rs index 734005d5..16db14d1 100644 --- a/apps/elf-eval/src/bin/live_baseline_elf/types.rs +++ b/apps/elf-eval/src/bin/live_baseline_elf/types.rs @@ -1,306 +1,21 @@ -use crate::{BTreeMap, Deserialize, EmbeddingMode, Parser, PathBuf, Serialize, Uuid, Value}; - -#[derive(Debug, Parser)] -#[command(version = elf_cli::VERSION, rename_all = "kebab", styles = elf_cli::styles())] -pub(super) struct Args { - /// Base ELF config to load before Docker runtime overrides are applied. - #[arg(long, short = 'c', value_name = "FILE")] - pub(super) config: PathBuf, - - /// Directory containing the generated benchmark corpus markdown files. - #[arg(long, value_name = "DIR")] - pub(super) corpus: PathBuf, - - /// Query manifest generated by the live-baseline harness. - #[arg(long, value_name = "FILE")] - pub(super) queries: PathBuf, - - /// Write ELF result JSON to this file. - #[arg(long, value_name = "FILE")] - pub(super) out: PathBuf, -} - -#[derive(Debug, Deserialize)] -pub(super) struct QueryManifest { - pub(super) queries: Vec, -} - -#[derive(Clone, Debug, Deserialize, Serialize)] -pub(super) struct QueryCase { - pub(super) id: String, - pub(super) task: Option, - pub(super) query: String, - pub(super) expected_doc: String, - pub(super) expected_terms: Vec, - #[serde(default)] - pub(super) allowed_alternate_docs: Vec, - #[serde(default)] - pub(super) expected_evidence_ids: Vec, - #[serde(default)] - pub(super) allowed_alternate_evidence_ids: Vec, -} -impl QueryCase { - pub(super) fn generated( - id: String, - query: String, - expected_doc: String, - expected_terms: Vec, - ) -> Self { - Self { - id, - task: None, - query, - expected_evidence_ids: vec![crate::evidence_id_for_doc(&expected_doc)], - allowed_alternate_docs: Vec::new(), - allowed_alternate_evidence_ids: Vec::new(), - expected_doc, - expected_terms, - } - } -} - -#[derive(Debug)] -pub(super) struct CorpusNote { - pub(super) key: String, - pub(super) title: String, - pub(super) text: String, - pub(super) source_doc: String, -} - -#[derive(Debug)] -pub(super) struct BackfillOutcome { - pub(super) report: BackfillReport, - pub(super) note_ids: Vec, -} - -#[derive(Debug)] -pub(super) struct ExistingBackfillNote { - pub(super) note_id: Uuid, - pub(super) source_hash: Option, -} - -#[derive(Clone, Debug, Deserialize, Serialize)] -pub(super) struct BackfillCheckpoint { - pub(super) schema: String, - pub(super) corpus_hash: String, - pub(super) completed: BTreeMap, -} - -#[derive(Clone, Debug, Deserialize, Serialize)] -pub(super) struct BackfillCheckpointEntry { - pub(super) note_id: Uuid, - pub(super) key: String, - pub(super) source_hash: String, - pub(super) op: String, -} - -#[derive(Debug, Serialize)] -pub(super) struct BackfillReport { - pub(super) checkpoint_path: String, - pub(super) corpus_hash: String, - pub(super) source_count: usize, - pub(super) completed_count: usize, - pub(super) batch_size: usize, - pub(super) worker_concurrency: usize, - pub(super) elapsed_seconds: f64, - pub(super) attempted_writes: usize, - pub(super) skipped_completed: usize, - pub(super) duplicate_source_notes: Vec, - pub(super) resume: BackfillResumeReport, - pub(super) attempts: Vec, -} - -#[derive(Debug, Serialize)] -pub(super) struct BackfillResumeReport { - pub(super) enabled: bool, - pub(super) interrupted: bool, - pub(super) interrupt_after: Option, - pub(super) resume_attempts: usize, - pub(super) completed_before_resume: usize, - pub(super) completed_after_resume: usize, -} - -#[derive(Debug, Serialize)] -pub(super) struct BackfillAttemptEvidence { - pub(super) attempt: usize, - pub(super) resumed: bool, - pub(super) interrupt_after: Option, - pub(super) skipped_completed: usize, - pub(super) attempted_writes: usize, - pub(super) completed_writes: usize, - pub(super) checkpoint_completed: usize, - pub(super) interrupted: bool, -} - -#[derive(Debug, Serialize)] -pub(super) struct DuplicateSourceNote { - pub(super) source_doc: String, - pub(super) count: i64, - pub(super) note_ids: Vec, -} - -#[derive(Debug)] -pub(super) struct BaselineRuntime { - pub(super) config_path: PathBuf, - pub(super) dsn: String, - pub(super) qdrant_url: String, - pub(super) collection: String, - pub(super) docs_collection: String, -} - -#[derive(Debug, Serialize)] -pub(super) struct WorkerRunEvidence { - pub(super) label: String, - pub(super) expected_note_count: usize, - pub(super) concurrency: usize, - pub(super) iterations: usize, - pub(super) before: BTreeMap, - pub(super) after: BTreeMap, - pub(super) chunk_rows: i64, - pub(super) chunk_embedding_rows: i64, - pub(super) failed_jobs: Vec, -} - -#[derive(Debug, Serialize)] -pub(super) struct FailedOutboxJob { - pub(super) note_id: Uuid, - pub(super) note_key: Option, - pub(super) op: String, - pub(super) attempts: i32, - pub(super) last_error: Option, -} - -#[derive(Debug, Serialize)] -pub(super) struct ResourceEnvelopeEvidence { - pub(super) elapsed_seconds: f64, - pub(super) max_elapsed_seconds: f64, - pub(super) rss_kb: Option, - pub(super) max_rss_kb: u64, - pub(super) postgres_database_bytes: Option, - pub(super) corpus_dir_bytes: u64, - pub(super) report_dir_bytes: Option, - pub(super) checkpoint_file_bytes: Option, -} - -#[derive(Debug, Serialize)] -pub(super) struct CostProxyReport { - pub(super) schema: &'static str, - pub(super) scope: &'static str, - pub(super) embedding_mode: EmbeddingMode, - pub(super) estimated_input_chars: usize, - pub(super) estimated_input_tokens: usize, - pub(super) token_estimation: &'static str, - pub(super) configured_usd_per_1k_tokens: Option, - pub(super) estimated_usd: Option, - pub(super) document_count: usize, - pub(super) query_count: usize, -} - -#[derive(Debug, Serialize)] -pub(super) struct EmbeddingRuntimeReport { - pub(super) mode: EmbeddingMode, - pub(super) provider_id: String, - pub(super) model: String, - pub(super) dimensions: u32, - pub(super) timeout_ms: u64, - pub(super) api_base: String, - pub(super) path: String, -} - -#[derive(Debug, Serialize)] -pub(super) struct SoakConfig { - pub(super) target_seconds: u64, - pub(super) write_rounds: usize, - pub(super) probe_interval_millis: u64, -} - -#[derive(Debug, Serialize)] -pub(super) struct ElfBaselineReport { - pub(super) schema: &'static str, - pub(super) status: &'static str, - pub(super) retrieval_status: &'static str, - pub(super) reason: String, - pub(super) head: String, - pub(super) embedding: EmbeddingRuntimeReport, - pub(super) cost_proxy: CostProxyReport, - pub(super) backfill: BackfillReport, - pub(super) indexing: IndexingReport, - pub(super) summary: QuerySummary, - pub(super) check_summary: CheckSummary, - pub(super) checks: Vec, - pub(super) queries: Vec, - pub(super) ops_cases: Vec, -} - -#[derive(Debug, Serialize)] -pub(super) struct IndexingReport { - pub(super) note_count: usize, - pub(super) rebuild_rebuilt_count: u64, - pub(super) rebuild_missing_vector_count: u64, - pub(super) rebuild_error_count: u64, -} - -#[derive(Debug, Serialize)] -pub(super) struct QuerySummary { - pub(super) total: usize, - pub(super) pass: usize, - pub(super) fail: usize, - pub(super) wrong_result_count: usize, - pub(super) latency_ms_total: f64, - pub(super) latency_ms_mean: f64, - pub(super) latency_ms_p50: f64, - pub(super) latency_ms_p95: f64, - pub(super) latency_ms_p99: f64, - pub(super) latency_ms_max: f64, -} - -#[derive(Debug, Serialize)] -pub(super) struct OperationalCase { - pub(super) name: &'static str, - pub(super) default_status: &'static str, - pub(super) operator_status: &'static str, - pub(super) command: &'static str, - pub(super) evidence: &'static str, - pub(super) safety: &'static str, -} - -#[derive(Debug, Serialize)] -pub(super) struct CheckSummary { - pub(super) total: usize, - pub(super) pass: usize, - pub(super) fail: usize, - pub(super) wrong_result: usize, - pub(super) lifecycle_fail: usize, - pub(super) incomplete: usize, - pub(super) blocked: usize, - pub(super) not_encoded: usize, -} - -#[derive(Debug, Serialize)] -pub(super) struct CheckResult { - pub(super) name: &'static str, - pub(super) status: &'static str, - pub(super) reason: String, - pub(super) evidence: Value, -} - -#[derive(Debug, Serialize)] -pub(super) struct QueryResult { - pub(super) id: String, - pub(super) task: Option, - pub(super) trace_id: Uuid, - pub(super) query: String, - pub(super) expected_doc: String, - pub(super) allowed_alternate_docs: Vec, - pub(super) expected_terms: Vec, - pub(super) expected_evidence_ids: Vec, - pub(super) allowed_alternate_evidence_ids: Vec, - pub(super) matched: bool, - pub(super) matched_terms: Vec, - pub(super) top_evidence_id: Option, - pub(super) matched_evidence_id: Option, - pub(super) top_note_key: Option, - pub(super) top_snippet: Option, - pub(super) latency_ms: f64, - pub(super) returned_count: usize, -} +mod backfill; +mod cli; +mod corpus; +mod query; +mod report; +mod runtime; + +pub(super) use self::{ + backfill::{ + BackfillAttemptEvidence, BackfillCheckpoint, BackfillCheckpointEntry, BackfillOutcome, + BackfillReport, BackfillResumeReport, DuplicateSourceNote, ExistingBackfillNote, + }, + cli::Args, + corpus::CorpusNote, + query::{QueryCase, QueryManifest, QueryResult, QuerySummary}, + report::{ + CheckResult, CheckSummary, CostProxyReport, ElfBaselineReport, IndexingReport, + OperationalCase, ResourceEnvelopeEvidence, SoakConfig, + }, + runtime::{BaselineRuntime, EmbeddingRuntimeReport, FailedOutboxJob, WorkerRunEvidence}, +}; diff --git a/apps/elf-eval/src/bin/live_baseline_elf/types/backfill.rs b/apps/elf-eval/src/bin/live_baseline_elf/types/backfill.rs new file mode 100644 index 00000000..bcd7266b --- /dev/null +++ b/apps/elf-eval/src/bin/live_baseline_elf/types/backfill.rs @@ -0,0 +1,73 @@ +use crate::{BTreeMap, Deserialize, Serialize, Uuid}; + +#[derive(Debug)] +pub(crate) struct BackfillOutcome { + pub(crate) report: BackfillReport, + pub(crate) note_ids: Vec, +} + +#[derive(Debug)] +pub(crate) struct ExistingBackfillNote { + pub(crate) note_id: Uuid, + pub(crate) source_hash: Option, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub(crate) struct BackfillCheckpoint { + pub(crate) schema: String, + pub(crate) corpus_hash: String, + pub(crate) completed: BTreeMap, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub(crate) struct BackfillCheckpointEntry { + pub(crate) note_id: Uuid, + pub(crate) key: String, + pub(crate) source_hash: String, + pub(crate) op: String, +} + +#[derive(Debug, Serialize)] +pub(crate) struct BackfillReport { + pub(crate) checkpoint_path: String, + pub(crate) corpus_hash: String, + pub(crate) source_count: usize, + pub(crate) completed_count: usize, + pub(crate) batch_size: usize, + pub(crate) worker_concurrency: usize, + pub(crate) elapsed_seconds: f64, + pub(crate) attempted_writes: usize, + pub(crate) skipped_completed: usize, + pub(crate) duplicate_source_notes: Vec, + pub(crate) resume: BackfillResumeReport, + pub(crate) attempts: Vec, +} + +#[derive(Debug, Serialize)] +pub(crate) struct BackfillResumeReport { + pub(crate) enabled: bool, + pub(crate) interrupted: bool, + pub(crate) interrupt_after: Option, + pub(crate) resume_attempts: usize, + pub(crate) completed_before_resume: usize, + pub(crate) completed_after_resume: usize, +} + +#[derive(Debug, Serialize)] +pub(crate) struct BackfillAttemptEvidence { + pub(crate) attempt: usize, + pub(crate) resumed: bool, + pub(crate) interrupt_after: Option, + pub(crate) skipped_completed: usize, + pub(crate) attempted_writes: usize, + pub(crate) completed_writes: usize, + pub(crate) checkpoint_completed: usize, + pub(crate) interrupted: bool, +} + +#[derive(Debug, Serialize)] +pub(crate) struct DuplicateSourceNote { + pub(crate) source_doc: String, + pub(crate) count: i64, + pub(crate) note_ids: Vec, +} diff --git a/apps/elf-eval/src/bin/live_baseline_elf/types/cli.rs b/apps/elf-eval/src/bin/live_baseline_elf/types/cli.rs new file mode 100644 index 00000000..14f63cb6 --- /dev/null +++ b/apps/elf-eval/src/bin/live_baseline_elf/types/cli.rs @@ -0,0 +1,21 @@ +use crate::{Parser, PathBuf}; + +#[derive(Debug, Parser)] +#[command(version = elf_cli::VERSION, rename_all = "kebab", styles = elf_cli::styles())] +pub(crate) struct Args { + /// Base ELF config to load before Docker runtime overrides are applied. + #[arg(long, short = 'c', value_name = "FILE")] + pub(crate) config: PathBuf, + + /// Directory containing the generated benchmark corpus markdown files. + #[arg(long, value_name = "DIR")] + pub(crate) corpus: PathBuf, + + /// Query manifest generated by the live-baseline harness. + #[arg(long, value_name = "FILE")] + pub(crate) queries: PathBuf, + + /// Write ELF result JSON to this file. + #[arg(long, value_name = "FILE")] + pub(crate) out: PathBuf, +} diff --git a/apps/elf-eval/src/bin/live_baseline_elf/types/corpus.rs b/apps/elf-eval/src/bin/live_baseline_elf/types/corpus.rs new file mode 100644 index 00000000..91d1e44c --- /dev/null +++ b/apps/elf-eval/src/bin/live_baseline_elf/types/corpus.rs @@ -0,0 +1,7 @@ +#[derive(Debug)] +pub(crate) struct CorpusNote { + pub(crate) key: String, + pub(crate) title: String, + pub(crate) text: String, + pub(crate) source_doc: String, +} diff --git a/apps/elf-eval/src/bin/live_baseline_elf/types/query.rs b/apps/elf-eval/src/bin/live_baseline_elf/types/query.rs new file mode 100644 index 00000000..01291aff --- /dev/null +++ b/apps/elf-eval/src/bin/live_baseline_elf/types/query.rs @@ -0,0 +1,75 @@ +use crate::{Deserialize, Serialize, Uuid}; + +#[derive(Debug, Deserialize)] +pub(crate) struct QueryManifest { + pub(crate) queries: Vec, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub(crate) struct QueryCase { + pub(crate) id: String, + pub(crate) task: Option, + pub(crate) query: String, + pub(crate) expected_doc: String, + pub(crate) expected_terms: Vec, + #[serde(default)] + pub(crate) allowed_alternate_docs: Vec, + #[serde(default)] + pub(crate) expected_evidence_ids: Vec, + #[serde(default)] + pub(crate) allowed_alternate_evidence_ids: Vec, +} +impl QueryCase { + pub(crate) fn generated( + id: String, + query: String, + expected_doc: String, + expected_terms: Vec, + ) -> Self { + Self { + id, + task: None, + query, + expected_evidence_ids: vec![crate::evidence_id_for_doc(&expected_doc)], + allowed_alternate_docs: Vec::new(), + allowed_alternate_evidence_ids: Vec::new(), + expected_doc, + expected_terms, + } + } +} + +#[derive(Debug, Serialize)] +pub(crate) struct QuerySummary { + pub(crate) total: usize, + pub(crate) pass: usize, + pub(crate) fail: usize, + pub(crate) wrong_result_count: usize, + pub(crate) latency_ms_total: f64, + pub(crate) latency_ms_mean: f64, + pub(crate) latency_ms_p50: f64, + pub(crate) latency_ms_p95: f64, + pub(crate) latency_ms_p99: f64, + pub(crate) latency_ms_max: f64, +} + +#[derive(Debug, Serialize)] +pub(crate) struct QueryResult { + pub(crate) id: String, + pub(crate) task: Option, + pub(crate) trace_id: Uuid, + pub(crate) query: String, + pub(crate) expected_doc: String, + pub(crate) allowed_alternate_docs: Vec, + pub(crate) expected_terms: Vec, + pub(crate) expected_evidence_ids: Vec, + pub(crate) allowed_alternate_evidence_ids: Vec, + pub(crate) matched: bool, + pub(crate) matched_terms: Vec, + pub(crate) top_evidence_id: Option, + pub(crate) matched_evidence_id: Option, + pub(crate) top_note_key: Option, + pub(crate) top_snippet: Option, + pub(crate) latency_ms: f64, + pub(crate) returned_count: usize, +} diff --git a/apps/elf-eval/src/bin/live_baseline_elf/types/report.rs b/apps/elf-eval/src/bin/live_baseline_elf/types/report.rs new file mode 100644 index 00000000..f36f17ed --- /dev/null +++ b/apps/elf-eval/src/bin/live_baseline_elf/types/report.rs @@ -0,0 +1,97 @@ +use crate::{ + EmbeddingMode, Serialize, Value, + types::{ + backfill::BackfillReport, + query::{QueryResult, QuerySummary}, + runtime::EmbeddingRuntimeReport, + }, +}; + +#[derive(Debug, Serialize)] +pub(crate) struct ResourceEnvelopeEvidence { + pub(crate) elapsed_seconds: f64, + pub(crate) max_elapsed_seconds: f64, + pub(crate) rss_kb: Option, + pub(crate) max_rss_kb: u64, + pub(crate) postgres_database_bytes: Option, + pub(crate) corpus_dir_bytes: u64, + pub(crate) report_dir_bytes: Option, + pub(crate) checkpoint_file_bytes: Option, +} + +#[derive(Debug, Serialize)] +pub(crate) struct CostProxyReport { + pub(crate) schema: &'static str, + pub(crate) scope: &'static str, + pub(crate) embedding_mode: EmbeddingMode, + pub(crate) estimated_input_chars: usize, + pub(crate) estimated_input_tokens: usize, + pub(crate) token_estimation: &'static str, + pub(crate) configured_usd_per_1k_tokens: Option, + pub(crate) estimated_usd: Option, + pub(crate) document_count: usize, + pub(crate) query_count: usize, +} + +#[derive(Debug, Serialize)] +pub(crate) struct SoakConfig { + pub(crate) target_seconds: u64, + pub(crate) write_rounds: usize, + pub(crate) probe_interval_millis: u64, +} + +#[derive(Debug, Serialize)] +pub(crate) struct ElfBaselineReport { + pub(crate) schema: &'static str, + pub(crate) status: &'static str, + pub(crate) retrieval_status: &'static str, + pub(crate) reason: String, + pub(crate) head: String, + pub(crate) embedding: EmbeddingRuntimeReport, + pub(crate) cost_proxy: CostProxyReport, + pub(crate) backfill: BackfillReport, + pub(crate) indexing: IndexingReport, + pub(crate) summary: QuerySummary, + pub(crate) check_summary: CheckSummary, + pub(crate) checks: Vec, + pub(crate) queries: Vec, + pub(crate) ops_cases: Vec, +} + +#[derive(Debug, Serialize)] +pub(crate) struct IndexingReport { + pub(crate) note_count: usize, + pub(crate) rebuild_rebuilt_count: u64, + pub(crate) rebuild_missing_vector_count: u64, + pub(crate) rebuild_error_count: u64, +} + +#[derive(Debug, Serialize)] +pub(crate) struct OperationalCase { + pub(crate) name: &'static str, + pub(crate) default_status: &'static str, + pub(crate) operator_status: &'static str, + pub(crate) command: &'static str, + pub(crate) evidence: &'static str, + pub(crate) safety: &'static str, +} + +#[derive(Debug, Serialize)] +pub(crate) struct CheckSummary { + pub(crate) total: usize, + pub(crate) pass: usize, + pub(crate) fail: usize, + pub(crate) wrong_result: usize, + pub(crate) lifecycle_fail: usize, + pub(crate) incomplete: usize, + pub(crate) blocked: usize, + pub(crate) not_encoded: usize, +} + +#[derive(Debug, Serialize)] +pub(crate) struct CheckResult { + pub(crate) name: &'static str, + pub(crate) status: &'static str, + pub(crate) reason: String, + pub(crate) evidence: Value, +} diff --git a/apps/elf-eval/src/bin/live_baseline_elf/types/runtime.rs b/apps/elf-eval/src/bin/live_baseline_elf/types/runtime.rs new file mode 100644 index 00000000..6ba4ab6a --- /dev/null +++ b/apps/elf-eval/src/bin/live_baseline_elf/types/runtime.rs @@ -0,0 +1,43 @@ +use crate::{BTreeMap, EmbeddingMode, PathBuf, Serialize, Uuid}; + +#[derive(Debug)] +pub(crate) struct BaselineRuntime { + pub(crate) config_path: PathBuf, + pub(crate) dsn: String, + pub(crate) qdrant_url: String, + pub(crate) collection: String, + pub(crate) docs_collection: String, +} + +#[derive(Debug, Serialize)] +pub(crate) struct WorkerRunEvidence { + pub(crate) label: String, + pub(crate) expected_note_count: usize, + pub(crate) concurrency: usize, + pub(crate) iterations: usize, + pub(crate) before: BTreeMap, + pub(crate) after: BTreeMap, + pub(crate) chunk_rows: i64, + pub(crate) chunk_embedding_rows: i64, + pub(crate) failed_jobs: Vec, +} + +#[derive(Debug, Serialize)] +pub(crate) struct FailedOutboxJob { + pub(crate) note_id: Uuid, + pub(crate) note_key: Option, + pub(crate) op: String, + pub(crate) attempts: i32, + pub(crate) last_error: Option, +} + +#[derive(Debug, Serialize)] +pub(crate) struct EmbeddingRuntimeReport { + pub(crate) mode: EmbeddingMode, + pub(crate) provider_id: String, + pub(crate) model: String, + pub(crate) dimensions: u32, + pub(crate) timeout_ms: u64, + pub(crate) api_base: String, + pub(crate) path: String, +}