Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 20 additions & 105 deletions apps/elf-eval/src/bin/live_baseline_elf/backfill.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
mod backfill_checkpoint;

use color_eyre::Result;

use crate::{
AGENT_ID, AddNoteInput, AddNoteRequest, BACKFILL_CHECKPOINT_SCHEMA, BTreeMap,
BackfillAttemptEvidence, BackfillCheckpoint, BackfillCheckpointEntry, BackfillOutcome,
BackfillReport, BackfillResumeReport, CorpusNote, DuplicateSourceNote, ElfService,
ExistingBackfillNote, Hasher, Instant, NoteOp, PROJECT_ID, Path, PathBuf, SCOPE, TENANT_ID,
Uuid, env, eyre, fs,
AGENT_ID, AddNoteInput, AddNoteRequest, BTreeMap, BackfillAttemptEvidence,
BackfillCheckpointEntry, BackfillOutcome, BackfillReport, BackfillResumeReport, CorpusNote,
DuplicateSourceNote, ElfService, ExistingBackfillNote, Instant, NoteOp, PROJECT_ID, Path,
PathBuf, SCOPE, TENANT_ID, Uuid, env, eyre,
};

pub(super) fn backfill_checkpoint_path(out: &Path) -> PathBuf {
backfill_checkpoint::backfill_checkpoint_path(out)
}

pub(super) fn backfill_batch_size() -> usize {
crate::parse_env_usize("ELF_BASELINE_BACKFILL_BATCH_SIZE").unwrap_or(32).max(1)
}
Expand Down Expand Up @@ -40,99 +45,8 @@ pub(super) fn backfill_interrupt_after(source_count: usize) -> Option<usize> {
Some(configured.unwrap_or(default).clamp(1, source_count.saturating_sub(1)))
}

pub(super) fn backfill_checkpoint_path(out: &Path) -> PathBuf {
crate::env_string(&["ELF_BASELINE_BACKFILL_CHECKPOINT"])
.map(PathBuf::from)
.unwrap_or_else(|| out.with_file_name("elf-backfill-checkpoint.json"))
}

pub(super) fn empty_backfill_checkpoint(corpus_hash: &str) -> BackfillCheckpoint {
BackfillCheckpoint {
schema: BACKFILL_CHECKPOINT_SCHEMA.to_string(),
corpus_hash: corpus_hash.to_string(),
completed: BTreeMap::new(),
}
}

pub(super) fn load_backfill_checkpoint(
path: &Path,
corpus_hash: &str,
) -> Result<BackfillCheckpoint> {
if !path.exists() {
return Ok(empty_backfill_checkpoint(corpus_hash));
}

let raw = fs::read_to_string(path)?;
let checkpoint = serde_json::from_str::<BackfillCheckpoint>(&raw)?;

if checkpoint.schema == BACKFILL_CHECKPOINT_SCHEMA && checkpoint.corpus_hash == corpus_hash {
Ok(checkpoint)
} else {
Ok(empty_backfill_checkpoint(corpus_hash))
}
}

pub(super) fn write_backfill_checkpoint(
path: &Path,
checkpoint: &BackfillCheckpoint,
) -> Result<()> {
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)?;
}

let raw = serde_json::to_string_pretty(checkpoint)?;
let tmp_path = path.with_extension("json.tmp");

fs::write(&tmp_path, raw)?;
fs::rename(tmp_path, path)?;

Ok(())
}

pub(super) fn source_hash(note: &CorpusNote) -> String {
let mut hasher = Hasher::new();

hasher.update(note.source_doc.as_bytes());
hasher.update(b"\0");
hasher.update(note.key.as_bytes());
hasher.update(b"\0");
hasher.update(note.text.as_bytes());

hasher.finalize().to_hex().to_string()
}

pub(super) fn corpus_hash(notes: &[CorpusNote]) -> String {
let mut hasher = Hasher::new();

for note in notes {
hasher.update(note.source_doc.as_bytes());
hasher.update(b"\0");
hasher.update(source_hash(note).as_bytes());
hasher.update(b"\0");
}

hasher.finalize().to_hex().to_string()
}

pub(super) fn checkpoint_entry_valid(
note: &CorpusNote,
entry: &BackfillCheckpointEntry,
existing: &BTreeMap<String, ExistingBackfillNote>,
) -> bool {
let expected_hash = source_hash(note);

if entry.source_hash != expected_hash {
return false;
}

existing.get(&note.source_doc).is_some_and(|stored| {
stored.note_id == entry.note_id
&& stored.source_hash.as_deref() == Some(expected_hash.as_str())
})
}

pub(super) fn note_input(note: &CorpusNote) -> AddNoteInput {
let hash = source_hash(note);
let hash = backfill_checkpoint::source_hash(note);

AddNoteInput {
r#type: "fact".to_string(),
Expand Down Expand Up @@ -232,7 +146,7 @@ pub(super) async fn run_resumable_backfill(
checkpoint_path: &Path,
) -> Result<BackfillOutcome> {
let started_at = Instant::now();
let corpus_hash = corpus_hash(notes);
let corpus_hash = backfill_checkpoint::corpus_hash(notes);
let batch_size = backfill_batch_size();
let interrupt_after = backfill_interrupt_after(notes.len());
let first_attempt = run_backfill_attempt(
Expand Down Expand Up @@ -266,7 +180,7 @@ pub(super) async fn run_resumable_backfill(
);
}

let checkpoint = load_backfill_checkpoint(checkpoint_path, &corpus_hash)?;
let checkpoint = backfill_checkpoint::load_backfill_checkpoint(checkpoint_path, &corpus_hash)?;
let existing = load_existing_backfill_notes(service).await?;
let mut note_ids = Vec::with_capacity(notes.len());

Expand All @@ -278,7 +192,7 @@ pub(super) async fn run_resumable_backfill(
));
};

if !checkpoint_entry_valid(note, entry, &existing) {
if !backfill_checkpoint::checkpoint_entry_valid(note, entry, &existing) {
return Err(eyre::eyre!(
"Backfill checkpoint entry for {} does not match Postgres state.",
note.source_doc
Expand Down Expand Up @@ -326,7 +240,8 @@ pub(super) async fn run_backfill_attempt(
attempt: usize,
interrupt_after: Option<usize>,
) -> Result<BackfillAttemptEvidence> {
let mut checkpoint = load_backfill_checkpoint(checkpoint_path, corpus_hash)?;
let mut checkpoint =
backfill_checkpoint::load_backfill_checkpoint(checkpoint_path, corpus_hash)?;
let existing = load_existing_backfill_notes(service).await?;
let notes_by_source =
notes.iter().map(|note| (note.source_doc.as_str(), note)).collect::<BTreeMap<_, _>>();
Expand All @@ -335,11 +250,11 @@ pub(super) async fn run_backfill_attempt(
checkpoint.completed.retain(|source_doc, entry| {
notes_by_source
.get(source_doc.as_str())
.is_some_and(|note| checkpoint_entry_valid(note, entry, &existing))
.is_some_and(|note| backfill_checkpoint::checkpoint_entry_valid(note, entry, &existing))
});

if checkpoint.completed.len() != checkpoint_len_before_prune {
write_backfill_checkpoint(checkpoint_path, &checkpoint)?;
backfill_checkpoint::write_backfill_checkpoint(checkpoint_path, &checkpoint)?;
}

let mut pending = Vec::new();
Expand Down Expand Up @@ -400,7 +315,7 @@ pub(super) async fn run_backfill_attempt(
BackfillCheckpointEntry {
note_id,
key: note.key.clone(),
source_hash: source_hash(note),
source_hash: backfill_checkpoint::source_hash(note),
op,
},
);
Expand All @@ -411,7 +326,7 @@ pub(super) async fn run_backfill_attempt(
attempted_writes += batch.len();
cursor += batch.len();

write_backfill_checkpoint(checkpoint_path, &checkpoint)?;
backfill_checkpoint::write_backfill_checkpoint(checkpoint_path, &checkpoint)?;
}

let interrupted = cursor < pending.len();
Expand Down
Loading