diff --git a/packages/elf-service/tests/acceptance/chunk_search.rs b/packages/elf-service/tests/acceptance/chunk_search.rs index 867ba014..ee454473 100644 --- a/packages/elf-service/tests/acceptance/chunk_search.rs +++ b/packages/elf-service/tests/acceptance/chunk_search.rs @@ -1,3 +1,7 @@ +mod filter_impact; +mod payload_levels; +mod relation_context; + use std::{ collections::HashMap, sync::{Arc, atomic::AtomicUsize}, @@ -9,15 +13,14 @@ use qdrant_client::{ }; use serde_json::Value; use sqlx::PgExecutor; -use time::{Duration, OffsetDateTime}; +use time::OffsetDateTime; use uuid::Uuid; use crate::acceptance::{self, SpyExtractor, StubEmbedding, StubRerank}; use elf_config::ProviderConfig; use elf_service::{ - BoxFuture, ElfService, NoteFetchResponse, PayloadLevel, Providers, RelationTemporalStatus, - RerankProvider, Result, SearchDetailsRequest, SearchRequest, SearchTimelineRequest, - TraceTrajectoryGetRequest, + BoxFuture, ElfService, Providers, RerankProvider, Result, SearchDetailsRequest, SearchRequest, + SearchTimelineRequest, }; use elf_storage::qdrant::{BM25_MODEL, BM25_VECTOR_NAME, DENSE_VECTOR_NAME}; use elf_testkit::TestDatabase; @@ -95,23 +98,6 @@ fn build_vectors(text: &str) -> HashMap { vectors } -fn build_payload_shape_search_request(payload_level: PayloadLevel) -> SearchRequest { - SearchRequest { - tenant_id: "t".to_string(), - project_id: "p".to_string(), - agent_id: "a".to_string(), - token_id: None, - read_profile: "private_only".to_string(), - payload_level, - query: "payload".to_string(), - top_k: Some(5), - candidate_k: Some(10), - filter: None, - record_hits: Some(false), - ranking: None, - } -} - async fn setup_context(test_name: &str, providers: Providers) -> Option { let Some(test_db) = acceptance::test_db().await else { eprintln!("Skipping {test_name}; set ELF_PG_DSN to run this test."); @@ -362,314 +348,6 @@ async fn upsert_point( .expect("Failed to upsert Qdrant point."); } -async fn fetch_raw_source_ref_for_level( - context: &TestContext, - note_id: Uuid, - payload_level: PayloadLevel, -) -> Value { - let response = context - .service - .search_raw(build_payload_shape_search_request(payload_level)) - .await - .expect("Search failed."); - let item = response.items.first().expect("Expected search result."); - - assert_eq!(item.note_id, note_id); - - item.source_ref.clone() -} - -async fn fetch_search_detail_note_for_level( - context: &TestContext, - search_session_id: Uuid, - note_id: Uuid, - payload_level: PayloadLevel, -) -> NoteFetchResponse { - let response = context - .service - .search_details(SearchDetailsRequest { - tenant_id: "t".to_string(), - project_id: "p".to_string(), - agent_id: "a".to_string(), - search_session_id, - payload_level, - note_ids: vec![note_id], - record_hits: Some(false), - }) - .await - .expect("Search details failed."); - - response - .results - .first() - .and_then(|item| item.note.as_ref()) - .expect("Expected note details.") - .clone() -} - -async fn insert_graph_entity<'e, E>( - executor: E, - entity_id: Uuid, - canonical: &str, - kind: Option<&str>, -) where - E: PgExecutor<'e>, -{ - sqlx::query( - "\ -INSERT INTO graph_entities ( - entity_id, - tenant_id, - project_id, - canonical, - canonical_norm, - kind -) -VALUES ($1, $2, $3, $4, $5, $6)", - ) - .bind(entity_id) - .bind("t") - .bind("p") - .bind(canonical) - .bind(canonical.to_lowercase()) - .bind(kind) - .execute(executor) - .await - .expect("Failed to insert graph entity."); -} - -async fn insert_graph_predicate<'e, E>(executor: E, predicate_id: Uuid, canonical: &str) -where - E: PgExecutor<'e>, -{ - sqlx::query( - "\ -INSERT INTO graph_predicates ( - predicate_id, - scope_key, - tenant_id, - project_id, - canonical, - canonical_norm, - cardinality, - status -) -VALUES ($1, $2, $3, $4, $5, $6, 'single', 'active')", - ) - .bind(predicate_id) - .bind("__project__:p") - .bind("t") - .bind("p") - .bind(canonical) - .bind(canonical.to_lowercase()) - .execute(executor) - .await - .expect("Failed to insert graph predicate."); -} - -#[allow(clippy::too_many_arguments)] -async fn insert_graph_fact<'e, E>( - executor: E, - fact_id: Uuid, - subject_entity_id: Uuid, - predicate: &str, - predicate_id: Uuid, - object_value: &str, - valid_from: OffsetDateTime, - valid_to: Option, -) where - E: PgExecutor<'e>, -{ - sqlx::query( - "\ -INSERT INTO graph_facts ( - fact_id, - tenant_id, - project_id, - agent_id, - scope, - subject_entity_id, - predicate, - predicate_id, - object_entity_id, - object_value, - valid_from, - valid_to -) -VALUES ($1, $2, $3, $4, $5, $6, $7, $8, NULL, $9, $10, $11)", - ) - .bind(fact_id) - .bind("t") - .bind("p") - .bind("a") - .bind("agent_private") - .bind(subject_entity_id) - .bind(predicate) - .bind(predicate_id) - .bind(object_value) - .bind(valid_from) - .bind(valid_to) - .execute(executor) - .await - .expect("Failed to insert graph fact."); -} - -async fn insert_graph_fact_evidence<'e, E>( - executor: E, - fact_id: Uuid, - note_id: Uuid, - created_at: OffsetDateTime, -) where - E: PgExecutor<'e>, -{ - sqlx::query( - "\ -INSERT INTO graph_fact_evidence (evidence_id, fact_id, note_id, created_at) -VALUES ($1, $2, $3, $4)", - ) - .bind(Uuid::new_v4()) - .bind(fact_id) - .bind(note_id) - .bind(created_at) - .execute(executor) - .await - .expect("Failed to insert graph fact evidence."); -} - -async fn setup_graph_context_test( - test_name: &str, - providers: Providers, - max_facts_per_item: u32, - max_evidence_notes_per_fact: u32, -) -> Option { - let Some(test_db) = acceptance::test_db().await else { - eprintln!("Skipping {test_name}; set ELF_PG_DSN to run this test."); - - return None; - }; - let Some(qdrant_url) = acceptance::test_qdrant_url() else { - eprintln!("Skipping {test_name}; set ELF_QDRANT_URL to run this test."); - - return None; - }; - let collection = test_db.collection_name("elf_acceptance"); - let docs_collection = test_db.collection_name("elf_acceptance_docs"); - let mut cfg = acceptance::test_config( - test_db.dsn().to_string(), - qdrant_url, - 4_096, - collection, - docs_collection, - ); - - cfg.search.graph_context.enabled = true; - cfg.search.graph_context.max_facts_per_item = max_facts_per_item; - cfg.search.graph_context.max_evidence_notes_per_fact = max_evidence_notes_per_fact; - - let service = - acceptance::build_service(cfg, providers).await.expect("Failed to build service."); - - acceptance::reset_db(&service.db.pool).await.expect("Failed to reset test database."); - - reset_collection(&service).await; - - let embedding_version = format!( - "{}:{}:{}", - service.cfg.providers.embedding.provider_id, - service.cfg.providers.embedding.model, - service.cfg.storage.qdrant.vector_dim - ); - - Some(TestContext { service, test_db, embedding_version }) -} - -async fn seed_relation_context_fixture( - service: &ElfService, - embedding_version: &str, -) -> (Uuid, Uuid, Uuid) { - let now = OffsetDateTime::now_utc(); - let note_id = Uuid::new_v4(); - let note_id_2 = Uuid::new_v4(); - let chunk_id = Uuid::new_v4(); - let chunk_text = "Alice mentors Bob about projects and priorities."; - let subject_id = Uuid::new_v4(); - let newer_fact_id = Uuid::new_v4(); - let predicate_id = Uuid::new_v4(); - let older_fact_id = Uuid::new_v4(); - let older_fact_valid_from = now - Duration::seconds(10); - let newer_fact_valid_from = now - Duration::seconds(5); - let note_1_evidence_created_at = now - Duration::seconds(30); - let note_2_evidence_created_at = now - Duration::seconds(10); - - insert_note(&service.db.pool, note_id, chunk_text, embedding_version).await; - insert_note( - &service.db.pool, - note_id_2, - "Second note for evidence ordering.", - embedding_version, - ) - .await; - insert_chunk( - &service.db.pool, - chunk_id, - note_id, - 0, - 0, - chunk_text.len() as i32, - chunk_text, - embedding_version, - ) - .await; - upsert_point(service, chunk_id, note_id, 0, 0, chunk_text.len() as i32, chunk_text).await; - insert_graph_entity(&service.db.pool, subject_id, "Alice", Some("person")).await; - insert_graph_predicate(&service.db.pool, predicate_id, "mentors").await; - insert_graph_fact( - &service.db.pool, - older_fact_id, - subject_id, - "mentors", - predicate_id, - "Bob", - older_fact_valid_from, - Some(newer_fact_valid_from), - ) - .await; - insert_graph_fact_evidence( - &service.db.pool, - older_fact_id, - note_id, - note_1_evidence_created_at, - ) - .await; - insert_graph_fact( - &service.db.pool, - newer_fact_id, - subject_id, - "mentors", - predicate_id, - "Carol", - newer_fact_valid_from, - None, - ) - .await; - insert_graph_fact_evidence( - &service.db.pool, - newer_fact_id, - note_id, - note_1_evidence_created_at, - ) - .await; - insert_graph_fact_evidence( - &service.db.pool, - newer_fact_id, - note_id_2, - note_2_evidence_created_at, - ) - .await; - - (note_id, newer_fact_id, older_fact_id) -} - #[tokio::test] #[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL to run."] async fn search_returns_chunk_items() { @@ -722,122 +400,6 @@ async fn search_returns_chunk_items() { context.test_db.cleanup().await.expect("Failed to cleanup test database."); } -#[tokio::test] -#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL to run."] -async fn search_raw_quick_includes_relation_context_and_respects_fact_bounds() { - let providers = build_providers(StubRerank); - let Some(context) = setup_graph_context_test( - "search_raw_quick_includes_relation_context_and_respects_fact_bounds", - providers, - 1, - 1, - ) - .await - else { - return; - }; - let fixture = seed_relation_context_fixture(&context.service, &context.embedding_version).await; - let note_id = fixture.0; - let newer_fact_id = fixture.1; - let response = context - .service - .search_raw_quick(SearchRequest { - tenant_id: "t".to_string(), - project_id: "p".to_string(), - agent_id: "a".to_string(), - token_id: None, - read_profile: "private_only".to_string(), - payload_level: Default::default(), - query: "Alice".to_string(), - top_k: Some(5), - candidate_k: Some(10), - filter: None, - record_hits: Some(false), - ranking: None, - }) - .await - .expect("Search failed."); - let item = response.items.first().expect("Expected search result."); - let relation_context = item - .explain - .relation_context - .as_ref() - .expect("Expected relation context in search explain."); - - assert_eq!(relation_context.len(), 1, "Expected relation context to be truncated to one fact."); - assert_eq!( - relation_context[0].fact_id, newer_fact_id, - "Expected the most recent fact after truncation." - ); - assert_eq!(relation_context[0].object.value.as_deref(), Some("Carol")); - assert_eq!(relation_context[0].temporal_status, RelationTemporalStatus::Current); - assert!(relation_context[0].valid_to.is_none()); - assert_eq!(relation_context[0].evidence_note_ids.len(), 1); - assert_eq!(relation_context[0].evidence_note_ids[0], note_id); - - context.test_db.cleanup().await.expect("Failed to cleanup test database."); -} - -#[tokio::test] -#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL to run."] -async fn search_raw_quick_marks_historical_relation_context() { - let providers = build_providers(StubRerank); - let Some(context) = setup_graph_context_test( - "search_raw_quick_marks_historical_relation_context", - providers, - 2, - 2, - ) - .await - else { - return; - }; - let fixture = seed_relation_context_fixture(&context.service, &context.embedding_version).await; - let older_fact_id = fixture.2; - let response = context - .service - .search_raw_quick(SearchRequest { - tenant_id: "t".to_string(), - project_id: "p".to_string(), - agent_id: "a".to_string(), - token_id: None, - read_profile: "private_only".to_string(), - payload_level: Default::default(), - query: "Alice".to_string(), - top_k: Some(5), - candidate_k: Some(10), - filter: None, - record_hits: Some(false), - ranking: None, - }) - .await - .expect("Search failed."); - let item = response.items.first().expect("Expected search result."); - let relation_context = item - .explain - .relation_context - .as_ref() - .expect("Expected relation context in search explain."); - - assert_eq!( - relation_context.len(), - 2, - "Expected current and historical relation facts in context.", - ); - assert_eq!(relation_context[0].temporal_status, RelationTemporalStatus::Current); - - let historical = relation_context - .iter() - .find(|context| context.fact_id == older_fact_id) - .expect("Expected historical fact in relation context."); - - assert_eq!(historical.object.value.as_deref(), Some("Bob")); - assert_eq!(historical.temporal_status, RelationTemporalStatus::Historical); - assert!(historical.valid_to.is_some()); - - context.test_db.cleanup().await.expect("Failed to cleanup test database."); -} - #[tokio::test] #[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL to run."] async fn search_stitches_adjacent_chunks() { @@ -1037,185 +599,6 @@ async fn progressive_search_returns_index_timeline_and_details() { context.test_db.cleanup().await.expect("Failed to cleanup test database."); } -#[tokio::test] -#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL to run."] -async fn search_raw_payload_level_shapes_source_ref() { - let providers = build_providers(StubRerank); - let Some(context) = - setup_context("search_raw_payload_level_shapes_source_ref", providers).await - else { - return; - }; - let note_id = Uuid::new_v4(); - let chunk_id = Uuid::new_v4(); - let note_text = "Payload shaping should control the raw item source_ref payload."; - let source_ref = serde_json::json!({ - "schema": "note_source_ref/v1", - "locator": { - "doc_id": Uuid::new_v4().to_string(), - "chunk_id": Uuid::new_v4().to_string() - }, - "metadata": { - "long_field": "A long metadata body to represent a heavy source reference shape." - } - }); - - insert_note_with_importance_and_source_ref( - &context.service.db.pool, - note_id, - note_text, - &context.embedding_version, - 0.9_f32, - 1.0, - "agent_private", - source_ref.clone(), - ) - .await; - insert_chunk( - &context.service.db.pool, - chunk_id, - note_id, - 0, - 0, - note_text.len() as i32, - note_text, - &context.embedding_version, - ) - .await; - upsert_point(&context.service, chunk_id, note_id, 0, 0, note_text.len() as i32, note_text) - .await; - - let l0 = fetch_raw_source_ref_for_level(&context, note_id, PayloadLevel::L0).await; - let l1 = fetch_raw_source_ref_for_level(&context, note_id, PayloadLevel::L1).await; - let l2 = fetch_raw_source_ref_for_level(&context, note_id, PayloadLevel::L2).await; - - assert_eq!(l0, serde_json::json!({})); - assert_eq!(l1, serde_json::json!({})); - assert_eq!(l2, source_ref); - - context.test_db.cleanup().await.expect("Failed to cleanup test database."); -} - -#[tokio::test] -#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL to run."] -async fn search_details_payload_level_shapes_text_and_fields() { - let providers = build_providers(StubRerank); - let Some(context) = - setup_context("search_details_payload_level_shapes_text_and_fields", providers).await - else { - return; - }; - let note_id = Uuid::new_v4(); - let chunk_id = Uuid::new_v4(); - let max_note_chars = context.service.cfg.memory.max_note_chars as usize; - let note_text_seed = - "This is the long note body used for detail shaping and payload truncation. "; - let note_text = note_text_seed.repeat((max_note_chars / note_text_seed.len()) + 2); - let source_ref = serde_json::json!({ - "schema": "note_source_ref/v1", - "locator": { - "document_id": Uuid::new_v4().to_string(), - "chunk_id": Uuid::new_v4().to_string(), - "extra": "field with rich details for l2 retention" - }, - }); - let structured_summary = "Structured summary about payload levels and compact text behavior."; - let field_id = Uuid::new_v4(); - - assert!(note_text.len() > max_note_chars); - - insert_note_with_importance_and_source_ref( - &context.service.db.pool, - note_id, - note_text.as_str(), - &context.embedding_version, - 0.8_f32, - 1.0, - "agent_private", - source_ref.clone(), - ) - .await; - insert_summary_field_row(&context.service.db.pool, field_id, note_id, structured_summary).await; - insert_chunk( - &context.service.db.pool, - chunk_id, - note_id, - 0, - 0, - note_text.len() as i32, - note_text.as_str(), - &context.embedding_version, - ) - .await; - upsert_point( - &context.service, - chunk_id, - note_id, - 0, - 0, - note_text.len() as i32, - note_text.as_str(), - ) - .await; - - let index = context - .service - .search(SearchRequest { - tenant_id: "t".to_string(), - project_id: "p".to_string(), - agent_id: "a".to_string(), - token_id: None, - read_profile: "private_only".to_string(), - payload_level: PayloadLevel::L2, - query: "payload".to_string(), - top_k: Some(5), - candidate_k: Some(10), - filter: None, - record_hits: Some(false), - ranking: None, - }) - .await - .expect("Search index failed."); - let l0 = fetch_search_detail_note_for_level( - &context, - index.search_session_id, - note_id, - PayloadLevel::L0, - ) - .await; - let l1 = fetch_search_detail_note_for_level( - &context, - index.search_session_id, - note_id, - PayloadLevel::L1, - ) - .await; - let l2 = fetch_search_detail_note_for_level( - &context, - index.search_session_id, - note_id, - PayloadLevel::L2, - ) - .await; - - assert!(l0.text.chars().count() <= max_note_chars + 3); - assert!(l1.text.chars().count() <= max_note_chars + 3); - assert!(l0.text.ends_with("...")); - assert_eq!(l2.text, note_text); - assert_ne!(l0.text, l1.text); - assert_ne!(l0.text, note_text); - assert_ne!(l1.text, note_text); - assert!(l1.text.contains("Structured summary")); - assert_eq!(l0.source_ref, serde_json::json!({})); - assert_eq!(l1.source_ref, serde_json::json!({})); - assert_eq!(l2.source_ref, source_ref); - assert!(l0.structured.is_none()); - assert!(l1.structured.is_some()); - assert!(l2.structured.is_some()); - - context.test_db.cleanup().await.expect("Failed to cleanup test database."); -} - #[tokio::test] #[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL to run."] async fn search_dedupes_note_results() { @@ -1289,182 +672,3 @@ async fn search_dedupes_note_results() { context.test_db.cleanup().await.expect("Failed to cleanup test database."); } - -#[tokio::test] -#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL to run this test."] -async fn search_filter_affects_candidate_set_and_records_filter_impact() { - let provider = build_providers(StubRerank); - let low_note_text = "alpha low confidence note"; - let high_note_text = "alpha high confidence note"; - let low_note_id = Uuid::new_v4(); - let high_note_id = Uuid::new_v4(); - let low_chunk_id = Uuid::new_v4(); - let high_chunk_id = Uuid::new_v4(); - let mut context = match setup_context( - "search_filter_affects_candidate_set_and_records_filter_impact", - provider, - ) - .await - { - Some(context) => context, - None => return, - }; - - context.service.cfg.search.explain.write_mode = "inline".to_string(); - - seed_filter_impact_notes( - &context, - low_note_id, - high_note_id, - low_chunk_id, - high_chunk_id, - low_note_text, - high_note_text, - ) - .await; - - let response = context - .service - .search_raw(SearchRequest { - tenant_id: "t".to_string(), - project_id: "p".to_string(), - agent_id: "a".to_string(), - token_id: None, - read_profile: "private_only".to_string(), - payload_level: Default::default(), - query: "alpha".to_string(), - top_k: Some(1), - candidate_k: Some(10), - filter: Some(serde_json::json!({ - "schema": "search_filter_expr/v1", - "expr": { "op": "gte", "field": "importance", "value": 0.5 }, - })), - record_hits: Some(false), - ranking: None, - }) - .await - .expect("Search failed."); - - assert_eq!(response.items.len(), 1); - assert_eq!(response.items[0].note_id, high_note_id); - - let filter_impact = load_filter_impact_from_trace(&context, response.trace_id).await; - let filter = filter_impact.get("filter").expect("Expected filter object in filter_impact."); - let requested_candidate_k = filter_impact - .get("requested_candidate_k") - .and_then(Value::as_u64) - .expect("Expected requested_candidate_k."); - let effective_candidate_k = filter_impact - .get("effective_candidate_k") - .and_then(Value::as_u64) - .expect("Expected effective_candidate_k."); - - assert_eq!( - filter_impact.get("schema"), - Some(&Value::String("search_filter_impact/v1".to_string())) - ); - assert_eq!(requested_candidate_k, 10); - assert_eq!(effective_candidate_k, 30); - assert_eq!(filter.get("schema"), Some(&Value::String("search_filter_expr/v1".to_string()))); - assert_eq!(filter_impact.get("candidate_count_pre"), Some(&Value::from(2_u64))); - assert_eq!(filter_impact.get("candidate_count_post"), Some(&Value::from(1_u64))); - assert_eq!(filter_impact.get("dropped_total"), Some(&Value::from(1_u64))); - - context.test_db.cleanup().await.expect("Failed to cleanup test database."); -} - -async fn seed_filter_impact_notes( - context: &TestContext, - low_note_id: Uuid, - high_note_id: Uuid, - low_chunk_id: Uuid, - high_chunk_id: Uuid, - low_note_text: &str, - high_note_text: &str, -) { - insert_note_with_importance( - &context.service.db.pool, - low_note_id, - low_note_text, - &context.embedding_version, - 0.2, - 0.2, - "agent_private", - ) - .await; - insert_note_with_importance( - &context.service.db.pool, - high_note_id, - high_note_text, - &context.embedding_version, - 0.9, - 0.9, - "agent_private", - ) - .await; - insert_chunk( - &context.service.db.pool, - low_chunk_id, - low_note_id, - 0, - 0, - low_note_text.len() as i32, - low_note_text, - &context.embedding_version, - ) - .await; - insert_chunk( - &context.service.db.pool, - high_chunk_id, - high_note_id, - 0, - 0, - high_note_text.len() as i32, - high_note_text, - &context.embedding_version, - ) - .await; - upsert_point( - &context.service, - low_chunk_id, - low_note_id, - 0, - 0, - low_note_text.len() as i32, - low_note_text, - ) - .await; - upsert_point( - &context.service, - high_chunk_id, - high_note_id, - 0, - 0, - high_note_text.len() as i32, - high_note_text, - ) - .await; -} - -async fn load_filter_impact_from_trace(context: &TestContext, trace_id: Uuid) -> Value { - let trajectory = context - .service - .trace_trajectory_get(TraceTrajectoryGetRequest { - tenant_id: "t".to_string(), - project_id: "p".to_string(), - agent_id: "a".to_string(), - trace_id, - }) - .await - .expect("Failed to fetch trace trajectory."); - - trajectory - .stages - .iter() - .find(|stage| stage.stage_name == "recall.candidates") - .expect("Expected recall.candidates stage.") - .stage_payload - .get("filter_impact") - .expect("Expected filter_impact in recall stage.") - .clone() -} diff --git a/packages/elf-service/tests/acceptance/chunk_search/filter_impact.rs b/packages/elf-service/tests/acceptance/chunk_search/filter_impact.rs new file mode 100644 index 00000000..5788e8d3 --- /dev/null +++ b/packages/elf-service/tests/acceptance/chunk_search/filter_impact.rs @@ -0,0 +1,187 @@ +use serde_json::Value; +use uuid::Uuid; + +use crate::acceptance::{ + StubRerank, + chunk_search::{self, TestContext}, +}; +use elf_service::{SearchRequest, TraceTrajectoryGetRequest}; + +async fn seed_filter_impact_notes( + context: &TestContext, + low_note_id: Uuid, + high_note_id: Uuid, + low_chunk_id: Uuid, + high_chunk_id: Uuid, + low_note_text: &str, + high_note_text: &str, +) { + chunk_search::insert_note_with_importance( + &context.service.db.pool, + low_note_id, + low_note_text, + &context.embedding_version, + 0.2, + 0.2, + "agent_private", + ) + .await; + chunk_search::insert_note_with_importance( + &context.service.db.pool, + high_note_id, + high_note_text, + &context.embedding_version, + 0.9, + 0.9, + "agent_private", + ) + .await; + chunk_search::insert_chunk( + &context.service.db.pool, + low_chunk_id, + low_note_id, + 0, + 0, + low_note_text.len() as i32, + low_note_text, + &context.embedding_version, + ) + .await; + chunk_search::insert_chunk( + &context.service.db.pool, + high_chunk_id, + high_note_id, + 0, + 0, + high_note_text.len() as i32, + high_note_text, + &context.embedding_version, + ) + .await; + chunk_search::upsert_point( + &context.service, + low_chunk_id, + low_note_id, + 0, + 0, + low_note_text.len() as i32, + low_note_text, + ) + .await; + chunk_search::upsert_point( + &context.service, + high_chunk_id, + high_note_id, + 0, + 0, + high_note_text.len() as i32, + high_note_text, + ) + .await; +} + +async fn load_filter_impact_from_trace(context: &TestContext, trace_id: Uuid) -> Value { + let trajectory = context + .service + .trace_trajectory_get(TraceTrajectoryGetRequest { + tenant_id: "t".to_string(), + project_id: "p".to_string(), + agent_id: "a".to_string(), + trace_id, + }) + .await + .expect("Failed to fetch trace trajectory."); + + trajectory + .stages + .iter() + .find(|stage| stage.stage_name == "recall.candidates") + .expect("Expected recall.candidates stage.") + .stage_payload + .get("filter_impact") + .expect("Expected filter_impact in recall stage.") + .clone() +} + +#[tokio::test] +#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL to run this test."] +async fn search_filter_affects_candidate_set_and_records_filter_impact() { + let provider = chunk_search::build_providers(StubRerank); + let low_note_text = "alpha low confidence note"; + let high_note_text = "alpha high confidence note"; + let low_note_id = Uuid::new_v4(); + let high_note_id = Uuid::new_v4(); + let low_chunk_id = Uuid::new_v4(); + let high_chunk_id = Uuid::new_v4(); + let mut context = match chunk_search::setup_context( + "search_filter_affects_candidate_set_and_records_filter_impact", + provider, + ) + .await + { + Some(context) => context, + None => return, + }; + + context.service.cfg.search.explain.write_mode = "inline".to_string(); + + seed_filter_impact_notes( + &context, + low_note_id, + high_note_id, + low_chunk_id, + high_chunk_id, + low_note_text, + high_note_text, + ) + .await; + + let response = context + .service + .search_raw(SearchRequest { + tenant_id: "t".to_string(), + project_id: "p".to_string(), + agent_id: "a".to_string(), + token_id: None, + read_profile: "private_only".to_string(), + payload_level: Default::default(), + query: "alpha".to_string(), + top_k: Some(1), + candidate_k: Some(10), + filter: Some(serde_json::json!({ + "schema": "search_filter_expr/v1", + "expr": { "op": "gte", "field": "importance", "value": 0.5 }, + })), + record_hits: Some(false), + ranking: None, + }) + .await + .expect("Search failed."); + + assert_eq!(response.items.len(), 1); + assert_eq!(response.items[0].note_id, high_note_id); + + let filter_impact = load_filter_impact_from_trace(&context, response.trace_id).await; + let filter = filter_impact.get("filter").expect("Expected filter object in filter_impact."); + let requested_candidate_k = filter_impact + .get("requested_candidate_k") + .and_then(Value::as_u64) + .expect("Expected requested_candidate_k."); + let effective_candidate_k = filter_impact + .get("effective_candidate_k") + .and_then(Value::as_u64) + .expect("Expected effective_candidate_k."); + + assert_eq!( + filter_impact.get("schema"), + Some(&Value::String("search_filter_impact/v1".to_string())) + ); + assert_eq!(requested_candidate_k, 10); + assert_eq!(effective_candidate_k, 30); + assert_eq!(filter.get("schema"), Some(&Value::String("search_filter_expr/v1".to_string()))); + assert_eq!(filter_impact.get("candidate_count_pre"), Some(&Value::from(2_u64))); + assert_eq!(filter_impact.get("candidate_count_post"), Some(&Value::from(1_u64))); + assert_eq!(filter_impact.get("dropped_total"), Some(&Value::from(1_u64))); + + context.test_db.cleanup().await.expect("Failed to cleanup test database."); +} diff --git a/packages/elf-service/tests/acceptance/chunk_search/payload_levels.rs b/packages/elf-service/tests/acceptance/chunk_search/payload_levels.rs new file mode 100644 index 00000000..be9fa341 --- /dev/null +++ b/packages/elf-service/tests/acceptance/chunk_search/payload_levels.rs @@ -0,0 +1,277 @@ +use serde_json::Value; +use uuid::Uuid; + +use crate::acceptance::{ + StubRerank, + chunk_search::{self, TestContext}, +}; +use elf_service::{NoteFetchResponse, PayloadLevel, SearchDetailsRequest, SearchRequest}; + +fn build_payload_shape_search_request(payload_level: PayloadLevel) -> SearchRequest { + SearchRequest { + tenant_id: "t".to_string(), + project_id: "p".to_string(), + agent_id: "a".to_string(), + token_id: None, + read_profile: "private_only".to_string(), + payload_level, + query: "payload".to_string(), + top_k: Some(5), + candidate_k: Some(10), + filter: None, + record_hits: Some(false), + ranking: None, + } +} + +fn assert_search_detail_payload_levels( + max_note_chars: usize, + note_text: &str, + source_ref: &Value, + l0: &NoteFetchResponse, + l1: &NoteFetchResponse, + l2: &NoteFetchResponse, +) { + assert!(l0.text.chars().count() <= max_note_chars + 3); + assert!(l1.text.chars().count() <= max_note_chars + 3); + assert!(l0.text.ends_with("...")); + assert_eq!(l2.text, note_text); + assert_ne!(l0.text, l1.text); + assert_ne!(l0.text, note_text); + assert_ne!(l1.text, note_text); + assert!(l1.text.contains("Structured summary")); + assert_eq!(l0.source_ref, serde_json::json!({})); + assert_eq!(l1.source_ref, serde_json::json!({})); + assert_eq!(l2.source_ref, *source_ref); + assert!(l0.structured.is_none()); + assert!(l1.structured.is_some()); + assert!(l2.structured.is_some()); +} + +async fn fetch_raw_source_ref_for_level( + context: &TestContext, + note_id: Uuid, + payload_level: PayloadLevel, +) -> Value { + let response = context + .service + .search_raw(build_payload_shape_search_request(payload_level)) + .await + .expect("Search failed."); + let item = response.items.first().expect("Expected search result."); + + assert_eq!(item.note_id, note_id); + + item.source_ref.clone() +} + +async fn fetch_search_detail_note_for_level( + context: &TestContext, + search_session_id: Uuid, + note_id: Uuid, + payload_level: PayloadLevel, +) -> NoteFetchResponse { + let response = context + .service + .search_details(SearchDetailsRequest { + tenant_id: "t".to_string(), + project_id: "p".to_string(), + agent_id: "a".to_string(), + search_session_id, + payload_level, + note_ids: vec![note_id], + record_hits: Some(false), + }) + .await + .expect("Search details failed."); + + response + .results + .first() + .and_then(|item| item.note.as_ref()) + .expect("Expected note details.") + .clone() +} + +#[tokio::test] +#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL to run."] +async fn search_raw_payload_level_shapes_source_ref() { + let providers = chunk_search::build_providers(StubRerank); + let Some(context) = + chunk_search::setup_context("search_raw_payload_level_shapes_source_ref", providers).await + else { + return; + }; + let note_id = Uuid::new_v4(); + let chunk_id = Uuid::new_v4(); + let note_text = "Payload shaping should control the raw item source_ref payload."; + let source_ref = serde_json::json!({ + "schema": "note_source_ref/v1", + "locator": { + "doc_id": Uuid::new_v4().to_string(), + "chunk_id": Uuid::new_v4().to_string() + }, + "metadata": { + "long_field": "A long metadata body to represent a heavy source reference shape." + } + }); + + chunk_search::insert_note_with_importance_and_source_ref( + &context.service.db.pool, + note_id, + note_text, + &context.embedding_version, + 0.9_f32, + 1.0, + "agent_private", + source_ref.clone(), + ) + .await; + chunk_search::insert_chunk( + &context.service.db.pool, + chunk_id, + note_id, + 0, + 0, + note_text.len() as i32, + note_text, + &context.embedding_version, + ) + .await; + chunk_search::upsert_point( + &context.service, + chunk_id, + note_id, + 0, + 0, + note_text.len() as i32, + note_text, + ) + .await; + + let l0 = fetch_raw_source_ref_for_level(&context, note_id, PayloadLevel::L0).await; + let l1 = fetch_raw_source_ref_for_level(&context, note_id, PayloadLevel::L1).await; + let l2 = fetch_raw_source_ref_for_level(&context, note_id, PayloadLevel::L2).await; + + assert_eq!(l0, serde_json::json!({})); + assert_eq!(l1, serde_json::json!({})); + assert_eq!(l2, source_ref); + + context.test_db.cleanup().await.expect("Failed to cleanup test database."); +} + +#[tokio::test] +#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL to run."] +async fn search_details_payload_level_shapes_text_and_fields() { + let providers = chunk_search::build_providers(StubRerank); + let Some(context) = chunk_search::setup_context( + "search_details_payload_level_shapes_text_and_fields", + providers, + ) + .await + else { + return; + }; + let note_id = Uuid::new_v4(); + let chunk_id = Uuid::new_v4(); + let max_note_chars = context.service.cfg.memory.max_note_chars as usize; + let note_text_seed = + "This is the long note body used for detail shaping and payload truncation. "; + let note_text = note_text_seed.repeat((max_note_chars / note_text_seed.len()) + 2); + let source_ref = serde_json::json!({ + "schema": "note_source_ref/v1", + "locator": { + "document_id": Uuid::new_v4().to_string(), + "chunk_id": Uuid::new_v4().to_string(), + "extra": "field with rich details for l2 retention" + }, + }); + let structured_summary = "Structured summary about payload levels and compact text behavior."; + let field_id = Uuid::new_v4(); + + assert!(note_text.len() > max_note_chars); + + chunk_search::insert_note_with_importance_and_source_ref( + &context.service.db.pool, + note_id, + note_text.as_str(), + &context.embedding_version, + 0.8_f32, + 1.0, + "agent_private", + source_ref.clone(), + ) + .await; + chunk_search::insert_summary_field_row( + &context.service.db.pool, + field_id, + note_id, + structured_summary, + ) + .await; + chunk_search::insert_chunk( + &context.service.db.pool, + chunk_id, + note_id, + 0, + 0, + note_text.len() as i32, + note_text.as_str(), + &context.embedding_version, + ) + .await; + chunk_search::upsert_point( + &context.service, + chunk_id, + note_id, + 0, + 0, + note_text.len() as i32, + note_text.as_str(), + ) + .await; + + let index = context + .service + .search(SearchRequest { + tenant_id: "t".to_string(), + project_id: "p".to_string(), + agent_id: "a".to_string(), + token_id: None, + read_profile: "private_only".to_string(), + payload_level: PayloadLevel::L2, + query: "payload".to_string(), + top_k: Some(5), + candidate_k: Some(10), + filter: None, + record_hits: Some(false), + ranking: None, + }) + .await + .expect("Search index failed."); + let l0 = fetch_search_detail_note_for_level( + &context, + index.search_session_id, + note_id, + PayloadLevel::L0, + ) + .await; + let l1 = fetch_search_detail_note_for_level( + &context, + index.search_session_id, + note_id, + PayloadLevel::L1, + ) + .await; + let l2 = fetch_search_detail_note_for_level( + &context, + index.search_session_id, + note_id, + PayloadLevel::L2, + ) + .await; + + assert_search_detail_payload_levels(max_note_chars, ¬e_text, &source_ref, &l0, &l1, &l2); + + context.test_db.cleanup().await.expect("Failed to cleanup test database."); +} diff --git a/packages/elf-service/tests/acceptance/chunk_search/relation_context.rs b/packages/elf-service/tests/acceptance/chunk_search/relation_context.rs new file mode 100644 index 00000000..c22554c7 --- /dev/null +++ b/packages/elf-service/tests/acceptance/chunk_search/relation_context.rs @@ -0,0 +1,397 @@ +use sqlx::PgExecutor; +use time::{Duration, OffsetDateTime}; +use uuid::Uuid; + +use crate::acceptance::{ + self, StubRerank, + chunk_search::{self, TestContext}, +}; +use elf_service::{ElfService, Providers, RelationTemporalStatus, SearchRequest}; + +async fn insert_graph_entity<'e, E>( + executor: E, + entity_id: Uuid, + canonical: &str, + kind: Option<&str>, +) where + E: PgExecutor<'e>, +{ + sqlx::query( + "\ +INSERT INTO graph_entities ( + entity_id, + tenant_id, + project_id, + canonical, + canonical_norm, + kind +) +VALUES ($1, $2, $3, $4, $5, $6)", + ) + .bind(entity_id) + .bind("t") + .bind("p") + .bind(canonical) + .bind(canonical.to_lowercase()) + .bind(kind) + .execute(executor) + .await + .expect("Failed to insert graph entity."); +} + +async fn insert_graph_predicate<'e, E>(executor: E, predicate_id: Uuid, canonical: &str) +where + E: PgExecutor<'e>, +{ + sqlx::query( + "\ +INSERT INTO graph_predicates ( + predicate_id, + scope_key, + tenant_id, + project_id, + canonical, + canonical_norm, + cardinality, + status +) +VALUES ($1, $2, $3, $4, $5, $6, 'single', 'active')", + ) + .bind(predicate_id) + .bind("__project__:p") + .bind("t") + .bind("p") + .bind(canonical) + .bind(canonical.to_lowercase()) + .execute(executor) + .await + .expect("Failed to insert graph predicate."); +} + +#[allow(clippy::too_many_arguments)] +async fn insert_graph_fact<'e, E>( + executor: E, + fact_id: Uuid, + subject_entity_id: Uuid, + predicate: &str, + predicate_id: Uuid, + object_value: &str, + valid_from: OffsetDateTime, + valid_to: Option, +) where + E: PgExecutor<'e>, +{ + sqlx::query( + "\ +INSERT INTO graph_facts ( + fact_id, + tenant_id, + project_id, + agent_id, + scope, + subject_entity_id, + predicate, + predicate_id, + object_entity_id, + object_value, + valid_from, + valid_to +) +VALUES ($1, $2, $3, $4, $5, $6, $7, $8, NULL, $9, $10, $11)", + ) + .bind(fact_id) + .bind("t") + .bind("p") + .bind("a") + .bind("agent_private") + .bind(subject_entity_id) + .bind(predicate) + .bind(predicate_id) + .bind(object_value) + .bind(valid_from) + .bind(valid_to) + .execute(executor) + .await + .expect("Failed to insert graph fact."); +} + +async fn insert_graph_fact_evidence<'e, E>( + executor: E, + fact_id: Uuid, + note_id: Uuid, + created_at: OffsetDateTime, +) where + E: PgExecutor<'e>, +{ + sqlx::query( + "\ +INSERT INTO graph_fact_evidence (evidence_id, fact_id, note_id, created_at) +VALUES ($1, $2, $3, $4)", + ) + .bind(Uuid::new_v4()) + .bind(fact_id) + .bind(note_id) + .bind(created_at) + .execute(executor) + .await + .expect("Failed to insert graph fact evidence."); +} + +async fn setup_graph_context_test( + test_name: &str, + providers: Providers, + max_facts_per_item: u32, + max_evidence_notes_per_fact: u32, +) -> Option { + let Some(test_db) = acceptance::test_db().await else { + eprintln!("Skipping {test_name}; set ELF_PG_DSN to run this test."); + + return None; + }; + let Some(qdrant_url) = acceptance::test_qdrant_url() else { + eprintln!("Skipping {test_name}; set ELF_QDRANT_URL to run this test."); + + return None; + }; + let collection = test_db.collection_name("elf_acceptance"); + let docs_collection = test_db.collection_name("elf_acceptance_docs"); + let mut cfg = acceptance::test_config( + test_db.dsn().to_string(), + qdrant_url, + 4_096, + collection, + docs_collection, + ); + + cfg.search.graph_context.enabled = true; + cfg.search.graph_context.max_facts_per_item = max_facts_per_item; + cfg.search.graph_context.max_evidence_notes_per_fact = max_evidence_notes_per_fact; + + let service = + acceptance::build_service(cfg, providers).await.expect("Failed to build service."); + + acceptance::reset_db(&service.db.pool).await.expect("Failed to reset test database."); + chunk_search::reset_collection(&service).await; + + let embedding_version = format!( + "{}:{}:{}", + service.cfg.providers.embedding.provider_id, + service.cfg.providers.embedding.model, + service.cfg.storage.qdrant.vector_dim + ); + + Some(TestContext { service, test_db, embedding_version }) +} + +async fn seed_relation_context_fixture( + service: &ElfService, + embedding_version: &str, +) -> (Uuid, Uuid, Uuid) { + let now = OffsetDateTime::now_utc(); + let note_id = Uuid::new_v4(); + let note_id_2 = Uuid::new_v4(); + let chunk_id = Uuid::new_v4(); + let chunk_text = "Alice mentors Bob about projects and priorities."; + let subject_id = Uuid::new_v4(); + let newer_fact_id = Uuid::new_v4(); + let predicate_id = Uuid::new_v4(); + let older_fact_id = Uuid::new_v4(); + let older_fact_valid_from = now - Duration::seconds(10); + let newer_fact_valid_from = now - Duration::seconds(5); + let note_1_evidence_created_at = now - Duration::seconds(30); + let note_2_evidence_created_at = now - Duration::seconds(10); + + chunk_search::insert_note(&service.db.pool, note_id, chunk_text, embedding_version).await; + chunk_search::insert_note( + &service.db.pool, + note_id_2, + "Second note for evidence ordering.", + embedding_version, + ) + .await; + chunk_search::insert_chunk( + &service.db.pool, + chunk_id, + note_id, + 0, + 0, + chunk_text.len() as i32, + chunk_text, + embedding_version, + ) + .await; + chunk_search::upsert_point( + service, + chunk_id, + note_id, + 0, + 0, + chunk_text.len() as i32, + chunk_text, + ) + .await; + + insert_graph_entity(&service.db.pool, subject_id, "Alice", Some("person")).await; + insert_graph_predicate(&service.db.pool, predicate_id, "mentors").await; + insert_graph_fact( + &service.db.pool, + older_fact_id, + subject_id, + "mentors", + predicate_id, + "Bob", + older_fact_valid_from, + Some(newer_fact_valid_from), + ) + .await; + insert_graph_fact_evidence( + &service.db.pool, + older_fact_id, + note_id, + note_1_evidence_created_at, + ) + .await; + insert_graph_fact( + &service.db.pool, + newer_fact_id, + subject_id, + "mentors", + predicate_id, + "Carol", + newer_fact_valid_from, + None, + ) + .await; + insert_graph_fact_evidence( + &service.db.pool, + newer_fact_id, + note_id, + note_1_evidence_created_at, + ) + .await; + insert_graph_fact_evidence( + &service.db.pool, + newer_fact_id, + note_id_2, + note_2_evidence_created_at, + ) + .await; + + (note_id, newer_fact_id, older_fact_id) +} + +#[tokio::test] +#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL to run."] +async fn search_raw_quick_includes_relation_context_and_respects_fact_bounds() { + let providers = chunk_search::build_providers(StubRerank); + let Some(context) = setup_graph_context_test( + "search_raw_quick_includes_relation_context_and_respects_fact_bounds", + providers, + 1, + 1, + ) + .await + else { + return; + }; + let fixture = seed_relation_context_fixture(&context.service, &context.embedding_version).await; + let note_id = fixture.0; + let newer_fact_id = fixture.1; + let response = context + .service + .search_raw_quick(SearchRequest { + tenant_id: "t".to_string(), + project_id: "p".to_string(), + agent_id: "a".to_string(), + token_id: None, + read_profile: "private_only".to_string(), + payload_level: Default::default(), + query: "Alice".to_string(), + top_k: Some(5), + candidate_k: Some(10), + filter: None, + record_hits: Some(false), + ranking: None, + }) + .await + .expect("Search failed."); + let item = response.items.first().expect("Expected search result."); + let relation_context = item + .explain + .relation_context + .as_ref() + .expect("Expected relation context in search explain."); + + assert_eq!(relation_context.len(), 1, "Expected relation context to be truncated to one fact."); + assert_eq!( + relation_context[0].fact_id, newer_fact_id, + "Expected the most recent fact after truncation." + ); + assert_eq!(relation_context[0].object.value.as_deref(), Some("Carol")); + assert_eq!(relation_context[0].temporal_status, RelationTemporalStatus::Current); + assert!(relation_context[0].valid_to.is_none()); + assert_eq!(relation_context[0].evidence_note_ids.len(), 1); + assert_eq!(relation_context[0].evidence_note_ids[0], note_id); + + context.test_db.cleanup().await.expect("Failed to cleanup test database."); +} + +#[tokio::test] +#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL to run."] +async fn search_raw_quick_marks_historical_relation_context() { + let providers = chunk_search::build_providers(StubRerank); + let Some(context) = setup_graph_context_test( + "search_raw_quick_marks_historical_relation_context", + providers, + 2, + 2, + ) + .await + else { + return; + }; + let fixture = seed_relation_context_fixture(&context.service, &context.embedding_version).await; + let older_fact_id = fixture.2; + let response = context + .service + .search_raw_quick(SearchRequest { + tenant_id: "t".to_string(), + project_id: "p".to_string(), + agent_id: "a".to_string(), + token_id: None, + read_profile: "private_only".to_string(), + payload_level: Default::default(), + query: "Alice".to_string(), + top_k: Some(5), + candidate_k: Some(10), + filter: None, + record_hits: Some(false), + ranking: None, + }) + .await + .expect("Search failed."); + let item = response.items.first().expect("Expected search result."); + let relation_context = item + .explain + .relation_context + .as_ref() + .expect("Expected relation context in search explain."); + + assert_eq!( + relation_context.len(), + 2, + "Expected current and historical relation facts in context.", + ); + assert_eq!(relation_context[0].temporal_status, RelationTemporalStatus::Current); + + let historical = relation_context + .iter() + .find(|context| context.fact_id == older_fact_id) + .expect("Expected historical fact in relation context."); + + assert_eq!(historical.object.value.as_deref(), Some("Bob")); + assert_eq!(historical.temporal_status, RelationTemporalStatus::Historical); + assert!(historical.valid_to.is_some()); + + context.test_db.cleanup().await.expect("Failed to cleanup test database."); +} diff --git a/packages/elf-service/tests/acceptance/docs_extension_v1.rs b/packages/elf-service/tests/acceptance/docs_extension_v1.rs index b41f83d7..7a6f321c 100644 --- a/packages/elf-service/tests/acceptance/docs_extension_v1.rs +++ b/packages/elf-service/tests/acceptance/docs_extension_v1.rs @@ -1,6 +1,13 @@ mod lifecycle; - -use std::{collections::HashSet, future::IntoFuture, string::ToString, sync::Arc, time::Instant}; +mod search_filters; + +use std::{ + collections::HashSet, + future::IntoFuture, + string::ToString, + sync::Arc, + time::{Duration, Instant}, +}; use ahash::AHashMap; use axum::{Json, Router, extract::State, http::StatusCode, response::IntoResponse, routing}; @@ -10,22 +17,21 @@ use qdrant_client::qdrant::{ }; use serde_json::Map; use sqlx::{FromRow, PgPool}; -use time::{OffsetDateTime, format_description::well_known::Rfc3339}; use tokenizers::{Tokenizer, models::wordlevel::WordLevel}; use tokio::{ net::TcpListener, sync::{oneshot, oneshot::Sender}, task::JoinHandle, + time, }; use uuid::Uuid; use crate::acceptance::{self, SpyExtractor, StubEmbedding, StubRerank, chunking::ChunkingConfig}; use elf_config::EmbeddingProviderConfig; use elf_service::{ - AddNoteInput, AddNoteRequest, BoxFuture, DocsExcerptsGetRequest, DocsGetRequest, - DocsPutRequest, DocsPutResponse, DocsSearchL0Request, ElfService, EmbeddingProvider, Error, - PayloadLevel, Providers, Result, SearchRequest, TextQuoteSelector, - docs::DocRetrievalTrajectory, + AddNoteInput, AddNoteRequest, DocsExcerptsGetRequest, DocsGetRequest, DocsPutRequest, + DocsPutResponse, DocsSearchL0Request, ElfService, Error, PayloadLevel, Providers, + SearchRequest, TextQuoteSelector, docs::DocRetrievalTrajectory, }; use elf_storage::{db::Db, qdrant::QdrantStore}; use elf_testkit::TestDatabase; @@ -64,26 +70,6 @@ struct DocsContext { service: ElfService, } -struct NonZeroSearchEmbedding; -impl EmbeddingProvider for NonZeroSearchEmbedding { - fn embed<'a>( - &'a self, - cfg: &'a EmbeddingProviderConfig, - texts: &'a [String], - ) -> BoxFuture<'a, Result>>> { - let vector = vec![0.1_f32; cfg.dimensions as usize]; - - Box::pin(async move { Ok(vec![vector; texts.len()]) }) - } -} - -struct DocsFilterFixtureIds { - search_domain_doc_id: Uuid, - search_other_domain_doc_id: Uuid, - repo_doc_id: Uuid, - repo_other_doc_id: Uuid, -} - fn build_test_tokenizer() -> Tokenizer { let mut vocab = AHashMap::new(); @@ -112,17 +98,7 @@ fn trajectory_stage_stats<'a>( trajectory.stages.iter().find(|stage| stage.stage_name == stage_name).map(|stage| &stage.stats) } -fn configure_recency_bias_settings(service: &mut ElfService) { - service.providers.embedding = Arc::new(NonZeroSearchEmbedding); - service.cfg.ranking.tie_breaker_weight = 1_000.0; - service.cfg.ranking.recency_tau_days = 36_500.0; -} - -async fn wait_for_doc_outbox_done( - pool: &PgPool, - doc_id: Uuid, - timeout: std::time::Duration, -) -> bool { +async fn wait_for_doc_outbox_done(pool: &PgPool, doc_id: Uuid, timeout: Duration) -> bool { let deadline = Instant::now() + timeout; loop { @@ -157,15 +133,11 @@ WHERE doc_id = $1", return false; } - tokio::time::sleep(std::time::Duration::from_millis(200)).await; + time::sleep(Duration::from_millis(200)).await; } } -async fn wait_for_note_outbox_done( - pool: &PgPool, - note_id: Uuid, - timeout: std::time::Duration, -) -> bool { +async fn wait_for_note_outbox_done(pool: &PgPool, note_id: Uuid, timeout: Duration) -> bool { let deadline = Instant::now() + timeout; loop { @@ -200,7 +172,7 @@ WHERE note_id = $1", return false; } - tokio::time::sleep(std::time::Duration::from_millis(200)).await; + time::sleep(Duration::from_millis(200)).await; } } @@ -241,441 +213,6 @@ async fn embed_handler( (StatusCode::OK, Json(serde_json::json!({ "data": data }))).into_response() } -#[tokio::test] -#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL (or ELF_QDRANT_GRPC_URL) to run."] -async fn docs_search_l0_respects_doc_ts_filter() { - let Some(ctx) = setup_docs_context().await else { return }; - let ( - test_db, - service, - shared_knowledge_doc, - older_shared_knowledge_doc, - private_chat_doc, - handle, - shutdown, - ) = create_docs_search_filter_fixture(ctx).await; - let doc_ts_windowed_results = service - .docs_search_l0(DocsSearchL0Request { - tenant_id: "t".to_string(), - project_id: "p".to_string(), - caller_agent_id: "reader".to_string(), - scope: Some("project_shared".to_string()), - status: None, - doc_type: None, - sparse_mode: None, - domain: None, - repo: None, - agent_id: None, - thread_id: None, - updated_after: None, - updated_before: None, - ts_gte: Some("2026-01-01T00:00:00Z".to_string()), - ts_lte: Some("2026-12-31T23:59:59Z".to_string()), - read_profile: "all_scopes".to_string(), - query: "peregrine".to_string(), - top_k: Some(20), - candidate_k: Some(50), - explain: None, - }) - .await - .expect("Failed to search docs by doc_ts range."); - let doc_ts_windowed_ids = - doc_ts_windowed_results.items.into_iter().map(|item| item.doc_id).collect::>(); - - assert!(doc_ts_windowed_ids.contains(&shared_knowledge_doc)); - assert!(!doc_ts_windowed_ids.contains(&older_shared_knowledge_doc)); - assert!(!doc_ts_windowed_ids.contains(&private_chat_doc)); - - cleanup_docs_filter_fixture(test_db, handle, shutdown).await; -} - -#[tokio::test] -#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL (or ELF_QDRANT_GRPC_URL) to run this test."] -async fn docs_search_l0_sparse_mode_records_expected_vector_search_channels() { - let Some(ctx) = setup_docs_context().await else { return }; - let DocsContext { test_db, service } = ctx; - let doc = put_test_doc(&service).await; - let (handle, shutdown) = spawn_doc_worker(&service).await; - - assert!( - wait_for_doc_outbox_done(&service.db.pool, doc.doc_id, std::time::Duration::from_secs(15)) - .await, - "Expected doc outbox to reach DONE." - ); - - let cases = [ - ("off", vec!["dense"]), - ("on", vec!["dense", "sparse"]), - ("auto", vec!["dense", "sparse"]), - ]; - - for (sparse_mode, expected_channels) in cases { - let response = service - .docs_search_l0(DocsSearchL0Request { - tenant_id: "t".to_string(), - project_id: "p".to_string(), - caller_agent_id: "reader".to_string(), - scope: None, - status: None, - doc_type: None, - sparse_mode: Some(sparse_mode.to_string()), - domain: None, - repo: None, - agent_id: None, - thread_id: None, - updated_after: None, - updated_before: None, - ts_gte: None, - ts_lte: None, - read_profile: "private_plus_project".to_string(), - query: "https://elf.example/docs?query=peregrine".to_string(), - top_k: Some(20), - candidate_k: Some(50), - explain: Some(true), - }) - .await - .expect("Failed to search docs with sparse_mode set."); - let trajectory = response.trajectory.as_ref().expect("Expected explain trajectory."); - let vector_search_stats = trajectory_stage_stats(trajectory, "vector_search") - .expect("Expected vector_search stage in trajectory."); - let vector_search_channels = vector_search_stats - .get("channels") - .and_then(serde_json::Value::as_array) - .expect("Expected vector_search stats channels."); - let observed_channels = vector_search_channels - .iter() - .map(|channel| channel.as_str().expect("Expected channel string.").to_string()) - .collect::>(); - - assert_eq!(observed_channels, expected_channels); - } - - let _ = shutdown.send(()); - - handle.abort(); - - let _ = handle.await; - - drop(service); - - test_db.cleanup().await.expect("Failed to cleanup test database."); -} - -#[tokio::test] -#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL (or ELF_QDRANT_GRPC_URL) to run this test."] -async fn docs_search_l0_filters_include_and_exclude_by_doc_type_and_domain_or_repo() { - let Some(ctx) = setup_docs_context().await else { return }; - let docs = seed_docs_filter_fixtures(&ctx).await; - let DocsContext { test_db, service } = ctx; - let (handle, shutdown) = spawn_doc_worker(&service).await; - - for doc_id in [ - docs.search_domain_doc_id, - docs.search_other_domain_doc_id, - docs.repo_doc_id, - docs.repo_other_doc_id, - ] - .iter() - { - assert!( - wait_for_doc_outbox_done(&service.db.pool, *doc_id, std::time::Duration::from_secs(15)) - .await, - "Expected docs outbox to reach DONE." - ); - } - - let search_domain_results = service - .docs_search_l0(DocsSearchL0Request { - tenant_id: "t".to_string(), - project_id: "p".to_string(), - caller_agent_id: "reader".to_string(), - scope: Some("project_shared".to_string()), - status: None, - doc_type: Some("search".to_string()), - sparse_mode: None, - domain: Some("docs.example.com".to_string()), - repo: None, - agent_id: None, - thread_id: None, - updated_after: None, - updated_before: None, - ts_gte: None, - ts_lte: None, - read_profile: "all_scopes".to_string(), - query: "peregrine".to_string(), - top_k: Some(20), - candidate_k: Some(50), - explain: None, - }) - .await - .expect("Failed to search docs by domain."); - let search_domain_result_ids = - search_domain_results.items.into_iter().map(|item| item.doc_id).collect::>(); - - assert!(search_domain_result_ids.contains(&docs.search_domain_doc_id)); - assert!(!search_domain_result_ids.contains(&docs.search_other_domain_doc_id)); - assert!(!search_domain_result_ids.contains(&docs.repo_doc_id)); - assert!(!search_domain_result_ids.contains(&docs.repo_other_doc_id)); - - let repo_results = service - .docs_search_l0(DocsSearchL0Request { - tenant_id: "t".to_string(), - project_id: "p".to_string(), - caller_agent_id: "reader".to_string(), - scope: Some("project_shared".to_string()), - status: None, - doc_type: Some("dev".to_string()), - sparse_mode: None, - domain: None, - repo: Some("elf-org/docs".to_string()), - agent_id: None, - thread_id: None, - updated_after: None, - updated_before: None, - ts_gte: None, - ts_lte: None, - read_profile: "all_scopes".to_string(), - query: "peregrine".to_string(), - top_k: Some(20), - candidate_k: Some(50), - explain: None, - }) - .await - .expect("Failed to search docs by repo."); - let repo_result_ids = - repo_results.items.into_iter().map(|item| item.doc_id).collect::>(); - - assert!(repo_result_ids.contains(&docs.repo_doc_id)); - assert!(!repo_result_ids.contains(&docs.repo_other_doc_id)); - assert!(!repo_result_ids.contains(&docs.search_domain_doc_id)); - assert!(!repo_result_ids.contains(&docs.search_other_domain_doc_id)); - - let _ = shutdown.send(()); - - handle.abort(); - - let _ = handle.await; - - drop(service); - - test_db.cleanup().await.expect("Failed to cleanup test database."); -} - -async fn seed_docs_filter_fixtures(ctx: &DocsContext) -> DocsFilterFixtureIds { - let search_domain_doc = put_test_doc_with( - &ctx.service, - "owner", - "project_shared", - Some("search"), - "Docs domain include sample", - serde_json::json!({ - "schema": "doc_source_ref/v1", - "doc_type": "search", - "ts": "2026-02-25T12:00:00Z", - "query": "How to fetch docs", - "domain": "docs.example.com", - "url": "https://docs.example.com/guide", - }), - TEST_CONTENT, - ) - .await; - let search_other_domain_doc = put_test_doc_with( - &ctx.service, - "owner", - "project_shared", - Some("search"), - "Docs domain exclude sample", - serde_json::json!({ - "schema": "doc_source_ref/v1", - "doc_type": "search", - "ts": "2026-02-25T12:00:00Z", - "query": "How to build", - "domain": "api.example.org", - "url": "https://api.example.org/reference", - }), - TEST_CONTENT, - ) - .await; - let repo_doc = put_test_doc_with( - &ctx.service, - "owner", - "project_shared", - Some("dev"), - "Docs repo include sample", - serde_json::json!({ - "schema": "doc_source_ref/v1", - "doc_type": "dev", - "ts": "2026-02-25T12:00:00Z", - "repo": "elf-org/docs", - "commit_sha": "9f0a3f4c4eb58bfcf4a5f4f9d0c7be0e13c2f8d19", - }), - TEST_CONTENT, - ) - .await; - let repo_other_doc = put_test_doc_with( - &ctx.service, - "owner", - "project_shared", - Some("dev"), - "Docs repo exclude sample", - serde_json::json!({ - "schema": "doc_source_ref/v1", - "doc_type": "dev", - "ts": "2026-02-25T12:00:00Z", - "repo": "other-org/docs", - "commit_sha": "4e3d9ec4d2a59a2f6c7d7f3d4c6e8a5b1f7b9d3f", - }), - TEST_CONTENT, - ) - .await; - - DocsFilterFixtureIds { - search_domain_doc_id: search_domain_doc.doc_id, - search_other_domain_doc_id: search_other_domain_doc.doc_id, - repo_doc_id: repo_doc.doc_id, - repo_other_doc_id: repo_other_doc.doc_id, - } -} - -#[tokio::test] -#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL (or ELF_QDRANT_GRPC_URL) to run this test."] -async fn docs_search_l0_recency_bias_orders_newer_doc_first_and_records_projection_signals() { - let Some(ctx) = setup_docs_context().await else { return }; - let DocsContext { test_db, mut service } = ctx; - - configure_recency_bias_settings(&mut service); - - let (handle, shutdown) = seed_recency_bias_docs_for_search(&service).await; - - assert_docs_search_l0_recency_projection(&service).await; - - let _ = shutdown.send(()); - - handle.abort(); - - let _ = handle.await; - - drop(service); - - test_db.cleanup().await.expect("Failed to cleanup test database."); -} - -async fn seed_recency_bias_docs_for_search(service: &ElfService) -> (JoinHandle<()>, Sender<()>) { - let newer_doc = put_test_doc_with( - service, - "owner", - "project_shared", - Some("knowledge"), - "Recency newer doc", - serde_json::json!({ - "schema": "doc_source_ref/v1", - "doc_type": "knowledge", - "ts": "2026-02-27T12:00:00Z", - }), - TEST_CONTENT, - ) - .await; - let older_doc = put_test_doc_with( - service, - "owner", - "project_shared", - Some("knowledge"), - "Recency older doc", - serde_json::json!({ - "schema": "doc_source_ref/v1", - "doc_type": "knowledge", - "ts": "2026-02-20T12:00:00Z", - }), - TEST_CONTENT, - ) - .await; - let (handle, shutdown) = spawn_doc_worker(service).await; - - assert!( - wait_for_doc_outbox_done( - &service.db.pool, - newer_doc.doc_id, - std::time::Duration::from_secs(15), - ) - .await, - "Expected newer doc outbox to reach DONE." - ); - assert!( - wait_for_doc_outbox_done( - &service.db.pool, - older_doc.doc_id, - std::time::Duration::from_secs(15), - ) - .await, - "Expected older doc outbox to reach DONE." - ); - - let older_ts = OffsetDateTime::parse("2020-01-01T00:00:00Z", &Rfc3339) - .expect("Failed to parse older doc timestamp."); - - sqlx::query("UPDATE doc_documents SET updated_at = $1 WHERE doc_id = $2") - .bind(older_ts) - .bind(older_doc.doc_id) - .execute(&service.db.pool) - .await - .expect("Failed to set deterministic updated_at for older doc."); - - (handle, shutdown) -} - -async fn assert_docs_search_l0_recency_projection(service: &ElfService) { - let results = service - .docs_search_l0(DocsSearchL0Request { - tenant_id: "t".to_string(), - project_id: "p".to_string(), - caller_agent_id: "reader".to_string(), - scope: None, - status: None, - doc_type: None, - sparse_mode: None, - domain: None, - repo: None, - agent_id: None, - thread_id: None, - updated_after: None, - updated_before: None, - ts_gte: None, - ts_lte: None, - read_profile: "private_plus_project".to_string(), - query: "peregrine".to_string(), - top_k: Some(2), - candidate_k: Some(20), - explain: Some(true), - }) - .await - .expect("Failed to search docs for recency ordering."); - let ordered_ids = results.items.iter().map(|item| item.doc_id).collect::>(); - - assert!(ordered_ids.len() >= 2); - - let newest_id = results - .items - .iter() - .max_by_key(|item| item.updated_at.unix_timestamp()) - .expect("Expected returned item.") - .doc_id; - - assert_eq!(results.items[0].doc_id, newest_id); - assert!(results.items[0].updated_at > results.items[1].updated_at); - - let trajectory = results.trajectory.as_ref().expect("Expected explain trajectory."); - let result_projection = trajectory_stage_stats(trajectory, "result_projection") - .expect("Expected result_projection stage in trajectory."); - - assert!(result_projection.get("pre_authorization_candidates").is_some()); - assert!(result_projection.get("returned_items").is_some()); - assert!(result_projection.get("recency_tau_days").is_some()); - assert!(result_projection.get("tie_breaker_weight").is_some()); - assert_eq!( - result_projection.get("recency_boost_applied"), - Some(&serde_json::Value::Bool(true)) - ); -} async fn create_docs_search_filter_fixture( ctx: DocsContext, diff --git a/packages/elf-service/tests/acceptance/docs_extension_v1/search_filters.rs b/packages/elf-service/tests/acceptance/docs_extension_v1/search_filters.rs new file mode 100644 index 00000000..6df2e049 --- /dev/null +++ b/packages/elf-service/tests/acceptance/docs_extension_v1/search_filters.rs @@ -0,0 +1,479 @@ +use std::{collections::HashSet, sync::Arc, time::Duration}; + +use serde_json::Value; +use time::{OffsetDateTime, format_description::well_known::Rfc3339}; +use tokio::{sync::oneshot::Sender, task::JoinHandle}; +use uuid::Uuid; + +use crate::acceptance::docs_extension_v1::{self, DocsContext, TEST_CONTENT}; +use elf_config::EmbeddingProviderConfig; +use elf_service::{BoxFuture, DocsSearchL0Request, ElfService, EmbeddingProvider, Result}; + +struct NonZeroSearchEmbedding; +impl EmbeddingProvider for NonZeroSearchEmbedding { + fn embed<'a>( + &'a self, + cfg: &'a EmbeddingProviderConfig, + texts: &'a [String], + ) -> BoxFuture<'a, Result>>> { + let vector = vec![0.1_f32; cfg.dimensions as usize]; + + Box::pin(async move { Ok(vec![vector; texts.len()]) }) + } +} + +struct DocsFilterFixtureIds { + search_domain_doc_id: Uuid, + search_other_domain_doc_id: Uuid, + repo_doc_id: Uuid, + repo_other_doc_id: Uuid, +} + +fn configure_recency_bias_settings(service: &mut ElfService) { + service.providers.embedding = Arc::new(NonZeroSearchEmbedding); + service.cfg.ranking.tie_breaker_weight = 1_000.0; + service.cfg.ranking.recency_tau_days = 36_500.0; +} + +#[tokio::test] +#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL (or ELF_QDRANT_GRPC_URL) to run."] +async fn docs_search_l0_respects_doc_ts_filter() { + let Some(ctx) = docs_extension_v1::setup_docs_context().await else { return }; + let ( + test_db, + service, + shared_knowledge_doc, + older_shared_knowledge_doc, + private_chat_doc, + handle, + shutdown, + ) = docs_extension_v1::create_docs_search_filter_fixture(ctx).await; + let doc_ts_windowed_results = service + .docs_search_l0(DocsSearchL0Request { + tenant_id: "t".to_string(), + project_id: "p".to_string(), + caller_agent_id: "reader".to_string(), + scope: Some("project_shared".to_string()), + status: None, + doc_type: None, + sparse_mode: None, + domain: None, + repo: None, + agent_id: None, + thread_id: None, + updated_after: None, + updated_before: None, + ts_gte: Some("2026-01-01T00:00:00Z".to_string()), + ts_lte: Some("2026-12-31T23:59:59Z".to_string()), + read_profile: "all_scopes".to_string(), + query: "peregrine".to_string(), + top_k: Some(20), + candidate_k: Some(50), + explain: None, + }) + .await + .expect("Failed to search docs by doc_ts range."); + let doc_ts_windowed_ids = + doc_ts_windowed_results.items.into_iter().map(|item| item.doc_id).collect::>(); + + assert!(doc_ts_windowed_ids.contains(&shared_knowledge_doc)); + assert!(!doc_ts_windowed_ids.contains(&older_shared_knowledge_doc)); + assert!(!doc_ts_windowed_ids.contains(&private_chat_doc)); + + docs_extension_v1::cleanup_docs_filter_fixture(test_db, handle, shutdown).await; +} + +#[tokio::test] +#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL (or ELF_QDRANT_GRPC_URL) to run this test."] +async fn docs_search_l0_sparse_mode_records_expected_vector_search_channels() { + let Some(ctx) = docs_extension_v1::setup_docs_context().await else { return }; + let DocsContext { test_db, service } = ctx; + let doc = docs_extension_v1::put_test_doc(&service).await; + let (handle, shutdown) = docs_extension_v1::spawn_doc_worker(&service).await; + + assert!( + docs_extension_v1::wait_for_doc_outbox_done( + &service.db.pool, + doc.doc_id, + Duration::from_secs(15), + ) + .await, + "Expected doc outbox to reach DONE." + ); + + let cases = [ + ("off", vec!["dense"]), + ("on", vec!["dense", "sparse"]), + ("auto", vec!["dense", "sparse"]), + ]; + + for (sparse_mode, expected_channels) in cases { + let response = service + .docs_search_l0(DocsSearchL0Request { + tenant_id: "t".to_string(), + project_id: "p".to_string(), + caller_agent_id: "reader".to_string(), + scope: None, + status: None, + doc_type: None, + sparse_mode: Some(sparse_mode.to_string()), + domain: None, + repo: None, + agent_id: None, + thread_id: None, + updated_after: None, + updated_before: None, + ts_gte: None, + ts_lte: None, + read_profile: "private_plus_project".to_string(), + query: "https://elf.example/docs?query=peregrine".to_string(), + top_k: Some(20), + candidate_k: Some(50), + explain: Some(true), + }) + .await + .expect("Failed to search docs with sparse_mode set."); + let trajectory = response.trajectory.as_ref().expect("Expected explain trajectory."); + let vector_search_stats = + docs_extension_v1::trajectory_stage_stats(trajectory, "vector_search") + .expect("Expected vector_search stage in trajectory."); + let vector_search_channels = vector_search_stats + .get("channels") + .and_then(Value::as_array) + .expect("Expected vector_search stats channels."); + let observed_channels = vector_search_channels + .iter() + .map(|channel| channel.as_str().expect("Expected channel string.").to_string()) + .collect::>(); + + assert_eq!(observed_channels, expected_channels); + } + + let _ = shutdown.send(()); + + handle.abort(); + + let _ = handle.await; + + drop(service); + + test_db.cleanup().await.expect("Failed to cleanup test database."); +} + +#[tokio::test] +#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL (or ELF_QDRANT_GRPC_URL) to run this test."] +async fn docs_search_l0_filters_include_and_exclude_by_doc_type_and_domain_or_repo() { + let Some(ctx) = docs_extension_v1::setup_docs_context().await else { return }; + let docs = seed_docs_filter_fixtures(&ctx).await; + let DocsContext { test_db, service } = ctx; + let (handle, shutdown) = docs_extension_v1::spawn_doc_worker(&service).await; + + for doc_id in [ + docs.search_domain_doc_id, + docs.search_other_domain_doc_id, + docs.repo_doc_id, + docs.repo_other_doc_id, + ] + .iter() + { + assert!( + docs_extension_v1::wait_for_doc_outbox_done( + &service.db.pool, + *doc_id, + Duration::from_secs(15), + ) + .await, + "Expected docs outbox to reach DONE." + ); + } + + let search_domain_results = service + .docs_search_l0(DocsSearchL0Request { + tenant_id: "t".to_string(), + project_id: "p".to_string(), + caller_agent_id: "reader".to_string(), + scope: Some("project_shared".to_string()), + status: None, + doc_type: Some("search".to_string()), + sparse_mode: None, + domain: Some("docs.example.com".to_string()), + repo: None, + agent_id: None, + thread_id: None, + updated_after: None, + updated_before: None, + ts_gte: None, + ts_lte: None, + read_profile: "all_scopes".to_string(), + query: "peregrine".to_string(), + top_k: Some(20), + candidate_k: Some(50), + explain: None, + }) + .await + .expect("Failed to search docs by domain."); + let search_domain_result_ids = + search_domain_results.items.into_iter().map(|item| item.doc_id).collect::>(); + + assert!(search_domain_result_ids.contains(&docs.search_domain_doc_id)); + assert!(!search_domain_result_ids.contains(&docs.search_other_domain_doc_id)); + assert!(!search_domain_result_ids.contains(&docs.repo_doc_id)); + assert!(!search_domain_result_ids.contains(&docs.repo_other_doc_id)); + + let repo_results = service + .docs_search_l0(DocsSearchL0Request { + tenant_id: "t".to_string(), + project_id: "p".to_string(), + caller_agent_id: "reader".to_string(), + scope: Some("project_shared".to_string()), + status: None, + doc_type: Some("dev".to_string()), + sparse_mode: None, + domain: None, + repo: Some("elf-org/docs".to_string()), + agent_id: None, + thread_id: None, + updated_after: None, + updated_before: None, + ts_gte: None, + ts_lte: None, + read_profile: "all_scopes".to_string(), + query: "peregrine".to_string(), + top_k: Some(20), + candidate_k: Some(50), + explain: None, + }) + .await + .expect("Failed to search docs by repo."); + let repo_result_ids = + repo_results.items.into_iter().map(|item| item.doc_id).collect::>(); + + assert!(repo_result_ids.contains(&docs.repo_doc_id)); + assert!(!repo_result_ids.contains(&docs.repo_other_doc_id)); + assert!(!repo_result_ids.contains(&docs.search_domain_doc_id)); + assert!(!repo_result_ids.contains(&docs.search_other_domain_doc_id)); + + let _ = shutdown.send(()); + + handle.abort(); + + let _ = handle.await; + + drop(service); + + test_db.cleanup().await.expect("Failed to cleanup test database."); +} + +async fn seed_docs_filter_fixtures(ctx: &DocsContext) -> DocsFilterFixtureIds { + let search_domain_doc = docs_extension_v1::put_test_doc_with( + &ctx.service, + "owner", + "project_shared", + Some("search"), + "Docs domain include sample", + serde_json::json!({ + "schema": "doc_source_ref/v1", + "doc_type": "search", + "ts": "2026-02-25T12:00:00Z", + "query": "How to fetch docs", + "domain": "docs.example.com", + "url": "https://docs.example.com/guide", + }), + TEST_CONTENT, + ) + .await; + let search_other_domain_doc = docs_extension_v1::put_test_doc_with( + &ctx.service, + "owner", + "project_shared", + Some("search"), + "Docs domain exclude sample", + serde_json::json!({ + "schema": "doc_source_ref/v1", + "doc_type": "search", + "ts": "2026-02-25T12:00:00Z", + "query": "How to build", + "domain": "api.example.org", + "url": "https://api.example.org/reference", + }), + TEST_CONTENT, + ) + .await; + let repo_doc = docs_extension_v1::put_test_doc_with( + &ctx.service, + "owner", + "project_shared", + Some("dev"), + "Docs repo include sample", + serde_json::json!({ + "schema": "doc_source_ref/v1", + "doc_type": "dev", + "ts": "2026-02-25T12:00:00Z", + "repo": "elf-org/docs", + "commit_sha": "9f0a3f4c4eb58bfcf4a5f4f9d0c7be0e13c2f8d19", + }), + TEST_CONTENT, + ) + .await; + let repo_other_doc = docs_extension_v1::put_test_doc_with( + &ctx.service, + "owner", + "project_shared", + Some("dev"), + "Docs repo exclude sample", + serde_json::json!({ + "schema": "doc_source_ref/v1", + "doc_type": "dev", + "ts": "2026-02-25T12:00:00Z", + "repo": "other-org/docs", + "commit_sha": "4e3d9ec4d2a59a2f6c7d7f3d4c6e8a5b1f7b9d3f", + }), + TEST_CONTENT, + ) + .await; + + DocsFilterFixtureIds { + search_domain_doc_id: search_domain_doc.doc_id, + search_other_domain_doc_id: search_other_domain_doc.doc_id, + repo_doc_id: repo_doc.doc_id, + repo_other_doc_id: repo_other_doc.doc_id, + } +} + +#[tokio::test] +#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL (or ELF_QDRANT_GRPC_URL) to run this test."] +async fn docs_search_l0_recency_bias_orders_newer_doc_first_and_records_projection_signals() { + let Some(ctx) = docs_extension_v1::setup_docs_context().await else { return }; + let DocsContext { test_db, mut service } = ctx; + + configure_recency_bias_settings(&mut service); + + let (handle, shutdown) = seed_recency_bias_docs_for_search(&service).await; + + assert_docs_search_l0_recency_projection(&service).await; + + let _ = shutdown.send(()); + + handle.abort(); + + let _ = handle.await; + + drop(service); + + test_db.cleanup().await.expect("Failed to cleanup test database."); +} + +async fn seed_recency_bias_docs_for_search(service: &ElfService) -> (JoinHandle<()>, Sender<()>) { + let newer_doc = docs_extension_v1::put_test_doc_with( + service, + "owner", + "project_shared", + Some("knowledge"), + "Recency newer doc", + serde_json::json!({ + "schema": "doc_source_ref/v1", + "doc_type": "knowledge", + "ts": "2026-02-27T12:00:00Z", + }), + TEST_CONTENT, + ) + .await; + let older_doc = docs_extension_v1::put_test_doc_with( + service, + "owner", + "project_shared", + Some("knowledge"), + "Recency older doc", + serde_json::json!({ + "schema": "doc_source_ref/v1", + "doc_type": "knowledge", + "ts": "2026-02-20T12:00:00Z", + }), + TEST_CONTENT, + ) + .await; + let (handle, shutdown) = docs_extension_v1::spawn_doc_worker(service).await; + + assert!( + docs_extension_v1::wait_for_doc_outbox_done( + &service.db.pool, + newer_doc.doc_id, + Duration::from_secs(15), + ) + .await, + "Expected newer doc outbox to reach DONE." + ); + assert!( + docs_extension_v1::wait_for_doc_outbox_done( + &service.db.pool, + older_doc.doc_id, + Duration::from_secs(15), + ) + .await, + "Expected older doc outbox to reach DONE." + ); + + let older_ts = OffsetDateTime::parse("2020-01-01T00:00:00Z", &Rfc3339) + .expect("Failed to parse older doc timestamp."); + + sqlx::query("UPDATE doc_documents SET updated_at = $1 WHERE doc_id = $2") + .bind(older_ts) + .bind(older_doc.doc_id) + .execute(&service.db.pool) + .await + .expect("Failed to set deterministic updated_at for older doc."); + + (handle, shutdown) +} + +async fn assert_docs_search_l0_recency_projection(service: &ElfService) { + let results = service + .docs_search_l0(DocsSearchL0Request { + tenant_id: "t".to_string(), + project_id: "p".to_string(), + caller_agent_id: "reader".to_string(), + scope: None, + status: None, + doc_type: None, + sparse_mode: None, + domain: None, + repo: None, + agent_id: None, + thread_id: None, + updated_after: None, + updated_before: None, + ts_gte: None, + ts_lte: None, + read_profile: "private_plus_project".to_string(), + query: "peregrine".to_string(), + top_k: Some(2), + candidate_k: Some(20), + explain: Some(true), + }) + .await + .expect("Failed to search docs for recency ordering."); + let ordered_ids = results.items.iter().map(|item| item.doc_id).collect::>(); + + assert!(ordered_ids.len() >= 2); + + let newest_id = results + .items + .iter() + .max_by_key(|item| item.updated_at.unix_timestamp()) + .expect("Expected returned item.") + .doc_id; + + assert_eq!(results.items[0].doc_id, newest_id); + assert!(results.items[0].updated_at > results.items[1].updated_at); + + let trajectory = results.trajectory.as_ref().expect("Expected explain trajectory."); + let result_projection = + docs_extension_v1::trajectory_stage_stats(trajectory, "result_projection") + .expect("Expected result_projection stage in trajectory."); + + assert!(result_projection.get("pre_authorization_candidates").is_some()); + assert!(result_projection.get("returned_items").is_some()); + assert!(result_projection.get("recency_tau_days").is_some()); + assert!(result_projection.get("tie_breaker_weight").is_some()); + assert_eq!(result_projection.get("recency_boost_applied"), Some(&Value::Bool(true))); +} diff --git a/packages/elf-service/tests/acceptance/graph_ingestion.rs b/packages/elf-service/tests/acceptance/graph_ingestion.rs index 700b8c53..ce48ba65 100644 --- a/packages/elf-service/tests/acceptance/graph_ingestion.rs +++ b/packages/elf-service/tests/acceptance/graph_ingestion.rs @@ -1,11 +1,12 @@ +mod single_predicate; + use std::{ collections::hash_map::DefaultHasher, hash::{Hash, Hasher}, sync::{Arc, atomic::AtomicUsize}, }; -use sqlx::{FromRow, PgPool}; -use time::OffsetDateTime; +use sqlx::PgPool; use uuid::Uuid; use crate::acceptance::{self, SpyExtractor, StubEmbedding, StubRerank}; @@ -14,8 +15,9 @@ use elf_domain::memory_policy::MemoryPolicyDecision; use elf_service::{ AddEventRequest, AddNoteInput, AddNoteRequest, BoxFuture, DeleteRequest, ElfService, EmbeddingProvider, EventMessage, GraphQueryEntityRef, GraphQueryPredicateRef, - GraphQueryRequest, NoteOp, Providers, RelationTemporalStatus, Result, StructuredFields, + GraphQueryRequest, NoteOp, Providers, Result, StructuredFields, }; +use elf_testkit::TestDatabase; const TEST_TENANT: &str = "t"; const TEST_PROJECT: &str = "p"; @@ -24,15 +26,6 @@ const GRAPH_REL_SUBJECT: &str = "alice"; const GRAPH_REL_PREDICATE: &str = "mentors"; const GRAPH_REL_OBJECT: &str = "Bob"; -#[derive(Debug, FromRow)] -struct GraphFactRow { - fact_id: Uuid, - predicate_id: Option, - object_value: Option, - valid_from: OffsetDateTime, - valid_to: Option, -} - struct HashEmbedding { vector_dim: u32, } @@ -154,21 +147,6 @@ fn duplicate_fact_attaches_multiple_evidence_request() -> AddNoteRequest { } } -fn works_at_graph_query_request(as_of: OffsetDateTime) -> GraphQueryRequest { - GraphQueryRequest { - tenant_id: TEST_TENANT.to_string(), - project_id: TEST_PROJECT.to_string(), - agent_id: "a".to_string(), - read_profile: "private_only".to_string(), - subject: GraphQueryEntityRef::Surface { surface: "Alice".to_string() }, - predicate: Some(GraphQueryPredicateRef::Surface { surface: "works at".to_string() }), - scopes: Some(vec![TEST_SCOPE.to_string()]), - as_of: Some(as_of), - limit: Some(10), - explain: Some(true), - } -} - async fn graph_fact_id(pool: &PgPool) -> Uuid { sqlx::query_scalar( "\ @@ -236,35 +214,6 @@ async fn graph_fact_evidence_count_for_note(pool: &PgPool, fact_id: Uuid, note_i .expect("Failed to load note evidence.") } -async fn graph_fact_row(pool: &PgPool, predicate: &str, object_value: &str) -> GraphFactRow { - sqlx::query_as::<_, GraphFactRow>( - "\ -SELECT - gf.fact_id, - gf.predicate_id, - gf.object_value, - gf.valid_from, - gf.valid_to -FROM graph_facts gf -JOIN graph_entities ge ON ge.entity_id = gf.subject_entity_id -WHERE ge.canonical_norm = $1 - AND gf.predicate = $2 - AND gf.object_value = $3 - AND gf.tenant_id = $4 - AND gf.project_id = $5 - AND gf.scope = $6", - ) - .bind(GRAPH_REL_SUBJECT) - .bind(predicate) - .bind(object_value) - .bind(TEST_TENANT) - .bind(TEST_PROJECT) - .bind(TEST_SCOPE) - .fetch_one(pool) - .await - .expect("Failed to load fact row.") -} - async fn add_fact_note( service: &ElfService, key: &str, @@ -291,94 +240,46 @@ async fn add_fact_note( response.results[0].note_id.expect("Expected note_id.") } -async fn activate_single_predicate(pool: &PgPool, predicate_id: Uuid) { - sqlx::query( - "\ -UPDATE graph_predicates -SET status = 'active', cardinality = 'single', updated_at = now() -WHERE predicate_id = $1", - ) - .bind(predicate_id) - .execute(pool) - .await - .expect("Failed to activate predicate."); -} +async fn build_test_db(test_name: &str) -> Option { + let Some(test_db) = acceptance::test_db().await else { + eprintln!("Skipping {test_name}; set ELF_PG_DSN to run."); -async fn active_object_value_at( - pool: &PgPool, - predicate_id: Uuid, - at: OffsetDateTime, -) -> Option { - sqlx::query_scalar( - "\ -SELECT gf.object_value -FROM graph_facts gf -JOIN graph_entities ge ON ge.entity_id = gf.subject_entity_id -WHERE ge.canonical_norm = $1 - AND gf.tenant_id = $2 - AND gf.project_id = $3 - AND gf.scope = $4 - AND gf.predicate_id = $5 - AND gf.valid_from <= $6 - AND (gf.valid_to IS NULL OR gf.valid_to > $6) -LIMIT 1", - ) - .bind(GRAPH_REL_SUBJECT) - .bind(TEST_TENANT) - .bind(TEST_PROJECT) - .bind(TEST_SCOPE) - .bind(predicate_id) - .bind(at) - .fetch_one(pool) - .await - .expect("Failed to load active fact object_value.") + return None; + }; + let Some(_) = acceptance::test_qdrant_url() else { + eprintln!("Skipping {test_name}; set ELF_QDRANT_URL to run."); + + return None; + }; + + Some(test_db) } -async fn active_fact_count_at(pool: &PgPool, predicate_id: Uuid, at: OffsetDateTime) -> i64 { - sqlx::query_scalar( - "\ -SELECT COUNT(*) -FROM graph_facts gf -JOIN graph_entities ge ON ge.entity_id = gf.subject_entity_id -WHERE ge.canonical_norm = $1 - AND gf.tenant_id = $2 - AND gf.project_id = $3 - AND gf.scope = $4 - AND gf.predicate_id = $5 - AND gf.valid_from <= $6 - AND (gf.valid_to IS NULL OR gf.valid_to > $6)", - ) - .bind(GRAPH_REL_SUBJECT) - .bind(TEST_TENANT) - .bind(TEST_PROJECT) - .bind(TEST_SCOPE) - .bind(predicate_id) - .bind(at) - .fetch_one(pool) - .await - .expect("Failed to count active facts.") +async fn build_stub_service(test_db: &TestDatabase) -> ElfService { + let qdrant_url = acceptance::test_qdrant_url().expect("Expected Qdrant test URL."); + let providers = Providers::new( + Arc::new(StubEmbedding { vector_dim: 4_096 }), + Arc::new(StubRerank), + Arc::new(SpyExtractor { + calls: Arc::new(AtomicUsize::new(0)), + payload: serde_json::json!({ "notes": [] }), + }), + ); + let collection = test_db.collection_name("elf_acceptance"); + let docs_collection = test_db.collection_name("elf_acceptance_docs"); + let cfg = acceptance::test_config( + test_db.dsn().to_string(), + qdrant_url, + 4_096, + collection, + docs_collection, + ); + + acceptance::build_service(cfg, providers).await.expect("Failed to build service.") } -async fn supersession_count( - pool: &PgPool, - from_fact_id: Uuid, - to_fact_id: Uuid, - note_id: Uuid, -) -> i64 { - sqlx::query_scalar( - "\ -SELECT COUNT(*) -FROM graph_fact_supersessions -WHERE from_fact_id = $1 - AND to_fact_id = $2 - AND note_id = $3", - ) - .bind(from_fact_id) - .bind(to_fact_id) - .bind(note_id) - .fetch_one(pool) - .await - .expect("Failed to count supersessions.") +async fn reset_service_db(service: &ElfService) { + acceptance::reset_db(&service.db.pool).await.expect("Failed to reset test database."); } #[tokio::test] @@ -455,112 +356,6 @@ async fn add_note_duplicate_fact_attaches_multiple_evidence() { test_db.cleanup().await.expect("Failed to cleanup test database."); } -#[tokio::test] -#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL to run."] -async fn add_note_single_predicate_supersedes_conflicting_fact() { - let Some(test_db) = acceptance::test_db().await else { - eprintln!( - "Skipping add_note_single_predicate_supersedes_conflicting_fact; set ELF_PG_DSN to run.", - ); - - return; - }; - let Some(qdrant_url) = acceptance::test_qdrant_url() else { - eprintln!( - "Skipping add_note_single_predicate_supersedes_conflicting_fact; set ELF_QDRANT_URL to run.", - ); - - return; - }; - let providers = Providers::new( - Arc::new(StubEmbedding { vector_dim: 4_096 }), - Arc::new(StubRerank), - Arc::new(SpyExtractor { - calls: Arc::new(AtomicUsize::new(0)), - payload: serde_json::json!({ "notes": [] }), - }), - ); - let collection = test_db.collection_name("elf_acceptance"); - let docs_collection = test_db.collection_name("elf_acceptance_docs"); - let cfg = acceptance::test_config( - test_db.dsn().to_string(), - qdrant_url, - 4_096, - collection, - docs_collection, - ); - let service = - acceptance::build_service(cfg, providers).await.expect("Failed to build service."); - - acceptance::reset_db(&service.db.pool).await.expect("Failed to reset test database."); - - let old_note_id = - add_fact_note(&service, "employment-a", "Alice works at Initech.", "works at", "Initech") - .await; - let fact_a = graph_fact_row(&service.db.pool, "works at", "Initech").await; - let predicate_id = fact_a.predicate_id.expect("Expected predicate_id."); - - activate_single_predicate(&service.db.pool, predicate_id).await; - - tokio::time::sleep(std::time::Duration::from_millis(1)).await; - - let note_id = - add_fact_note(&service, "employment-b", "Alice works at Globex.", "works at", "Globex") - .await; - let fact_a = graph_fact_row(&service.db.pool, "works at", "Initech").await; - let fact_b = graph_fact_row(&service.db.pool, "works at", "Globex").await; - - assert_eq!(fact_a.predicate_id, Some(predicate_id)); - assert_eq!(fact_b.predicate_id, Some(predicate_id)); - assert_eq!(fact_a.object_value.as_deref(), Some("Initech")); - assert_eq!(fact_b.object_value.as_deref(), Some("Globex")); - assert_eq!(fact_a.valid_to, Some(fact_b.valid_from)); - assert!(fact_b.valid_to.is_none()); - - let t_before = fact_b.valid_from - time::Duration::microseconds(1); - let active_before = active_object_value_at(&service.db.pool, predicate_id, t_before).await; - - assert_eq!(active_before.as_deref(), Some("Initech")); - - let t_after = fact_b.valid_from + time::Duration::microseconds(1); - let active_after = active_object_value_at(&service.db.pool, predicate_id, t_after).await; - - assert_eq!(active_after.as_deref(), Some("Globex")); - - let historical_replay = service - .graph_query(works_at_graph_query_request(t_before)) - .await - .expect("historical graph query failed."); - - assert_eq!(historical_replay.facts.len(), 1); - assert_eq!(historical_replay.facts[0].object.value.as_deref(), Some("Initech")); - assert_eq!(historical_replay.facts[0].valid_to, Some(fact_b.valid_from)); - assert_eq!(historical_replay.facts[0].temporal_status, RelationTemporalStatus::Historical); - assert_eq!(historical_replay.facts[0].evidence_note_ids, vec![old_note_id]); - - let current_readback = service - .graph_query(works_at_graph_query_request(t_after)) - .await - .expect("current graph query failed."); - - assert_eq!(current_readback.facts.len(), 1); - assert_eq!(current_readback.facts[0].object.value.as_deref(), Some("Globex")); - assert_eq!(current_readback.facts[0].temporal_status, RelationTemporalStatus::Current); - assert_eq!(current_readback.facts[0].evidence_note_ids, vec![note_id]); - - let supersession_count = - supersession_count(&service.db.pool, fact_a.fact_id, fact_b.fact_id, note_id).await; - - assert_eq!(supersession_count, 1); - - let now = OffsetDateTime::now_utc(); - let active_count = active_fact_count_at(&service.db.pool, predicate_id, now).await; - - assert_eq!(active_count, 1); - - test_db.cleanup().await.expect("Failed to cleanup test database."); -} - #[tokio::test] #[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL to run."] async fn add_note_invalid_relation_rejected_has_field_path() { diff --git a/packages/elf-service/tests/acceptance/graph_ingestion/single_predicate.rs b/packages/elf-service/tests/acceptance/graph_ingestion/single_predicate.rs new file mode 100644 index 00000000..f798e7f9 --- /dev/null +++ b/packages/elf-service/tests/acceptance/graph_ingestion/single_predicate.rs @@ -0,0 +1,243 @@ +use sqlx::{FromRow, PgPool}; +use time::OffsetDateTime; +use uuid::Uuid; + +use crate::acceptance::graph_ingestion::{ + self, GRAPH_REL_SUBJECT, TEST_PROJECT, TEST_SCOPE, TEST_TENANT, +}; +use elf_service::{ + GraphQueryEntityRef, GraphQueryPredicateRef, GraphQueryRequest, RelationTemporalStatus, +}; + +#[derive(Debug, FromRow)] +struct GraphFactRow { + fact_id: Uuid, + predicate_id: Option, + object_value: Option, + valid_from: OffsetDateTime, + valid_to: Option, +} + +fn works_at_graph_query_request(as_of: OffsetDateTime) -> GraphQueryRequest { + GraphQueryRequest { + tenant_id: TEST_TENANT.to_string(), + project_id: TEST_PROJECT.to_string(), + agent_id: "a".to_string(), + read_profile: "private_only".to_string(), + subject: GraphQueryEntityRef::Surface { surface: "Alice".to_string() }, + predicate: Some(GraphQueryPredicateRef::Surface { surface: "works at".to_string() }), + scopes: Some(vec![TEST_SCOPE.to_string()]), + as_of: Some(as_of), + limit: Some(10), + explain: Some(true), + } +} + +async fn graph_fact_row(pool: &PgPool, predicate: &str, object_value: &str) -> GraphFactRow { + sqlx::query_as::<_, GraphFactRow>( + "\ +SELECT + gf.fact_id, + gf.predicate_id, + gf.object_value, + gf.valid_from, + gf.valid_to +FROM graph_facts gf +JOIN graph_entities ge ON ge.entity_id = gf.subject_entity_id +WHERE ge.canonical_norm = $1 + AND gf.predicate = $2 + AND gf.object_value = $3 + AND gf.tenant_id = $4 + AND gf.project_id = $5 + AND gf.scope = $6", + ) + .bind(GRAPH_REL_SUBJECT) + .bind(predicate) + .bind(object_value) + .bind(TEST_TENANT) + .bind(TEST_PROJECT) + .bind(TEST_SCOPE) + .fetch_one(pool) + .await + .expect("Failed to load fact row.") +} + +async fn activate_single_predicate(pool: &PgPool, predicate_id: Uuid) { + sqlx::query( + "\ +UPDATE graph_predicates +SET status = 'active', cardinality = 'single', updated_at = now() +WHERE predicate_id = $1", + ) + .bind(predicate_id) + .execute(pool) + .await + .expect("Failed to activate predicate."); +} + +async fn active_object_value_at( + pool: &PgPool, + predicate_id: Uuid, + at: OffsetDateTime, +) -> Option { + sqlx::query_scalar( + "\ +SELECT gf.object_value +FROM graph_facts gf +JOIN graph_entities ge ON ge.entity_id = gf.subject_entity_id +WHERE ge.canonical_norm = $1 + AND gf.tenant_id = $2 + AND gf.project_id = $3 + AND gf.scope = $4 + AND gf.predicate_id = $5 + AND gf.valid_from <= $6 + AND (gf.valid_to IS NULL OR gf.valid_to > $6) +LIMIT 1", + ) + .bind(GRAPH_REL_SUBJECT) + .bind(TEST_TENANT) + .bind(TEST_PROJECT) + .bind(TEST_SCOPE) + .bind(predicate_id) + .bind(at) + .fetch_one(pool) + .await + .expect("Failed to load active fact object_value.") +} + +async fn active_fact_count_at(pool: &PgPool, predicate_id: Uuid, at: OffsetDateTime) -> i64 { + sqlx::query_scalar( + "\ +SELECT COUNT(*) +FROM graph_facts gf +JOIN graph_entities ge ON ge.entity_id = gf.subject_entity_id +WHERE ge.canonical_norm = $1 + AND gf.tenant_id = $2 + AND gf.project_id = $3 + AND gf.scope = $4 + AND gf.predicate_id = $5 + AND gf.valid_from <= $6 + AND (gf.valid_to IS NULL OR gf.valid_to > $6)", + ) + .bind(GRAPH_REL_SUBJECT) + .bind(TEST_TENANT) + .bind(TEST_PROJECT) + .bind(TEST_SCOPE) + .bind(predicate_id) + .bind(at) + .fetch_one(pool) + .await + .expect("Failed to count active facts.") +} + +async fn supersession_count( + pool: &PgPool, + from_fact_id: Uuid, + to_fact_id: Uuid, + note_id: Uuid, +) -> i64 { + sqlx::query_scalar( + "\ +SELECT COUNT(*) +FROM graph_fact_supersessions +WHERE from_fact_id = $1 + AND to_fact_id = $2 + AND note_id = $3", + ) + .bind(from_fact_id) + .bind(to_fact_id) + .bind(note_id) + .fetch_one(pool) + .await + .expect("Failed to count supersessions.") +} + +#[tokio::test] +#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL to run."] +async fn add_note_single_predicate_supersedes_conflicting_fact() { + let Some(test_db) = + graph_ingestion::build_test_db("add_note_single_predicate_supersedes_conflicting_fact") + .await + else { + return; + }; + let service = graph_ingestion::build_stub_service(&test_db).await; + + graph_ingestion::reset_service_db(&service).await; + + let old_note_id = graph_ingestion::add_fact_note( + &service, + "employment-a", + "Alice works at Initech.", + "works at", + "Initech", + ) + .await; + let fact_a = graph_fact_row(&service.db.pool, "works at", "Initech").await; + let predicate_id = fact_a.predicate_id.expect("Expected predicate_id."); + + activate_single_predicate(&service.db.pool, predicate_id).await; + + tokio::time::sleep(std::time::Duration::from_millis(1)).await; + + let note_id = graph_ingestion::add_fact_note( + &service, + "employment-b", + "Alice works at Globex.", + "works at", + "Globex", + ) + .await; + let fact_a = graph_fact_row(&service.db.pool, "works at", "Initech").await; + let fact_b = graph_fact_row(&service.db.pool, "works at", "Globex").await; + + assert_eq!(fact_a.predicate_id, Some(predicate_id)); + assert_eq!(fact_b.predicate_id, Some(predicate_id)); + assert_eq!(fact_a.object_value.as_deref(), Some("Initech")); + assert_eq!(fact_b.object_value.as_deref(), Some("Globex")); + assert_eq!(fact_a.valid_to, Some(fact_b.valid_from)); + assert!(fact_b.valid_to.is_none()); + + let t_before = fact_b.valid_from - time::Duration::microseconds(1); + let active_before = active_object_value_at(&service.db.pool, predicate_id, t_before).await; + + assert_eq!(active_before.as_deref(), Some("Initech")); + + let t_after = fact_b.valid_from + time::Duration::microseconds(1); + let active_after = active_object_value_at(&service.db.pool, predicate_id, t_after).await; + + assert_eq!(active_after.as_deref(), Some("Globex")); + + let historical_replay = service + .graph_query(works_at_graph_query_request(t_before)) + .await + .expect("historical graph query failed."); + + assert_eq!(historical_replay.facts.len(), 1); + assert_eq!(historical_replay.facts[0].object.value.as_deref(), Some("Initech")); + assert_eq!(historical_replay.facts[0].valid_to, Some(fact_b.valid_from)); + assert_eq!(historical_replay.facts[0].temporal_status, RelationTemporalStatus::Historical); + assert_eq!(historical_replay.facts[0].evidence_note_ids, vec![old_note_id]); + + let current_readback = service + .graph_query(works_at_graph_query_request(t_after)) + .await + .expect("current graph query failed."); + + assert_eq!(current_readback.facts.len(), 1); + assert_eq!(current_readback.facts[0].object.value.as_deref(), Some("Globex")); + assert_eq!(current_readback.facts[0].temporal_status, RelationTemporalStatus::Current); + assert_eq!(current_readback.facts[0].evidence_note_ids, vec![note_id]); + + let supersession_count = + supersession_count(&service.db.pool, fact_a.fact_id, fact_b.fact_id, note_id).await; + + assert_eq!(supersession_count, 1); + + let now = OffsetDateTime::now_utc(); + let active_count = active_fact_count_at(&service.db.pool, predicate_id, now).await; + + assert_eq!(active_count, 1); + + test_db.cleanup().await.expect("Failed to cleanup test database."); +}