diff --git a/packages/elf-service/tests/acceptance/consolidation/tests_helpers.rs b/packages/elf-service/tests/acceptance/consolidation/tests_helpers.rs index 83ef23e5..2bb7d383 100644 --- a/packages/elf-service/tests/acceptance/consolidation/tests_helpers.rs +++ b/packages/elf-service/tests/acceptance/consolidation/tests_helpers.rs @@ -1,26 +1,27 @@ -use std::sync::{Arc, atomic::AtomicUsize}; - -use time::OffsetDateTime; -use uuid::Uuid; - -use crate::acceptance::{self, SpyExtractor, StubEmbedding, StubRerank}; -use elf_chunking::ChunkingConfig; -use elf_domain::consolidation::{ - ConsolidationApplyIntent, ConsolidationInputRef, ConsolidationLineage, ConsolidationMarker, - ConsolidationMarkerSeverity, ConsolidationMarkers, ConsolidationProposalDiff, - ConsolidationReviewAction, ConsolidationSourceKind, ConsolidationSourceSnapshot, - ConsolidationUnsupportedClaimFlag, -}; -use elf_service::{ - AddNoteInput, AddNoteRequest, ConsolidationProposalInput, ConsolidationProposalReviewRequest, - ConsolidationProposalsListRequest, ConsolidationProposalsListResponse, - ConsolidationRunCreateRequest, ConsolidationRunCreateResponse, ElfService, ListRequest, - MemoryCorrectionAction, MemoryCorrectionRequest, MemoryCorrectionResponse, - MemoryHistoryGetRequest, Providers, +mod corrections; +mod notes; +mod proposals; +mod refs; +mod service_setup; +mod worker_processing; + +pub(super) use self::{ + corrections::{ + active_list_contains, apply_memory_correction, memory_history_event_types, + promote_reviewed_memory, + }, + notes::insert_source_note, + proposals::{ + create_run_with_proposals, materialized_proposals, proposal_id_by_kind, proposal_input, + proposal_input_with_payload, + }, + refs::source_ref, + service_setup::setup_service, + worker_processing::process_consolidation_worker, }; -use elf_storage::{db::Db, qdrant::QdrantStore}; + +use elf_service::ElfService; use elf_testkit::TestDatabase; -use elf_worker::worker::{self, WorkerState}; pub(super) const TENANT_ID: &str = "tenant_consolidation"; pub(super) const PROJECT_ID: &str = "project_consolidation"; @@ -31,311 +32,3 @@ pub(super) struct ConsolidationFixture { pub(super) service: ElfService, _test_db: TestDatabase, } - -pub(super) fn source_ref(note_id: Uuid) -> ConsolidationInputRef { - ConsolidationInputRef { - kind: ConsolidationSourceKind::Note, - id: note_id, - snapshot: ConsolidationSourceSnapshot { - status: Some("active".to_string()), - updated_at: Some(OffsetDateTime::UNIX_EPOCH), - content_hash: Some("blake3:acceptance-source".to_string()), - embedding_version: Some("test:test:4096".to_string()), - trace_version: None, - source_ref: serde_json::json!({ "schema": "acceptance/v1" }), - metadata: serde_json::json!({ "fixture": "consolidation" }), - }, - } -} - -pub(super) fn lineage(source: &ConsolidationInputRef) -> ConsolidationLineage { - ConsolidationLineage { - source_refs: vec![source.clone()], - parent_run_id: None, - parent_proposal_ids: Vec::new(), - } -} - -pub(super) fn proposal_input( - source: &ConsolidationInputRef, - kind: &str, -) -> ConsolidationProposalInput { - proposal_input_with_payload( - source, - kind, - serde_json::json!({ - "type": "fact", - "text": "Fact: Consolidation proposals are derived and reviewable." - }), - ) -} - -pub(super) fn proposal_input_with_payload( - source: &ConsolidationInputRef, - kind: &str, - proposed_payload: serde_json::Value, -) -> ConsolidationProposalInput { - ConsolidationProposalInput { - proposal_kind: kind.to_string(), - apply_intent: ConsolidationApplyIntent::CreateDerivedNote, - source_refs: vec![source.clone()], - source_snapshot: serde_json::json!({ "source_count": 1 }), - lineage: lineage(source), - confidence: 0.82, - unsupported_claim_flags: vec![ConsolidationUnsupportedClaimFlag { - claim_id: Some("unsupported-claim".to_string()), - message: "The source does not prove that source notes may be rewritten.".to_string(), - source: Some(source.clone()), - }], - markers: ConsolidationMarkers { - contradictions: vec![ConsolidationMarker { - severity: ConsolidationMarkerSeverity::High, - message: "Stale rewrite evidence conflicts with the proposal-only rule." - .to_string(), - source: Some(source.clone()), - }], - staleness: Vec::new(), - }, - diff: ConsolidationProposalDiff { - summary: "Create a reviewed derived note without changing source evidence.".to_string(), - before: serde_json::json!({}), - after: serde_json::json!({ - "target": "derived_note", - "text": "Fact: Consolidation proposals are derived and reviewable." - }), - }, - target_ref: serde_json::json!({}), - proposed_payload, - } -} - -pub(super) fn proposal_id_by_kind( - response: &ConsolidationProposalsListResponse, - proposal_kind: &str, -) -> Uuid { - response - .proposals - .iter() - .find(|proposal| proposal.proposal_kind == proposal_kind) - .map(|proposal| proposal.proposal_id) - .expect("proposal kind should be present") -} - -pub(super) async fn setup_service(test_name: &str) -> Option { - let Some(test_db) = acceptance::test_db().await else { - eprintln!("Skipping {test_name}; set ELF_PG_DSN to run this test."); - - return None; - }; - let Some(qdrant_url) = acceptance::test_qdrant_url() else { - eprintln!("Skipping {test_name}; set ELF_QDRANT_URL to run this test."); - - return None; - }; - let collection = test_db.collection_name("elf_acceptance"); - let docs_collection = test_db.collection_name("elf_acceptance_docs"); - let cfg = acceptance::test_config( - test_db.dsn().to_string(), - qdrant_url, - 4_096, - collection, - docs_collection, - ); - let extractor = SpyExtractor { - calls: Arc::new(AtomicUsize::new(0)), - payload: serde_json::json!({ "notes": [] }), - }; - let providers = Providers::new( - Arc::new(StubEmbedding { vector_dim: 4_096 }), - Arc::new(StubRerank), - Arc::new(extractor), - ); - let service = - acceptance::build_service(cfg, providers).await.expect("Failed to build service."); - - acceptance::reset_db(&service.db.pool).await.expect("Failed to reset test database."); - - Some(ConsolidationFixture { service, _test_db: test_db }) -} - -pub(super) async fn insert_source_note(service: &ElfService, key: &str, text: &str) -> Uuid { - let response = service - .add_note(AddNoteRequest { - tenant_id: TENANT_ID.to_string(), - project_id: PROJECT_ID.to_string(), - agent_id: AGENT_ID.to_string(), - scope: "agent_private".to_string(), - notes: vec![AddNoteInput { - r#type: "fact".to_string(), - key: Some(key.to_string()), - text: text.to_string(), - structured: None, - importance: 0.7, - confidence: 0.9, - ttl_days: None, - source_ref: serde_json::json!({ "schema": "acceptance/v1", "key": key }), - write_policy: None, - }], - }) - .await - .expect("add_note should persist source note"); - - response.results[0].note_id.expect("source note id should be present") -} - -pub(super) async fn create_run_with_proposals( - service: &ElfService, - source: &ConsolidationInputRef, - proposals: Vec, -) -> ConsolidationRunCreateResponse { - service - .consolidation_run_create(ConsolidationRunCreateRequest { - tenant_id: TENANT_ID.to_string(), - project_id: PROJECT_ID.to_string(), - agent_id: AGENT_ID.to_string(), - job_kind: "manual".to_string(), - input_refs: vec![source.clone()], - source_snapshot: serde_json::json!({ "source_count": 1 }), - lineage: lineage(source), - proposals, - }) - .await - .expect("consolidation run should be created") -} - -pub(super) async fn process_consolidation_worker(service: &ElfService) { - let tokenizer = elf_chunking::load_tokenizer(&service.cfg.chunking.tokenizer_repo) - .expect("worker tokenizer should load"); - let mut embedding = acceptance::dummy_embedding_provider(); - - embedding.dimensions = service.cfg.storage.qdrant.vector_dim; - - let worker_state = WorkerState { - db: Db::connect(&service.cfg.storage.postgres).await.expect("Failed to connect worker DB."), - qdrant: QdrantStore::new(&service.cfg.storage.qdrant) - .expect("Failed to build Qdrant store."), - docs_qdrant: QdrantStore::new_with_collection( - &service.cfg.storage.qdrant, - &service.cfg.storage.qdrant.docs_collection, - ) - .expect("Failed to build docs Qdrant store."), - embedding, - chunking: ChunkingConfig { - max_tokens: service.cfg.chunking.max_tokens, - overlap_tokens: service.cfg.chunking.overlap_tokens, - }, - tokenizer, - }; - - worker::process_once(&worker_state).await.expect("consolidation worker should process once"); -} - -pub(super) async fn materialized_proposals( - service: &ElfService, - run_id: Uuid, -) -> ConsolidationProposalsListResponse { - service - .consolidation_proposals_list(ConsolidationProposalsListRequest { - tenant_id: TENANT_ID.to_string(), - project_id: PROJECT_ID.to_string(), - run_id: Some(run_id), - review_state: None, - limit: None, - }) - .await - .expect("consolidation proposals should be listed") -} - -pub(super) async fn promote_reviewed_memory(service: &ElfService) -> Uuid { - let note_id = insert_source_note( - service, - "memory_authority_source", - "Fact: Reviewed memories require source-linked approval.", - ) - .await; - let source = source_ref(note_id); - let created = - create_run_with_proposals(service, &source, vec![proposal_input(&source, "derived_note")]) - .await; - - process_consolidation_worker(service).await; - - let materialized = materialized_proposals(service, created.run.run_id).await; - let proposal_id = materialized.proposals[0].proposal_id; - let reviewed = service - .consolidation_proposal_review(ConsolidationProposalReviewRequest { - tenant_id: TENANT_ID.to_string(), - project_id: PROJECT_ID.to_string(), - reviewer_agent_id: AGENT_ID.to_string(), - proposal_id, - review_action: ConsolidationReviewAction::Apply, - review_comment: Some("Approve memory authority candidate.".to_string()), - }) - .await - .expect("review action should promote memory"); - - reviewed - .target_ref - .get("id") - .and_then(serde_json::Value::as_str) - .and_then(|value| Uuid::parse_str(value).ok()) - .expect("applied proposal should point at promoted note") -} - -pub(super) async fn active_list_contains(service: &ElfService, note_id: Uuid) -> bool { - service - .list(ListRequest { - tenant_id: TENANT_ID.to_string(), - project_id: PROJECT_ID.to_string(), - agent_id: Some(AGENT_ID.to_string()), - scope: Some("agent_private".to_string()), - status: None, - r#type: None, - }) - .await - .expect("active notes should list") - .items - .iter() - .any(|item| item.note_id == note_id) -} - -pub(super) async fn apply_memory_correction( - service: &ElfService, - note_id: Uuid, - action: MemoryCorrectionAction, - reason: &str, - source: &str, - restore_version_id: Option, -) -> MemoryCorrectionResponse { - service - .memory_correction_apply(MemoryCorrectionRequest { - tenant_id: TENANT_ID.to_string(), - project_id: PROJECT_ID.to_string(), - actor_agent_id: AGENT_ID.to_string(), - note_id, - action, - reason: reason.to_string(), - source_ref: serde_json::json!({ - "schema": "acceptance/review", - "source": source - }), - restore_version_id, - }) - .await - .expect("memory correction should persist") -} - -pub(super) async fn memory_history_event_types(service: &ElfService, note_id: Uuid) -> Vec { - service - .memory_history_get(MemoryHistoryGetRequest { - tenant_id: TENANT_ID.to_string(), - project_id: PROJECT_ID.to_string(), - note_id, - }) - .await - .expect("promoted memory history should be readable") - .events - .into_iter() - .map(|event| event.event_type) - .collect() -} diff --git a/packages/elf-service/tests/acceptance/consolidation/tests_helpers/corrections.rs b/packages/elf-service/tests/acceptance/consolidation/tests_helpers/corrections.rs new file mode 100644 index 00000000..02e6f2e2 --- /dev/null +++ b/packages/elf-service/tests/acceptance/consolidation/tests_helpers/corrections.rs @@ -0,0 +1,116 @@ +use serde_json::Value; +use uuid::Uuid; + +use crate::acceptance::consolidation::tests_helpers::{ + AGENT_ID, PROJECT_ID, TENANT_ID, notes, proposals, refs, worker_processing, +}; +use elf_domain::consolidation::ConsolidationReviewAction; +use elf_service::{ + ConsolidationProposalReviewRequest, ElfService, ListRequest, MemoryCorrectionAction, + MemoryCorrectionRequest, MemoryCorrectionResponse, MemoryHistoryGetRequest, +}; + +pub(in crate::acceptance::consolidation) async fn promote_reviewed_memory( + service: &ElfService, +) -> Uuid { + let note_id = notes::insert_source_note( + service, + "memory_authority_source", + "Fact: Reviewed memories require source-linked approval.", + ) + .await; + let source = refs::source_ref(note_id); + let created = proposals::create_run_with_proposals( + service, + &source, + vec![proposals::proposal_input(&source, "derived_note")], + ) + .await; + + worker_processing::process_consolidation_worker(service).await; + + let materialized = proposals::materialized_proposals(service, created.run.run_id).await; + let proposal_id = materialized.proposals[0].proposal_id; + let reviewed = service + .consolidation_proposal_review(ConsolidationProposalReviewRequest { + tenant_id: TENANT_ID.to_string(), + project_id: PROJECT_ID.to_string(), + reviewer_agent_id: AGENT_ID.to_string(), + proposal_id, + review_action: ConsolidationReviewAction::Apply, + review_comment: Some("Approve memory authority candidate.".to_string()), + }) + .await + .expect("review action should promote memory"); + + reviewed + .target_ref + .get("id") + .and_then(Value::as_str) + .and_then(|value| Uuid::parse_str(value).ok()) + .expect("applied proposal should point at promoted note") +} + +pub(in crate::acceptance::consolidation) async fn active_list_contains( + service: &ElfService, + note_id: Uuid, +) -> bool { + service + .list(ListRequest { + tenant_id: TENANT_ID.to_string(), + project_id: PROJECT_ID.to_string(), + agent_id: Some(AGENT_ID.to_string()), + scope: Some("agent_private".to_string()), + status: None, + r#type: None, + }) + .await + .expect("active notes should list") + .items + .iter() + .any(|item| item.note_id == note_id) +} + +pub(in crate::acceptance::consolidation) async fn apply_memory_correction( + service: &ElfService, + note_id: Uuid, + action: MemoryCorrectionAction, + reason: &str, + source: &str, + restore_version_id: Option, +) -> MemoryCorrectionResponse { + service + .memory_correction_apply(MemoryCorrectionRequest { + tenant_id: TENANT_ID.to_string(), + project_id: PROJECT_ID.to_string(), + actor_agent_id: AGENT_ID.to_string(), + note_id, + action, + reason: reason.to_string(), + source_ref: serde_json::json!({ + "schema": "acceptance/review", + "source": source + }), + restore_version_id, + }) + .await + .expect("memory correction should persist") +} + +pub(in crate::acceptance::consolidation) async fn memory_history_event_types( + service: &ElfService, + note_id: Uuid, +) -> Vec { + service + .memory_history_get(MemoryHistoryGetRequest { + tenant_id: TENANT_ID.to_string(), + project_id: PROJECT_ID.to_string(), + note_id, + }) + .await + .expect("promoted memory history should be readable") + .events + .into_iter() + .map(|event| event.event_type) + .collect() +} diff --git a/packages/elf-service/tests/acceptance/consolidation/tests_helpers/notes.rs b/packages/elf-service/tests/acceptance/consolidation/tests_helpers/notes.rs new file mode 100644 index 00000000..145c93ee --- /dev/null +++ b/packages/elf-service/tests/acceptance/consolidation/tests_helpers/notes.rs @@ -0,0 +1,33 @@ +use uuid::Uuid; + +use crate::acceptance::consolidation::tests_helpers::{AGENT_ID, PROJECT_ID, TENANT_ID}; +use elf_service::{AddNoteInput, AddNoteRequest, ElfService}; + +pub(in crate::acceptance::consolidation) async fn insert_source_note( + service: &ElfService, + key: &str, + text: &str, +) -> Uuid { + let response = service + .add_note(AddNoteRequest { + tenant_id: TENANT_ID.to_string(), + project_id: PROJECT_ID.to_string(), + agent_id: AGENT_ID.to_string(), + scope: "agent_private".to_string(), + notes: vec![AddNoteInput { + r#type: "fact".to_string(), + key: Some(key.to_string()), + text: text.to_string(), + structured: None, + importance: 0.7, + confidence: 0.9, + ttl_days: None, + source_ref: serde_json::json!({ "schema": "acceptance/v1", "key": key }), + write_policy: None, + }], + }) + .await + .expect("add_note should persist source note"); + + response.results[0].note_id.expect("source note id should be present") +} diff --git a/packages/elf-service/tests/acceptance/consolidation/tests_helpers/proposals.rs b/packages/elf-service/tests/acceptance/consolidation/tests_helpers/proposals.rs new file mode 100644 index 00000000..7a6eded3 --- /dev/null +++ b/packages/elf-service/tests/acceptance/consolidation/tests_helpers/proposals.rs @@ -0,0 +1,115 @@ +use serde_json::Value; +use uuid::Uuid; + +use crate::acceptance::consolidation::tests_helpers::{AGENT_ID, PROJECT_ID, TENANT_ID, refs}; +use elf_domain::consolidation::{ + ConsolidationApplyIntent, ConsolidationInputRef, ConsolidationMarker, + ConsolidationMarkerSeverity, ConsolidationMarkers, ConsolidationProposalDiff, + ConsolidationUnsupportedClaimFlag, +}; +use elf_service::{ + ConsolidationProposalInput, ConsolidationProposalsListRequest, + ConsolidationProposalsListResponse, ConsolidationRunCreateRequest, + ConsolidationRunCreateResponse, ElfService, +}; + +pub(in crate::acceptance::consolidation) fn proposal_input( + source: &ConsolidationInputRef, + kind: &str, +) -> ConsolidationProposalInput { + proposal_input_with_payload( + source, + kind, + serde_json::json!({ + "type": "fact", + "text": "Fact: Consolidation proposals are derived and reviewable." + }), + ) +} + +pub(in crate::acceptance::consolidation) fn proposal_input_with_payload( + source: &ConsolidationInputRef, + kind: &str, + proposed_payload: Value, +) -> ConsolidationProposalInput { + ConsolidationProposalInput { + proposal_kind: kind.to_string(), + apply_intent: ConsolidationApplyIntent::CreateDerivedNote, + source_refs: vec![source.clone()], + source_snapshot: serde_json::json!({ "source_count": 1 }), + lineage: refs::lineage(source), + confidence: 0.82, + unsupported_claim_flags: vec![ConsolidationUnsupportedClaimFlag { + claim_id: Some("unsupported-claim".to_string()), + message: "The source does not prove that source notes may be rewritten.".to_string(), + source: Some(source.clone()), + }], + markers: ConsolidationMarkers { + contradictions: vec![ConsolidationMarker { + severity: ConsolidationMarkerSeverity::High, + message: "Stale rewrite evidence conflicts with the proposal-only rule." + .to_string(), + source: Some(source.clone()), + }], + staleness: Vec::new(), + }, + diff: ConsolidationProposalDiff { + summary: "Create a reviewed derived note without changing source evidence.".to_string(), + before: serde_json::json!({}), + after: serde_json::json!({ + "target": "derived_note", + "text": "Fact: Consolidation proposals are derived and reviewable." + }), + }, + target_ref: serde_json::json!({}), + proposed_payload, + } +} + +pub(in crate::acceptance::consolidation) fn proposal_id_by_kind( + response: &ConsolidationProposalsListResponse, + proposal_kind: &str, +) -> Uuid { + response + .proposals + .iter() + .find(|proposal| proposal.proposal_kind == proposal_kind) + .map(|proposal| proposal.proposal_id) + .expect("proposal kind should be present") +} + +pub(in crate::acceptance::consolidation) async fn create_run_with_proposals( + service: &ElfService, + source: &ConsolidationInputRef, + proposals: Vec, +) -> ConsolidationRunCreateResponse { + service + .consolidation_run_create(ConsolidationRunCreateRequest { + tenant_id: TENANT_ID.to_string(), + project_id: PROJECT_ID.to_string(), + agent_id: AGENT_ID.to_string(), + job_kind: "manual".to_string(), + input_refs: vec![source.clone()], + source_snapshot: serde_json::json!({ "source_count": 1 }), + lineage: refs::lineage(source), + proposals, + }) + .await + .expect("consolidation run should be created") +} + +pub(in crate::acceptance::consolidation) async fn materialized_proposals( + service: &ElfService, + run_id: Uuid, +) -> ConsolidationProposalsListResponse { + service + .consolidation_proposals_list(ConsolidationProposalsListRequest { + tenant_id: TENANT_ID.to_string(), + project_id: PROJECT_ID.to_string(), + run_id: Some(run_id), + review_state: None, + limit: None, + }) + .await + .expect("consolidation proposals should be listed") +} diff --git a/packages/elf-service/tests/acceptance/consolidation/tests_helpers/refs.rs b/packages/elf-service/tests/acceptance/consolidation/tests_helpers/refs.rs new file mode 100644 index 00000000..c15a715f --- /dev/null +++ b/packages/elf-service/tests/acceptance/consolidation/tests_helpers/refs.rs @@ -0,0 +1,33 @@ +use time::OffsetDateTime; +use uuid::Uuid; + +use elf_domain::consolidation::{ + ConsolidationInputRef, ConsolidationLineage, ConsolidationSourceKind, + ConsolidationSourceSnapshot, +}; + +pub(in crate::acceptance::consolidation) fn source_ref(note_id: Uuid) -> ConsolidationInputRef { + ConsolidationInputRef { + kind: ConsolidationSourceKind::Note, + id: note_id, + snapshot: ConsolidationSourceSnapshot { + status: Some("active".to_string()), + updated_at: Some(OffsetDateTime::UNIX_EPOCH), + content_hash: Some("blake3:acceptance-source".to_string()), + embedding_version: Some("test:test:4096".to_string()), + trace_version: None, + source_ref: serde_json::json!({ "schema": "acceptance/v1" }), + metadata: serde_json::json!({ "fixture": "consolidation" }), + }, + } +} + +pub(in crate::acceptance::consolidation) fn lineage( + source: &ConsolidationInputRef, +) -> ConsolidationLineage { + ConsolidationLineage { + source_refs: vec![source.clone()], + parent_run_id: None, + parent_proposal_ids: Vec::new(), + } +} diff --git a/packages/elf-service/tests/acceptance/consolidation/tests_helpers/service_setup.rs b/packages/elf-service/tests/acceptance/consolidation/tests_helpers/service_setup.rs new file mode 100644 index 00000000..d715362d --- /dev/null +++ b/packages/elf-service/tests/acceptance/consolidation/tests_helpers/service_setup.rs @@ -0,0 +1,46 @@ +use std::sync::{Arc, atomic::AtomicUsize}; + +use crate::acceptance::{ + self, SpyExtractor, StubEmbedding, StubRerank, + consolidation::tests_helpers::ConsolidationFixture, +}; +use elf_service::Providers; + +pub(in crate::acceptance::consolidation) async fn setup_service( + test_name: &str, +) -> Option { + let Some(test_db) = acceptance::test_db().await else { + eprintln!("Skipping {test_name}; set ELF_PG_DSN to run this test."); + + return None; + }; + let Some(qdrant_url) = acceptance::test_qdrant_url() else { + eprintln!("Skipping {test_name}; set ELF_QDRANT_URL to run this test."); + + return None; + }; + let collection = test_db.collection_name("elf_acceptance"); + let docs_collection = test_db.collection_name("elf_acceptance_docs"); + let cfg = acceptance::test_config( + test_db.dsn().to_string(), + qdrant_url, + 4_096, + collection, + docs_collection, + ); + let extractor = SpyExtractor { + calls: Arc::new(AtomicUsize::new(0)), + payload: serde_json::json!({ "notes": [] }), + }; + let providers = Providers::new( + Arc::new(StubEmbedding { vector_dim: 4_096 }), + Arc::new(StubRerank), + Arc::new(extractor), + ); + let service = + acceptance::build_service(cfg, providers).await.expect("Failed to build service."); + + acceptance::reset_db(&service.db.pool).await.expect("Failed to reset test database."); + + Some(ConsolidationFixture { service, _test_db: test_db }) +} diff --git a/packages/elf-service/tests/acceptance/consolidation/tests_helpers/worker_processing.rs b/packages/elf-service/tests/acceptance/consolidation/tests_helpers/worker_processing.rs new file mode 100644 index 00000000..01c7193e --- /dev/null +++ b/packages/elf-service/tests/acceptance/consolidation/tests_helpers/worker_processing.rs @@ -0,0 +1,34 @@ +use crate::acceptance; +use elf_chunking::ChunkingConfig; +use elf_service::ElfService; +use elf_storage::{db::Db, qdrant::QdrantStore}; +use elf_worker::worker::{self, WorkerState}; + +pub(in crate::acceptance::consolidation) async fn process_consolidation_worker( + service: &ElfService, +) { + let tokenizer = elf_chunking::load_tokenizer(&service.cfg.chunking.tokenizer_repo) + .expect("worker tokenizer should load"); + let mut embedding = acceptance::dummy_embedding_provider(); + + embedding.dimensions = service.cfg.storage.qdrant.vector_dim; + + let worker_state = WorkerState { + db: Db::connect(&service.cfg.storage.postgres).await.expect("Failed to connect worker DB."), + qdrant: QdrantStore::new(&service.cfg.storage.qdrant) + .expect("Failed to build Qdrant store."), + docs_qdrant: QdrantStore::new_with_collection( + &service.cfg.storage.qdrant, + &service.cfg.storage.qdrant.docs_collection, + ) + .expect("Failed to build docs Qdrant store."), + embedding, + chunking: ChunkingConfig { + max_tokens: service.cfg.chunking.max_tokens, + overlap_tokens: service.cfg.chunking.overlap_tokens, + }, + tokenizer, + }; + + worker::process_once(&worker_state).await.expect("consolidation worker should process once"); +} diff --git a/packages/elf-service/tests/acceptance/english_only_boundary.rs b/packages/elf-service/tests/acceptance/english_only_boundary.rs index 09fba084..b5e0eb69 100644 --- a/packages/elf-service/tests/acceptance/english_only_boundary.rs +++ b/packages/elf-service/tests/acceptance/english_only_boundary.rs @@ -1,321 +1,4 @@ -use std::sync::{Arc, atomic::AtomicUsize}; - -use crate::acceptance::{self, SpyExtractor, StubEmbedding, StubRerank}; -use elf_service::{ - AddEventRequest, AddNoteInput, AddNoteRequest, ElfService, Error, EventMessage, Providers, - SearchRequest, -}; - -async fn build_test_service( - dsn: String, - qdrant_url: String, - collection: String, - docs_collection: String, -) -> Option { - let extractor = SpyExtractor { - calls: Arc::new(AtomicUsize::new(0)), - payload: serde_json::json!({ "notes": [] }), - }; - let providers = Providers::new( - Arc::new(StubEmbedding { vector_dim: 4_096 }), - Arc::new(StubRerank), - Arc::new(extractor), - ); - let cfg = acceptance::test_config(dsn, qdrant_url, 4_096, collection, docs_collection); - let service = - acceptance::build_service(cfg, providers).await.expect("Failed to build service."); - - acceptance::reset_db(&service.db.pool).await.expect("Failed to reset test database."); - - Some(service) -} - -#[tokio::test] -#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL to run."] -async fn rejects_non_english_in_add_note() { - let Some(test_db) = acceptance::test_db().await else { - eprintln!("Skipping english_only_boundary; set ELF_PG_DSN to run this test."); - - return; - }; - let Some(qdrant_url) = acceptance::test_qdrant_url() else { - eprintln!("Skipping english_only_boundary; set ELF_QDRANT_URL to run this test."); - - return; - }; - let collection = test_db.collection_name("elf_acceptance"); - let docs_collection = test_db.collection_name("elf_acceptance_docs"); - let Some(service) = - build_test_service(test_db.dsn().to_string(), qdrant_url, collection, docs_collection) - .await - else { - return; - }; - let request = AddNoteRequest { - tenant_id: "t".to_string(), - project_id: "p".to_string(), - agent_id: "a".to_string(), - scope: "agent_private".to_string(), - notes: vec![AddNoteInput { - r#type: "fact".to_string(), - key: None, - text: "你好".to_string(), - structured: None, - importance: 0.4, - confidence: 0.9, - ttl_days: None, - source_ref: serde_json::json!({}), - write_policy: None, - }], - }; - let result = service.add_note(request).await; - - match result { - Err(Error::NonEnglishInput { field }) => { - assert_eq!(field, "$.notes[0].text"); - }, - other => panic!("Expected NonEnglishInput, got {other:?}"), - } - - test_db.cleanup().await.expect("Failed to cleanup test database."); -} - -#[tokio::test] -#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL to run."] -async fn rejects_cyrillic_in_add_note() { - let Some(test_db) = acceptance::test_db().await else { - eprintln!("Skipping english_only_boundary; set ELF_PG_DSN to run this test."); - - return; - }; - let Some(qdrant_url) = acceptance::test_qdrant_url() else { - eprintln!("Skipping english_only_boundary; set ELF_QDRANT_URL to run this test."); - - return; - }; - let collection = test_db.collection_name("elf_acceptance"); - let docs_collection = test_db.collection_name("elf_acceptance_docs"); - let Some(service) = - build_test_service(test_db.dsn().to_string(), qdrant_url, collection, docs_collection) - .await - else { - return; - }; - let request = AddNoteRequest { - tenant_id: "t".to_string(), - project_id: "p".to_string(), - agent_id: "a".to_string(), - scope: "agent_private".to_string(), - notes: vec![AddNoteInput { - r#type: "fact".to_string(), - key: None, - text: "Привет мир".to_string(), - structured: None, - importance: 0.4, - confidence: 0.9, - ttl_days: None, - source_ref: serde_json::json!({}), - write_policy: None, - }], - }; - let result = service.add_note(request).await; - - match result { - Err(Error::NonEnglishInput { field }) => { - assert_eq!(field, "$.notes[0].text"); - }, - other => panic!("Expected NonEnglishInput, got {other:?}"), - } - - test_db.cleanup().await.expect("Failed to cleanup test database."); -} - -#[tokio::test] -#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL to run."] -async fn rejects_non_english_in_add_event() { - let Some(test_db) = acceptance::test_db().await else { - eprintln!("Skipping english_only_boundary; set ELF_PG_DSN to run this test."); - - return; - }; - let Some(qdrant_url) = acceptance::test_qdrant_url() else { - eprintln!("Skipping english_only_boundary; set ELF_QDRANT_URL to run this test."); - - return; - }; - let collection = test_db.collection_name("elf_acceptance"); - let docs_collection = test_db.collection_name("elf_acceptance_docs"); - let Some(service) = - build_test_service(test_db.dsn().to_string(), qdrant_url, collection, docs_collection) - .await - else { - return; - }; - let request = AddEventRequest { - tenant_id: "t".to_string(), - project_id: "p".to_string(), - agent_id: "a".to_string(), - scope: Some("agent_private".to_string()), - dry_run: Some(true), - ingestion_profile: None, - messages: vec![EventMessage { - role: "user".to_string(), - content: "こんにちは".to_string(), - ts: None, - msg_id: None, - write_policy: None, - }], - }; - let result = service.add_event(request).await; - - match result { - Err(Error::NonEnglishInput { field }) => { - assert_eq!(field, "$.messages[0].content"); - }, - other => panic!("Expected NonEnglishInput, got {other:?}"), - } - - test_db.cleanup().await.expect("Failed to cleanup test database."); -} - -#[tokio::test] -#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL to run."] -async fn rejects_cyrillic_in_add_event() { - let Some(test_db) = acceptance::test_db().await else { - eprintln!("Skipping english_only_boundary; set ELF_PG_DSN to run this test."); - - return; - }; - let Some(qdrant_url) = acceptance::test_qdrant_url() else { - eprintln!("Skipping english_only_boundary; set ELF_QDRANT_URL to run this test."); - - return; - }; - let collection = test_db.collection_name("elf_acceptance"); - let docs_collection = test_db.collection_name("elf_acceptance_docs"); - let Some(service) = - build_test_service(test_db.dsn().to_string(), qdrant_url, collection, docs_collection) - .await - else { - return; - }; - let request = AddEventRequest { - tenant_id: "t".to_string(), - project_id: "p".to_string(), - agent_id: "a".to_string(), - scope: Some("agent_private".to_string()), - dry_run: Some(true), - ingestion_profile: None, - messages: vec![EventMessage { - role: "user".to_string(), - content: "Это не английский текст.".to_string(), - ts: None, - msg_id: None, - write_policy: None, - }], - }; - let result = service.add_event(request).await; - - match result { - Err(Error::NonEnglishInput { field }) => { - assert_eq!(field, "$.messages[0].content"); - }, - other => panic!("Expected NonEnglishInput, got {other:?}"), - } - - test_db.cleanup().await.expect("Failed to cleanup test database."); -} - -#[tokio::test] -#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL to run."] -async fn rejects_non_english_in_search() { - let Some(test_db) = acceptance::test_db().await else { - eprintln!("Skipping english_only_boundary; set ELF_PG_DSN to run this test."); - - return; - }; - let Some(qdrant_url) = acceptance::test_qdrant_url() else { - eprintln!("Skipping english_only_boundary; set ELF_QDRANT_URL to run this test."); - - return; - }; - let collection = test_db.collection_name("elf_acceptance"); - let docs_collection = test_db.collection_name("elf_acceptance_docs"); - let Some(service) = - build_test_service(test_db.dsn().to_string(), qdrant_url, collection, docs_collection) - .await - else { - return; - }; - let request = SearchRequest { - tenant_id: "t".to_string(), - project_id: "p".to_string(), - agent_id: "a".to_string(), - token_id: None, - read_profile: "private_only".to_string(), - payload_level: Default::default(), - query: "안녕하세요".to_string(), - top_k: Some(5), - candidate_k: Some(10), - filter: None, - record_hits: Some(false), - ranking: None, - }; - let result = service.search(request).await; - - match result { - Err(Error::NonEnglishInput { field }) => { - assert_eq!(field, "$.query"); - }, - other => panic!("Expected NonEnglishInput, got {other:?}"), - } - - test_db.cleanup().await.expect("Failed to cleanup test database."); -} - -#[tokio::test] -#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL to run."] -async fn rejects_cyrillic_in_search() { - let Some(test_db) = acceptance::test_db().await else { - eprintln!("Skipping english_only_boundary; set ELF_PG_DSN to run this test."); - - return; - }; - let Some(qdrant_url) = acceptance::test_qdrant_url() else { - eprintln!("Skipping english_only_boundary; set ELF_QDRANT_URL to run this test."); - - return; - }; - let collection = test_db.collection_name("elf_acceptance"); - let docs_collection = test_db.collection_name("elf_acceptance_docs"); - let Some(service) = - build_test_service(test_db.dsn().to_string(), qdrant_url, collection, docs_collection) - .await - else { - return; - }; - let request = SearchRequest { - tenant_id: "t".to_string(), - project_id: "p".to_string(), - agent_id: "a".to_string(), - token_id: None, - read_profile: "private_only".to_string(), - payload_level: Default::default(), - query: "Привет".to_string(), - top_k: Some(5), - candidate_k: Some(10), - filter: None, - record_hits: Some(false), - ranking: None, - }; - let result = service.search(request).await; - - match result { - Err(Error::NonEnglishInput { field }) => { - assert_eq!(field, "$.query"); - }, - other => panic!("Expected NonEnglishInput, got {other:?}"), - } - - test_db.cleanup().await.expect("Failed to cleanup test database."); -} +mod add_event; +mod add_note; +mod search; +mod setup; diff --git a/packages/elf-service/tests/acceptance/english_only_boundary/add_event.rs b/packages/elf-service/tests/acceptance/english_only_boundary/add_event.rs new file mode 100644 index 00000000..c094fc90 --- /dev/null +++ b/packages/elf-service/tests/acceptance/english_only_boundary/add_event.rs @@ -0,0 +1,68 @@ +use crate::acceptance::english_only_boundary::setup; +use elf_service::{AddEventRequest, Error, EventMessage}; + +#[tokio::test] +#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL to run."] +async fn rejects_non_english_in_add_event() { + let Some(fixture) = setup::setup_service("english_only_boundary").await else { + return; + }; + let request = AddEventRequest { + tenant_id: "t".to_string(), + project_id: "p".to_string(), + agent_id: "a".to_string(), + scope: Some("agent_private".to_string()), + dry_run: Some(true), + ingestion_profile: None, + messages: vec![EventMessage { + role: "user".to_string(), + content: "こんにちは".to_string(), + ts: None, + msg_id: None, + write_policy: None, + }], + }; + let result = fixture.service.add_event(request).await; + + match result { + Err(Error::NonEnglishInput { field }) => { + assert_eq!(field, "$.messages[0].content"); + }, + other => panic!("Expected NonEnglishInput, got {other:?}"), + } + + fixture.test_db.cleanup().await.expect("Failed to cleanup test database."); +} + +#[tokio::test] +#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL to run."] +async fn rejects_cyrillic_in_add_event() { + let Some(fixture) = setup::setup_service("english_only_boundary").await else { + return; + }; + let request = AddEventRequest { + tenant_id: "t".to_string(), + project_id: "p".to_string(), + agent_id: "a".to_string(), + scope: Some("agent_private".to_string()), + dry_run: Some(true), + ingestion_profile: None, + messages: vec![EventMessage { + role: "user".to_string(), + content: "Это не английский текст.".to_string(), + ts: None, + msg_id: None, + write_policy: None, + }], + }; + let result = fixture.service.add_event(request).await; + + match result { + Err(Error::NonEnglishInput { field }) => { + assert_eq!(field, "$.messages[0].content"); + }, + other => panic!("Expected NonEnglishInput, got {other:?}"), + } + + fixture.test_db.cleanup().await.expect("Failed to cleanup test database."); +} diff --git a/packages/elf-service/tests/acceptance/english_only_boundary/add_note.rs b/packages/elf-service/tests/acceptance/english_only_boundary/add_note.rs new file mode 100644 index 00000000..d315a281 --- /dev/null +++ b/packages/elf-service/tests/acceptance/english_only_boundary/add_note.rs @@ -0,0 +1,72 @@ +use crate::acceptance::english_only_boundary::setup; +use elf_service::{AddNoteInput, AddNoteRequest, Error}; + +#[tokio::test] +#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL to run."] +async fn rejects_non_english_in_add_note() { + let Some(fixture) = setup::setup_service("english_only_boundary").await else { + return; + }; + let request = AddNoteRequest { + tenant_id: "t".to_string(), + project_id: "p".to_string(), + agent_id: "a".to_string(), + scope: "agent_private".to_string(), + notes: vec![AddNoteInput { + r#type: "fact".to_string(), + key: None, + text: "你好".to_string(), + structured: None, + importance: 0.4, + confidence: 0.9, + ttl_days: None, + source_ref: serde_json::json!({}), + write_policy: None, + }], + }; + let result = fixture.service.add_note(request).await; + + match result { + Err(Error::NonEnglishInput { field }) => { + assert_eq!(field, "$.notes[0].text"); + }, + other => panic!("Expected NonEnglishInput, got {other:?}"), + } + + fixture.test_db.cleanup().await.expect("Failed to cleanup test database."); +} + +#[tokio::test] +#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL to run."] +async fn rejects_cyrillic_in_add_note() { + let Some(fixture) = setup::setup_service("english_only_boundary").await else { + return; + }; + let request = AddNoteRequest { + tenant_id: "t".to_string(), + project_id: "p".to_string(), + agent_id: "a".to_string(), + scope: "agent_private".to_string(), + notes: vec![AddNoteInput { + r#type: "fact".to_string(), + key: None, + text: "Привет мир".to_string(), + structured: None, + importance: 0.4, + confidence: 0.9, + ttl_days: None, + source_ref: serde_json::json!({}), + write_policy: None, + }], + }; + let result = fixture.service.add_note(request).await; + + match result { + Err(Error::NonEnglishInput { field }) => { + assert_eq!(field, "$.notes[0].text"); + }, + other => panic!("Expected NonEnglishInput, got {other:?}"), + } + + fixture.test_db.cleanup().await.expect("Failed to cleanup test database."); +} diff --git a/packages/elf-service/tests/acceptance/english_only_boundary/search.rs b/packages/elf-service/tests/acceptance/english_only_boundary/search.rs new file mode 100644 index 00000000..8a86ce6a --- /dev/null +++ b/packages/elf-service/tests/acceptance/english_only_boundary/search.rs @@ -0,0 +1,66 @@ +use crate::acceptance::english_only_boundary::setup; +use elf_service::{Error, SearchRequest}; + +#[tokio::test] +#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL to run."] +async fn rejects_non_english_in_search() { + let Some(fixture) = setup::setup_service("english_only_boundary").await else { + return; + }; + let request = SearchRequest { + tenant_id: "t".to_string(), + project_id: "p".to_string(), + agent_id: "a".to_string(), + token_id: None, + read_profile: "private_only".to_string(), + payload_level: Default::default(), + query: "안녕하세요".to_string(), + top_k: Some(5), + candidate_k: Some(10), + filter: None, + record_hits: Some(false), + ranking: None, + }; + let result = fixture.service.search(request).await; + + match result { + Err(Error::NonEnglishInput { field }) => { + assert_eq!(field, "$.query"); + }, + other => panic!("Expected NonEnglishInput, got {other:?}"), + } + + fixture.test_db.cleanup().await.expect("Failed to cleanup test database."); +} + +#[tokio::test] +#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL to run."] +async fn rejects_cyrillic_in_search() { + let Some(fixture) = setup::setup_service("english_only_boundary").await else { + return; + }; + let request = SearchRequest { + tenant_id: "t".to_string(), + project_id: "p".to_string(), + agent_id: "a".to_string(), + token_id: None, + read_profile: "private_only".to_string(), + payload_level: Default::default(), + query: "Привет".to_string(), + top_k: Some(5), + candidate_k: Some(10), + filter: None, + record_hits: Some(false), + ranking: None, + }; + let result = fixture.service.search(request).await; + + match result { + Err(Error::NonEnglishInput { field }) => { + assert_eq!(field, "$.query"); + }, + other => panic!("Expected NonEnglishInput, got {other:?}"), + } + + fixture.test_db.cleanup().await.expect("Failed to cleanup test database."); +} diff --git a/packages/elf-service/tests/acceptance/english_only_boundary/setup.rs b/packages/elf-service/tests/acceptance/english_only_boundary/setup.rs new file mode 100644 index 00000000..c777a28a --- /dev/null +++ b/packages/elf-service/tests/acceptance/english_only_boundary/setup.rs @@ -0,0 +1,49 @@ +use std::sync::{Arc, atomic::AtomicUsize}; + +use crate::acceptance::{self, SpyExtractor, StubEmbedding, StubRerank}; +use elf_service::{ElfService, Providers}; +use elf_testkit::TestDatabase; + +pub(in crate::acceptance::english_only_boundary) struct BoundaryFixture { + pub(in crate::acceptance::english_only_boundary) service: ElfService, + pub(in crate::acceptance::english_only_boundary) test_db: TestDatabase, +} + +pub(in crate::acceptance::english_only_boundary) async fn setup_service( + test_name: &str, +) -> Option { + let Some(test_db) = acceptance::test_db().await else { + eprintln!("Skipping {test_name}; set ELF_PG_DSN to run this test."); + + return None; + }; + let Some(qdrant_url) = acceptance::test_qdrant_url() else { + eprintln!("Skipping {test_name}; set ELF_QDRANT_URL to run this test."); + + return None; + }; + let collection = test_db.collection_name("elf_acceptance"); + let docs_collection = test_db.collection_name("elf_acceptance_docs"); + let extractor = SpyExtractor { + calls: Arc::new(AtomicUsize::new(0)), + payload: serde_json::json!({ "notes": [] }), + }; + let providers = Providers::new( + Arc::new(StubEmbedding { vector_dim: 4_096 }), + Arc::new(StubRerank), + Arc::new(extractor), + ); + let cfg = acceptance::test_config( + test_db.dsn().to_string(), + qdrant_url, + 4_096, + collection, + docs_collection, + ); + let service = + acceptance::build_service(cfg, providers).await.expect("Failed to build service."); + + acceptance::reset_db(&service.db.pool).await.expect("Failed to reset test database."); + + Some(BoundaryFixture { service, test_db }) +} diff --git a/packages/elf-service/tests/acceptance/graph_ingestion/tests_helpers.rs b/packages/elf-service/tests/acceptance/graph_ingestion/tests_helpers.rs index 15129abf..09be8d3d 100644 --- a/packages/elf-service/tests/acceptance/graph_ingestion/tests_helpers.rs +++ b/packages/elf-service/tests/acceptance/graph_ingestion/tests_helpers.rs @@ -1,317 +1,25 @@ -use std::{ - collections::hash_map::DefaultHasher, - hash::{Hash, Hasher}, - sync::{Arc, atomic::AtomicUsize}, +mod embedding; +mod graph_queries; +mod notes; +mod policy; +mod service_setup; + +pub(super) use self::{ + graph_queries::{ + graph_fact_count, graph_fact_evidence_count, graph_fact_evidence_count_for_note, + graph_fact_id, + }, + notes::{add_fact_note, duplicate_fact_attaches_multiple_evidence_request}, + policy::assert_graph_policy_from_op, + service_setup::{ + build_hash_service, build_service_with_extractor_payload, build_stub_service, + build_test_db, reset_service_db, + }, }; -use serde_json::Value; -use sqlx::PgPool; -use uuid::Uuid; - -use crate::acceptance::{self, SpyExtractor, StubEmbedding, StubRerank}; -use elf_config::EmbeddingProviderConfig; -use elf_domain::memory_policy::MemoryPolicyDecision; -use elf_service::{ - AddNoteInput, AddNoteRequest, BoxFuture, ElfService, EmbeddingProvider, NoteOp, Providers, - Result, StructuredFields, -}; -use elf_testkit::TestDatabase; - pub(super) const TEST_TENANT: &str = "t"; pub(super) const TEST_PROJECT: &str = "p"; pub(super) const TEST_SCOPE: &str = "agent_private"; pub(super) const GRAPH_REL_SUBJECT: &str = "alice"; pub(super) const GRAPH_REL_PREDICATE: &str = "mentors"; pub(super) const GRAPH_REL_OBJECT: &str = "Bob"; - -pub(super) struct HashEmbedding { - vector_dim: u32, -} -impl EmbeddingProvider for HashEmbedding { - fn embed<'a>( - &'a self, - _: &'a EmbeddingProviderConfig, - texts: &'a [String], - ) -> BoxFuture<'a, Result>>> { - let vector_dim = self.vector_dim as usize; - let vectors = texts - .iter() - .map(|text| { - let mut values = Vec::with_capacity(vector_dim); - - for idx in 0..vector_dim { - let mut hasher = DefaultHasher::new(); - - text.hash(&mut hasher); - idx.hash(&mut hasher); - - let raw = hasher.finish(); - let normalized = ((raw % 2_000_000) as f32 / 1_000_000.0) - 1.0; - - values.push(normalized); - } - - values - }) - .collect(); - - Box::pin(async move { Ok(vectors) }) - } -} - -pub(super) fn fact_note( - key: &str, - text: &str, - predicate: &str, - object_value: &str, -) -> AddNoteInput { - let structured = serde_json::from_value::(serde_json::json!({ - "relations": [{ - "subject": { "canonical": "Alice" }, - "predicate": predicate, - "object": { "value": object_value } - }] - })) - .expect("Failed to build structured fields."); - - AddNoteInput { - r#type: "fact".to_string(), - key: Some(key.to_string()), - text: text.to_string(), - structured: Some(structured), - importance: 0.8, - confidence: 0.9, - ttl_days: None, - source_ref: serde_json::json!({}), - write_policy: None, - } -} - -pub(super) fn assert_graph_policy_from_op(op: NoteOp, policy_decision: MemoryPolicyDecision) { - match op { - NoteOp::Add => assert_eq!(policy_decision, MemoryPolicyDecision::Remember), - NoteOp::Update => assert_eq!(policy_decision, MemoryPolicyDecision::Update), - _ => {}, - } -} - -pub(super) fn duplicate_fact_attaches_multiple_evidence_request() -> AddNoteRequest { - AddNoteRequest { - tenant_id: "t".to_string(), - project_id: "p".to_string(), - agent_id: "a".to_string(), - scope: "agent_private".to_string(), - notes: vec![ - AddNoteInput { - r#type: "fact".to_string(), - key: Some("mentorship-a".to_string()), - text: "Alice mentors Bob in 2026.".to_string(), - structured: Some( - serde_json::from_value::( - serde_json::json!({ - "relations": [{ - "subject": { "canonical": "Alice" }, - "predicate": "mentors", - "object": { "value": "Bob" } - }] - }), - ) - .expect("Failed to build structured fields."), - ), - importance: 0.8, - confidence: 0.9, - ttl_days: None, - source_ref: serde_json::json!({}), - write_policy: None, - }, - AddNoteInput { - r#type: "fact".to_string(), - key: Some("mentorship-b".to_string()), - text: "Alice also mentors Bob often.".to_string(), - structured: Some( - serde_json::from_value::( - serde_json::json!({ - "relations": [{ - "subject": { "canonical": "Alice" }, - "predicate": "mentors", - "object": { "value": "Bob" } - }] - }), - ) - .expect("Failed to build structured fields."), - ), - importance: 0.7, - confidence: 0.8, - ttl_days: None, - source_ref: serde_json::json!({}), - write_policy: None, - }, - ], - } -} - -pub(super) async fn graph_fact_id(pool: &PgPool) -> Uuid { - sqlx::query_scalar( - "\ -SELECT gf.fact_id -FROM graph_facts gf -JOIN graph_entities ge ON ge.entity_id = gf.subject_entity_id -WHERE ge.canonical_norm = $1 - AND gf.predicate = $2 - AND gf.object_value = $3 - AND gf.tenant_id = $4 - AND gf.project_id = $5 - AND gf.scope = $6", - ) - .bind(GRAPH_REL_SUBJECT) - .bind(GRAPH_REL_PREDICATE) - .bind(GRAPH_REL_OBJECT) - .bind(TEST_TENANT) - .bind(TEST_PROJECT) - .bind(TEST_SCOPE) - .fetch_one(pool) - .await - .expect("Failed to load fact.") -} - -pub(super) async fn graph_fact_count(pool: &PgPool) -> i64 { - sqlx::query_scalar( - "\ -SELECT COUNT(*) -FROM graph_facts gf -JOIN graph_entities ge ON ge.entity_id = gf.subject_entity_id -WHERE ge.canonical_norm = $1 - AND gf.predicate = $2 - AND gf.object_value = $3 - AND gf.tenant_id = $4 - AND gf.project_id = $5 - AND gf.scope = $6", - ) - .bind(GRAPH_REL_SUBJECT) - .bind(GRAPH_REL_PREDICATE) - .bind(GRAPH_REL_OBJECT) - .bind(TEST_TENANT) - .bind(TEST_PROJECT) - .bind(TEST_SCOPE) - .fetch_one(pool) - .await - .expect("Failed to count fact rows.") -} - -pub(super) async fn graph_fact_evidence_count(pool: &PgPool, fact_id: Uuid) -> i64 { - sqlx::query_scalar("SELECT COUNT(*) FROM graph_fact_evidence WHERE fact_id = $1") - .bind(fact_id) - .fetch_one(pool) - .await - .expect("Failed to load fact evidence.") -} - -pub(super) async fn graph_fact_evidence_count_for_note( - pool: &PgPool, - fact_id: Uuid, - note_id: Uuid, -) -> i64 { - sqlx::query_scalar( - "SELECT COUNT(*) FROM graph_fact_evidence WHERE fact_id = $1 AND note_id = $2", - ) - .bind(fact_id) - .bind(note_id) - .fetch_one(pool) - .await - .expect("Failed to load note evidence.") -} - -pub(super) async fn add_fact_note( - service: &ElfService, - key: &str, - text: &str, - predicate: &str, - object_value: &str, -) -> Uuid { - let response = service - .add_note(AddNoteRequest { - tenant_id: TEST_TENANT.to_string(), - project_id: TEST_PROJECT.to_string(), - agent_id: "a".to_string(), - scope: TEST_SCOPE.to_string(), - notes: vec![fact_note(key, text, predicate, object_value)], - }) - .await - .expect("add_note failed."); - - assert_eq!(response.results.len(), 1); - assert_eq!(response.results[0].op, NoteOp::Add); - - assert_graph_policy_from_op(response.results[0].op, response.results[0].policy_decision); - - response.results[0].note_id.expect("Expected note_id.") -} - -pub(super) async fn build_test_db(test_name: &str) -> Option { - let Some(test_db) = acceptance::test_db().await else { - eprintln!("Skipping {test_name}; set ELF_PG_DSN to run."); - - return None; - }; - let Some(_) = acceptance::test_qdrant_url() else { - eprintln!("Skipping {test_name}; set ELF_QDRANT_URL to run."); - - return None; - }; - - Some(test_db) -} - -pub(super) async fn build_hash_service(test_db: &TestDatabase) -> ElfService { - let qdrant_url = acceptance::test_qdrant_url().expect("Expected Qdrant test URL."); - let providers = Providers::new( - Arc::new(HashEmbedding { vector_dim: 4_096 }), - Arc::new(StubRerank), - Arc::new(SpyExtractor { - calls: Arc::new(AtomicUsize::new(0)), - payload: serde_json::json!({ "notes": [] }), - }), - ); - let collection = test_db.collection_name("elf_acceptance"); - let docs_collection = test_db.collection_name("elf_acceptance_docs"); - let cfg = acceptance::test_config( - test_db.dsn().to_string(), - qdrant_url, - 4_096, - collection, - docs_collection, - ); - - acceptance::build_service(cfg, providers).await.expect("Failed to build service.") -} - -pub(super) async fn build_stub_service(test_db: &TestDatabase) -> ElfService { - build_service_with_extractor_payload(test_db, serde_json::json!({ "notes": [] })).await -} - -pub(super) async fn build_service_with_extractor_payload( - test_db: &TestDatabase, - extractor_payload: Value, -) -> ElfService { - let qdrant_url = acceptance::test_qdrant_url().expect("Expected Qdrant test URL."); - let providers = Providers::new( - Arc::new(StubEmbedding { vector_dim: 4_096 }), - Arc::new(StubRerank), - Arc::new(SpyExtractor { calls: Arc::new(AtomicUsize::new(0)), payload: extractor_payload }), - ); - let collection = test_db.collection_name("elf_acceptance"); - let docs_collection = test_db.collection_name("elf_acceptance_docs"); - let cfg = acceptance::test_config( - test_db.dsn().to_string(), - qdrant_url, - 4_096, - collection, - docs_collection, - ); - - acceptance::build_service(cfg, providers).await.expect("Failed to build service.") -} - -pub(super) async fn reset_service_db(service: &ElfService) { - acceptance::reset_db(&service.db.pool).await.expect("Failed to reset test database."); -} diff --git a/packages/elf-service/tests/acceptance/graph_ingestion/tests_helpers/embedding.rs b/packages/elf-service/tests/acceptance/graph_ingestion/tests_helpers/embedding.rs new file mode 100644 index 00000000..f735c13d --- /dev/null +++ b/packages/elf-service/tests/acceptance/graph_ingestion/tests_helpers/embedding.rs @@ -0,0 +1,42 @@ +use std::{ + collections::hash_map::DefaultHasher, + hash::{Hash, Hasher}, +}; + +use elf_config::EmbeddingProviderConfig; +use elf_service::{BoxFuture, EmbeddingProvider, Result}; + +pub(in crate::acceptance::graph_ingestion::tests_helpers) struct HashEmbedding { + pub(in crate::acceptance::graph_ingestion::tests_helpers) vector_dim: u32, +} +impl EmbeddingProvider for HashEmbedding { + fn embed<'a>( + &'a self, + _: &'a EmbeddingProviderConfig, + texts: &'a [String], + ) -> BoxFuture<'a, Result>>> { + let vector_dim = self.vector_dim as usize; + let vectors = texts + .iter() + .map(|text| { + let mut values = Vec::with_capacity(vector_dim); + + for idx in 0..vector_dim { + let mut hasher = DefaultHasher::new(); + + text.hash(&mut hasher); + idx.hash(&mut hasher); + + let raw = hasher.finish(); + let normalized = ((raw % 2_000_000) as f32 / 1_000_000.0) - 1.0; + + values.push(normalized); + } + + values + }) + .collect(); + + Box::pin(async move { Ok(vectors) }) + } +} diff --git a/packages/elf-service/tests/acceptance/graph_ingestion/tests_helpers/graph_queries.rs b/packages/elf-service/tests/acceptance/graph_ingestion/tests_helpers/graph_queries.rs new file mode 100644 index 00000000..3b08a716 --- /dev/null +++ b/packages/elf-service/tests/acceptance/graph_ingestion/tests_helpers/graph_queries.rs @@ -0,0 +1,80 @@ +use sqlx::PgPool; +use uuid::Uuid; + +use crate::acceptance::graph_ingestion::tests_helpers::{ + GRAPH_REL_OBJECT, GRAPH_REL_PREDICATE, GRAPH_REL_SUBJECT, TEST_PROJECT, TEST_SCOPE, TEST_TENANT, +}; + +pub(in crate::acceptance::graph_ingestion) async fn graph_fact_id(pool: &PgPool) -> Uuid { + sqlx::query_scalar( + "\ +SELECT gf.fact_id +FROM graph_facts gf +JOIN graph_entities ge ON ge.entity_id = gf.subject_entity_id +WHERE ge.canonical_norm = $1 + AND gf.predicate = $2 + AND gf.object_value = $3 + AND gf.tenant_id = $4 + AND gf.project_id = $5 + AND gf.scope = $6", + ) + .bind(GRAPH_REL_SUBJECT) + .bind(GRAPH_REL_PREDICATE) + .bind(GRAPH_REL_OBJECT) + .bind(TEST_TENANT) + .bind(TEST_PROJECT) + .bind(TEST_SCOPE) + .fetch_one(pool) + .await + .expect("Failed to load fact.") +} + +pub(in crate::acceptance::graph_ingestion) async fn graph_fact_count(pool: &PgPool) -> i64 { + sqlx::query_scalar( + "\ +SELECT COUNT(*) +FROM graph_facts gf +JOIN graph_entities ge ON ge.entity_id = gf.subject_entity_id +WHERE ge.canonical_norm = $1 + AND gf.predicate = $2 + AND gf.object_value = $3 + AND gf.tenant_id = $4 + AND gf.project_id = $5 + AND gf.scope = $6", + ) + .bind(GRAPH_REL_SUBJECT) + .bind(GRAPH_REL_PREDICATE) + .bind(GRAPH_REL_OBJECT) + .bind(TEST_TENANT) + .bind(TEST_PROJECT) + .bind(TEST_SCOPE) + .fetch_one(pool) + .await + .expect("Failed to count fact rows.") +} + +pub(in crate::acceptance::graph_ingestion) async fn graph_fact_evidence_count( + pool: &PgPool, + fact_id: Uuid, +) -> i64 { + sqlx::query_scalar("SELECT COUNT(*) FROM graph_fact_evidence WHERE fact_id = $1") + .bind(fact_id) + .fetch_one(pool) + .await + .expect("Failed to load fact evidence.") +} + +pub(in crate::acceptance::graph_ingestion) async fn graph_fact_evidence_count_for_note( + pool: &PgPool, + fact_id: Uuid, + note_id: Uuid, +) -> i64 { + sqlx::query_scalar( + "SELECT COUNT(*) FROM graph_fact_evidence WHERE fact_id = $1 AND note_id = $2", + ) + .bind(fact_id) + .bind(note_id) + .fetch_one(pool) + .await + .expect("Failed to load note evidence.") +} diff --git a/packages/elf-service/tests/acceptance/graph_ingestion/tests_helpers/notes.rs b/packages/elf-service/tests/acceptance/graph_ingestion/tests_helpers/notes.rs new file mode 100644 index 00000000..7dd18c20 --- /dev/null +++ b/packages/elf-service/tests/acceptance/graph_ingestion/tests_helpers/notes.rs @@ -0,0 +1,119 @@ +use uuid::Uuid; + +use crate::acceptance::graph_ingestion::tests_helpers::{ + TEST_PROJECT, TEST_SCOPE, TEST_TENANT, policy, +}; +use elf_service::{AddNoteInput, AddNoteRequest, ElfService, NoteOp, StructuredFields}; + +pub(in crate::acceptance::graph_ingestion) fn fact_note( + key: &str, + text: &str, + predicate: &str, + object_value: &str, +) -> AddNoteInput { + let structured = serde_json::from_value::(serde_json::json!({ + "relations": [{ + "subject": { "canonical": "Alice" }, + "predicate": predicate, + "object": { "value": object_value } + }] + })) + .expect("Failed to build structured fields."); + + AddNoteInput { + r#type: "fact".to_string(), + key: Some(key.to_string()), + text: text.to_string(), + structured: Some(structured), + importance: 0.8, + confidence: 0.9, + ttl_days: None, + source_ref: serde_json::json!({}), + write_policy: None, + } +} + +pub(in crate::acceptance::graph_ingestion) fn duplicate_fact_attaches_multiple_evidence_request() +-> AddNoteRequest { + AddNoteRequest { + tenant_id: "t".to_string(), + project_id: "p".to_string(), + agent_id: "a".to_string(), + scope: "agent_private".to_string(), + notes: vec![ + AddNoteInput { + r#type: "fact".to_string(), + key: Some("mentorship-a".to_string()), + text: "Alice mentors Bob in 2026.".to_string(), + structured: Some( + serde_json::from_value::( + serde_json::json!({ + "relations": [{ + "subject": { "canonical": "Alice" }, + "predicate": "mentors", + "object": { "value": "Bob" } + }] + }), + ) + .expect("Failed to build structured fields."), + ), + importance: 0.8, + confidence: 0.9, + ttl_days: None, + source_ref: serde_json::json!({}), + write_policy: None, + }, + AddNoteInput { + r#type: "fact".to_string(), + key: Some("mentorship-b".to_string()), + text: "Alice also mentors Bob often.".to_string(), + structured: Some( + serde_json::from_value::( + serde_json::json!({ + "relations": [{ + "subject": { "canonical": "Alice" }, + "predicate": "mentors", + "object": { "value": "Bob" } + }] + }), + ) + .expect("Failed to build structured fields."), + ), + importance: 0.7, + confidence: 0.8, + ttl_days: None, + source_ref: serde_json::json!({}), + write_policy: None, + }, + ], + } +} + +pub(in crate::acceptance::graph_ingestion) async fn add_fact_note( + service: &ElfService, + key: &str, + text: &str, + predicate: &str, + object_value: &str, +) -> Uuid { + let response = service + .add_note(AddNoteRequest { + tenant_id: TEST_TENANT.to_string(), + project_id: TEST_PROJECT.to_string(), + agent_id: "a".to_string(), + scope: TEST_SCOPE.to_string(), + notes: vec![fact_note(key, text, predicate, object_value)], + }) + .await + .expect("add_note failed."); + + assert_eq!(response.results.len(), 1); + assert_eq!(response.results[0].op, NoteOp::Add); + + policy::assert_graph_policy_from_op( + response.results[0].op, + response.results[0].policy_decision, + ); + + response.results[0].note_id.expect("Expected note_id.") +} diff --git a/packages/elf-service/tests/acceptance/graph_ingestion/tests_helpers/policy.rs b/packages/elf-service/tests/acceptance/graph_ingestion/tests_helpers/policy.rs new file mode 100644 index 00000000..73780158 --- /dev/null +++ b/packages/elf-service/tests/acceptance/graph_ingestion/tests_helpers/policy.rs @@ -0,0 +1,13 @@ +use elf_domain::memory_policy::MemoryPolicyDecision; +use elf_service::NoteOp; + +pub(in crate::acceptance::graph_ingestion) fn assert_graph_policy_from_op( + op: NoteOp, + policy_decision: MemoryPolicyDecision, +) { + match op { + NoteOp::Add => assert_eq!(policy_decision, MemoryPolicyDecision::Remember), + NoteOp::Update => assert_eq!(policy_decision, MemoryPolicyDecision::Update), + _ => {}, + } +} diff --git a/packages/elf-service/tests/acceptance/graph_ingestion/tests_helpers/service_setup.rs b/packages/elf-service/tests/acceptance/graph_ingestion/tests_helpers/service_setup.rs new file mode 100644 index 00000000..e5fd5702 --- /dev/null +++ b/packages/elf-service/tests/acceptance/graph_ingestion/tests_helpers/service_setup.rs @@ -0,0 +1,85 @@ +use std::sync::{Arc, atomic::AtomicUsize}; + +use serde_json::Value; + +use crate::acceptance::{ + self, SpyExtractor, StubEmbedding, StubRerank, + graph_ingestion::tests_helpers::embedding::HashEmbedding, +}; +use elf_service::{ElfService, Providers}; +use elf_testkit::TestDatabase; + +pub(in crate::acceptance::graph_ingestion) async fn build_test_db( + test_name: &str, +) -> Option { + let Some(test_db) = acceptance::test_db().await else { + eprintln!("Skipping {test_name}; set ELF_PG_DSN to run."); + + return None; + }; + let Some(_) = acceptance::test_qdrant_url() else { + eprintln!("Skipping {test_name}; set ELF_QDRANT_URL to run."); + + return None; + }; + + Some(test_db) +} + +pub(in crate::acceptance::graph_ingestion) async fn build_hash_service( + test_db: &TestDatabase, +) -> ElfService { + let qdrant_url = acceptance::test_qdrant_url().expect("Expected Qdrant test URL."); + let providers = Providers::new( + Arc::new(HashEmbedding { vector_dim: 4_096 }), + Arc::new(StubRerank), + Arc::new(SpyExtractor { + calls: Arc::new(AtomicUsize::new(0)), + payload: serde_json::json!({ "notes": [] }), + }), + ); + let collection = test_db.collection_name("elf_acceptance"); + let docs_collection = test_db.collection_name("elf_acceptance_docs"); + let cfg = acceptance::test_config( + test_db.dsn().to_string(), + qdrant_url, + 4_096, + collection, + docs_collection, + ); + + acceptance::build_service(cfg, providers).await.expect("Failed to build service.") +} + +pub(in crate::acceptance::graph_ingestion) async fn build_stub_service( + test_db: &TestDatabase, +) -> ElfService { + build_service_with_extractor_payload(test_db, serde_json::json!({ "notes": [] })).await +} + +pub(in crate::acceptance::graph_ingestion) async fn build_service_with_extractor_payload( + test_db: &TestDatabase, + extractor_payload: Value, +) -> ElfService { + let qdrant_url = acceptance::test_qdrant_url().expect("Expected Qdrant test URL."); + let providers = Providers::new( + Arc::new(StubEmbedding { vector_dim: 4_096 }), + Arc::new(StubRerank), + Arc::new(SpyExtractor { calls: Arc::new(AtomicUsize::new(0)), payload: extractor_payload }), + ); + let collection = test_db.collection_name("elf_acceptance"); + let docs_collection = test_db.collection_name("elf_acceptance_docs"); + let cfg = acceptance::test_config( + test_db.dsn().to_string(), + qdrant_url, + 4_096, + collection, + docs_collection, + ); + + acceptance::build_service(cfg, providers).await.expect("Failed to build service.") +} + +pub(in crate::acceptance::graph_ingestion) async fn reset_service_db(service: &ElfService) { + acceptance::reset_db(&service.db.pool).await.expect("Failed to reset test database."); +} diff --git a/packages/elf-service/tests/acceptance/knowledge_pages/helpers/source_inserts.rs b/packages/elf-service/tests/acceptance/knowledge_pages/helpers/source_inserts.rs index 8977becf..b2f8a75b 100644 --- a/packages/elf-service/tests/acceptance/knowledge_pages/helpers/source_inserts.rs +++ b/packages/elf-service/tests/acceptance/knowledge_pages/helpers/source_inserts.rs @@ -1,341 +1,23 @@ -use time::OffsetDateTime; -use uuid::Uuid; +mod audit; +mod document; +mod graph; +mod note; +mod proposal; -use crate::acceptance::knowledge_pages::helpers::{ - AGENT_ID, KnowledgeSourceIds, PROJECT_ID, TENANT_ID, -}; -use elf_service::{AddNoteInput, AddNoteRequest, ElfService}; +use crate::acceptance::knowledge_pages::helpers::KnowledgeSourceIds; +use elf_service::ElfService; pub(crate) async fn insert_rebuild_sources(service: &ElfService) -> KnowledgeSourceIds { - let note_id = insert_source_note( + let note_id = note::insert_source_note( service, "knowledge_pages_foundation", "Fact: Derived knowledge pages are rebuilt from authoritative source memory and keep citations.", ) .await; - let event_id = insert_event_audit(service, note_id).await; - let (doc_id, chunk_id) = insert_source_document(service).await; - let fact_id = insert_relation(service, note_id).await; - let proposal_id = insert_applied_proposal(service, note_id).await; + let event_id = audit::insert_event_audit(service, note_id).await; + let (doc_id, chunk_id) = document::insert_source_document(service).await; + let fact_id = graph::insert_relation(service, note_id).await; + let proposal_id = proposal::insert_applied_proposal(service, note_id).await; KnowledgeSourceIds { note_id, event_id, doc_id, chunk_id, fact_id, proposal_id } } - -async fn insert_source_note(service: &ElfService, key: &str, text: &str) -> Uuid { - let response = service - .add_note(AddNoteRequest { - tenant_id: TENANT_ID.to_string(), - project_id: PROJECT_ID.to_string(), - agent_id: AGENT_ID.to_string(), - scope: "agent_private".to_string(), - notes: vec![AddNoteInput { - r#type: "fact".to_string(), - key: Some(key.to_string()), - text: text.to_string(), - structured: None, - importance: 0.7, - confidence: 0.9, - ttl_days: None, - source_ref: serde_json::json!({ "schema": "acceptance/v1", "key": key }), - write_policy: None, - }], - }) - .await - .expect("add_note should persist source note"); - - response.results[0].note_id.expect("source note id should be present") -} - -async fn insert_event_audit(service: &ElfService, note_id: Uuid) -> Uuid { - let decision_id = Uuid::new_v4(); - - sqlx::query( - "\ -INSERT INTO memory_ingest_decisions ( - decision_id, - tenant_id, - project_id, - agent_id, - scope, - pipeline, - note_type, - note_key, - note_id, - base_decision, - policy_decision, - note_op, - reason_code, - details, - ts -) -VALUES ($1,$2,$3,$4,'agent_private','add_event','fact','knowledge_event',$5,'remember','remember','ADD',NULL,$6,$7)", - ) - .bind(decision_id) - .bind(TENANT_ID) - .bind(PROJECT_ID) - .bind(AGENT_ID) - .bind(note_id) - .bind(serde_json::json!({ "fixture": "knowledge_page_event_audit" })) - .bind(OffsetDateTime::UNIX_EPOCH) - .execute(&service.db.pool) - .await - .expect("event audit should be inserted"); - - decision_id -} - -async fn insert_source_document(service: &ElfService) -> (Uuid, Uuid) { - let doc_id = Uuid::new_v4(); - let chunk_id = Uuid::new_v4(); - let content = "The Knowledge Workspace compiles Source Library spans into cited derived pages."; - let content_hash = blake3::hash(content.as_bytes()).to_hex().to_string(); - let chunk_hash = blake3::hash(content.as_bytes()).to_hex().to_string(); - let source_ref = serde_json::json!({ - "schema": "doc_source_ref/v1", - "doc_type": "knowledge", - "uri": "docs://knowledge/workspace/source-span-fixture", - "source_record_id": doc_id, - "content_hash": content_hash, - "source_spans": [ - { - "schema": "doc_source_span/v1", - "span_id": Uuid::new_v4(), - "chunk_id": chunk_id, - "status": "captured", - "start_offset": 0, - "end_offset": content.len(), - "content_hash": content_hash, - "chunk_hash": chunk_hash - } - ] - }); - - sqlx::query( - "\ -INSERT INTO doc_documents ( - doc_id, - tenant_id, - project_id, - agent_id, - scope, - doc_type, - status, - title, - source_ref, - content, - content_bytes, - content_hash, - created_at, - updated_at -) -VALUES ($1,$2,$3,$4,'project_shared','knowledge','active','Knowledge Workspace Source',$5,$6,$7,$8,$9,$9)", - ) - .bind(doc_id) - .bind(TENANT_ID) - .bind(PROJECT_ID) - .bind(AGENT_ID) - .bind(source_ref) - .bind(content) - .bind(i32::try_from(content.len()).expect("fixture content length should fit i32")) - .bind(content_hash) - .bind(OffsetDateTime::UNIX_EPOCH) - .execute(&service.db.pool) - .await - .expect("source document should be inserted"); - sqlx::query( - "\ -INSERT INTO doc_chunks ( - chunk_id, - doc_id, - chunk_index, - start_offset, - end_offset, - chunk_text, - chunk_hash, - created_at -) -VALUES ($1,$2,0,0,$3,$4,$5,$6)", - ) - .bind(chunk_id) - .bind(doc_id) - .bind(i32::try_from(content.len()).expect("fixture content length should fit i32")) - .bind(content) - .bind(chunk_hash) - .bind(OffsetDateTime::UNIX_EPOCH) - .execute(&service.db.pool) - .await - .expect("source document chunk should be inserted"); - - (doc_id, chunk_id) -} - -async fn insert_relation(service: &ElfService, note_id: Uuid) -> Uuid { - let subject_id = Uuid::new_v4(); - let fact_id = Uuid::new_v4(); - let evidence_id = Uuid::new_v4(); - - sqlx::query( - "\ -INSERT INTO graph_entities ( - entity_id, - tenant_id, - project_id, - canonical, - canonical_norm, - kind, - created_at, - updated_at -) -VALUES ($1,$2,$3,'ELF knowledge pages','elf knowledge pages','concept',$4,$4)", - ) - .bind(subject_id) - .bind(TENANT_ID) - .bind(PROJECT_ID) - .bind(OffsetDateTime::UNIX_EPOCH) - .execute(&service.db.pool) - .await - .expect("graph entity should be inserted"); - sqlx::query( - "\ -INSERT INTO graph_facts ( - fact_id, - tenant_id, - project_id, - agent_id, - scope, - subject_entity_id, - predicate, - predicate_id, - object_entity_id, - object_value, - valid_from, - valid_to, - created_at, - updated_at -) -VALUES ($1,$2,$3,$4,'project_shared',$5,'compile from',NULL,NULL,'authoritative source memory',$6,NULL,$6,$6)", - ) - .bind(fact_id) - .bind(TENANT_ID) - .bind(PROJECT_ID) - .bind(AGENT_ID) - .bind(subject_id) - .bind(OffsetDateTime::UNIX_EPOCH) - .execute(&service.db.pool) - .await - .expect("graph fact should be inserted"); - sqlx::query( - "\ -INSERT INTO graph_fact_evidence (evidence_id, fact_id, note_id, created_at) -VALUES ($1,$2,$3,$4)", - ) - .bind(evidence_id) - .bind(fact_id) - .bind(note_id) - .bind(OffsetDateTime::UNIX_EPOCH) - .execute(&service.db.pool) - .await - .expect("graph fact evidence should be inserted"); - - fact_id -} - -async fn insert_applied_proposal(service: &ElfService, note_id: Uuid) -> Uuid { - let run_id = Uuid::new_v4(); - let proposal_id = Uuid::new_v4(); - let source_refs = serde_json::json!([ - { - "kind": "note", - "id": note_id, - "snapshot": { - "status": "active", - "updated_at": "1970-01-01T00:00:00Z", - "metadata": { "fixture": "knowledge_pages" }, - "source_ref": {} - } - } - ]); - let lineage = serde_json::json!({ "source_refs": source_refs }); - - sqlx::query( - "\ -INSERT INTO consolidation_runs ( - run_id, - tenant_id, - project_id, - agent_id, - contract_schema, - job_kind, - status, - input_refs, - source_snapshot, - lineage, - error, - created_at, - updated_at, - completed_at -) -VALUES ($1,$2,$3,$4,'elf.consolidation/v1','manual','completed',$5,$6,$7,'{}'::jsonb,$8,$8,$8)", - ) - .bind(run_id) - .bind(TENANT_ID) - .bind(PROJECT_ID) - .bind(AGENT_ID) - .bind(&source_refs) - .bind(serde_json::json!({ "source_count": 1 })) - .bind(&lineage) - .bind(OffsetDateTime::UNIX_EPOCH) - .execute(&service.db.pool) - .await - .expect("consolidation run should be inserted"); - sqlx::query( - "\ -INSERT INTO consolidation_proposals ( - proposal_id, - run_id, - tenant_id, - project_id, - agent_id, - contract_schema, - proposal_kind, - apply_intent, - review_state, - source_refs, - source_snapshot, - lineage, - diff, - confidence, - unsupported_claim_flags, - contradiction_markers, - staleness_markers, - target_ref, - proposed_payload, - reviewer_agent_id, - review_comment, - reviewed_at, - created_at, - updated_at -) -VALUES ($1,$2,$3,$4,$5,'elf.consolidation/v1','knowledge_page','create_derived_knowledge_page','applied',$6,$7,$8,$9,0.9,'[]'::jsonb,'[]'::jsonb,'[]'::jsonb,'{}'::jsonb,$10,$5,'Apply derived page proposal.',$11,$11,$11)", - ) - .bind(proposal_id) - .bind(run_id) - .bind(TENANT_ID) - .bind(PROJECT_ID) - .bind(AGENT_ID) - .bind(&source_refs) - .bind(serde_json::json!({ "source_count": 1 })) - .bind(&lineage) - .bind(serde_json::json!({ - "summary": "Create a derived knowledge page from cited source memory.", - "before": {}, - "after": { "page_key": "knowledge-foundation" } - })) - .bind(serde_json::json!({ "page_key": "knowledge-foundation" })) - .bind(OffsetDateTime::UNIX_EPOCH) - .execute(&service.db.pool) - .await - .expect("consolidation proposal should be inserted"); - - proposal_id -} diff --git a/packages/elf-service/tests/acceptance/knowledge_pages/helpers/source_inserts/audit.rs b/packages/elf-service/tests/acceptance/knowledge_pages/helpers/source_inserts/audit.rs new file mode 100644 index 00000000..39f2c890 --- /dev/null +++ b/packages/elf-service/tests/acceptance/knowledge_pages/helpers/source_inserts/audit.rs @@ -0,0 +1,43 @@ +use time::OffsetDateTime; +use uuid::Uuid; + +use crate::acceptance::knowledge_pages::helpers::{AGENT_ID, PROJECT_ID, TENANT_ID}; +use elf_service::ElfService; + +pub(super) async fn insert_event_audit(service: &ElfService, note_id: Uuid) -> Uuid { + let decision_id = Uuid::new_v4(); + + sqlx::query( + "\ +INSERT INTO memory_ingest_decisions ( + decision_id, + tenant_id, + project_id, + agent_id, + scope, + pipeline, + note_type, + note_key, + note_id, + base_decision, + policy_decision, + note_op, + reason_code, + details, + ts +) +VALUES ($1,$2,$3,$4,'agent_private','add_event','fact','knowledge_event',$5,'remember','remember','ADD',NULL,$6,$7)", + ) + .bind(decision_id) + .bind(TENANT_ID) + .bind(PROJECT_ID) + .bind(AGENT_ID) + .bind(note_id) + .bind(serde_json::json!({ "fixture": "knowledge_page_event_audit" })) + .bind(OffsetDateTime::UNIX_EPOCH) + .execute(&service.db.pool) + .await + .expect("event audit should be inserted"); + + decision_id +} diff --git a/packages/elf-service/tests/acceptance/knowledge_pages/helpers/source_inserts/document.rs b/packages/elf-service/tests/acceptance/knowledge_pages/helpers/source_inserts/document.rs new file mode 100644 index 00000000..f65ca53b --- /dev/null +++ b/packages/elf-service/tests/acceptance/knowledge_pages/helpers/source_inserts/document.rs @@ -0,0 +1,90 @@ +use time::OffsetDateTime; +use uuid::Uuid; + +use crate::acceptance::knowledge_pages::helpers::{AGENT_ID, PROJECT_ID, TENANT_ID}; +use elf_service::ElfService; + +pub(super) async fn insert_source_document(service: &ElfService) -> (Uuid, Uuid) { + let doc_id = Uuid::new_v4(); + let chunk_id = Uuid::new_v4(); + let content = "The Knowledge Workspace compiles Source Library spans into cited derived pages."; + let content_hash = blake3::hash(content.as_bytes()).to_hex().to_string(); + let chunk_hash = blake3::hash(content.as_bytes()).to_hex().to_string(); + let source_ref = serde_json::json!({ + "schema": "doc_source_ref/v1", + "doc_type": "knowledge", + "uri": "docs://knowledge/workspace/source-span-fixture", + "source_record_id": doc_id, + "content_hash": content_hash, + "source_spans": [ + { + "schema": "doc_source_span/v1", + "span_id": Uuid::new_v4(), + "chunk_id": chunk_id, + "status": "captured", + "start_offset": 0, + "end_offset": content.len(), + "content_hash": content_hash, + "chunk_hash": chunk_hash + } + ] + }); + + sqlx::query( + "\ +INSERT INTO doc_documents ( + doc_id, + tenant_id, + project_id, + agent_id, + scope, + doc_type, + status, + title, + source_ref, + content, + content_bytes, + content_hash, + created_at, + updated_at +) +VALUES ($1,$2,$3,$4,'project_shared','knowledge','active','Knowledge Workspace Source',$5,$6,$7,$8,$9,$9)", + ) + .bind(doc_id) + .bind(TENANT_ID) + .bind(PROJECT_ID) + .bind(AGENT_ID) + .bind(source_ref) + .bind(content) + .bind(i32::try_from(content.len()).expect("fixture content length should fit i32")) + .bind(content_hash) + .bind(OffsetDateTime::UNIX_EPOCH) + .execute(&service.db.pool) + .await + .expect("source document should be inserted"); + sqlx::query( + "\ +INSERT INTO doc_chunks ( + chunk_id, + doc_id, + chunk_index, + start_offset, + end_offset, + chunk_text, + chunk_hash, + created_at +) +VALUES ($1,$2,0,0,$3,$4,$5,$6)", + ) + .bind(chunk_id) + .bind(doc_id) + .bind(i32::try_from(content.len()).expect("fixture content length should fit i32")) + .bind(content) + .bind(chunk_hash) + .bind(OffsetDateTime::UNIX_EPOCH) + .execute(&service.db.pool) + .await + .expect("source document chunk should be inserted"); + + (doc_id, chunk_id) +} diff --git a/packages/elf-service/tests/acceptance/knowledge_pages/helpers/source_inserts/graph.rs b/packages/elf-service/tests/acceptance/knowledge_pages/helpers/source_inserts/graph.rs new file mode 100644 index 00000000..3eebb188 --- /dev/null +++ b/packages/elf-service/tests/acceptance/knowledge_pages/helpers/source_inserts/graph.rs @@ -0,0 +1,76 @@ +use time::OffsetDateTime; +use uuid::Uuid; + +use crate::acceptance::knowledge_pages::helpers::{AGENT_ID, PROJECT_ID, TENANT_ID}; +use elf_service::ElfService; + +pub(super) async fn insert_relation(service: &ElfService, note_id: Uuid) -> Uuid { + let subject_id = Uuid::new_v4(); + let fact_id = Uuid::new_v4(); + let evidence_id = Uuid::new_v4(); + + sqlx::query( + "\ +INSERT INTO graph_entities ( + entity_id, + tenant_id, + project_id, + canonical, + canonical_norm, + kind, + created_at, + updated_at +) +VALUES ($1,$2,$3,'ELF knowledge pages','elf knowledge pages','concept',$4,$4)", + ) + .bind(subject_id) + .bind(TENANT_ID) + .bind(PROJECT_ID) + .bind(OffsetDateTime::UNIX_EPOCH) + .execute(&service.db.pool) + .await + .expect("graph entity should be inserted"); + sqlx::query( + "\ +INSERT INTO graph_facts ( + fact_id, + tenant_id, + project_id, + agent_id, + scope, + subject_entity_id, + predicate, + predicate_id, + object_entity_id, + object_value, + valid_from, + valid_to, + created_at, + updated_at +) +VALUES ($1,$2,$3,$4,'project_shared',$5,'compile from',NULL,NULL,'authoritative source memory',$6,NULL,$6,$6)", + ) + .bind(fact_id) + .bind(TENANT_ID) + .bind(PROJECT_ID) + .bind(AGENT_ID) + .bind(subject_id) + .bind(OffsetDateTime::UNIX_EPOCH) + .execute(&service.db.pool) + .await + .expect("graph fact should be inserted"); + sqlx::query( + "\ +INSERT INTO graph_fact_evidence (evidence_id, fact_id, note_id, created_at) +VALUES ($1,$2,$3,$4)", + ) + .bind(evidence_id) + .bind(fact_id) + .bind(note_id) + .bind(OffsetDateTime::UNIX_EPOCH) + .execute(&service.db.pool) + .await + .expect("graph fact evidence should be inserted"); + + fact_id +} diff --git a/packages/elf-service/tests/acceptance/knowledge_pages/helpers/source_inserts/note.rs b/packages/elf-service/tests/acceptance/knowledge_pages/helpers/source_inserts/note.rs new file mode 100644 index 00000000..786447f4 --- /dev/null +++ b/packages/elf-service/tests/acceptance/knowledge_pages/helpers/source_inserts/note.rs @@ -0,0 +1,29 @@ +use uuid::Uuid; + +use crate::acceptance::knowledge_pages::helpers::{AGENT_ID, PROJECT_ID, TENANT_ID}; +use elf_service::{AddNoteInput, AddNoteRequest, ElfService}; + +pub(super) async fn insert_source_note(service: &ElfService, key: &str, text: &str) -> Uuid { + let response = service + .add_note(AddNoteRequest { + tenant_id: TENANT_ID.to_string(), + project_id: PROJECT_ID.to_string(), + agent_id: AGENT_ID.to_string(), + scope: "agent_private".to_string(), + notes: vec![AddNoteInput { + r#type: "fact".to_string(), + key: Some(key.to_string()), + text: text.to_string(), + structured: None, + importance: 0.7, + confidence: 0.9, + ttl_days: None, + source_ref: serde_json::json!({ "schema": "acceptance/v1", "key": key }), + write_policy: None, + }], + }) + .await + .expect("add_note should persist source note"); + + response.results[0].note_id.expect("source note id should be present") +} diff --git a/packages/elf-service/tests/acceptance/knowledge_pages/helpers/source_inserts/proposal.rs b/packages/elf-service/tests/acceptance/knowledge_pages/helpers/source_inserts/proposal.rs new file mode 100644 index 00000000..4579991c --- /dev/null +++ b/packages/elf-service/tests/acceptance/knowledge_pages/helpers/source_inserts/proposal.rs @@ -0,0 +1,105 @@ +use time::OffsetDateTime; +use uuid::Uuid; + +use crate::acceptance::knowledge_pages::helpers::{AGENT_ID, PROJECT_ID, TENANT_ID}; +use elf_service::ElfService; + +pub(super) async fn insert_applied_proposal(service: &ElfService, note_id: Uuid) -> Uuid { + let run_id = Uuid::new_v4(); + let proposal_id = Uuid::new_v4(); + let source_refs = serde_json::json!([ + { + "kind": "note", + "id": note_id, + "snapshot": { + "status": "active", + "updated_at": "1970-01-01T00:00:00Z", + "metadata": { "fixture": "knowledge_pages" }, + "source_ref": {} + } + } + ]); + let lineage = serde_json::json!({ "source_refs": source_refs }); + + sqlx::query( + "\ +INSERT INTO consolidation_runs ( + run_id, + tenant_id, + project_id, + agent_id, + contract_schema, + job_kind, + status, + input_refs, + source_snapshot, + lineage, + error, + created_at, + updated_at, + completed_at +) +VALUES ($1,$2,$3,$4,'elf.consolidation/v1','manual','completed',$5,$6,$7,'{}'::jsonb,$8,$8,$8)", + ) + .bind(run_id) + .bind(TENANT_ID) + .bind(PROJECT_ID) + .bind(AGENT_ID) + .bind(&source_refs) + .bind(serde_json::json!({ "source_count": 1 })) + .bind(&lineage) + .bind(OffsetDateTime::UNIX_EPOCH) + .execute(&service.db.pool) + .await + .expect("consolidation run should be inserted"); + sqlx::query( + "\ +INSERT INTO consolidation_proposals ( + proposal_id, + run_id, + tenant_id, + project_id, + agent_id, + contract_schema, + proposal_kind, + apply_intent, + review_state, + source_refs, + source_snapshot, + lineage, + diff, + confidence, + unsupported_claim_flags, + contradiction_markers, + staleness_markers, + target_ref, + proposed_payload, + reviewer_agent_id, + review_comment, + reviewed_at, + created_at, + updated_at +) +VALUES ($1,$2,$3,$4,$5,'elf.consolidation/v1','knowledge_page','create_derived_knowledge_page','applied',$6,$7,$8,$9,0.9,'[]'::jsonb,'[]'::jsonb,'[]'::jsonb,'{}'::jsonb,$10,$5,'Apply derived page proposal.',$11,$11,$11)", + ) + .bind(proposal_id) + .bind(run_id) + .bind(TENANT_ID) + .bind(PROJECT_ID) + .bind(AGENT_ID) + .bind(&source_refs) + .bind(serde_json::json!({ "source_count": 1 })) + .bind(&lineage) + .bind(serde_json::json!({ + "summary": "Create a derived knowledge page from cited source memory.", + "before": {}, + "after": { "page_key": "knowledge-foundation" } + })) + .bind(serde_json::json!({ "page_key": "knowledge-foundation" })) + .bind(OffsetDateTime::UNIX_EPOCH) + .execute(&service.db.pool) + .await + .expect("consolidation proposal should be inserted"); + + proposal_id +} diff --git a/packages/elf-service/tests/service.rs b/packages/elf-service/tests/service.rs index 7443e882..e7d49409 100644 --- a/packages/elf-service/tests/service.rs +++ b/packages/elf-service/tests/service.rs @@ -2,334 +2,8 @@ //! Integration tests for service-layer note ingestion and policy behavior. -use std::sync::{ - Arc, - atomic::{AtomicUsize, Ordering}, -}; - -use serde_json::{Map, Value}; -use sqlx::PgPool; - -use elf_config::{ - Chunking, Config, EmbeddingProviderConfig, Lifecycle, LlmProviderConfig, Memory, MemoryPolicy, - Postgres, ProviderConfig, Qdrant, Ranking, RankingBlend, RankingBlendSegment, - RankingDeterministic, RankingDeterministicDecay, RankingDeterministicHits, - RankingDeterministicLexical, RankingDiversity, RankingRetrievalSources, ReadProfiles, - ScopePrecedence, ScopeWriteAllowed, Scopes, Search, SearchCache, SearchDynamic, - SearchExpansion, SearchExplain, SearchGraphContext, SearchPrefilter, SearchRecursive, Security, - Service, Storage, TtlDays, -}; -use elf_service::{ - AddNoteInput, AddNoteRequest, BoxFuture, ElfService, EmbeddingProvider, Error, - ExtractorProvider, RerankProvider, Result, -}; -use elf_storage::{db::Db, qdrant::QdrantStore}; - -struct DummyEmbedding; -impl EmbeddingProvider for DummyEmbedding { - fn embed<'a>( - &'a self, - cfg: &'a EmbeddingProviderConfig, - texts: &'a [String], - ) -> BoxFuture<'a, Result>>> { - let dim = (cfg.dimensions as usize).max(1); - let vec = vec![0.0; dim]; - - Box::pin(async move { Ok(vec![vec; texts.len()]) }) - } -} - -struct DummyRerank; -impl RerankProvider for DummyRerank { - fn rerank<'a>( - &'a self, - _cfg: &'a ProviderConfig, - _query: &'a str, - docs: &'a [String], - ) -> BoxFuture<'a, Result>> { - let scores = vec![0.0; docs.len()]; - - Box::pin(async move { Ok(scores) }) - } -} - -struct SpyExtractor { - calls: Arc, -} -impl SpyExtractor { - fn new() -> Self { - Self { calls: Arc::new(AtomicUsize::new(0)) } - } - - fn count(&self) -> usize { - self.calls.load(Ordering::SeqCst) - } -} -impl ExtractorProvider for SpyExtractor { - fn extract<'a>( - &'a self, - _cfg: &'a LlmProviderConfig, - _messages: &'a [Value], - ) -> BoxFuture<'a, Result> { - self.calls.fetch_add(1, Ordering::SeqCst); - - Box::pin(async move { Ok(serde_json::json!({ "notes": [] })) }) - } -} - -fn test_ranking() -> Ranking { - Ranking { - recency_tau_days: 60.0, - tie_breaker_weight: 0.1, - deterministic: RankingDeterministic { - enabled: false, - lexical: RankingDeterministicLexical { - enabled: false, - weight: 0.05, - min_ratio: 0.3, - max_query_terms: 16, - max_text_terms: 1_024, - }, - hits: RankingDeterministicHits { - enabled: false, - weight: 0.05, - half_saturation: 8.0, - last_hit_tau_days: 14.0, - }, - decay: RankingDeterministicDecay { enabled: false, weight: 0.05, tau_days: 30.0 }, - }, - blend: RankingBlend { - enabled: true, - rerank_normalization: "rank".to_string(), - retrieval_normalization: "rank".to_string(), - segments: vec![ - RankingBlendSegment { max_retrieval_rank: 3, retrieval_weight: 0.8 }, - RankingBlendSegment { max_retrieval_rank: 10, retrieval_weight: 0.5 }, - RankingBlendSegment { max_retrieval_rank: 1_000_000, retrieval_weight: 0.2 }, - ], - }, - diversity: RankingDiversity { - enabled: true, - sim_threshold: 0.88, - mmr_lambda: 0.7, - max_skips: 64, - }, - retrieval_sources: RankingRetrievalSources { - fusion_weight: 1.0, - structured_field_weight: 1.0, - fusion_priority: 1, - structured_field_priority: 0, - }, - } -} - -fn test_config() -> Config { - Config { - service: Service { - http_bind: "127.0.0.1:8080".to_string(), - mcp_bind: "127.0.0.1:8082".to_string(), - admin_bind: "127.0.0.1:8081".to_string(), - log_level: "info".to_string(), - }, - storage: Storage { - postgres: Postgres { - dsn: "postgres://user:pass@localhost/db".to_string(), - pool_max_conns: 1, - }, - qdrant: Qdrant { - url: "http://localhost:6334".to_string(), - collection: "mem_notes_v2".to_string(), - docs_collection: "doc_chunks_v1".to_string(), - vector_dim: 4_096, - }, - }, - providers: elf_config::Providers { - embedding: dummy_embedding_provider(), - rerank: dummy_provider(), - llm_extractor: dummy_llm_provider(), - }, - scopes: Scopes { - allowed: vec!["agent_private".to_string()], - read_profiles: ReadProfiles { - private_only: vec!["agent_private".to_string()], - private_plus_project: vec!["agent_private".to_string()], - all_scopes: vec!["agent_private".to_string()], - }, - precedence: ScopePrecedence { agent_private: 30, project_shared: 20, org_shared: 10 }, - write_allowed: ScopeWriteAllowed { - agent_private: true, - project_shared: false, - org_shared: false, - }, - }, - memory: Memory { - max_notes_per_add_event: 3, - max_note_chars: 500, - dup_sim_threshold: 0.9, - update_sim_threshold: 0.8, - candidate_k: 10, - top_k: 5, - policy: MemoryPolicy { rules: vec![] }, - }, - search: Search { - expansion: SearchExpansion { - mode: "off".to_string(), - max_queries: 4, - include_original: true, - }, - dynamic: SearchDynamic { min_candidates: 10, min_top_score: 0.12 }, - prefilter: SearchPrefilter { max_candidates: 0 }, - cache: SearchCache { - enabled: true, - expansion_ttl_days: 7, - rerank_ttl_days: 7, - max_payload_bytes: Some(262_144), - }, - explain: SearchExplain { - retention_days: 7, - capture_candidates: false, - candidate_retention_days: 2, - write_mode: "outbox".to_string(), - }, - recursive: SearchRecursive { - enabled: false, - max_depth: 2, - max_children_per_node: 4, - max_nodes_per_scope: 32, - max_total_nodes: 256, - }, - graph_context: SearchGraphContext { - enabled: false, - max_facts_per_item: 16, - max_evidence_notes_per_fact: 16, - }, - }, - ranking: test_ranking(), - lifecycle: Lifecycle { - ttl_days: TtlDays { - plan: 1, - fact: 2, - preference: 0, - constraint: 0, - decision: 0, - profile: 0, - }, - purge_deleted_after_days: 30, - purge_deprecated_after_days: 180, - }, - chunking: Chunking { - enabled: true, - max_tokens: 512, - overlap_tokens: 128, - tokenizer_repo: "gpt2".to_string(), - }, - security: Security { - bind_localhost_only: true, - reject_non_english: true, - redact_secrets_on_write: true, - evidence_min_quotes: 1, - evidence_max_quotes: 2, - evidence_max_quote_chars: 320, - auth_mode: "off".to_string(), - auth_keys: vec![], - }, - context: None, - mcp: None, - } -} - -fn dummy_embedding_provider() -> EmbeddingProviderConfig { - EmbeddingProviderConfig { - provider_id: "p".to_string(), - api_base: "http://localhost".to_string(), - api_key: "key".to_string(), - path: "/".to_string(), - model: "3".to_string(), - dimensions: 4_096, - timeout_ms: 1_000, - default_headers: Map::new(), - } -} - -fn dummy_provider() -> ProviderConfig { - ProviderConfig { - provider_id: "p".to_string(), - api_base: "http://localhost".to_string(), - api_key: "key".to_string(), - path: "/".to_string(), - model: "3".to_string(), - timeout_ms: 1_000, - default_headers: Map::new(), - } -} - -fn dummy_llm_provider() -> LlmProviderConfig { - LlmProviderConfig { - provider_id: "p".to_string(), - api_base: "http://localhost".to_string(), - api_key: "key".to_string(), - path: "/".to_string(), - model: "m".to_string(), - temperature: 0.1, - timeout_ms: 1_000, - default_headers: Map::new(), - } -} - -#[tokio::test] -async fn add_note_does_not_call_llm() { - let cfg = test_config(); - let pool = - PgPool::connect_lazy(&cfg.storage.postgres.dsn).expect("Failed to create lazy pool."); - let db = Db { pool }; - let qdrant = QdrantStore::new(&cfg.storage.qdrant).expect("Failed to create Qdrant store."); - let spy = Arc::new(SpyExtractor::new()); - let providers = - elf_service::Providers::new(Arc::new(DummyEmbedding), Arc::new(DummyRerank), spy.clone()); - let service = ElfService::with_providers(cfg, db, qdrant, providers); - let req = AddNoteRequest { - tenant_id: "t1".to_string(), - project_id: "p1".to_string(), - agent_id: "a1".to_string(), - scope: "agent_private".to_string(), - notes: vec![AddNoteInput { - r#type: "fact".to_string(), - key: None, - text: "こんにちは".to_string(), - structured: None, - importance: 0.5, - confidence: 0.5, - ttl_days: None, - source_ref: serde_json::json!({}), - write_policy: None, - }], - }; - let result = service.add_note(req).await; - - assert!(matches!(result, Err(Error::NonEnglishInput { .. }))); - assert_eq!(spy.count(), 0); -} - -#[tokio::test] -async fn add_note_rejects_empty_notes() { - let cfg = test_config(); - let pool = - PgPool::connect_lazy(&cfg.storage.postgres.dsn).expect("Failed to create lazy pool."); - let db = Db { pool }; - let qdrant = QdrantStore::new(&cfg.storage.qdrant).expect("Failed to create Qdrant store."); - let spy = Arc::new(SpyExtractor::new()); - let providers = - elf_service::Providers::new(Arc::new(DummyEmbedding), Arc::new(DummyRerank), spy.clone()); - let service = ElfService::with_providers(cfg, db, qdrant, providers); - let req = AddNoteRequest { - tenant_id: "t1".to_string(), - project_id: "p1".to_string(), - agent_id: "a1".to_string(), - scope: "agent_private".to_string(), - notes: vec![], - }; - let result = service.add_note(req).await; - - assert!(matches!(result, Err(Error::InvalidRequest { .. }))); - assert_eq!(spy.count(), 0); +mod service { + mod config; + mod note_ingestion; + mod providers; } diff --git a/packages/elf-service/tests/service/config.rs b/packages/elf-service/tests/service/config.rs new file mode 100644 index 00000000..3e38e01b --- /dev/null +++ b/packages/elf-service/tests/service/config.rs @@ -0,0 +1,210 @@ +use serde_json::Map; + +use elf_config::{ + Chunking, Config, EmbeddingProviderConfig, Lifecycle, LlmProviderConfig, Memory, MemoryPolicy, + Postgres, ProviderConfig, Providers, Qdrant, Ranking, RankingBlend, RankingBlendSegment, + RankingDeterministic, RankingDeterministicDecay, RankingDeterministicHits, + RankingDeterministicLexical, RankingDiversity, RankingRetrievalSources, ReadProfiles, + ScopePrecedence, ScopeWriteAllowed, Scopes, Search, SearchCache, SearchDynamic, + SearchExpansion, SearchExplain, SearchGraphContext, SearchPrefilter, SearchRecursive, Security, + Service, Storage, TtlDays, +}; + +pub(crate) fn test_config() -> Config { + Config { + service: Service { + http_bind: "127.0.0.1:8080".to_string(), + mcp_bind: "127.0.0.1:8082".to_string(), + admin_bind: "127.0.0.1:8081".to_string(), + log_level: "info".to_string(), + }, + storage: Storage { + postgres: Postgres { + dsn: "postgres://user:pass@localhost/db".to_string(), + pool_max_conns: 1, + }, + qdrant: Qdrant { + url: "http://localhost:6334".to_string(), + collection: "mem_notes_v2".to_string(), + docs_collection: "doc_chunks_v1".to_string(), + vector_dim: 4_096, + }, + }, + providers: Providers { + embedding: dummy_embedding_provider(), + rerank: dummy_provider(), + llm_extractor: dummy_llm_provider(), + }, + scopes: Scopes { + allowed: vec!["agent_private".to_string()], + read_profiles: ReadProfiles { + private_only: vec!["agent_private".to_string()], + private_plus_project: vec!["agent_private".to_string()], + all_scopes: vec!["agent_private".to_string()], + }, + precedence: ScopePrecedence { agent_private: 30, project_shared: 20, org_shared: 10 }, + write_allowed: ScopeWriteAllowed { + agent_private: true, + project_shared: false, + org_shared: false, + }, + }, + memory: Memory { + max_notes_per_add_event: 3, + max_note_chars: 500, + dup_sim_threshold: 0.9, + update_sim_threshold: 0.8, + candidate_k: 10, + top_k: 5, + policy: MemoryPolicy { rules: vec![] }, + }, + search: Search { + expansion: SearchExpansion { + mode: "off".to_string(), + max_queries: 4, + include_original: true, + }, + dynamic: SearchDynamic { min_candidates: 10, min_top_score: 0.12 }, + prefilter: SearchPrefilter { max_candidates: 0 }, + cache: SearchCache { + enabled: true, + expansion_ttl_days: 7, + rerank_ttl_days: 7, + max_payload_bytes: Some(262_144), + }, + explain: SearchExplain { + retention_days: 7, + capture_candidates: false, + candidate_retention_days: 2, + write_mode: "outbox".to_string(), + }, + recursive: SearchRecursive { + enabled: false, + max_depth: 2, + max_children_per_node: 4, + max_nodes_per_scope: 32, + max_total_nodes: 256, + }, + graph_context: SearchGraphContext { + enabled: false, + max_facts_per_item: 16, + max_evidence_notes_per_fact: 16, + }, + }, + ranking: test_ranking(), + lifecycle: Lifecycle { + ttl_days: TtlDays { + plan: 1, + fact: 2, + preference: 0, + constraint: 0, + decision: 0, + profile: 0, + }, + purge_deleted_after_days: 30, + purge_deprecated_after_days: 180, + }, + chunking: Chunking { + enabled: true, + max_tokens: 512, + overlap_tokens: 128, + tokenizer_repo: "gpt2".to_string(), + }, + security: Security { + bind_localhost_only: true, + reject_non_english: true, + redact_secrets_on_write: true, + evidence_min_quotes: 1, + evidence_max_quotes: 2, + evidence_max_quote_chars: 320, + auth_mode: "off".to_string(), + auth_keys: vec![], + }, + context: None, + mcp: None, + } +} + +fn test_ranking() -> Ranking { + Ranking { + recency_tau_days: 60.0, + tie_breaker_weight: 0.1, + deterministic: RankingDeterministic { + enabled: false, + lexical: RankingDeterministicLexical { + enabled: false, + weight: 0.05, + min_ratio: 0.3, + max_query_terms: 16, + max_text_terms: 1_024, + }, + hits: RankingDeterministicHits { + enabled: false, + weight: 0.05, + half_saturation: 8.0, + last_hit_tau_days: 14.0, + }, + decay: RankingDeterministicDecay { enabled: false, weight: 0.05, tau_days: 30.0 }, + }, + blend: RankingBlend { + enabled: true, + rerank_normalization: "rank".to_string(), + retrieval_normalization: "rank".to_string(), + segments: vec![ + RankingBlendSegment { max_retrieval_rank: 3, retrieval_weight: 0.8 }, + RankingBlendSegment { max_retrieval_rank: 10, retrieval_weight: 0.5 }, + RankingBlendSegment { max_retrieval_rank: 1_000_000, retrieval_weight: 0.2 }, + ], + }, + diversity: RankingDiversity { + enabled: true, + sim_threshold: 0.88, + mmr_lambda: 0.7, + max_skips: 64, + }, + retrieval_sources: RankingRetrievalSources { + fusion_weight: 1.0, + structured_field_weight: 1.0, + fusion_priority: 1, + structured_field_priority: 0, + }, + } +} + +fn dummy_embedding_provider() -> EmbeddingProviderConfig { + EmbeddingProviderConfig { + provider_id: "p".to_string(), + api_base: "http://localhost".to_string(), + api_key: "key".to_string(), + path: "/".to_string(), + model: "3".to_string(), + dimensions: 4_096, + timeout_ms: 1_000, + default_headers: Map::new(), + } +} + +fn dummy_provider() -> ProviderConfig { + ProviderConfig { + provider_id: "p".to_string(), + api_base: "http://localhost".to_string(), + api_key: "key".to_string(), + path: "/".to_string(), + model: "3".to_string(), + timeout_ms: 1_000, + default_headers: Map::new(), + } +} + +fn dummy_llm_provider() -> LlmProviderConfig { + LlmProviderConfig { + provider_id: "p".to_string(), + api_base: "http://localhost".to_string(), + api_key: "key".to_string(), + path: "/".to_string(), + model: "m".to_string(), + temperature: 0.1, + timeout_ms: 1_000, + default_headers: Map::new(), + } +} diff --git a/packages/elf-service/tests/service/note_ingestion.rs b/packages/elf-service/tests/service/note_ingestion.rs new file mode 100644 index 00000000..eb51f69b --- /dev/null +++ b/packages/elf-service/tests/service/note_ingestion.rs @@ -0,0 +1,67 @@ +use std::sync::Arc; + +use sqlx::PgPool; + +use crate::service::{ + config, + providers::{DummyEmbedding, DummyRerank, SpyExtractor}, +}; +use elf_service::{AddNoteInput, AddNoteRequest, ElfService, Error, Providers}; +use elf_storage::{db::Db, qdrant::QdrantStore}; + +fn build_service_with_spy(spy: Arc) -> ElfService { + let cfg = config::test_config(); + let pool = + PgPool::connect_lazy(&cfg.storage.postgres.dsn).expect("Failed to create lazy pool."); + let db = Db { pool }; + let qdrant = QdrantStore::new(&cfg.storage.qdrant).expect("Failed to create Qdrant store."); + let providers = Providers::new(Arc::new(DummyEmbedding), Arc::new(DummyRerank), spy); + + ElfService::with_providers(cfg, db, qdrant, providers) +} + +#[tokio::test] +async fn add_note_does_not_call_llm() { + let cfg = config::test_config(); + let spy = Arc::new(SpyExtractor::new()); + let service = build_service_with_spy(spy.clone()); + let req = AddNoteRequest { + tenant_id: "t1".to_string(), + project_id: "p1".to_string(), + agent_id: "a1".to_string(), + scope: "agent_private".to_string(), + notes: vec![AddNoteInput { + r#type: "fact".to_string(), + key: None, + text: "こんにちは".to_string(), + structured: None, + importance: 0.5, + confidence: 0.5, + ttl_days: None, + source_ref: serde_json::json!({}), + write_policy: None, + }], + }; + let result = service.add_note(req).await; + + assert!(cfg.security.reject_non_english); + assert!(matches!(result, Err(Error::NonEnglishInput { .. }))); + assert_eq!(spy.count(), 0); +} + +#[tokio::test] +async fn add_note_rejects_empty_notes() { + let spy = Arc::new(SpyExtractor::new()); + let service = build_service_with_spy(spy.clone()); + let req = AddNoteRequest { + tenant_id: "t1".to_string(), + project_id: "p1".to_string(), + agent_id: "a1".to_string(), + scope: "agent_private".to_string(), + notes: vec![], + }; + let result = service.add_note(req).await; + + assert!(matches!(result, Err(Error::InvalidRequest { .. }))); + assert_eq!(spy.count(), 0); +} diff --git a/packages/elf-service/tests/service/providers.rs b/packages/elf-service/tests/service/providers.rs new file mode 100644 index 00000000..ff6aec5c --- /dev/null +++ b/packages/elf-service/tests/service/providers.rs @@ -0,0 +1,61 @@ +use std::sync::{ + Arc, + atomic::{AtomicUsize, Ordering}, +}; + +use serde_json::Value; + +use elf_config::{EmbeddingProviderConfig, LlmProviderConfig, ProviderConfig}; +use elf_service::{BoxFuture, EmbeddingProvider, ExtractorProvider, RerankProvider, Result}; + +pub(crate) struct DummyEmbedding; +impl EmbeddingProvider for DummyEmbedding { + fn embed<'a>( + &'a self, + cfg: &'a EmbeddingProviderConfig, + texts: &'a [String], + ) -> BoxFuture<'a, Result>>> { + let dim = (cfg.dimensions as usize).max(1); + let vec = vec![0.0; dim]; + + Box::pin(async move { Ok(vec![vec; texts.len()]) }) + } +} + +pub(crate) struct DummyRerank; +impl RerankProvider for DummyRerank { + fn rerank<'a>( + &'a self, + _cfg: &'a ProviderConfig, + _query: &'a str, + docs: &'a [String], + ) -> BoxFuture<'a, Result>> { + let scores = vec![0.0; docs.len()]; + + Box::pin(async move { Ok(scores) }) + } +} + +pub(crate) struct SpyExtractor { + calls: Arc, +} +impl SpyExtractor { + pub(crate) fn new() -> Self { + Self { calls: Arc::new(AtomicUsize::new(0)) } + } + + pub(crate) fn count(&self) -> usize { + self.calls.load(Ordering::SeqCst) + } +} +impl ExtractorProvider for SpyExtractor { + fn extract<'a>( + &'a self, + _cfg: &'a LlmProviderConfig, + _messages: &'a [Value], + ) -> BoxFuture<'a, Result> { + self.calls.fetch_add(1, Ordering::SeqCst); + + Box::pin(async move { Ok(serde_json::json!({ "notes": [] })) }) + } +}