diff --git a/apps/elf-eval/src/bin/live_baseline_elf/backfill.rs b/apps/elf-eval/src/bin/live_baseline_elf/backfill.rs index d85632a9..0769b218 100644 --- a/apps/elf-eval/src/bin/live_baseline_elf/backfill.rs +++ b/apps/elf-eval/src/bin/live_baseline_elf/backfill.rs @@ -1,13 +1,18 @@ +mod backfill_checkpoint; + use color_eyre::Result; use crate::{ - AGENT_ID, AddNoteInput, AddNoteRequest, BACKFILL_CHECKPOINT_SCHEMA, BTreeMap, - BackfillAttemptEvidence, BackfillCheckpoint, BackfillCheckpointEntry, BackfillOutcome, - BackfillReport, BackfillResumeReport, CorpusNote, DuplicateSourceNote, ElfService, - ExistingBackfillNote, Hasher, Instant, NoteOp, PROJECT_ID, Path, PathBuf, SCOPE, TENANT_ID, - Uuid, env, eyre, fs, + AGENT_ID, AddNoteInput, AddNoteRequest, BTreeMap, BackfillAttemptEvidence, + BackfillCheckpointEntry, BackfillOutcome, BackfillReport, BackfillResumeReport, CorpusNote, + DuplicateSourceNote, ElfService, ExistingBackfillNote, Instant, NoteOp, PROJECT_ID, Path, + PathBuf, SCOPE, TENANT_ID, Uuid, env, eyre, }; +pub(super) fn backfill_checkpoint_path(out: &Path) -> PathBuf { + backfill_checkpoint::backfill_checkpoint_path(out) +} + pub(super) fn backfill_batch_size() -> usize { crate::parse_env_usize("ELF_BASELINE_BACKFILL_BATCH_SIZE").unwrap_or(32).max(1) } @@ -40,99 +45,8 @@ pub(super) fn backfill_interrupt_after(source_count: usize) -> Option { Some(configured.unwrap_or(default).clamp(1, source_count.saturating_sub(1))) } -pub(super) fn backfill_checkpoint_path(out: &Path) -> PathBuf { - crate::env_string(&["ELF_BASELINE_BACKFILL_CHECKPOINT"]) - .map(PathBuf::from) - .unwrap_or_else(|| out.with_file_name("elf-backfill-checkpoint.json")) -} - -pub(super) fn empty_backfill_checkpoint(corpus_hash: &str) -> BackfillCheckpoint { - BackfillCheckpoint { - schema: BACKFILL_CHECKPOINT_SCHEMA.to_string(), - corpus_hash: corpus_hash.to_string(), - completed: BTreeMap::new(), - } -} - -pub(super) fn load_backfill_checkpoint( - path: &Path, - corpus_hash: &str, -) -> Result { - if !path.exists() { - return Ok(empty_backfill_checkpoint(corpus_hash)); - } - - let raw = fs::read_to_string(path)?; - let checkpoint = serde_json::from_str::(&raw)?; - - if checkpoint.schema == BACKFILL_CHECKPOINT_SCHEMA && checkpoint.corpus_hash == corpus_hash { - Ok(checkpoint) - } else { - Ok(empty_backfill_checkpoint(corpus_hash)) - } -} - -pub(super) fn write_backfill_checkpoint( - path: &Path, - checkpoint: &BackfillCheckpoint, -) -> Result<()> { - if let Some(parent) = path.parent() { - fs::create_dir_all(parent)?; - } - - let raw = serde_json::to_string_pretty(checkpoint)?; - let tmp_path = path.with_extension("json.tmp"); - - fs::write(&tmp_path, raw)?; - fs::rename(tmp_path, path)?; - - Ok(()) -} - -pub(super) fn source_hash(note: &CorpusNote) -> String { - let mut hasher = Hasher::new(); - - hasher.update(note.source_doc.as_bytes()); - hasher.update(b"\0"); - hasher.update(note.key.as_bytes()); - hasher.update(b"\0"); - hasher.update(note.text.as_bytes()); - - hasher.finalize().to_hex().to_string() -} - -pub(super) fn corpus_hash(notes: &[CorpusNote]) -> String { - let mut hasher = Hasher::new(); - - for note in notes { - hasher.update(note.source_doc.as_bytes()); - hasher.update(b"\0"); - hasher.update(source_hash(note).as_bytes()); - hasher.update(b"\0"); - } - - hasher.finalize().to_hex().to_string() -} - -pub(super) fn checkpoint_entry_valid( - note: &CorpusNote, - entry: &BackfillCheckpointEntry, - existing: &BTreeMap, -) -> bool { - let expected_hash = source_hash(note); - - if entry.source_hash != expected_hash { - return false; - } - - existing.get(¬e.source_doc).is_some_and(|stored| { - stored.note_id == entry.note_id - && stored.source_hash.as_deref() == Some(expected_hash.as_str()) - }) -} - pub(super) fn note_input(note: &CorpusNote) -> AddNoteInput { - let hash = source_hash(note); + let hash = backfill_checkpoint::source_hash(note); AddNoteInput { r#type: "fact".to_string(), @@ -232,7 +146,7 @@ pub(super) async fn run_resumable_backfill( checkpoint_path: &Path, ) -> Result { let started_at = Instant::now(); - let corpus_hash = corpus_hash(notes); + let corpus_hash = backfill_checkpoint::corpus_hash(notes); let batch_size = backfill_batch_size(); let interrupt_after = backfill_interrupt_after(notes.len()); let first_attempt = run_backfill_attempt( @@ -266,7 +180,7 @@ pub(super) async fn run_resumable_backfill( ); } - let checkpoint = load_backfill_checkpoint(checkpoint_path, &corpus_hash)?; + let checkpoint = backfill_checkpoint::load_backfill_checkpoint(checkpoint_path, &corpus_hash)?; let existing = load_existing_backfill_notes(service).await?; let mut note_ids = Vec::with_capacity(notes.len()); @@ -278,7 +192,7 @@ pub(super) async fn run_resumable_backfill( )); }; - if !checkpoint_entry_valid(note, entry, &existing) { + if !backfill_checkpoint::checkpoint_entry_valid(note, entry, &existing) { return Err(eyre::eyre!( "Backfill checkpoint entry for {} does not match Postgres state.", note.source_doc @@ -326,7 +240,8 @@ pub(super) async fn run_backfill_attempt( attempt: usize, interrupt_after: Option, ) -> Result { - let mut checkpoint = load_backfill_checkpoint(checkpoint_path, corpus_hash)?; + let mut checkpoint = + backfill_checkpoint::load_backfill_checkpoint(checkpoint_path, corpus_hash)?; let existing = load_existing_backfill_notes(service).await?; let notes_by_source = notes.iter().map(|note| (note.source_doc.as_str(), note)).collect::>(); @@ -335,11 +250,11 @@ pub(super) async fn run_backfill_attempt( checkpoint.completed.retain(|source_doc, entry| { notes_by_source .get(source_doc.as_str()) - .is_some_and(|note| checkpoint_entry_valid(note, entry, &existing)) + .is_some_and(|note| backfill_checkpoint::checkpoint_entry_valid(note, entry, &existing)) }); if checkpoint.completed.len() != checkpoint_len_before_prune { - write_backfill_checkpoint(checkpoint_path, &checkpoint)?; + backfill_checkpoint::write_backfill_checkpoint(checkpoint_path, &checkpoint)?; } let mut pending = Vec::new(); @@ -400,7 +315,7 @@ pub(super) async fn run_backfill_attempt( BackfillCheckpointEntry { note_id, key: note.key.clone(), - source_hash: source_hash(note), + source_hash: backfill_checkpoint::source_hash(note), op, }, ); @@ -411,7 +326,7 @@ pub(super) async fn run_backfill_attempt( attempted_writes += batch.len(); cursor += batch.len(); - write_backfill_checkpoint(checkpoint_path, &checkpoint)?; + backfill_checkpoint::write_backfill_checkpoint(checkpoint_path, &checkpoint)?; } let interrupted = cursor < pending.len(); diff --git a/apps/elf-eval/src/bin/live_baseline_elf/backfill_checkpoint.rs b/apps/elf-eval/src/bin/live_baseline_elf/backfill_checkpoint.rs new file mode 100644 index 00000000..e5cdd67f --- /dev/null +++ b/apps/elf-eval/src/bin/live_baseline_elf/backfill_checkpoint.rs @@ -0,0 +1,291 @@ +use color_eyre::Result; + +use crate::{ + BACKFILL_CHECKPOINT_SCHEMA, BTreeMap, BackfillCheckpoint, BackfillCheckpointEntry, CorpusNote, + ExistingBackfillNote, Hasher, Path, PathBuf, fs, +}; + +pub(super) fn backfill_checkpoint_path(out: &Path) -> PathBuf { + crate::env_string(&["ELF_BASELINE_BACKFILL_CHECKPOINT"]) + .map(PathBuf::from) + .unwrap_or_else(|| out.with_file_name("elf-backfill-checkpoint.json")) +} + +pub(super) fn load_backfill_checkpoint( + path: &Path, + corpus_hash: &str, +) -> Result { + if !path.exists() { + return Ok(empty_backfill_checkpoint(corpus_hash)); + } + + let raw = fs::read_to_string(path)?; + let checkpoint = serde_json::from_str::(&raw)?; + + if checkpoint.schema == BACKFILL_CHECKPOINT_SCHEMA && checkpoint.corpus_hash == corpus_hash { + Ok(checkpoint) + } else { + Ok(empty_backfill_checkpoint(corpus_hash)) + } +} + +pub(super) fn write_backfill_checkpoint( + path: &Path, + checkpoint: &BackfillCheckpoint, +) -> Result<()> { + if let Some(parent) = path.parent() { + fs::create_dir_all(parent)?; + } + + let raw = serde_json::to_string_pretty(checkpoint)?; + let tmp_path = path.with_extension("json.tmp"); + + fs::write(&tmp_path, raw)?; + fs::rename(tmp_path, path)?; + + Ok(()) +} + +pub(super) fn source_hash(note: &CorpusNote) -> String { + let mut hasher = Hasher::new(); + + hasher.update(note.source_doc.as_bytes()); + hasher.update(b"\0"); + hasher.update(note.key.as_bytes()); + hasher.update(b"\0"); + hasher.update(note.text.as_bytes()); + + hasher.finalize().to_hex().to_string() +} + +pub(super) fn corpus_hash(notes: &[CorpusNote]) -> String { + let mut hasher = Hasher::new(); + + for note in notes { + hasher.update(note.source_doc.as_bytes()); + hasher.update(b"\0"); + hasher.update(source_hash(note).as_bytes()); + hasher.update(b"\0"); + } + + hasher.finalize().to_hex().to_string() +} + +pub(super) fn checkpoint_entry_valid( + note: &CorpusNote, + entry: &BackfillCheckpointEntry, + existing: &BTreeMap, +) -> bool { + let expected_hash = source_hash(note); + + if entry.source_hash != expected_hash { + return false; + } + + existing.get(¬e.source_doc).is_some_and(|stored| { + stored.note_id == entry.note_id + && stored.source_hash.as_deref() == Some(expected_hash.as_str()) + }) +} + +fn empty_backfill_checkpoint(corpus_hash: &str) -> BackfillCheckpoint { + BackfillCheckpoint { + schema: BACKFILL_CHECKPOINT_SCHEMA.to_string(), + corpus_hash: corpus_hash.to_string(), + completed: BTreeMap::new(), + } +} + +#[cfg(test)] +mod tests { + use std::env; + + use uuid::Uuid; + + use crate::{ + BACKFILL_CHECKPOINT_SCHEMA, BTreeMap, BackfillCheckpoint, BackfillCheckpointEntry, + CorpusNote, ExistingBackfillNote, backfill::backfill_checkpoint, fs, + }; + + fn note(source_doc: &str, key: &str, text: &str) -> CorpusNote { + CorpusNote { + key: key.to_string(), + title: format!("Title {source_doc}"), + text: text.to_string(), + source_doc: source_doc.to_string(), + } + } + + #[test] + fn source_and_corpus_hashes_are_deterministic_and_content_sensitive() { + let baseline = note("doc-a.md", "alpha", "Current project decision."); + let same = note("doc-a.md", "alpha", "Current project decision."); + let changed_doc = note("doc-b.md", "alpha", "Current project decision."); + let changed_key = note("doc-a.md", "beta", "Current project decision."); + let changed_text = note("doc-a.md", "alpha", "Updated project decision."); + + assert_eq!( + backfill_checkpoint::source_hash(&baseline), + backfill_checkpoint::source_hash(&same) + ); + assert_ne!( + backfill_checkpoint::source_hash(&baseline), + backfill_checkpoint::source_hash(&changed_doc) + ); + assert_ne!( + backfill_checkpoint::source_hash(&baseline), + backfill_checkpoint::source_hash(&changed_key) + ); + assert_ne!( + backfill_checkpoint::source_hash(&baseline), + backfill_checkpoint::source_hash(&changed_text) + ); + assert_eq!( + backfill_checkpoint::corpus_hash(&[baseline, changed_doc]), + backfill_checkpoint::corpus_hash(&[ + same, + note("doc-b.md", "alpha", "Current project decision.") + ]) + ); + assert_ne!( + backfill_checkpoint::corpus_hash(&[ + note("doc-a.md", "alpha", "Current project decision."), + changed_key + ]), + backfill_checkpoint::corpus_hash(&[ + note("doc-a.md", "alpha", "Current project decision."), + changed_text + ]) + ); + } + + #[test] + fn checkpoint_load_resets_missing_schema_or_corpus_mismatches() { + let root = env::temp_dir().join(format!("elf-backfill-checkpoint-test-{}", Uuid::new_v4())); + let path = root.join("checkpoint.json"); + let missing = backfill_checkpoint::load_backfill_checkpoint(&path, "corpus-a") + .expect("Missing checkpoint loads."); + + assert_eq!(missing.schema, BACKFILL_CHECKPOINT_SCHEMA); + assert_eq!(missing.corpus_hash, "corpus-a"); + assert!(missing.completed.is_empty()); + + fs::create_dir_all(&root).expect("Temp dir is created."); + fs::write( + &path, + r#"{"schema":"old","corpus_hash":"corpus-a","completed":{"doc.md":{"note_id":"00000000-0000-0000-0000-000000000001","key":"k","source_hash":"h","op":"CREATED"}}}"#, + ) + .expect("Mismatched checkpoint is written."); + + let schema_reset = backfill_checkpoint::load_backfill_checkpoint(&path, "corpus-a") + .expect("Checkpoint loads."); + + assert!(schema_reset.completed.is_empty()); + + fs::write( + &path, + r#"{"schema":"elf.live_baseline.backfill_checkpoint/v1","corpus_hash":"corpus-b","completed":{"doc.md":{"note_id":"00000000-0000-0000-0000-000000000001","key":"k","source_hash":"h","op":"CREATED"}}}"#, + ) + .expect("Mismatched checkpoint is written."); + + let corpus_reset = backfill_checkpoint::load_backfill_checkpoint(&path, "corpus-a") + .expect("Checkpoint loads."); + + assert!(corpus_reset.completed.is_empty()); + + fs::remove_dir_all(root).expect("Temp dir is removed."); + } + + #[test] + fn checkpoint_write_round_trips_through_parent_directories() { + let root = env::temp_dir().join(format!("elf-backfill-checkpoint-test-{}", Uuid::new_v4())); + let path = root.join("nested").join("checkpoint.json"); + let note_id = Uuid::new_v4(); + let mut checkpoint = BackfillCheckpoint { + schema: BACKFILL_CHECKPOINT_SCHEMA.to_string(), + corpus_hash: "corpus-a".to_string(), + completed: BTreeMap::new(), + }; + + checkpoint.completed.insert( + "doc.md".to_string(), + BackfillCheckpointEntry { + note_id, + key: "k".to_string(), + source_hash: "h".to_string(), + op: "CREATED".to_string(), + }, + ); + + backfill_checkpoint::write_backfill_checkpoint(&path, &checkpoint) + .expect("Checkpoint writes."); + + let loaded = backfill_checkpoint::load_backfill_checkpoint(&path, "corpus-a") + .expect("Checkpoint reloads."); + + assert_eq!(loaded.completed["doc.md"].note_id, note_id); + assert!(fs::read_to_string(&path).expect("Checkpoint is readable.").contains('\n')); + + fs::remove_dir_all(root).expect("Temp dir is removed."); + } + + #[test] + fn checkpoint_entry_validation_requires_matching_hash_note_id_and_existing_source() { + let note = note("doc.md", "k", "text"); + let note_id = Uuid::new_v4(); + let source_hash = backfill_checkpoint::source_hash(¬e); + let entry = BackfillCheckpointEntry { + note_id, + key: "k".to_string(), + source_hash: source_hash.clone(), + op: "CREATED".to_string(), + }; + let mut existing = BTreeMap::new(); + + existing.insert( + "doc.md".to_string(), + ExistingBackfillNote { note_id, source_hash: Some(source_hash) }, + ); + + assert!(backfill_checkpoint::checkpoint_entry_valid(¬e, &entry, &existing)); + + let mut stale_entry = entry.clone(); + + stale_entry.source_hash = "stale".to_string(); + + assert!(!backfill_checkpoint::checkpoint_entry_valid(¬e, &stale_entry, &existing)); + + let mut wrong_note_id = existing; + + wrong_note_id.insert( + "doc.md".to_string(), + ExistingBackfillNote { + note_id: Uuid::new_v4(), + source_hash: Some(entry.source_hash.clone()), + }, + ); + + assert!(!backfill_checkpoint::checkpoint_entry_valid(¬e, &entry, &wrong_note_id)); + assert!(!backfill_checkpoint::checkpoint_entry_valid(¬e, &entry, &BTreeMap::new())); + + let mut missing_stored_hash = BTreeMap::new(); + + missing_stored_hash + .insert("doc.md".to_string(), ExistingBackfillNote { note_id, source_hash: None }); + + assert!(!backfill_checkpoint::checkpoint_entry_valid(¬e, &entry, &missing_stored_hash)); + + let mut mismatched_stored_hash = BTreeMap::new(); + + mismatched_stored_hash.insert( + "doc.md".to_string(), + ExistingBackfillNote { note_id, source_hash: Some("stale".to_string()) }, + ); + + assert!(!backfill_checkpoint::checkpoint_entry_valid( + ¬e, + &entry, + &mismatched_stored_hash + )); + } +}