From 98c077fbaa43c459d9e488f4c6372b71b871c464 Mon Sep 17 00:00:00 2001 From: Yvette Carlisle Date: Tue, 30 Jun 2026 22:52:43 -0400 Subject: [PATCH] {"schema":"decodex/commit/1","summary":"Split docs extension acceptance modules","authority":"manual"} --- .../acceptance/docs_extension_v1/helpers.rs | 684 +----------------- .../docs_extension_v1/helpers/assertions.rs | 111 +++ .../docs_extension_v1/helpers/context.rs | 110 +++ .../docs_extension_v1/helpers/filters.rs | 152 ++++ .../docs_extension_v1/helpers/outbox.rs | 105 +++ .../docs_extension_v1/helpers/qdrant.rs | 106 +++ .../docs_extension_v1/helpers/worker.rs | 97 +++ .../acceptance/docs_extension_v1/lifecycle.rs | 531 +------------- .../docs_extension_v1/lifecycle/delete.rs | 113 +++ .../docs_extension_v1/lifecycle/end_to_end.rs | 38 + .../docs_extension_v1/lifecycle/filters.rs | 209 ++++++ .../lifecycle/source_library.rs | 79 ++ .../lifecycle/write_policy.rs | 97 +++ .../docs_extension_v1/search_filters.rs | 483 +------------ .../search_filters/domain_repo.rs | 193 +++++ .../search_filters/recency.rs | 166 +++++ .../search_filters/sparse.rs | 83 +++ .../search_filters/timestamp.rs | 52 ++ 18 files changed, 1741 insertions(+), 1668 deletions(-) create mode 100644 packages/elf-service/tests/acceptance/docs_extension_v1/helpers/assertions.rs create mode 100644 packages/elf-service/tests/acceptance/docs_extension_v1/helpers/context.rs create mode 100644 packages/elf-service/tests/acceptance/docs_extension_v1/helpers/filters.rs create mode 100644 packages/elf-service/tests/acceptance/docs_extension_v1/helpers/outbox.rs create mode 100644 packages/elf-service/tests/acceptance/docs_extension_v1/helpers/qdrant.rs create mode 100644 packages/elf-service/tests/acceptance/docs_extension_v1/helpers/worker.rs create mode 100644 packages/elf-service/tests/acceptance/docs_extension_v1/lifecycle/delete.rs create mode 100644 packages/elf-service/tests/acceptance/docs_extension_v1/lifecycle/end_to_end.rs create mode 100644 packages/elf-service/tests/acceptance/docs_extension_v1/lifecycle/filters.rs create mode 100644 packages/elf-service/tests/acceptance/docs_extension_v1/lifecycle/source_library.rs create mode 100644 packages/elf-service/tests/acceptance/docs_extension_v1/lifecycle/write_policy.rs create mode 100644 packages/elf-service/tests/acceptance/docs_extension_v1/search_filters/domain_repo.rs create mode 100644 packages/elf-service/tests/acceptance/docs_extension_v1/search_filters/recency.rs create mode 100644 packages/elf-service/tests/acceptance/docs_extension_v1/search_filters/sparse.rs create mode 100644 packages/elf-service/tests/acceptance/docs_extension_v1/search_filters/timestamp.rs diff --git a/packages/elf-service/tests/acceptance/docs_extension_v1/helpers.rs b/packages/elf-service/tests/acceptance/docs_extension_v1/helpers.rs index 7d7c52fe..555bd2ad 100644 --- a/packages/elf-service/tests/acceptance/docs_extension_v1/helpers.rs +++ b/packages/elf-service/tests/acceptance/docs_extension_v1/helpers.rs @@ -1,664 +1,22 @@ -use std::{ - collections::HashSet, - future::IntoFuture, - string::ToString, - sync::Arc, - time::{Duration, Instant}, +mod assertions; +mod context; +mod filters; +mod outbox; +mod qdrant; +mod worker; + +pub(crate) use self::{ + assertions::{ + assert_doc_excerpt, assert_doc_get, assert_docs_search_l0, payload_string, + trajectory_stage_stats, + }, + context::{DocsContext, TEST_CONTENT, put_test_doc, put_test_doc_with, setup_docs_context}, + filters::{ + cleanup_docs_filter_fixture, create_docs_search_filter_fixture, search_doc_ids_with_filters, + }, + outbox::{wait_for_doc_outbox_done, wait_for_note_outbox_done}, + qdrant::{ + fetch_first_doc_chunk_id, fetch_first_doc_chunk_point, verify_docs_qdrant_filter_indexes, + }, + worker::spawn_doc_worker, }; - -use ahash::AHashMap; -use axum::{Json, Router, extract::State, http::StatusCode, response::IntoResponse, routing}; -use qdrant_client::qdrant::{ - CreateFieldIndexCollection, FieldType, GetPointsBuilder, PayloadSchemaType, RetrievedPoint, - value, -}; -use serde_json::Map; -use sqlx::{FromRow, PgPool}; -use tokenizers::{Tokenizer, models::wordlevel::WordLevel}; -use tokio::{ - net::TcpListener, - sync::{oneshot, oneshot::Sender}, - task::JoinHandle, - time, -}; -use uuid::Uuid; - -use crate::acceptance::{self, SpyExtractor, StubEmbedding, StubRerank, chunking::ChunkingConfig}; -use elf_config::EmbeddingProviderConfig; -use elf_service::{ - DocsExcerptsGetRequest, DocsGetRequest, DocsPutRequest, DocsPutResponse, DocsSearchL0Request, - ElfService, Providers, TextQuoteSelector, docs::DocRetrievalTrajectory, -}; -use elf_storage::{db::Db, qdrant::QdrantStore}; -use elf_testkit::TestDatabase; -use elf_worker::worker::{self, WorkerState}; - -pub(crate) const TEST_CONTENT: &str = - "ELF docs extension v1 stores evidence. Keyword: peregrine.\nSecond sentence for chunking."; -pub(crate) const DOCS_SEARCH_FILTER_INDEXES: [(&str, PayloadSchemaType, FieldType); 9] = [ - ("scope", PayloadSchemaType::Keyword, FieldType::Keyword), - ("status", PayloadSchemaType::Keyword, FieldType::Keyword), - ("doc_type", PayloadSchemaType::Keyword, FieldType::Keyword), - ("agent_id", PayloadSchemaType::Keyword, FieldType::Keyword), - ("updated_at", PayloadSchemaType::Datetime, FieldType::Datetime), - ("doc_ts", PayloadSchemaType::Datetime, FieldType::Datetime), - ("thread_id", PayloadSchemaType::Keyword, FieldType::Keyword), - ("domain", PayloadSchemaType::Keyword, FieldType::Keyword), - ("repo", PayloadSchemaType::Keyword, FieldType::Keyword), -]; - -#[derive(FromRow)] -pub(crate) struct DocOutboxCounts { - total: i64, - done: i64, - failed: i64, -} - -#[derive(FromRow)] -pub(crate) struct NoteOutboxCounts { - total: i64, - done: i64, - failed: i64, -} - -pub(crate) struct DocsContext { - pub(crate) test_db: TestDatabase, - pub(crate) service: ElfService, -} - -pub(crate) fn build_test_tokenizer() -> Tokenizer { - let mut vocab = AHashMap::new(); - - vocab.insert("".to_string(), 0_u32); - - let model = WordLevel::builder() - .vocab(vocab) - .unk_token("".to_string()) - .build() - .expect("Failed to build test tokenizer."); - - Tokenizer::new(model) -} - -pub(crate) fn payload_string(payload_value: &qdrant_client::qdrant::Value) -> Option<&str> { - match payload_value.kind.as_ref() { - Some(value::Kind::StringValue(value)) => Some(value.as_str()), - _ => None, - } -} - -pub(crate) fn trajectory_stage_stats<'a>( - trajectory: &'a DocRetrievalTrajectory, - stage_name: &str, -) -> Option<&'a serde_json::Value> { - trajectory.stages.iter().find(|stage| stage.stage_name == stage_name).map(|stage| &stage.stats) -} - -pub(crate) async fn wait_for_doc_outbox_done( - pool: &PgPool, - doc_id: Uuid, - timeout: Duration, -) -> bool { - let deadline = Instant::now() + timeout; - - loop { - let row: Option = sqlx::query_as::<_, DocOutboxCounts>( - "\ -SELECT - COUNT(*) AS total, - COUNT(*) FILTER (WHERE status = 'DONE') AS done, - COUNT(*) FILTER (WHERE status = 'FAILED') AS failed -FROM doc_indexing_outbox -WHERE doc_id = $1", - ) - .bind(doc_id) - .fetch_optional(pool) - .await - .ok() - .flatten(); - - if let Some(row) = row.as_ref() - && row.total > 0 - && row.done == row.total - { - return true; - } - if let Some(row) = row.as_ref() - && row.failed > 0 - { - return false; - } - - if Instant::now() >= deadline { - return false; - } - - time::sleep(Duration::from_millis(200)).await; - } -} - -pub(crate) async fn wait_for_note_outbox_done( - pool: &PgPool, - note_id: Uuid, - timeout: Duration, -) -> bool { - let deadline = Instant::now() + timeout; - - loop { - let row: Option = sqlx::query_as::<_, NoteOutboxCounts>( - "\ -SELECT - COUNT(*) AS total, - COUNT(*) FILTER (WHERE status = 'DONE') AS done, - COUNT(*) FILTER (WHERE status = 'FAILED') AS failed -FROM indexing_outbox -WHERE note_id = $1", - ) - .bind(note_id) - .fetch_optional(pool) - .await - .ok() - .flatten(); - - if let Some(row) = row.as_ref() - && row.total > 0 - && row.done == row.total - { - return true; - } - if let Some(row) = row.as_ref() - && row.failed > 0 - { - return false; - } - - if Instant::now() >= deadline { - return false; - } - - time::sleep(Duration::from_millis(200)).await; - } -} - -pub(crate) async fn start_embed_server() -> (String, Sender<()>) { - let app = Router::new().route("/embeddings", routing::post(embed_handler)).with_state(()); - let listener = TcpListener::bind("127.0.0.1:0").await.expect("Failed to bind embed server."); - let addr = listener.local_addr().expect("Failed to read embed server address."); - let (tx, rx) = oneshot::channel(); - let server = axum::serve(listener, app).with_graceful_shutdown(async move { - let _ = rx.await; - }); - - tokio::spawn(async move { - let _ = server.into_future().await; - }); - - (format!("http://{addr}"), tx) -} - -pub(crate) async fn embed_handler( - State(()): State<()>, - Json(payload): Json, -) -> impl IntoResponse { - let inputs = - payload.get("input").and_then(|value| value.as_array()).cloned().unwrap_or_default(); - let data: Vec<_> = inputs - .iter() - .enumerate() - .map(|(index, _)| { - let embedding: Vec = vec![0.1_f32; 4_096]; - - serde_json::json!({ - "index": index, - "embedding": embedding, - }) - }) - .collect(); - - (StatusCode::OK, Json(serde_json::json!({ "data": data }))).into_response() -} - -pub(crate) async fn create_docs_search_filter_fixture( - ctx: DocsContext, -) -> (TestDatabase, ElfService, Uuid, Uuid, Uuid, JoinHandle<()>, Sender<()>) { - let DocsContext { test_db, service } = ctx; - let shared_knowledge_doc = put_test_doc_with( - &service, - "owner", - "project_shared", - None, - "Docs filter sample", - serde_json::json!({ - "schema": "doc_source_ref/v1", - "doc_type": "knowledge", - "ts": "2026-02-25T12:00:00Z", - }), - TEST_CONTENT, - ) - .await; - let older_shared_knowledge_doc = put_test_doc_with( - &service, - "owner", - "project_shared", - None, - "Docs old filter sample", - serde_json::json!({ - "schema": "doc_source_ref/v1", - "doc_type": "knowledge", - "ts": "2025-01-01T10:00:00Z", - }), - TEST_CONTENT, - ) - .await; - let private_chat_doc = put_test_doc_with( - &service, - "assistant", - "agent_private", - Some("chat"), - "Docs chat sample", - serde_json::json!({ - "schema": "doc_source_ref/v1", - "doc_type": "chat", - "ts": "2026-02-25T12:00:00Z", - "thread_id": "shared-chat-thread", - "role": "assistant" - }), - TEST_CONTENT, - ) - .await; - let (handle, shutdown) = spawn_doc_worker(&service).await; - - assert!( - wait_for_doc_outbox_done( - &service.db.pool, - shared_knowledge_doc.doc_id, - std::time::Duration::from_secs(15) - ) - .await, - "Expected shared docs outbox to reach DONE." - ); - assert!( - wait_for_doc_outbox_done( - &service.db.pool, - older_shared_knowledge_doc.doc_id, - std::time::Duration::from_secs(15) - ) - .await, - "Expected older shared docs outbox to reach DONE." - ); - assert!( - wait_for_doc_outbox_done( - &service.db.pool, - private_chat_doc.doc_id, - std::time::Duration::from_secs(15) - ) - .await, - "Expected private docs outbox to reach DONE." - ); - - ( - test_db, - service, - shared_knowledge_doc.doc_id, - older_shared_knowledge_doc.doc_id, - private_chat_doc.doc_id, - handle, - shutdown, - ) -} - -pub(crate) async fn cleanup_docs_filter_fixture( - test_db: TestDatabase, - _handle: JoinHandle<()>, - shutdown: Sender<()>, -) { - let _ = shutdown.send(()); - - _handle.abort(); - - let _ = _handle.await; - - test_db.cleanup().await.expect("Failed to cleanup test database."); -} - -pub(crate) async fn setup_docs_context() -> Option { - let Some(test_db) = acceptance::test_db().await else { - eprintln!("Skipping docs_extension_v1; set ELF_PG_DSN to run this test."); - - return None; - }; - let Some(qdrant_url) = acceptance::test_qdrant_url() else { - eprintln!( - "Skipping docs_extension_v1; set ELF_QDRANT_URL (or ELF_QDRANT_GRPC_URL) to run this test." - ); - - return None; - }; - let collection = test_db.collection_name("elf_acceptance"); - let docs_collection = test_db.collection_name("elf_acceptance_docs"); - let cfg = acceptance::test_config( - test_db.dsn().to_string(), - qdrant_url, - 4_096, - collection, - docs_collection, - ); - let providers = Providers::new( - Arc::new(StubEmbedding { vector_dim: 4_096 }), - Arc::new(StubRerank), - Arc::new(SpyExtractor { - calls: Arc::new(Default::default()), - payload: serde_json::json!({ "notes": [] }), - }), - ); - let service = - acceptance::build_service(cfg, providers).await.expect("Failed to build service."); - - acceptance::reset_db(&service.db.pool).await.expect("Failed to reset test database."); - acceptance::reset_qdrant_collection( - &service.qdrant.client, - &service.qdrant.collection, - service.qdrant.vector_dim, - ) - .await - .expect("Failed to reset Qdrant memory collection."); - acceptance::reset_qdrant_collection( - &service.qdrant.client, - &service.cfg.storage.qdrant.docs_collection, - service.qdrant.vector_dim, - ) - .await - .expect("Failed to reset Qdrant docs collection."); - - Some(DocsContext { test_db, service }) -} - -pub(crate) async fn fetch_first_doc_chunk_id(db: &ElfService, doc_id: Uuid) -> Option { - sqlx::query_scalar::<_, Uuid>( - "SELECT chunk_id FROM doc_chunks WHERE doc_id = $1 ORDER BY chunk_index LIMIT 1", - ) - .bind(doc_id) - .fetch_optional(&db.db.pool) - .await - .expect("Failed to fetch doc chunk id.") -} - -pub(crate) async fn fetch_first_doc_chunk_point( - service: &ElfService, - doc_id: Uuid, -) -> Option { - let chunk_id = fetch_first_doc_chunk_id(service, doc_id).await?; - let response = service - .qdrant - .client - .get_points( - GetPointsBuilder::new( - service.cfg.storage.qdrant.docs_collection.clone(), - vec![chunk_id.to_string().into()], - ) - .with_payload(true), - ) - .await - .expect("Failed to fetch doc chunk point from Qdrant."); - - response.result.into_iter().next() -} - -pub(crate) async fn put_test_doc(service: &ElfService) -> DocsPutResponse { - put_test_doc_with( - service, - "owner", - "project_shared", - None, - "Docs v1", - serde_json::json!({ - "schema": "doc_source_ref/v1", - "doc_type": "knowledge", - "ts": "2026-02-25T12:00:00Z", - "uri": "acceptance://knowledge/v1" - }), - TEST_CONTENT, - ) - .await -} - -pub(crate) async fn put_test_doc_with( - service: &ElfService, - agent_id: &str, - scope: &str, - doc_type: Option<&str>, - title: &str, - source_ref: serde_json::Value, - content: &str, -) -> DocsPutResponse { - service - .docs_put(DocsPutRequest { - tenant_id: "t".to_string(), - project_id: "p".to_string(), - agent_id: agent_id.to_string(), - scope: scope.to_string(), - doc_type: doc_type.map(ToString::to_string), - title: Some(title.to_string()), - write_policy: None, - source_ref, - content: content.to_string(), - }) - .await - .expect("Failed to put doc.") -} - -pub(crate) async fn search_doc_ids_with_filters( - service: &ElfService, - scope: Option<&str>, - doc_type: Option<&str>, - agent_id: Option<&str>, - updated_after: Option<&str>, - updated_before: Option<&str>, - caller_agent_id: &str, -) -> HashSet { - let results = service - .docs_search_l0(DocsSearchL0Request { - tenant_id: "t".to_string(), - project_id: "p".to_string(), - caller_agent_id: caller_agent_id.to_string(), - scope: scope.map(str::to_string), - status: None, - doc_type: doc_type.map(str::to_string), - sparse_mode: None, - domain: None, - repo: None, - agent_id: agent_id.map(str::to_string), - thread_id: None, - updated_after: updated_after.map(str::to_string), - updated_before: updated_before.map(str::to_string), - ts_gte: None, - ts_lte: None, - read_profile: "all_scopes".to_string(), - query: "peregrine".to_string(), - top_k: Some(20), - candidate_k: Some(50), - explain: None, - }) - .await - .expect("Failed to search docs."); - - results.items.into_iter().map(|item| item.doc_id).collect() -} - -pub(crate) async fn verify_docs_qdrant_filter_indexes(service: &ElfService) { - let mut payload_schema = service - .qdrant - .client - .collection_info(&service.cfg.storage.qdrant.docs_collection) - .await - .expect("Failed to fetch Qdrant docs collection info.") - .result - .expect("Qdrant collection info is missing.") - .payload_schema; - - for (field_name, payload_type, index_type) in DOCS_SEARCH_FILTER_INDEXES { - let missing_or_wrong = match payload_schema.get(field_name) { - Some(schema) => schema.data_type != payload_type as i32, - None => true, - }; - - if missing_or_wrong { - let request = CreateFieldIndexCollection { - collection_name: service.cfg.storage.qdrant.docs_collection.clone(), - wait: Some(true), - field_name: field_name.to_string(), - field_type: Some(index_type as i32), - field_index_params: None, - ordering: None, - timeout: None, - }; - - service - .qdrant - .client - .create_field_index(request) - .await - .expect("Failed to create required Qdrant payload index."); - } - } - - payload_schema = service - .qdrant - .client - .collection_info(&service.cfg.storage.qdrant.docs_collection) - .await - .expect("Failed to fetch Qdrant docs collection info.") - .result - .expect("Qdrant collection info is missing.") - .payload_schema; - - for (field_name, payload_type, _) in DOCS_SEARCH_FILTER_INDEXES { - let schema = payload_schema.get(field_name).expect("Expected required payload field."); - - assert_eq!( - schema.data_type, payload_type as i32, - "Unexpected payload type for {field_name}." - ); - } -} - -pub(crate) async fn assert_doc_get(service: &ElfService, doc_id: Uuid) { - let get_as_owner = service - .docs_get(DocsGetRequest { - tenant_id: "t".to_string(), - project_id: "p".to_string(), - agent_id: "owner".to_string(), - read_profile: "private_plus_project".to_string(), - doc_id, - }) - .await - .expect("Failed to get doc as owner."); - - assert_eq!(get_as_owner.scope, "project_shared"); - assert_eq!(get_as_owner.doc_type, "knowledge"); - assert_eq!(get_as_owner.agent_id, "owner"); - assert_eq!(get_as_owner.title.as_deref(), Some("Docs v1")); - - let get_as_reader = service - .docs_get(DocsGetRequest { - tenant_id: "t".to_string(), - project_id: "p".to_string(), - agent_id: "reader".to_string(), - read_profile: "private_plus_project".to_string(), - doc_id, - }) - .await - .expect("Failed to get doc as reader (expected project grant)."); - - assert_eq!(get_as_reader.doc_id, doc_id); -} - -pub(crate) async fn assert_doc_excerpt(service: &ElfService, doc_id: Uuid, content_hash: &str) { - let excerpts = service - .docs_excerpts_get(DocsExcerptsGetRequest { - tenant_id: "t".to_string(), - project_id: "p".to_string(), - agent_id: "reader".to_string(), - read_profile: "private_plus_project".to_string(), - doc_id, - level: "L1".to_string(), - chunk_id: None, - quote: Some(TextQuoteSelector { - exact: "Keyword: peregrine.".to_string(), - prefix: Some("evidence. ".to_string()), - suffix: Some("\nSecond".to_string()), - }), - position: None, - explain: None, - }) - .await - .expect("Failed to get excerpt."); - - assert!(excerpts.verification.verified); - assert!(excerpts.excerpt.contains("Keyword: peregrine.")); - assert_eq!(excerpts.verification.content_hash, content_hash); -} - -pub(crate) async fn spawn_doc_worker(service: &ElfService) -> (JoinHandle<()>, Sender<()>) { - let (api_base, shutdown) = start_embed_server().await; - let worker_state = WorkerState { - db: Db::connect(&service.cfg.storage.postgres).await.expect("Failed to connect worker DB."), - qdrant: QdrantStore::new(&service.cfg.storage.qdrant) - .expect("Failed to build Qdrant store."), - docs_qdrant: QdrantStore::new_with_collection( - &service.cfg.storage.qdrant, - &service.cfg.storage.qdrant.docs_collection, - ) - .expect("Failed to build docs Qdrant store."), - embedding: EmbeddingProviderConfig { - provider_id: "test".to_string(), - api_base, - api_key: "test-key".to_string(), - path: "/embeddings".to_string(), - model: "test".to_string(), - dimensions: 4_096, - timeout_ms: 1_000, - default_headers: Map::new(), - }, - chunking: ChunkingConfig { max_tokens: 64, overlap_tokens: 8 }, - tokenizer: build_test_tokenizer(), - }; - let handle = tokio::spawn(async move { - let _ = worker::run_worker(worker_state).await; - }); - - (handle, shutdown) -} - -pub(crate) async fn assert_docs_search_l0(service: &ElfService, doc_id: Uuid) { - let results = service - .docs_search_l0(DocsSearchL0Request { - tenant_id: "t".to_string(), - project_id: "p".to_string(), - caller_agent_id: "reader".to_string(), - scope: None, - status: None, - doc_type: None, - sparse_mode: None, - domain: None, - repo: None, - agent_id: None, - thread_id: None, - updated_after: None, - updated_before: None, - ts_gte: None, - ts_lte: None, - read_profile: "private_plus_project".to_string(), - query: "peregrine".to_string(), - top_k: Some(5), - candidate_k: Some(20), - explain: None, - }) - .await - .expect("Failed to search docs."); - - assert!(!results.items.is_empty()); - assert_eq!(results.items[0].doc_id, doc_id); - assert_eq!(results.items[0].doc_type, "knowledge"); - assert!(results.items[0].snippet.contains("peregrine")); -} diff --git a/packages/elf-service/tests/acceptance/docs_extension_v1/helpers/assertions.rs b/packages/elf-service/tests/acceptance/docs_extension_v1/helpers/assertions.rs new file mode 100644 index 00000000..2e2e3b90 --- /dev/null +++ b/packages/elf-service/tests/acceptance/docs_extension_v1/helpers/assertions.rs @@ -0,0 +1,111 @@ +use qdrant_client::qdrant::value; +use uuid::Uuid; + +use elf_service::{ + DocsExcerptsGetRequest, DocsGetRequest, DocsSearchL0Request, ElfService, TextQuoteSelector, + docs::DocRetrievalTrajectory, +}; + +pub(crate) fn payload_string(payload_value: &qdrant_client::qdrant::Value) -> Option<&str> { + match payload_value.kind.as_ref() { + Some(value::Kind::StringValue(value)) => Some(value.as_str()), + _ => None, + } +} + +pub(crate) fn trajectory_stage_stats<'a>( + trajectory: &'a DocRetrievalTrajectory, + stage_name: &str, +) -> Option<&'a serde_json::Value> { + trajectory.stages.iter().find(|stage| stage.stage_name == stage_name).map(|stage| &stage.stats) +} + +pub(crate) async fn assert_doc_get(service: &ElfService, doc_id: Uuid) { + let get_as_owner = service + .docs_get(DocsGetRequest { + tenant_id: "t".to_string(), + project_id: "p".to_string(), + agent_id: "owner".to_string(), + read_profile: "private_plus_project".to_string(), + doc_id, + }) + .await + .expect("Failed to get doc as owner."); + + assert_eq!(get_as_owner.scope, "project_shared"); + assert_eq!(get_as_owner.doc_type, "knowledge"); + assert_eq!(get_as_owner.agent_id, "owner"); + assert_eq!(get_as_owner.title.as_deref(), Some("Docs v1")); + + let get_as_reader = service + .docs_get(DocsGetRequest { + tenant_id: "t".to_string(), + project_id: "p".to_string(), + agent_id: "reader".to_string(), + read_profile: "private_plus_project".to_string(), + doc_id, + }) + .await + .expect("Failed to get doc as reader (expected project grant)."); + + assert_eq!(get_as_reader.doc_id, doc_id); +} + +pub(crate) async fn assert_doc_excerpt(service: &ElfService, doc_id: Uuid, content_hash: &str) { + let excerpts = service + .docs_excerpts_get(DocsExcerptsGetRequest { + tenant_id: "t".to_string(), + project_id: "p".to_string(), + agent_id: "reader".to_string(), + read_profile: "private_plus_project".to_string(), + doc_id, + level: "L1".to_string(), + chunk_id: None, + quote: Some(TextQuoteSelector { + exact: "Keyword: peregrine.".to_string(), + prefix: Some("evidence. ".to_string()), + suffix: Some("\nSecond".to_string()), + }), + position: None, + explain: None, + }) + .await + .expect("Failed to get excerpt."); + + assert!(excerpts.verification.verified); + assert!(excerpts.excerpt.contains("Keyword: peregrine.")); + assert_eq!(excerpts.verification.content_hash, content_hash); +} + +pub(crate) async fn assert_docs_search_l0(service: &ElfService, doc_id: Uuid) { + let results = service + .docs_search_l0(DocsSearchL0Request { + tenant_id: "t".to_string(), + project_id: "p".to_string(), + caller_agent_id: "reader".to_string(), + scope: None, + status: None, + doc_type: None, + sparse_mode: None, + domain: None, + repo: None, + agent_id: None, + thread_id: None, + updated_after: None, + updated_before: None, + ts_gte: None, + ts_lte: None, + read_profile: "private_plus_project".to_string(), + query: "peregrine".to_string(), + top_k: Some(5), + candidate_k: Some(20), + explain: None, + }) + .await + .expect("Failed to search docs."); + + assert!(!results.items.is_empty()); + assert_eq!(results.items[0].doc_id, doc_id); + assert_eq!(results.items[0].doc_type, "knowledge"); + assert!(results.items[0].snippet.contains("peregrine")); +} diff --git a/packages/elf-service/tests/acceptance/docs_extension_v1/helpers/context.rs b/packages/elf-service/tests/acceptance/docs_extension_v1/helpers/context.rs new file mode 100644 index 00000000..d0c3da50 --- /dev/null +++ b/packages/elf-service/tests/acceptance/docs_extension_v1/helpers/context.rs @@ -0,0 +1,110 @@ +use std::{string::ToString, sync::Arc}; + +use serde_json::Value; + +use crate::acceptance::{self, SpyExtractor, StubEmbedding, StubRerank}; +use elf_service::{DocsPutRequest, DocsPutResponse, ElfService, Providers}; +use elf_testkit::TestDatabase; + +pub(crate) const TEST_CONTENT: &str = + "ELF docs extension v1 stores evidence. Keyword: peregrine.\nSecond sentence for chunking."; + +pub(crate) struct DocsContext { + pub(crate) test_db: TestDatabase, + pub(crate) service: ElfService, +} + +pub(crate) async fn setup_docs_context() -> Option { + let Some(test_db) = acceptance::test_db().await else { + eprintln!("Skipping docs_extension_v1; set ELF_PG_DSN to run this test."); + + return None; + }; + let Some(qdrant_url) = acceptance::test_qdrant_url() else { + eprintln!( + "Skipping docs_extension_v1; set ELF_QDRANT_URL (or ELF_QDRANT_GRPC_URL) to run this test." + ); + + return None; + }; + let collection = test_db.collection_name("elf_acceptance"); + let docs_collection = test_db.collection_name("elf_acceptance_docs"); + let cfg = acceptance::test_config( + test_db.dsn().to_string(), + qdrant_url, + 4_096, + collection, + docs_collection, + ); + let providers = Providers::new( + Arc::new(StubEmbedding { vector_dim: 4_096 }), + Arc::new(StubRerank), + Arc::new(SpyExtractor { + calls: Arc::new(Default::default()), + payload: serde_json::json!({ "notes": [] }), + }), + ); + let service = + acceptance::build_service(cfg, providers).await.expect("Failed to build service."); + + acceptance::reset_db(&service.db.pool).await.expect("Failed to reset test database."); + acceptance::reset_qdrant_collection( + &service.qdrant.client, + &service.qdrant.collection, + service.qdrant.vector_dim, + ) + .await + .expect("Failed to reset Qdrant memory collection."); + acceptance::reset_qdrant_collection( + &service.qdrant.client, + &service.cfg.storage.qdrant.docs_collection, + service.qdrant.vector_dim, + ) + .await + .expect("Failed to reset Qdrant docs collection."); + + Some(DocsContext { test_db, service }) +} + +pub(crate) async fn put_test_doc(service: &ElfService) -> DocsPutResponse { + put_test_doc_with( + service, + "owner", + "project_shared", + None, + "Docs v1", + serde_json::json!({ + "schema": "doc_source_ref/v1", + "doc_type": "knowledge", + "ts": "2026-02-25T12:00:00Z", + "uri": "acceptance://knowledge/v1" + }), + TEST_CONTENT, + ) + .await +} + +pub(crate) async fn put_test_doc_with( + service: &ElfService, + agent_id: &str, + scope: &str, + doc_type: Option<&str>, + title: &str, + source_ref: Value, + content: &str, +) -> DocsPutResponse { + service + .docs_put(DocsPutRequest { + tenant_id: "t".to_string(), + project_id: "p".to_string(), + agent_id: agent_id.to_string(), + scope: scope.to_string(), + doc_type: doc_type.map(ToString::to_string), + title: Some(title.to_string()), + write_policy: None, + source_ref, + content: content.to_string(), + }) + .await + .expect("Failed to put doc.") +} diff --git a/packages/elf-service/tests/acceptance/docs_extension_v1/helpers/filters.rs b/packages/elf-service/tests/acceptance/docs_extension_v1/helpers/filters.rs new file mode 100644 index 00000000..321aad80 --- /dev/null +++ b/packages/elf-service/tests/acceptance/docs_extension_v1/helpers/filters.rs @@ -0,0 +1,152 @@ +use std::{collections::HashSet, time::Duration}; + +use tokio::{sync::oneshot::Sender, task::JoinHandle}; +use uuid::Uuid; + +use crate::acceptance::docs_extension_v1::helpers::{ + context::{self, DocsContext, TEST_CONTENT}, + outbox, worker, +}; +use elf_service::{DocsSearchL0Request, ElfService}; +use elf_testkit::TestDatabase; + +pub(crate) async fn create_docs_search_filter_fixture( + ctx: DocsContext, +) -> (TestDatabase, ElfService, Uuid, Uuid, Uuid, JoinHandle<()>, Sender<()>) { + let DocsContext { test_db, service } = ctx; + let shared_knowledge_doc = context::put_test_doc_with( + &service, + "owner", + "project_shared", + None, + "Docs filter sample", + serde_json::json!({ + "schema": "doc_source_ref/v1", + "doc_type": "knowledge", + "ts": "2026-02-25T12:00:00Z", + }), + TEST_CONTENT, + ) + .await; + let older_shared_knowledge_doc = context::put_test_doc_with( + &service, + "owner", + "project_shared", + None, + "Docs old filter sample", + serde_json::json!({ + "schema": "doc_source_ref/v1", + "doc_type": "knowledge", + "ts": "2025-01-01T10:00:00Z", + }), + TEST_CONTENT, + ) + .await; + let private_chat_doc = context::put_test_doc_with( + &service, + "assistant", + "agent_private", + Some("chat"), + "Docs chat sample", + serde_json::json!({ + "schema": "doc_source_ref/v1", + "doc_type": "chat", + "ts": "2026-02-25T12:00:00Z", + "thread_id": "shared-chat-thread", + "role": "assistant" + }), + TEST_CONTENT, + ) + .await; + let (handle, shutdown) = worker::spawn_doc_worker(&service).await; + + assert!( + outbox::wait_for_doc_outbox_done( + &service.db.pool, + shared_knowledge_doc.doc_id, + Duration::from_secs(15) + ) + .await, + "Expected shared docs outbox to reach DONE." + ); + assert!( + outbox::wait_for_doc_outbox_done( + &service.db.pool, + older_shared_knowledge_doc.doc_id, + Duration::from_secs(15) + ) + .await, + "Expected older shared docs outbox to reach DONE." + ); + assert!( + outbox::wait_for_doc_outbox_done( + &service.db.pool, + private_chat_doc.doc_id, + Duration::from_secs(15) + ) + .await, + "Expected private docs outbox to reach DONE." + ); + + ( + test_db, + service, + shared_knowledge_doc.doc_id, + older_shared_knowledge_doc.doc_id, + private_chat_doc.doc_id, + handle, + shutdown, + ) +} + +pub(crate) async fn cleanup_docs_filter_fixture( + test_db: TestDatabase, + _handle: JoinHandle<()>, + shutdown: Sender<()>, +) { + let _ = shutdown.send(()); + + _handle.abort(); + + let _ = _handle.await; + + test_db.cleanup().await.expect("Failed to cleanup test database."); +} + +pub(crate) async fn search_doc_ids_with_filters( + service: &ElfService, + scope: Option<&str>, + doc_type: Option<&str>, + agent_id: Option<&str>, + updated_after: Option<&str>, + updated_before: Option<&str>, + caller_agent_id: &str, +) -> HashSet { + let results = service + .docs_search_l0(DocsSearchL0Request { + tenant_id: "t".to_string(), + project_id: "p".to_string(), + caller_agent_id: caller_agent_id.to_string(), + scope: scope.map(str::to_string), + status: None, + doc_type: doc_type.map(str::to_string), + sparse_mode: None, + domain: None, + repo: None, + agent_id: agent_id.map(str::to_string), + thread_id: None, + updated_after: updated_after.map(str::to_string), + updated_before: updated_before.map(str::to_string), + ts_gte: None, + ts_lte: None, + read_profile: "all_scopes".to_string(), + query: "peregrine".to_string(), + top_k: Some(20), + candidate_k: Some(50), + explain: None, + }) + .await + .expect("Failed to search docs."); + + results.items.into_iter().map(|item| item.doc_id).collect() +} diff --git a/packages/elf-service/tests/acceptance/docs_extension_v1/helpers/outbox.rs b/packages/elf-service/tests/acceptance/docs_extension_v1/helpers/outbox.rs new file mode 100644 index 00000000..90a315e4 --- /dev/null +++ b/packages/elf-service/tests/acceptance/docs_extension_v1/helpers/outbox.rs @@ -0,0 +1,105 @@ +use std::time::{Duration, Instant}; + +use sqlx::{FromRow, PgPool}; +use tokio::time; +use uuid::Uuid; + +#[derive(FromRow)] +struct DocOutboxCounts { + total: i64, + done: i64, + failed: i64, +} + +#[derive(FromRow)] +struct NoteOutboxCounts { + total: i64, + done: i64, + failed: i64, +} + +pub(crate) async fn wait_for_doc_outbox_done( + pool: &PgPool, + doc_id: Uuid, + timeout: Duration, +) -> bool { + let deadline = Instant::now() + timeout; + + loop { + let row: Option = sqlx::query_as::<_, DocOutboxCounts>( + "\ +SELECT + COUNT(*) AS total, + COUNT(*) FILTER (WHERE status = 'DONE') AS done, + COUNT(*) FILTER (WHERE status = 'FAILED') AS failed +FROM doc_indexing_outbox +WHERE doc_id = $1", + ) + .bind(doc_id) + .fetch_optional(pool) + .await + .ok() + .flatten(); + + if let Some(row) = row.as_ref() + && row.total > 0 + && row.done == row.total + { + return true; + } + if let Some(row) = row.as_ref() + && row.failed > 0 + { + return false; + } + + if Instant::now() >= deadline { + return false; + } + + time::sleep(Duration::from_millis(200)).await; + } +} + +pub(crate) async fn wait_for_note_outbox_done( + pool: &PgPool, + note_id: Uuid, + timeout: Duration, +) -> bool { + let deadline = Instant::now() + timeout; + + loop { + let row: Option = sqlx::query_as::<_, NoteOutboxCounts>( + "\ +SELECT + COUNT(*) AS total, + COUNT(*) FILTER (WHERE status = 'DONE') AS done, + COUNT(*) FILTER (WHERE status = 'FAILED') AS failed +FROM indexing_outbox +WHERE note_id = $1", + ) + .bind(note_id) + .fetch_optional(pool) + .await + .ok() + .flatten(); + + if let Some(row) = row.as_ref() + && row.total > 0 + && row.done == row.total + { + return true; + } + if let Some(row) = row.as_ref() + && row.failed > 0 + { + return false; + } + + if Instant::now() >= deadline { + return false; + } + + time::sleep(Duration::from_millis(200)).await; + } +} diff --git a/packages/elf-service/tests/acceptance/docs_extension_v1/helpers/qdrant.rs b/packages/elf-service/tests/acceptance/docs_extension_v1/helpers/qdrant.rs new file mode 100644 index 00000000..76f8282c --- /dev/null +++ b/packages/elf-service/tests/acceptance/docs_extension_v1/helpers/qdrant.rs @@ -0,0 +1,106 @@ +use qdrant_client::qdrant::{ + CreateFieldIndexCollection, FieldType, GetPointsBuilder, PayloadSchemaType, RetrievedPoint, +}; +use uuid::Uuid; + +use elf_service::ElfService; + +const DOCS_SEARCH_FILTER_INDEXES: [(&str, PayloadSchemaType, FieldType); 9] = [ + ("scope", PayloadSchemaType::Keyword, FieldType::Keyword), + ("status", PayloadSchemaType::Keyword, FieldType::Keyword), + ("doc_type", PayloadSchemaType::Keyword, FieldType::Keyword), + ("agent_id", PayloadSchemaType::Keyword, FieldType::Keyword), + ("updated_at", PayloadSchemaType::Datetime, FieldType::Datetime), + ("doc_ts", PayloadSchemaType::Datetime, FieldType::Datetime), + ("thread_id", PayloadSchemaType::Keyword, FieldType::Keyword), + ("domain", PayloadSchemaType::Keyword, FieldType::Keyword), + ("repo", PayloadSchemaType::Keyword, FieldType::Keyword), +]; + +pub(crate) async fn fetch_first_doc_chunk_id(db: &ElfService, doc_id: Uuid) -> Option { + sqlx::query_scalar::<_, Uuid>( + "SELECT chunk_id FROM doc_chunks WHERE doc_id = $1 ORDER BY chunk_index LIMIT 1", + ) + .bind(doc_id) + .fetch_optional(&db.db.pool) + .await + .expect("Failed to fetch doc chunk id.") +} + +pub(crate) async fn fetch_first_doc_chunk_point( + service: &ElfService, + doc_id: Uuid, +) -> Option { + let chunk_id = fetch_first_doc_chunk_id(service, doc_id).await?; + let response = service + .qdrant + .client + .get_points( + GetPointsBuilder::new( + service.cfg.storage.qdrant.docs_collection.clone(), + vec![chunk_id.to_string().into()], + ) + .with_payload(true), + ) + .await + .expect("Failed to fetch doc chunk point from Qdrant."); + + response.result.into_iter().next() +} + +pub(crate) async fn verify_docs_qdrant_filter_indexes(service: &ElfService) { + let mut payload_schema = service + .qdrant + .client + .collection_info(&service.cfg.storage.qdrant.docs_collection) + .await + .expect("Failed to fetch Qdrant docs collection info.") + .result + .expect("Qdrant collection info is missing.") + .payload_schema; + + for (field_name, payload_type, index_type) in DOCS_SEARCH_FILTER_INDEXES { + let missing_or_wrong = match payload_schema.get(field_name) { + Some(schema) => schema.data_type != payload_type as i32, + None => true, + }; + + if missing_or_wrong { + let request = CreateFieldIndexCollection { + collection_name: service.cfg.storage.qdrant.docs_collection.clone(), + wait: Some(true), + field_name: field_name.to_string(), + field_type: Some(index_type as i32), + field_index_params: None, + ordering: None, + timeout: None, + }; + + service + .qdrant + .client + .create_field_index(request) + .await + .expect("Failed to create required Qdrant payload index."); + } + } + + payload_schema = service + .qdrant + .client + .collection_info(&service.cfg.storage.qdrant.docs_collection) + .await + .expect("Failed to fetch Qdrant docs collection info.") + .result + .expect("Qdrant collection info is missing.") + .payload_schema; + + for (field_name, payload_type, _) in DOCS_SEARCH_FILTER_INDEXES { + let schema = payload_schema.get(field_name).expect("Expected required payload field."); + + assert_eq!( + schema.data_type, payload_type as i32, + "Unexpected payload type for {field_name}." + ); + } +} diff --git a/packages/elf-service/tests/acceptance/docs_extension_v1/helpers/worker.rs b/packages/elf-service/tests/acceptance/docs_extension_v1/helpers/worker.rs new file mode 100644 index 00000000..446bc314 --- /dev/null +++ b/packages/elf-service/tests/acceptance/docs_extension_v1/helpers/worker.rs @@ -0,0 +1,97 @@ +use std::future::IntoFuture; + +use ahash::AHashMap; +use axum::{Json, Router, extract::State, http::StatusCode, response::IntoResponse, routing}; +use serde_json::{Map, Value}; +use tokenizers::{Tokenizer, models::wordlevel::WordLevel}; +use tokio::{ + net::TcpListener, + sync::{oneshot, oneshot::Sender}, + task::JoinHandle, +}; + +use crate::acceptance::chunking::ChunkingConfig; +use elf_config::EmbeddingProviderConfig; +use elf_service::ElfService; +use elf_storage::{db::Db, qdrant::QdrantStore}; +use elf_worker::worker::{self, WorkerState}; + +pub(crate) async fn spawn_doc_worker(service: &ElfService) -> (JoinHandle<()>, Sender<()>) { + let (api_base, shutdown) = start_embed_server().await; + let worker_state = WorkerState { + db: Db::connect(&service.cfg.storage.postgres).await.expect("Failed to connect worker DB."), + qdrant: QdrantStore::new(&service.cfg.storage.qdrant) + .expect("Failed to build Qdrant store."), + docs_qdrant: QdrantStore::new_with_collection( + &service.cfg.storage.qdrant, + &service.cfg.storage.qdrant.docs_collection, + ) + .expect("Failed to build docs Qdrant store."), + embedding: EmbeddingProviderConfig { + provider_id: "test".to_string(), + api_base, + api_key: "test-key".to_string(), + path: "/embeddings".to_string(), + model: "test".to_string(), + dimensions: 4_096, + timeout_ms: 1_000, + default_headers: Map::new(), + }, + chunking: ChunkingConfig { max_tokens: 64, overlap_tokens: 8 }, + tokenizer: build_test_tokenizer(), + }; + let handle = tokio::spawn(async move { + let _ = worker::run_worker(worker_state).await; + }); + + (handle, shutdown) +} + +fn build_test_tokenizer() -> Tokenizer { + let mut vocab = AHashMap::new(); + + vocab.insert("".to_string(), 0_u32); + + let model = WordLevel::builder() + .vocab(vocab) + .unk_token("".to_string()) + .build() + .expect("Failed to build test tokenizer."); + + Tokenizer::new(model) +} + +async fn start_embed_server() -> (String, Sender<()>) { + let app = Router::new().route("/embeddings", routing::post(embed_handler)).with_state(()); + let listener = TcpListener::bind("127.0.0.1:0").await.expect("Failed to bind embed server."); + let addr = listener.local_addr().expect("Failed to read embed server address."); + let (tx, rx) = oneshot::channel(); + let server = axum::serve(listener, app).with_graceful_shutdown(async move { + let _ = rx.await; + }); + + tokio::spawn(async move { + let _ = server.into_future().await; + }); + + (format!("http://{addr}"), tx) +} + +async fn embed_handler(State(()): State<()>, Json(payload): Json) -> impl IntoResponse { + let inputs = + payload.get("input").and_then(|value| value.as_array()).cloned().unwrap_or_default(); + let data: Vec<_> = inputs + .iter() + .enumerate() + .map(|(index, _)| { + let embedding: Vec = vec![0.1_f32; 4_096]; + + serde_json::json!({ + "index": index, + "embedding": embedding, + }) + }) + .collect(); + + (StatusCode::OK, Json(serde_json::json!({ "data": data }))).into_response() +} diff --git a/packages/elf-service/tests/acceptance/docs_extension_v1/lifecycle.rs b/packages/elf-service/tests/acceptance/docs_extension_v1/lifecycle.rs index 70f48b28..6eaeaf70 100644 --- a/packages/elf-service/tests/acceptance/docs_extension_v1/lifecycle.rs +++ b/packages/elf-service/tests/acceptance/docs_extension_v1/lifecycle.rs @@ -1,526 +1,5 @@ -use std::collections::HashSet; - -use serde_json::Value; -use time::{Duration, OffsetDateTime, format_description::well_known::Rfc3339}; - -use crate::acceptance::docs_extension_v1::{self, DocsContext}; -use elf_service::{ - DocsDeleteRequest, DocsExcerptsGetRequest, DocsGetRequest, DocsPutRequest, DocsSearchL0Request, - Error, NoteOp, -}; - -#[tokio::test] -#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL (or ELF_QDRANT_GRPC_URL) to run."] -async fn docs_put_get_excerpts_and_search_l0_work_end_to_end() { - let Some(ctx) = docs_extension_v1::setup_docs_context().await else { return }; - let DocsContext { test_db, service } = ctx; - let put = docs_extension_v1::put_test_doc(&service).await; - - docs_extension_v1::assert_doc_get(&service, put.doc_id).await; - docs_extension_v1::assert_doc_excerpt(&service, put.doc_id, put.content_hash.as_str()).await; - - let (handle, shutdown) = docs_extension_v1::spawn_doc_worker(&service).await; - - assert!( - docs_extension_v1::wait_for_doc_outbox_done( - &service.db.pool, - put.doc_id, - std::time::Duration::from_secs(15) - ) - .await, - "Expected doc outbox to reach DONE." - ); - - docs_extension_v1::assert_docs_search_l0(&service, put.doc_id).await; - - let _ = shutdown.send(()); - - handle.abort(); - - let _ = handle.await; - - drop(service); - - test_db.cleanup().await.expect("Failed to cleanup test database."); -} - -#[tokio::test] -#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL (or ELF_QDRANT_GRPC_URL) to run."] -async fn docs_delete_marks_doc_deleted_and_removes_doc_vectors() { - let Some(ctx) = docs_extension_v1::setup_docs_context().await else { return }; - let DocsContext { test_db, service } = ctx; - let put = docs_extension_v1::put_test_doc(&service).await; - let (handle, shutdown) = docs_extension_v1::spawn_doc_worker(&service).await; - - assert!( - docs_extension_v1::wait_for_doc_outbox_done( - &service.db.pool, - put.doc_id, - std::time::Duration::from_secs(15) - ) - .await, - "Expected doc UPSERT outbox to reach DONE." - ); - assert!( - docs_extension_v1::fetch_first_doc_chunk_point(&service, put.doc_id).await.is_some(), - "Expected indexed doc chunk before delete." - ); - - let deleted = service - .docs_delete(DocsDeleteRequest { - tenant_id: "t".to_string(), - project_id: "p".to_string(), - agent_id: "owner".to_string(), - doc_id: put.doc_id, - }) - .await - .expect("Failed to delete Source Library doc."); - - assert_eq!(deleted.doc_id, put.doc_id); - assert_eq!(deleted.op, NoteOp::Delete); - assert!(deleted.chunk_delete_count > 0); - assert!( - docs_extension_v1::wait_for_doc_outbox_done( - &service.db.pool, - put.doc_id, - std::time::Duration::from_secs(15) - ) - .await, - "Expected doc DELETE outbox to reach DONE." - ); - - let get_after_delete = service - .docs_get(DocsGetRequest { - tenant_id: "t".to_string(), - project_id: "p".to_string(), - agent_id: "owner".to_string(), - read_profile: "private_plus_project".to_string(), - doc_id: put.doc_id, - }) - .await; - let search_after_delete = service - .docs_search_l0(DocsSearchL0Request { - tenant_id: "t".to_string(), - project_id: "p".to_string(), - caller_agent_id: "reader".to_string(), - scope: None, - status: None, - doc_type: None, - sparse_mode: None, - domain: None, - repo: None, - agent_id: None, - thread_id: None, - updated_after: None, - updated_before: None, - ts_gte: None, - ts_lte: None, - read_profile: "private_plus_project".to_string(), - query: "peregrine".to_string(), - top_k: Some(5), - candidate_k: Some(20), - explain: None, - }) - .await - .expect("Failed to search docs after delete."); - let second_delete = service - .docs_delete(DocsDeleteRequest { - tenant_id: "t".to_string(), - project_id: "p".to_string(), - agent_id: "owner".to_string(), - doc_id: put.doc_id, - }) - .await - .expect("Second Source Library delete should be idempotent."); - - assert!(matches!(get_after_delete, Err(Error::NotFound { .. }))); - assert!(search_after_delete.items.iter().all(|item| item.doc_id != put.doc_id)); - assert!( - docs_extension_v1::fetch_first_doc_chunk_point(&service, put.doc_id).await.is_none(), - "Deleted Source Library doc chunk must be removed from Qdrant docs index." - ); - assert_eq!(second_delete.op, NoteOp::None); - assert_eq!(second_delete.chunk_delete_count, 0); - - let _ = shutdown.send(()); - - handle.abort(); - - let _ = handle.await; - - drop(service); - - test_db.cleanup().await.expect("Failed to cleanup test database."); -} - -#[tokio::test] -#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL (or ELF_QDRANT_GRPC_URL) to run."] -async fn docs_put_source_library_records_do_not_create_memory_notes() { - let Some(ctx) = docs_extension_v1::setup_docs_context().await else { return }; - let DocsContext { test_db, service } = ctx; - let before: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM memory_notes") - .fetch_one(&service.db.pool) - .await - .expect("Failed to count notes before docs_put."); - let put = docs_extension_v1::put_test_doc_with( - &service, - "owner", - "project_shared", - Some("chat"), - "Captured thread", - serde_json::json!({ - "schema": "doc_source_ref/v1", - "doc_type": "chat", - "ts": "2026-02-25T12:00:00Z", - "thread_id": "thread-source-library-1", - "role": "user", - "source_kind": "social_thread", - "canonical_uri": "https://example.com/thread/source-library-1", - "captured_at": "2026-02-25T12:10:00Z", - "source_created_at": "2026-02-25T11:55:00Z", - "trust_label": "public_web", - "author": "Example Researcher", - "handle": "example-researcher", - "excerpt_locator": { - "quote": { - "exact": "Source libraries should preserve thread context." - } - } - }), - "Source libraries should preserve thread context. Agents can later promote only selected facts.", - ) - .await; - let after: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM memory_notes") - .fetch_one(&service.db.pool) - .await - .expect("Failed to count notes after docs_put."); - let doc_exists: bool = - sqlx::query_scalar("SELECT EXISTS(SELECT 1 FROM doc_documents WHERE doc_id = $1)") - .bind(put.doc_id) - .fetch_one(&service.db.pool) - .await - .expect("Failed to verify doc row."); - let stored_source_ref: Value = - sqlx::query_scalar("SELECT source_ref FROM doc_documents WHERE doc_id = $1") - .bind(put.doc_id) - .fetch_one(&service.db.pool) - .await - .expect("Failed to fetch normalized source_ref."); - - assert!(doc_exists); - assert_eq!(after, before, "docs_put must not create durable Memory Notes."); - assert_eq!(put.source_capture.schema, "doc_source_capture/v1"); - assert_eq!(put.source_capture.source_record_id, put.doc_id); - assert_eq!(put.source_capture.origin, "https://example.com/thread/source-library-1"); - assert_eq!(put.source_capture.source_type, "social_thread"); - assert_eq!(put.source_capture.visibility_scope, "project_shared"); - assert!(!put.source_capture.source_spans.is_empty()); - assert_eq!(put.source_capture.source_spans[0].schema, "doc_source_span/v1"); - assert_eq!(put.source_capture.source_spans[0].status, "captured"); - assert_eq!(put.source_capture.source_spans[0].reason_code, None); - assert_eq!(stored_source_ref["source_record_id"], put.doc_id.to_string()); - assert_eq!(stored_source_ref["origin"], "https://example.com/thread/source-library-1"); - assert_eq!(stored_source_ref["source_type"], "social_thread"); - assert_eq!(stored_source_ref["content_hash"], put.content_hash); - assert!(stored_source_ref["source_spans"].as_array().is_some_and(|spans| !spans.is_empty())); - - drop(service); - - test_db.cleanup().await.expect("Failed to cleanup test database."); -} - -#[tokio::test] -#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL (or ELF_QDRANT_GRPC_URL) to run."] -async fn docs_search_l0_respects_scope_doc_type_agent_id_and_updated_after_filters() { - let Some(ctx) = docs_extension_v1::setup_docs_context().await else { return }; - let ( - test_db, - service, - shared_knowledge_doc, - _older_shared_knowledge_doc, - private_chat_doc, - handle, - shutdown, - ) = docs_extension_v1::create_docs_search_filter_fixture(ctx).await; - let shared_scope_results = docs_extension_v1::search_doc_ids_with_filters( - &service, - Some("project_shared"), - None, - None, - None, - None, - "reader", - ) - .await; - - assert!(shared_scope_results.contains(&shared_knowledge_doc)); - assert!(!shared_scope_results.contains(&private_chat_doc)); - - let chat_results = docs_extension_v1::search_doc_ids_with_filters( - &service, - None, - Some("chat"), - None, - None, - None, - "reader", - ) - .await; - - assert!(!chat_results.contains(&private_chat_doc)); - assert!(!chat_results.contains(&shared_knowledge_doc)); - - let assistant_chat_results = docs_extension_v1::search_doc_ids_with_filters( - &service, - None, - Some("chat"), - None, - None, - None, - "assistant", - ) - .await; - - assert!(assistant_chat_results.contains(&private_chat_doc)); - assert!(!assistant_chat_results.contains(&shared_knowledge_doc)); - - let assistant_results = docs_extension_v1::search_doc_ids_with_filters( - &service, - None, - None, - Some("assistant"), - None, - None, - "reader", - ) - .await; - - assert!(!assistant_results.contains(&private_chat_doc)); - assert!(!assistant_results.contains(&shared_knowledge_doc)); - - let past = (OffsetDateTime::now_utc() - Duration::seconds(60)) - .format(&Rfc3339) - .expect("Failed to format past RFC3339 timestamp."); - let future = (OffsetDateTime::now_utc() + Duration::seconds(60)) - .format(&Rfc3339) - .expect("Failed to format future RFC3339 timestamp."); - let updated_after_past_results = docs_extension_v1::search_doc_ids_with_filters( - &service, - None, - None, - None, - Some(&past), - None, - "reader", - ) - .await; - - assert!(updated_after_past_results.contains(&shared_knowledge_doc)); - assert!(!updated_after_past_results.contains(&private_chat_doc)); - - let updated_after_future_results = docs_extension_v1::search_doc_ids_with_filters( - &service, - None, - None, - None, - Some(&future), - None, - "reader", - ) - .await; - - assert!(updated_after_future_results.is_empty()); - - docs_extension_v1::cleanup_docs_filter_fixture(test_db, handle, shutdown).await; -} - -#[tokio::test] -#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL (or ELF_QDRANT_GRPC_URL) to run."] -async fn docs_search_l0_respects_thread_id_filter_for_chat_docs() { - let Some(ctx) = docs_extension_v1::setup_docs_context().await else { return }; - let ( - test_db, - service, - shared_knowledge_doc, - older_shared_knowledge_doc, - private_chat_doc, - handle, - shutdown, - ) = docs_extension_v1::create_docs_search_filter_fixture(ctx).await; - let thread_filter_results = service - .docs_search_l0(DocsSearchL0Request { - tenant_id: "t".to_string(), - project_id: "p".to_string(), - caller_agent_id: "assistant".to_string(), - scope: None, - status: None, - doc_type: Some("chat".to_string()), - sparse_mode: None, - domain: None, - repo: None, - agent_id: None, - thread_id: Some("shared-chat-thread".to_string()), - updated_after: None, - updated_before: None, - ts_gte: None, - ts_lte: None, - read_profile: "private_plus_project".to_string(), - query: "peregrine".to_string(), - top_k: Some(20), - candidate_k: Some(50), - explain: None, - }) - .await - .expect("Failed to search docs with thread_id filter."); - let thread_filtered_docs = - thread_filter_results.items.into_iter().map(|item| item.doc_id).collect::>(); - - assert!(thread_filtered_docs.contains(&private_chat_doc)); - assert!(!thread_filtered_docs.contains(&shared_knowledge_doc)); - assert!(!thread_filtered_docs.contains(&older_shared_knowledge_doc)); - - docs_extension_v1::cleanup_docs_filter_fixture(test_db, handle, shutdown).await; -} - -#[tokio::test] -#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL (or ELF_QDRANT_GRPC_URL) to run this test."] -async fn docs_search_l0_requires_chat_doc_type_for_thread_id() { - let Some(ctx) = docs_extension_v1::setup_docs_context().await else { return }; - let ( - test_db, - service, - _shared_knowledge_doc, - _older_shared_knowledge_doc, - _private_chat_doc, - handle, - shutdown, - ) = docs_extension_v1::create_docs_search_filter_fixture(ctx).await; - let result = service - .docs_search_l0(DocsSearchL0Request { - tenant_id: "t".to_string(), - project_id: "p".to_string(), - caller_agent_id: "assistant".to_string(), - scope: None, - status: None, - doc_type: None, - sparse_mode: None, - domain: None, - repo: None, - agent_id: None, - thread_id: Some("shared-chat-thread".to_string()), - updated_after: None, - updated_before: None, - ts_gte: None, - ts_lte: None, - read_profile: "private_plus_project".to_string(), - query: "peregrine".to_string(), - top_k: Some(20), - candidate_k: Some(50), - explain: None, - }) - .await; - - match result { - Err(Error::InvalidRequest { message }) => { - assert!(message.contains("thread_id requires")); - }, - other => { - panic!("Expected InvalidRequest for thread_id without chat doc_type, got {other:?}") - }, - } - - docs_extension_v1::cleanup_docs_filter_fixture(test_db, handle, shutdown).await; -} - -#[tokio::test] -#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL (or ELF_QDRANT_GRPC_URL) to run this test."] -async fn docs_put_applies_write_policy_and_excerpt_by_chunk_id_is_verified() { - let Some(ctx) = docs_extension_v1::setup_docs_context().await else { return }; - let DocsContext { test_db, service } = ctx; - let content = "Alpha normal text then secret sk-abcdef and trailing content."; - let secret = "sk-abcdef"; - let start = content.find(secret).expect("Expected secret in content."); - let end = start + secret.len(); - let write_policy = serde_json::from_value(serde_json::json!({ - "exclusions": [{"start": start, "end": end}], - })) - .expect("Failed to build write_policy."); - let put = service - .docs_put(DocsPutRequest { - tenant_id: "t".to_string(), - project_id: "p".to_string(), - agent_id: "owner".to_string(), - scope: "project_shared".to_string(), - doc_type: None, - title: Some("Docs write_policy sample".to_string()), - write_policy: Some(write_policy), - source_ref: serde_json::json!({ - "schema": "doc_source_ref/v1", - "doc_type": "knowledge", - "ts": "2026-02-25T12:00:00Z", - }), - content: content.to_string(), - }) - .await - .expect("Failed to put doc with write policy."); - let (handle, shutdown) = docs_extension_v1::spawn_doc_worker(&service).await; - - assert!( - docs_extension_v1::wait_for_doc_outbox_done( - &service.db.pool, - put.doc_id, - std::time::Duration::from_secs(15) - ) - .await, - "Expected doc outbox to reach DONE." - ); - - let chunk_id = docs_extension_v1::fetch_first_doc_chunk_id(&service, put.doc_id) - .await - .expect("Expected chunk id from transformed doc."); - let excerpt = service - .docs_excerpts_get(DocsExcerptsGetRequest { - tenant_id: "t".to_string(), - project_id: "p".to_string(), - agent_id: "reader".to_string(), - read_profile: "private_plus_project".to_string(), - doc_id: put.doc_id, - level: "L1".to_string(), - chunk_id: Some(chunk_id), - quote: None, - position: None, - explain: None, - }) - .await - .expect("Failed to hydrate excerpt by chunk_id."); - - assert!(excerpt.verification.verified); - assert!(!excerpt.excerpt.is_empty()); - assert!(!excerpt.excerpt.contains(secret)); - assert!(!excerpt.locator.span_id.is_nil()); - - let captured_chunk_span = put - .source_capture - .source_spans - .iter() - .find(|span| span.chunk_id == Some(chunk_id)) - .expect("Expected captured source span for hydrated chunk."); - - assert_eq!(excerpt.locator.span_id, captured_chunk_span.span_id); - assert_eq!(excerpt.verification.content_hash, put.content_hash); - assert!(put.write_policy_audit.is_some()); - assert_eq!(put.source_capture.policy_spans.len(), 1); - assert_eq!(put.source_capture.policy_spans[0].status, "excluded"); - assert_eq!( - put.source_capture.policy_spans[0].reason_code.as_deref(), - Some("WRITE_POLICY_EXCLUSION") - ); - - let _ = shutdown.send(()); - - handle.abort(); - - let _ = handle.await; - - test_db.cleanup().await.expect("Failed to cleanup test database."); -} +mod delete; +mod end_to_end; +mod filters; +mod source_library; +mod write_policy; diff --git a/packages/elf-service/tests/acceptance/docs_extension_v1/lifecycle/delete.rs b/packages/elf-service/tests/acceptance/docs_extension_v1/lifecycle/delete.rs new file mode 100644 index 00000000..3f4ea021 --- /dev/null +++ b/packages/elf-service/tests/acceptance/docs_extension_v1/lifecycle/delete.rs @@ -0,0 +1,113 @@ +use std::time::Duration; + +use crate::acceptance::docs_extension_v1::{self, DocsContext}; +use elf_service::{DocsDeleteRequest, DocsGetRequest, DocsSearchL0Request, Error, NoteOp}; + +#[tokio::test] +#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL (or ELF_QDRANT_GRPC_URL) to run."] +async fn docs_delete_marks_doc_deleted_and_removes_doc_vectors() { + let Some(ctx) = docs_extension_v1::setup_docs_context().await else { return }; + let DocsContext { test_db, service } = ctx; + let put = docs_extension_v1::put_test_doc(&service).await; + let (handle, shutdown) = docs_extension_v1::spawn_doc_worker(&service).await; + + assert!( + docs_extension_v1::wait_for_doc_outbox_done( + &service.db.pool, + put.doc_id, + Duration::from_secs(15) + ) + .await, + "Expected doc UPSERT outbox to reach DONE." + ); + assert!( + docs_extension_v1::fetch_first_doc_chunk_point(&service, put.doc_id).await.is_some(), + "Expected indexed doc chunk before delete." + ); + + let deleted = service + .docs_delete(DocsDeleteRequest { + tenant_id: "t".to_string(), + project_id: "p".to_string(), + agent_id: "owner".to_string(), + doc_id: put.doc_id, + }) + .await + .expect("Failed to delete Source Library doc."); + + assert_eq!(deleted.doc_id, put.doc_id); + assert_eq!(deleted.op, NoteOp::Delete); + assert!(deleted.chunk_delete_count > 0); + assert!( + docs_extension_v1::wait_for_doc_outbox_done( + &service.db.pool, + put.doc_id, + Duration::from_secs(15) + ) + .await, + "Expected doc DELETE outbox to reach DONE." + ); + + let get_after_delete = service + .docs_get(DocsGetRequest { + tenant_id: "t".to_string(), + project_id: "p".to_string(), + agent_id: "owner".to_string(), + read_profile: "private_plus_project".to_string(), + doc_id: put.doc_id, + }) + .await; + let search_after_delete = service + .docs_search_l0(DocsSearchL0Request { + tenant_id: "t".to_string(), + project_id: "p".to_string(), + caller_agent_id: "reader".to_string(), + scope: None, + status: None, + doc_type: None, + sparse_mode: None, + domain: None, + repo: None, + agent_id: None, + thread_id: None, + updated_after: None, + updated_before: None, + ts_gte: None, + ts_lte: None, + read_profile: "private_plus_project".to_string(), + query: "peregrine".to_string(), + top_k: Some(5), + candidate_k: Some(20), + explain: None, + }) + .await + .expect("Failed to search docs after delete."); + let second_delete = service + .docs_delete(DocsDeleteRequest { + tenant_id: "t".to_string(), + project_id: "p".to_string(), + agent_id: "owner".to_string(), + doc_id: put.doc_id, + }) + .await + .expect("Second Source Library delete should be idempotent."); + + assert!(matches!(get_after_delete, Err(Error::NotFound { .. }))); + assert!(search_after_delete.items.iter().all(|item| item.doc_id != put.doc_id)); + assert!( + docs_extension_v1::fetch_first_doc_chunk_point(&service, put.doc_id).await.is_none(), + "Deleted Source Library doc chunk must be removed from Qdrant docs index." + ); + assert_eq!(second_delete.op, NoteOp::None); + assert_eq!(second_delete.chunk_delete_count, 0); + + let _ = shutdown.send(()); + + handle.abort(); + + let _ = handle.await; + + drop(service); + + test_db.cleanup().await.expect("Failed to cleanup test database."); +} diff --git a/packages/elf-service/tests/acceptance/docs_extension_v1/lifecycle/end_to_end.rs b/packages/elf-service/tests/acceptance/docs_extension_v1/lifecycle/end_to_end.rs new file mode 100644 index 00000000..3019c8c5 --- /dev/null +++ b/packages/elf-service/tests/acceptance/docs_extension_v1/lifecycle/end_to_end.rs @@ -0,0 +1,38 @@ +use std::time::Duration; + +use crate::acceptance::docs_extension_v1::{self, DocsContext}; + +#[tokio::test] +#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL (or ELF_QDRANT_GRPC_URL) to run."] +async fn docs_put_get_excerpts_and_search_l0_work_end_to_end() { + let Some(ctx) = docs_extension_v1::setup_docs_context().await else { return }; + let DocsContext { test_db, service } = ctx; + let put = docs_extension_v1::put_test_doc(&service).await; + + docs_extension_v1::assert_doc_get(&service, put.doc_id).await; + docs_extension_v1::assert_doc_excerpt(&service, put.doc_id, put.content_hash.as_str()).await; + + let (handle, shutdown) = docs_extension_v1::spawn_doc_worker(&service).await; + + assert!( + docs_extension_v1::wait_for_doc_outbox_done( + &service.db.pool, + put.doc_id, + Duration::from_secs(15) + ) + .await, + "Expected doc outbox to reach DONE." + ); + + docs_extension_v1::assert_docs_search_l0(&service, put.doc_id).await; + + let _ = shutdown.send(()); + + handle.abort(); + + let _ = handle.await; + + drop(service); + + test_db.cleanup().await.expect("Failed to cleanup test database."); +} diff --git a/packages/elf-service/tests/acceptance/docs_extension_v1/lifecycle/filters.rs b/packages/elf-service/tests/acceptance/docs_extension_v1/lifecycle/filters.rs new file mode 100644 index 00000000..b2be317b --- /dev/null +++ b/packages/elf-service/tests/acceptance/docs_extension_v1/lifecycle/filters.rs @@ -0,0 +1,209 @@ +use std::collections::HashSet; + +use time::{Duration, OffsetDateTime, format_description::well_known::Rfc3339}; + +use crate::acceptance::docs_extension_v1; +use elf_service::{DocsSearchL0Request, Error}; + +#[tokio::test] +#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL (or ELF_QDRANT_GRPC_URL) to run."] +async fn docs_search_l0_respects_scope_doc_type_agent_id_and_updated_after_filters() { + let Some(ctx) = docs_extension_v1::setup_docs_context().await else { return }; + let ( + test_db, + service, + shared_knowledge_doc, + _older_shared_knowledge_doc, + private_chat_doc, + handle, + shutdown, + ) = docs_extension_v1::create_docs_search_filter_fixture(ctx).await; + let shared_scope_results = docs_extension_v1::search_doc_ids_with_filters( + &service, + Some("project_shared"), + None, + None, + None, + None, + "reader", + ) + .await; + + assert!(shared_scope_results.contains(&shared_knowledge_doc)); + assert!(!shared_scope_results.contains(&private_chat_doc)); + + let chat_results = docs_extension_v1::search_doc_ids_with_filters( + &service, + None, + Some("chat"), + None, + None, + None, + "reader", + ) + .await; + + assert!(!chat_results.contains(&private_chat_doc)); + assert!(!chat_results.contains(&shared_knowledge_doc)); + + let assistant_chat_results = docs_extension_v1::search_doc_ids_with_filters( + &service, + None, + Some("chat"), + None, + None, + None, + "assistant", + ) + .await; + + assert!(assistant_chat_results.contains(&private_chat_doc)); + assert!(!assistant_chat_results.contains(&shared_knowledge_doc)); + + let assistant_results = docs_extension_v1::search_doc_ids_with_filters( + &service, + None, + None, + Some("assistant"), + None, + None, + "reader", + ) + .await; + + assert!(!assistant_results.contains(&private_chat_doc)); + assert!(!assistant_results.contains(&shared_knowledge_doc)); + + let past = (OffsetDateTime::now_utc() - Duration::seconds(60)) + .format(&Rfc3339) + .expect("Failed to format past RFC3339 timestamp."); + let future = (OffsetDateTime::now_utc() + Duration::seconds(60)) + .format(&Rfc3339) + .expect("Failed to format future RFC3339 timestamp."); + let updated_after_past_results = docs_extension_v1::search_doc_ids_with_filters( + &service, + None, + None, + None, + Some(&past), + None, + "reader", + ) + .await; + + assert!(updated_after_past_results.contains(&shared_knowledge_doc)); + assert!(!updated_after_past_results.contains(&private_chat_doc)); + + let updated_after_future_results = docs_extension_v1::search_doc_ids_with_filters( + &service, + None, + None, + None, + Some(&future), + None, + "reader", + ) + .await; + + assert!(updated_after_future_results.is_empty()); + + docs_extension_v1::cleanup_docs_filter_fixture(test_db, handle, shutdown).await; +} + +#[tokio::test] +#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL (or ELF_QDRANT_GRPC_URL) to run."] +async fn docs_search_l0_respects_thread_id_filter_for_chat_docs() { + let Some(ctx) = docs_extension_v1::setup_docs_context().await else { return }; + let ( + test_db, + service, + shared_knowledge_doc, + older_shared_knowledge_doc, + private_chat_doc, + handle, + shutdown, + ) = docs_extension_v1::create_docs_search_filter_fixture(ctx).await; + let thread_filter_results = service + .docs_search_l0(DocsSearchL0Request { + tenant_id: "t".to_string(), + project_id: "p".to_string(), + caller_agent_id: "assistant".to_string(), + scope: None, + status: None, + doc_type: Some("chat".to_string()), + sparse_mode: None, + domain: None, + repo: None, + agent_id: None, + thread_id: Some("shared-chat-thread".to_string()), + updated_after: None, + updated_before: None, + ts_gte: None, + ts_lte: None, + read_profile: "private_plus_project".to_string(), + query: "peregrine".to_string(), + top_k: Some(20), + candidate_k: Some(50), + explain: None, + }) + .await + .expect("Failed to search docs with thread_id filter."); + let thread_filtered_docs = + thread_filter_results.items.into_iter().map(|item| item.doc_id).collect::>(); + + assert!(thread_filtered_docs.contains(&private_chat_doc)); + assert!(!thread_filtered_docs.contains(&shared_knowledge_doc)); + assert!(!thread_filtered_docs.contains(&older_shared_knowledge_doc)); + + docs_extension_v1::cleanup_docs_filter_fixture(test_db, handle, shutdown).await; +} + +#[tokio::test] +#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL (or ELF_QDRANT_GRPC_URL) to run this test."] +async fn docs_search_l0_requires_chat_doc_type_for_thread_id() { + let Some(ctx) = docs_extension_v1::setup_docs_context().await else { return }; + let ( + test_db, + service, + _shared_knowledge_doc, + _older_shared_knowledge_doc, + _private_chat_doc, + handle, + shutdown, + ) = docs_extension_v1::create_docs_search_filter_fixture(ctx).await; + let result = service + .docs_search_l0(DocsSearchL0Request { + tenant_id: "t".to_string(), + project_id: "p".to_string(), + caller_agent_id: "assistant".to_string(), + scope: None, + status: None, + doc_type: None, + sparse_mode: None, + domain: None, + repo: None, + agent_id: None, + thread_id: Some("shared-chat-thread".to_string()), + updated_after: None, + updated_before: None, + ts_gte: None, + ts_lte: None, + read_profile: "private_plus_project".to_string(), + query: "peregrine".to_string(), + top_k: Some(20), + candidate_k: Some(50), + explain: None, + }) + .await; + + match result { + Err(Error::InvalidRequest { message }) => { + assert!(message.contains("thread_id requires")); + }, + other => { + panic!("Expected InvalidRequest for thread_id without chat doc_type, got {other:?}") + }, + } + + docs_extension_v1::cleanup_docs_filter_fixture(test_db, handle, shutdown).await; +} diff --git a/packages/elf-service/tests/acceptance/docs_extension_v1/lifecycle/source_library.rs b/packages/elf-service/tests/acceptance/docs_extension_v1/lifecycle/source_library.rs new file mode 100644 index 00000000..b0513c9f --- /dev/null +++ b/packages/elf-service/tests/acceptance/docs_extension_v1/lifecycle/source_library.rs @@ -0,0 +1,79 @@ +use serde_json::Value; + +use crate::acceptance::docs_extension_v1::{self, DocsContext}; + +#[tokio::test] +#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL (or ELF_QDRANT_GRPC_URL) to run."] +async fn docs_put_source_library_records_do_not_create_memory_notes() { + let Some(ctx) = docs_extension_v1::setup_docs_context().await else { return }; + let DocsContext { test_db, service } = ctx; + let before: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM memory_notes") + .fetch_one(&service.db.pool) + .await + .expect("Failed to count notes before docs_put."); + let put = docs_extension_v1::put_test_doc_with( + &service, + "owner", + "project_shared", + Some("chat"), + "Captured thread", + serde_json::json!({ + "schema": "doc_source_ref/v1", + "doc_type": "chat", + "ts": "2026-02-25T12:00:00Z", + "thread_id": "thread-source-library-1", + "role": "user", + "source_kind": "social_thread", + "canonical_uri": "https://example.com/thread/source-library-1", + "captured_at": "2026-02-25T12:10:00Z", + "source_created_at": "2026-02-25T11:55:00Z", + "trust_label": "public_web", + "author": "Example Researcher", + "handle": "example-researcher", + "excerpt_locator": { + "quote": { + "exact": "Source libraries should preserve thread context." + } + } + }), + "Source libraries should preserve thread context. Agents can later promote only selected facts.", + ) + .await; + let after: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM memory_notes") + .fetch_one(&service.db.pool) + .await + .expect("Failed to count notes after docs_put."); + let doc_exists: bool = + sqlx::query_scalar("SELECT EXISTS(SELECT 1 FROM doc_documents WHERE doc_id = $1)") + .bind(put.doc_id) + .fetch_one(&service.db.pool) + .await + .expect("Failed to verify doc row."); + let stored_source_ref: Value = + sqlx::query_scalar("SELECT source_ref FROM doc_documents WHERE doc_id = $1") + .bind(put.doc_id) + .fetch_one(&service.db.pool) + .await + .expect("Failed to fetch normalized source_ref."); + + assert!(doc_exists); + assert_eq!(after, before, "docs_put must not create durable Memory Notes."); + assert_eq!(put.source_capture.schema, "doc_source_capture/v1"); + assert_eq!(put.source_capture.source_record_id, put.doc_id); + assert_eq!(put.source_capture.origin, "https://example.com/thread/source-library-1"); + assert_eq!(put.source_capture.source_type, "social_thread"); + assert_eq!(put.source_capture.visibility_scope, "project_shared"); + assert!(!put.source_capture.source_spans.is_empty()); + assert_eq!(put.source_capture.source_spans[0].schema, "doc_source_span/v1"); + assert_eq!(put.source_capture.source_spans[0].status, "captured"); + assert_eq!(put.source_capture.source_spans[0].reason_code, None); + assert_eq!(stored_source_ref["source_record_id"], put.doc_id.to_string()); + assert_eq!(stored_source_ref["origin"], "https://example.com/thread/source-library-1"); + assert_eq!(stored_source_ref["source_type"], "social_thread"); + assert_eq!(stored_source_ref["content_hash"], put.content_hash); + assert!(stored_source_ref["source_spans"].as_array().is_some_and(|spans| !spans.is_empty())); + + drop(service); + + test_db.cleanup().await.expect("Failed to cleanup test database."); +} diff --git a/packages/elf-service/tests/acceptance/docs_extension_v1/lifecycle/write_policy.rs b/packages/elf-service/tests/acceptance/docs_extension_v1/lifecycle/write_policy.rs new file mode 100644 index 00000000..427e1d13 --- /dev/null +++ b/packages/elf-service/tests/acceptance/docs_extension_v1/lifecycle/write_policy.rs @@ -0,0 +1,97 @@ +use std::time::Duration; + +use crate::acceptance::docs_extension_v1::{self, DocsContext}; +use elf_service::{DocsExcerptsGetRequest, DocsPutRequest}; + +#[tokio::test] +#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL (or ELF_QDRANT_GRPC_URL) to run this test."] +async fn docs_put_applies_write_policy_and_excerpt_by_chunk_id_is_verified() { + let Some(ctx) = docs_extension_v1::setup_docs_context().await else { return }; + let DocsContext { test_db, service } = ctx; + let content = "Alpha normal text then secret sk-abcdef and trailing content."; + let secret = "sk-abcdef"; + let start = content.find(secret).expect("Expected secret in content."); + let end = start + secret.len(); + let write_policy = serde_json::from_value(serde_json::json!({ + "exclusions": [{"start": start, "end": end}], + })) + .expect("Failed to build write_policy."); + let put = service + .docs_put(DocsPutRequest { + tenant_id: "t".to_string(), + project_id: "p".to_string(), + agent_id: "owner".to_string(), + scope: "project_shared".to_string(), + doc_type: None, + title: Some("Docs write_policy sample".to_string()), + write_policy: Some(write_policy), + source_ref: serde_json::json!({ + "schema": "doc_source_ref/v1", + "doc_type": "knowledge", + "ts": "2026-02-25T12:00:00Z", + }), + content: content.to_string(), + }) + .await + .expect("Failed to put doc with write policy."); + let (handle, shutdown) = docs_extension_v1::spawn_doc_worker(&service).await; + + assert!( + docs_extension_v1::wait_for_doc_outbox_done( + &service.db.pool, + put.doc_id, + Duration::from_secs(15) + ) + .await, + "Expected doc outbox to reach DONE." + ); + + let chunk_id = docs_extension_v1::fetch_first_doc_chunk_id(&service, put.doc_id) + .await + .expect("Expected chunk id from transformed doc."); + let excerpt = service + .docs_excerpts_get(DocsExcerptsGetRequest { + tenant_id: "t".to_string(), + project_id: "p".to_string(), + agent_id: "reader".to_string(), + read_profile: "private_plus_project".to_string(), + doc_id: put.doc_id, + level: "L1".to_string(), + chunk_id: Some(chunk_id), + quote: None, + position: None, + explain: None, + }) + .await + .expect("Failed to hydrate excerpt by chunk_id."); + + assert!(excerpt.verification.verified); + assert!(!excerpt.excerpt.is_empty()); + assert!(!excerpt.excerpt.contains(secret)); + assert!(!excerpt.locator.span_id.is_nil()); + + let captured_chunk_span = put + .source_capture + .source_spans + .iter() + .find(|span| span.chunk_id == Some(chunk_id)) + .expect("Expected captured source span for hydrated chunk."); + + assert_eq!(excerpt.locator.span_id, captured_chunk_span.span_id); + assert_eq!(excerpt.verification.content_hash, put.content_hash); + assert!(put.write_policy_audit.is_some()); + assert_eq!(put.source_capture.policy_spans.len(), 1); + assert_eq!(put.source_capture.policy_spans[0].status, "excluded"); + assert_eq!( + put.source_capture.policy_spans[0].reason_code.as_deref(), + Some("WRITE_POLICY_EXCLUSION") + ); + + let _ = shutdown.send(()); + + handle.abort(); + + let _ = handle.await; + + test_db.cleanup().await.expect("Failed to cleanup test database."); +} diff --git a/packages/elf-service/tests/acceptance/docs_extension_v1/search_filters.rs b/packages/elf-service/tests/acceptance/docs_extension_v1/search_filters.rs index 6df2e049..07d08e70 100644 --- a/packages/elf-service/tests/acceptance/docs_extension_v1/search_filters.rs +++ b/packages/elf-service/tests/acceptance/docs_extension_v1/search_filters.rs @@ -1,479 +1,4 @@ -use std::{collections::HashSet, sync::Arc, time::Duration}; - -use serde_json::Value; -use time::{OffsetDateTime, format_description::well_known::Rfc3339}; -use tokio::{sync::oneshot::Sender, task::JoinHandle}; -use uuid::Uuid; - -use crate::acceptance::docs_extension_v1::{self, DocsContext, TEST_CONTENT}; -use elf_config::EmbeddingProviderConfig; -use elf_service::{BoxFuture, DocsSearchL0Request, ElfService, EmbeddingProvider, Result}; - -struct NonZeroSearchEmbedding; -impl EmbeddingProvider for NonZeroSearchEmbedding { - fn embed<'a>( - &'a self, - cfg: &'a EmbeddingProviderConfig, - texts: &'a [String], - ) -> BoxFuture<'a, Result>>> { - let vector = vec![0.1_f32; cfg.dimensions as usize]; - - Box::pin(async move { Ok(vec![vector; texts.len()]) }) - } -} - -struct DocsFilterFixtureIds { - search_domain_doc_id: Uuid, - search_other_domain_doc_id: Uuid, - repo_doc_id: Uuid, - repo_other_doc_id: Uuid, -} - -fn configure_recency_bias_settings(service: &mut ElfService) { - service.providers.embedding = Arc::new(NonZeroSearchEmbedding); - service.cfg.ranking.tie_breaker_weight = 1_000.0; - service.cfg.ranking.recency_tau_days = 36_500.0; -} - -#[tokio::test] -#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL (or ELF_QDRANT_GRPC_URL) to run."] -async fn docs_search_l0_respects_doc_ts_filter() { - let Some(ctx) = docs_extension_v1::setup_docs_context().await else { return }; - let ( - test_db, - service, - shared_knowledge_doc, - older_shared_knowledge_doc, - private_chat_doc, - handle, - shutdown, - ) = docs_extension_v1::create_docs_search_filter_fixture(ctx).await; - let doc_ts_windowed_results = service - .docs_search_l0(DocsSearchL0Request { - tenant_id: "t".to_string(), - project_id: "p".to_string(), - caller_agent_id: "reader".to_string(), - scope: Some("project_shared".to_string()), - status: None, - doc_type: None, - sparse_mode: None, - domain: None, - repo: None, - agent_id: None, - thread_id: None, - updated_after: None, - updated_before: None, - ts_gte: Some("2026-01-01T00:00:00Z".to_string()), - ts_lte: Some("2026-12-31T23:59:59Z".to_string()), - read_profile: "all_scopes".to_string(), - query: "peregrine".to_string(), - top_k: Some(20), - candidate_k: Some(50), - explain: None, - }) - .await - .expect("Failed to search docs by doc_ts range."); - let doc_ts_windowed_ids = - doc_ts_windowed_results.items.into_iter().map(|item| item.doc_id).collect::>(); - - assert!(doc_ts_windowed_ids.contains(&shared_knowledge_doc)); - assert!(!doc_ts_windowed_ids.contains(&older_shared_knowledge_doc)); - assert!(!doc_ts_windowed_ids.contains(&private_chat_doc)); - - docs_extension_v1::cleanup_docs_filter_fixture(test_db, handle, shutdown).await; -} - -#[tokio::test] -#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL (or ELF_QDRANT_GRPC_URL) to run this test."] -async fn docs_search_l0_sparse_mode_records_expected_vector_search_channels() { - let Some(ctx) = docs_extension_v1::setup_docs_context().await else { return }; - let DocsContext { test_db, service } = ctx; - let doc = docs_extension_v1::put_test_doc(&service).await; - let (handle, shutdown) = docs_extension_v1::spawn_doc_worker(&service).await; - - assert!( - docs_extension_v1::wait_for_doc_outbox_done( - &service.db.pool, - doc.doc_id, - Duration::from_secs(15), - ) - .await, - "Expected doc outbox to reach DONE." - ); - - let cases = [ - ("off", vec!["dense"]), - ("on", vec!["dense", "sparse"]), - ("auto", vec!["dense", "sparse"]), - ]; - - for (sparse_mode, expected_channels) in cases { - let response = service - .docs_search_l0(DocsSearchL0Request { - tenant_id: "t".to_string(), - project_id: "p".to_string(), - caller_agent_id: "reader".to_string(), - scope: None, - status: None, - doc_type: None, - sparse_mode: Some(sparse_mode.to_string()), - domain: None, - repo: None, - agent_id: None, - thread_id: None, - updated_after: None, - updated_before: None, - ts_gte: None, - ts_lte: None, - read_profile: "private_plus_project".to_string(), - query: "https://elf.example/docs?query=peregrine".to_string(), - top_k: Some(20), - candidate_k: Some(50), - explain: Some(true), - }) - .await - .expect("Failed to search docs with sparse_mode set."); - let trajectory = response.trajectory.as_ref().expect("Expected explain trajectory."); - let vector_search_stats = - docs_extension_v1::trajectory_stage_stats(trajectory, "vector_search") - .expect("Expected vector_search stage in trajectory."); - let vector_search_channels = vector_search_stats - .get("channels") - .and_then(Value::as_array) - .expect("Expected vector_search stats channels."); - let observed_channels = vector_search_channels - .iter() - .map(|channel| channel.as_str().expect("Expected channel string.").to_string()) - .collect::>(); - - assert_eq!(observed_channels, expected_channels); - } - - let _ = shutdown.send(()); - - handle.abort(); - - let _ = handle.await; - - drop(service); - - test_db.cleanup().await.expect("Failed to cleanup test database."); -} - -#[tokio::test] -#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL (or ELF_QDRANT_GRPC_URL) to run this test."] -async fn docs_search_l0_filters_include_and_exclude_by_doc_type_and_domain_or_repo() { - let Some(ctx) = docs_extension_v1::setup_docs_context().await else { return }; - let docs = seed_docs_filter_fixtures(&ctx).await; - let DocsContext { test_db, service } = ctx; - let (handle, shutdown) = docs_extension_v1::spawn_doc_worker(&service).await; - - for doc_id in [ - docs.search_domain_doc_id, - docs.search_other_domain_doc_id, - docs.repo_doc_id, - docs.repo_other_doc_id, - ] - .iter() - { - assert!( - docs_extension_v1::wait_for_doc_outbox_done( - &service.db.pool, - *doc_id, - Duration::from_secs(15), - ) - .await, - "Expected docs outbox to reach DONE." - ); - } - - let search_domain_results = service - .docs_search_l0(DocsSearchL0Request { - tenant_id: "t".to_string(), - project_id: "p".to_string(), - caller_agent_id: "reader".to_string(), - scope: Some("project_shared".to_string()), - status: None, - doc_type: Some("search".to_string()), - sparse_mode: None, - domain: Some("docs.example.com".to_string()), - repo: None, - agent_id: None, - thread_id: None, - updated_after: None, - updated_before: None, - ts_gte: None, - ts_lte: None, - read_profile: "all_scopes".to_string(), - query: "peregrine".to_string(), - top_k: Some(20), - candidate_k: Some(50), - explain: None, - }) - .await - .expect("Failed to search docs by domain."); - let search_domain_result_ids = - search_domain_results.items.into_iter().map(|item| item.doc_id).collect::>(); - - assert!(search_domain_result_ids.contains(&docs.search_domain_doc_id)); - assert!(!search_domain_result_ids.contains(&docs.search_other_domain_doc_id)); - assert!(!search_domain_result_ids.contains(&docs.repo_doc_id)); - assert!(!search_domain_result_ids.contains(&docs.repo_other_doc_id)); - - let repo_results = service - .docs_search_l0(DocsSearchL0Request { - tenant_id: "t".to_string(), - project_id: "p".to_string(), - caller_agent_id: "reader".to_string(), - scope: Some("project_shared".to_string()), - status: None, - doc_type: Some("dev".to_string()), - sparse_mode: None, - domain: None, - repo: Some("elf-org/docs".to_string()), - agent_id: None, - thread_id: None, - updated_after: None, - updated_before: None, - ts_gte: None, - ts_lte: None, - read_profile: "all_scopes".to_string(), - query: "peregrine".to_string(), - top_k: Some(20), - candidate_k: Some(50), - explain: None, - }) - .await - .expect("Failed to search docs by repo."); - let repo_result_ids = - repo_results.items.into_iter().map(|item| item.doc_id).collect::>(); - - assert!(repo_result_ids.contains(&docs.repo_doc_id)); - assert!(!repo_result_ids.contains(&docs.repo_other_doc_id)); - assert!(!repo_result_ids.contains(&docs.search_domain_doc_id)); - assert!(!repo_result_ids.contains(&docs.search_other_domain_doc_id)); - - let _ = shutdown.send(()); - - handle.abort(); - - let _ = handle.await; - - drop(service); - - test_db.cleanup().await.expect("Failed to cleanup test database."); -} - -async fn seed_docs_filter_fixtures(ctx: &DocsContext) -> DocsFilterFixtureIds { - let search_domain_doc = docs_extension_v1::put_test_doc_with( - &ctx.service, - "owner", - "project_shared", - Some("search"), - "Docs domain include sample", - serde_json::json!({ - "schema": "doc_source_ref/v1", - "doc_type": "search", - "ts": "2026-02-25T12:00:00Z", - "query": "How to fetch docs", - "domain": "docs.example.com", - "url": "https://docs.example.com/guide", - }), - TEST_CONTENT, - ) - .await; - let search_other_domain_doc = docs_extension_v1::put_test_doc_with( - &ctx.service, - "owner", - "project_shared", - Some("search"), - "Docs domain exclude sample", - serde_json::json!({ - "schema": "doc_source_ref/v1", - "doc_type": "search", - "ts": "2026-02-25T12:00:00Z", - "query": "How to build", - "domain": "api.example.org", - "url": "https://api.example.org/reference", - }), - TEST_CONTENT, - ) - .await; - let repo_doc = docs_extension_v1::put_test_doc_with( - &ctx.service, - "owner", - "project_shared", - Some("dev"), - "Docs repo include sample", - serde_json::json!({ - "schema": "doc_source_ref/v1", - "doc_type": "dev", - "ts": "2026-02-25T12:00:00Z", - "repo": "elf-org/docs", - "commit_sha": "9f0a3f4c4eb58bfcf4a5f4f9d0c7be0e13c2f8d19", - }), - TEST_CONTENT, - ) - .await; - let repo_other_doc = docs_extension_v1::put_test_doc_with( - &ctx.service, - "owner", - "project_shared", - Some("dev"), - "Docs repo exclude sample", - serde_json::json!({ - "schema": "doc_source_ref/v1", - "doc_type": "dev", - "ts": "2026-02-25T12:00:00Z", - "repo": "other-org/docs", - "commit_sha": "4e3d9ec4d2a59a2f6c7d7f3d4c6e8a5b1f7b9d3f", - }), - TEST_CONTENT, - ) - .await; - - DocsFilterFixtureIds { - search_domain_doc_id: search_domain_doc.doc_id, - search_other_domain_doc_id: search_other_domain_doc.doc_id, - repo_doc_id: repo_doc.doc_id, - repo_other_doc_id: repo_other_doc.doc_id, - } -} - -#[tokio::test] -#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL (or ELF_QDRANT_GRPC_URL) to run this test."] -async fn docs_search_l0_recency_bias_orders_newer_doc_first_and_records_projection_signals() { - let Some(ctx) = docs_extension_v1::setup_docs_context().await else { return }; - let DocsContext { test_db, mut service } = ctx; - - configure_recency_bias_settings(&mut service); - - let (handle, shutdown) = seed_recency_bias_docs_for_search(&service).await; - - assert_docs_search_l0_recency_projection(&service).await; - - let _ = shutdown.send(()); - - handle.abort(); - - let _ = handle.await; - - drop(service); - - test_db.cleanup().await.expect("Failed to cleanup test database."); -} - -async fn seed_recency_bias_docs_for_search(service: &ElfService) -> (JoinHandle<()>, Sender<()>) { - let newer_doc = docs_extension_v1::put_test_doc_with( - service, - "owner", - "project_shared", - Some("knowledge"), - "Recency newer doc", - serde_json::json!({ - "schema": "doc_source_ref/v1", - "doc_type": "knowledge", - "ts": "2026-02-27T12:00:00Z", - }), - TEST_CONTENT, - ) - .await; - let older_doc = docs_extension_v1::put_test_doc_with( - service, - "owner", - "project_shared", - Some("knowledge"), - "Recency older doc", - serde_json::json!({ - "schema": "doc_source_ref/v1", - "doc_type": "knowledge", - "ts": "2026-02-20T12:00:00Z", - }), - TEST_CONTENT, - ) - .await; - let (handle, shutdown) = docs_extension_v1::spawn_doc_worker(service).await; - - assert!( - docs_extension_v1::wait_for_doc_outbox_done( - &service.db.pool, - newer_doc.doc_id, - Duration::from_secs(15), - ) - .await, - "Expected newer doc outbox to reach DONE." - ); - assert!( - docs_extension_v1::wait_for_doc_outbox_done( - &service.db.pool, - older_doc.doc_id, - Duration::from_secs(15), - ) - .await, - "Expected older doc outbox to reach DONE." - ); - - let older_ts = OffsetDateTime::parse("2020-01-01T00:00:00Z", &Rfc3339) - .expect("Failed to parse older doc timestamp."); - - sqlx::query("UPDATE doc_documents SET updated_at = $1 WHERE doc_id = $2") - .bind(older_ts) - .bind(older_doc.doc_id) - .execute(&service.db.pool) - .await - .expect("Failed to set deterministic updated_at for older doc."); - - (handle, shutdown) -} - -async fn assert_docs_search_l0_recency_projection(service: &ElfService) { - let results = service - .docs_search_l0(DocsSearchL0Request { - tenant_id: "t".to_string(), - project_id: "p".to_string(), - caller_agent_id: "reader".to_string(), - scope: None, - status: None, - doc_type: None, - sparse_mode: None, - domain: None, - repo: None, - agent_id: None, - thread_id: None, - updated_after: None, - updated_before: None, - ts_gte: None, - ts_lte: None, - read_profile: "private_plus_project".to_string(), - query: "peregrine".to_string(), - top_k: Some(2), - candidate_k: Some(20), - explain: Some(true), - }) - .await - .expect("Failed to search docs for recency ordering."); - let ordered_ids = results.items.iter().map(|item| item.doc_id).collect::>(); - - assert!(ordered_ids.len() >= 2); - - let newest_id = results - .items - .iter() - .max_by_key(|item| item.updated_at.unix_timestamp()) - .expect("Expected returned item.") - .doc_id; - - assert_eq!(results.items[0].doc_id, newest_id); - assert!(results.items[0].updated_at > results.items[1].updated_at); - - let trajectory = results.trajectory.as_ref().expect("Expected explain trajectory."); - let result_projection = - docs_extension_v1::trajectory_stage_stats(trajectory, "result_projection") - .expect("Expected result_projection stage in trajectory."); - - assert!(result_projection.get("pre_authorization_candidates").is_some()); - assert!(result_projection.get("returned_items").is_some()); - assert!(result_projection.get("recency_tau_days").is_some()); - assert!(result_projection.get("tie_breaker_weight").is_some()); - assert_eq!(result_projection.get("recency_boost_applied"), Some(&Value::Bool(true))); -} +mod domain_repo; +mod recency; +mod sparse; +mod timestamp; diff --git a/packages/elf-service/tests/acceptance/docs_extension_v1/search_filters/domain_repo.rs b/packages/elf-service/tests/acceptance/docs_extension_v1/search_filters/domain_repo.rs new file mode 100644 index 00000000..4180f191 --- /dev/null +++ b/packages/elf-service/tests/acceptance/docs_extension_v1/search_filters/domain_repo.rs @@ -0,0 +1,193 @@ +use std::{collections::HashSet, time::Duration}; + +use uuid::Uuid; + +use crate::acceptance::docs_extension_v1::{self, DocsContext, TEST_CONTENT}; +use elf_service::DocsSearchL0Request; + +struct DocsFilterFixtureIds { + search_domain_doc_id: Uuid, + search_other_domain_doc_id: Uuid, + repo_doc_id: Uuid, + repo_other_doc_id: Uuid, +} + +#[tokio::test] +#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL (or ELF_QDRANT_GRPC_URL) to run this test."] +async fn docs_search_l0_filters_include_and_exclude_by_doc_type_and_domain_or_repo() { + let Some(ctx) = docs_extension_v1::setup_docs_context().await else { return }; + let docs = seed_docs_filter_fixtures(&ctx).await; + let DocsContext { test_db, service } = ctx; + let (handle, shutdown) = docs_extension_v1::spawn_doc_worker(&service).await; + + for doc_id in [ + docs.search_domain_doc_id, + docs.search_other_domain_doc_id, + docs.repo_doc_id, + docs.repo_other_doc_id, + ] + .iter() + { + assert!( + docs_extension_v1::wait_for_doc_outbox_done( + &service.db.pool, + *doc_id, + Duration::from_secs(15), + ) + .await, + "Expected docs outbox to reach DONE." + ); + } + + let search_domain_results = service + .docs_search_l0(DocsSearchL0Request { + tenant_id: "t".to_string(), + project_id: "p".to_string(), + caller_agent_id: "reader".to_string(), + scope: Some("project_shared".to_string()), + status: None, + doc_type: Some("search".to_string()), + sparse_mode: None, + domain: Some("docs.example.com".to_string()), + repo: None, + agent_id: None, + thread_id: None, + updated_after: None, + updated_before: None, + ts_gte: None, + ts_lte: None, + read_profile: "all_scopes".to_string(), + query: "peregrine".to_string(), + top_k: Some(20), + candidate_k: Some(50), + explain: None, + }) + .await + .expect("Failed to search docs by domain."); + let search_domain_result_ids = + search_domain_results.items.into_iter().map(|item| item.doc_id).collect::>(); + + assert!(search_domain_result_ids.contains(&docs.search_domain_doc_id)); + assert!(!search_domain_result_ids.contains(&docs.search_other_domain_doc_id)); + assert!(!search_domain_result_ids.contains(&docs.repo_doc_id)); + assert!(!search_domain_result_ids.contains(&docs.repo_other_doc_id)); + + let repo_results = service + .docs_search_l0(DocsSearchL0Request { + tenant_id: "t".to_string(), + project_id: "p".to_string(), + caller_agent_id: "reader".to_string(), + scope: Some("project_shared".to_string()), + status: None, + doc_type: Some("dev".to_string()), + sparse_mode: None, + domain: None, + repo: Some("elf-org/docs".to_string()), + agent_id: None, + thread_id: None, + updated_after: None, + updated_before: None, + ts_gte: None, + ts_lte: None, + read_profile: "all_scopes".to_string(), + query: "peregrine".to_string(), + top_k: Some(20), + candidate_k: Some(50), + explain: None, + }) + .await + .expect("Failed to search docs by repo."); + let repo_result_ids = + repo_results.items.into_iter().map(|item| item.doc_id).collect::>(); + + assert!(repo_result_ids.contains(&docs.repo_doc_id)); + assert!(!repo_result_ids.contains(&docs.repo_other_doc_id)); + assert!(!repo_result_ids.contains(&docs.search_domain_doc_id)); + assert!(!repo_result_ids.contains(&docs.search_other_domain_doc_id)); + + let _ = shutdown.send(()); + + handle.abort(); + + let _ = handle.await; + + drop(service); + + test_db.cleanup().await.expect("Failed to cleanup test database."); +} + +async fn seed_docs_filter_fixtures(ctx: &DocsContext) -> DocsFilterFixtureIds { + let search_domain_doc = docs_extension_v1::put_test_doc_with( + &ctx.service, + "owner", + "project_shared", + Some("search"), + "Docs domain include sample", + serde_json::json!({ + "schema": "doc_source_ref/v1", + "doc_type": "search", + "ts": "2026-02-25T12:00:00Z", + "query": "How to fetch docs", + "domain": "docs.example.com", + "url": "https://docs.example.com/guide", + }), + TEST_CONTENT, + ) + .await; + let search_other_domain_doc = docs_extension_v1::put_test_doc_with( + &ctx.service, + "owner", + "project_shared", + Some("search"), + "Docs domain exclude sample", + serde_json::json!({ + "schema": "doc_source_ref/v1", + "doc_type": "search", + "ts": "2026-02-25T12:00:00Z", + "query": "How to build", + "domain": "api.example.org", + "url": "https://api.example.org/reference", + }), + TEST_CONTENT, + ) + .await; + let repo_doc = docs_extension_v1::put_test_doc_with( + &ctx.service, + "owner", + "project_shared", + Some("dev"), + "Docs repo include sample", + serde_json::json!({ + "schema": "doc_source_ref/v1", + "doc_type": "dev", + "ts": "2026-02-25T12:00:00Z", + "repo": "elf-org/docs", + "commit_sha": "9f0a3f4c4eb58bfcf4a5f4f9d0c7be0e13c2f8d19", + }), + TEST_CONTENT, + ) + .await; + let repo_other_doc = docs_extension_v1::put_test_doc_with( + &ctx.service, + "owner", + "project_shared", + Some("dev"), + "Docs repo exclude sample", + serde_json::json!({ + "schema": "doc_source_ref/v1", + "doc_type": "dev", + "ts": "2026-02-25T12:00:00Z", + "repo": "other-org/docs", + "commit_sha": "4e3d9ec4d2a59a2f6c7d7f3d4c6e8a5b1f7b9d3f", + }), + TEST_CONTENT, + ) + .await; + + DocsFilterFixtureIds { + search_domain_doc_id: search_domain_doc.doc_id, + search_other_domain_doc_id: search_other_domain_doc.doc_id, + repo_doc_id: repo_doc.doc_id, + repo_other_doc_id: repo_other_doc.doc_id, + } +} diff --git a/packages/elf-service/tests/acceptance/docs_extension_v1/search_filters/recency.rs b/packages/elf-service/tests/acceptance/docs_extension_v1/search_filters/recency.rs new file mode 100644 index 00000000..951dd64e --- /dev/null +++ b/packages/elf-service/tests/acceptance/docs_extension_v1/search_filters/recency.rs @@ -0,0 +1,166 @@ +use std::{sync::Arc, time::Duration}; + +use serde_json::Value; +use time::{OffsetDateTime, format_description::well_known::Rfc3339}; +use tokio::{sync::oneshot::Sender, task::JoinHandle}; + +use crate::acceptance::docs_extension_v1::{self, DocsContext, TEST_CONTENT}; +use elf_config::EmbeddingProviderConfig; +use elf_service::{BoxFuture, DocsSearchL0Request, ElfService, EmbeddingProvider, Result}; + +struct NonZeroSearchEmbedding; +impl EmbeddingProvider for NonZeroSearchEmbedding { + fn embed<'a>( + &'a self, + cfg: &'a EmbeddingProviderConfig, + texts: &'a [String], + ) -> BoxFuture<'a, Result>>> { + let vector = vec![0.1_f32; cfg.dimensions as usize]; + + Box::pin(async move { Ok(vec![vector; texts.len()]) }) + } +} + +fn configure_recency_bias_settings(service: &mut ElfService) { + service.providers.embedding = Arc::new(NonZeroSearchEmbedding); + service.cfg.ranking.tie_breaker_weight = 1_000.0; + service.cfg.ranking.recency_tau_days = 36_500.0; +} + +#[tokio::test] +#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL (or ELF_QDRANT_GRPC_URL) to run this test."] +async fn docs_search_l0_recency_bias_orders_newer_doc_first_and_records_projection_signals() { + let Some(ctx) = docs_extension_v1::setup_docs_context().await else { return }; + let DocsContext { test_db, mut service } = ctx; + + configure_recency_bias_settings(&mut service); + + let (handle, shutdown) = seed_recency_bias_docs_for_search(&service).await; + + assert_docs_search_l0_recency_projection(&service).await; + + let _ = shutdown.send(()); + + handle.abort(); + + let _ = handle.await; + + drop(service); + + test_db.cleanup().await.expect("Failed to cleanup test database."); +} + +async fn seed_recency_bias_docs_for_search(service: &ElfService) -> (JoinHandle<()>, Sender<()>) { + let newer_doc = docs_extension_v1::put_test_doc_with( + service, + "owner", + "project_shared", + Some("knowledge"), + "Recency newer doc", + serde_json::json!({ + "schema": "doc_source_ref/v1", + "doc_type": "knowledge", + "ts": "2026-02-27T12:00:00Z", + }), + TEST_CONTENT, + ) + .await; + let older_doc = docs_extension_v1::put_test_doc_with( + service, + "owner", + "project_shared", + Some("knowledge"), + "Recency older doc", + serde_json::json!({ + "schema": "doc_source_ref/v1", + "doc_type": "knowledge", + "ts": "2026-02-20T12:00:00Z", + }), + TEST_CONTENT, + ) + .await; + let (handle, shutdown) = docs_extension_v1::spawn_doc_worker(service).await; + + assert!( + docs_extension_v1::wait_for_doc_outbox_done( + &service.db.pool, + newer_doc.doc_id, + Duration::from_secs(15), + ) + .await, + "Expected newer doc outbox to reach DONE." + ); + assert!( + docs_extension_v1::wait_for_doc_outbox_done( + &service.db.pool, + older_doc.doc_id, + Duration::from_secs(15), + ) + .await, + "Expected older doc outbox to reach DONE." + ); + + let older_ts = OffsetDateTime::parse("2020-01-01T00:00:00Z", &Rfc3339) + .expect("Failed to parse older doc timestamp."); + + sqlx::query("UPDATE doc_documents SET updated_at = $1 WHERE doc_id = $2") + .bind(older_ts) + .bind(older_doc.doc_id) + .execute(&service.db.pool) + .await + .expect("Failed to set deterministic updated_at for older doc."); + + (handle, shutdown) +} + +async fn assert_docs_search_l0_recency_projection(service: &ElfService) { + let results = service + .docs_search_l0(DocsSearchL0Request { + tenant_id: "t".to_string(), + project_id: "p".to_string(), + caller_agent_id: "reader".to_string(), + scope: None, + status: None, + doc_type: None, + sparse_mode: None, + domain: None, + repo: None, + agent_id: None, + thread_id: None, + updated_after: None, + updated_before: None, + ts_gte: None, + ts_lte: None, + read_profile: "private_plus_project".to_string(), + query: "peregrine".to_string(), + top_k: Some(2), + candidate_k: Some(20), + explain: Some(true), + }) + .await + .expect("Failed to search docs for recency ordering."); + let ordered_ids = results.items.iter().map(|item| item.doc_id).collect::>(); + + assert!(ordered_ids.len() >= 2); + + let newest_id = results + .items + .iter() + .max_by_key(|item| item.updated_at.unix_timestamp()) + .expect("Expected returned item.") + .doc_id; + + assert_eq!(results.items[0].doc_id, newest_id); + assert!(results.items[0].updated_at > results.items[1].updated_at); + + let trajectory = results.trajectory.as_ref().expect("Expected explain trajectory."); + let result_projection = + docs_extension_v1::trajectory_stage_stats(trajectory, "result_projection") + .expect("Expected result_projection stage in trajectory."); + + assert!(result_projection.get("pre_authorization_candidates").is_some()); + assert!(result_projection.get("returned_items").is_some()); + assert!(result_projection.get("recency_tau_days").is_some()); + assert!(result_projection.get("tie_breaker_weight").is_some()); + assert_eq!(result_projection.get("recency_boost_applied"), Some(&Value::Bool(true))); +} diff --git a/packages/elf-service/tests/acceptance/docs_extension_v1/search_filters/sparse.rs b/packages/elf-service/tests/acceptance/docs_extension_v1/search_filters/sparse.rs new file mode 100644 index 00000000..4c2586a2 --- /dev/null +++ b/packages/elf-service/tests/acceptance/docs_extension_v1/search_filters/sparse.rs @@ -0,0 +1,83 @@ +use std::time::Duration; + +use serde_json::Value; + +use crate::acceptance::docs_extension_v1::{self, DocsContext}; +use elf_service::DocsSearchL0Request; + +#[tokio::test] +#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL (or ELF_QDRANT_GRPC_URL) to run this test."] +async fn docs_search_l0_sparse_mode_records_expected_vector_search_channels() { + let Some(ctx) = docs_extension_v1::setup_docs_context().await else { return }; + let DocsContext { test_db, service } = ctx; + let doc = docs_extension_v1::put_test_doc(&service).await; + let (handle, shutdown) = docs_extension_v1::spawn_doc_worker(&service).await; + + assert!( + docs_extension_v1::wait_for_doc_outbox_done( + &service.db.pool, + doc.doc_id, + Duration::from_secs(15), + ) + .await, + "Expected doc outbox to reach DONE." + ); + + let cases = [ + ("off", vec!["dense"]), + ("on", vec!["dense", "sparse"]), + ("auto", vec!["dense", "sparse"]), + ]; + + for (sparse_mode, expected_channels) in cases { + let response = service + .docs_search_l0(DocsSearchL0Request { + tenant_id: "t".to_string(), + project_id: "p".to_string(), + caller_agent_id: "reader".to_string(), + scope: None, + status: None, + doc_type: None, + sparse_mode: Some(sparse_mode.to_string()), + domain: None, + repo: None, + agent_id: None, + thread_id: None, + updated_after: None, + updated_before: None, + ts_gte: None, + ts_lte: None, + read_profile: "private_plus_project".to_string(), + query: "https://elf.example/docs?query=peregrine".to_string(), + top_k: Some(20), + candidate_k: Some(50), + explain: Some(true), + }) + .await + .expect("Failed to search docs with sparse_mode set."); + let trajectory = response.trajectory.as_ref().expect("Expected explain trajectory."); + let vector_search_stats = + docs_extension_v1::trajectory_stage_stats(trajectory, "vector_search") + .expect("Expected vector_search stage in trajectory."); + let vector_search_channels = vector_search_stats + .get("channels") + .and_then(Value::as_array) + .expect("Expected vector_search stats channels."); + let observed_channels = vector_search_channels + .iter() + .map(|channel| channel.as_str().expect("Expected channel string.").to_string()) + .collect::>(); + + assert_eq!(observed_channels, expected_channels); + } + + let _ = shutdown.send(()); + + handle.abort(); + + let _ = handle.await; + + drop(service); + + test_db.cleanup().await.expect("Failed to cleanup test database."); +} diff --git a/packages/elf-service/tests/acceptance/docs_extension_v1/search_filters/timestamp.rs b/packages/elf-service/tests/acceptance/docs_extension_v1/search_filters/timestamp.rs new file mode 100644 index 00000000..33ce8433 --- /dev/null +++ b/packages/elf-service/tests/acceptance/docs_extension_v1/search_filters/timestamp.rs @@ -0,0 +1,52 @@ +use std::collections::HashSet; + +use crate::acceptance::docs_extension_v1; +use elf_service::DocsSearchL0Request; + +#[tokio::test] +#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL (or ELF_QDRANT_GRPC_URL) to run."] +async fn docs_search_l0_respects_doc_ts_filter() { + let Some(ctx) = docs_extension_v1::setup_docs_context().await else { return }; + let ( + test_db, + service, + shared_knowledge_doc, + older_shared_knowledge_doc, + private_chat_doc, + handle, + shutdown, + ) = docs_extension_v1::create_docs_search_filter_fixture(ctx).await; + let doc_ts_windowed_results = service + .docs_search_l0(DocsSearchL0Request { + tenant_id: "t".to_string(), + project_id: "p".to_string(), + caller_agent_id: "reader".to_string(), + scope: Some("project_shared".to_string()), + status: None, + doc_type: None, + sparse_mode: None, + domain: None, + repo: None, + agent_id: None, + thread_id: None, + updated_after: None, + updated_before: None, + ts_gte: Some("2026-01-01T00:00:00Z".to_string()), + ts_lte: Some("2026-12-31T23:59:59Z".to_string()), + read_profile: "all_scopes".to_string(), + query: "peregrine".to_string(), + top_k: Some(20), + candidate_k: Some(50), + explain: None, + }) + .await + .expect("Failed to search docs by doc_ts range."); + let doc_ts_windowed_ids = + doc_ts_windowed_results.items.into_iter().map(|item| item.doc_id).collect::>(); + + assert!(doc_ts_windowed_ids.contains(&shared_knowledge_doc)); + assert!(!doc_ts_windowed_ids.contains(&older_shared_knowledge_doc)); + assert!(!doc_ts_windowed_ids.contains(&private_chat_doc)); + + docs_extension_v1::cleanup_docs_filter_fixture(test_db, handle, shutdown).await; +}