From ea6a20c9d337fc54d6e86e2ac4905a62679973fa Mon Sep 17 00:00:00 2001 From: Yvette Carlisle Date: Tue, 30 Jun 2026 16:22:05 +0800 Subject: [PATCH 1/6] {"schema":"decodex/commit/1","summary":"Split search service modules","authority":"manual"} --- packages/elf-service/src/search/service.rs | 309 +----------------- .../elf-service/src/search/service/context.rs | 88 +++++ .../src/search/service/entrypoint.rs | 30 ++ .../src/search/service/execution.rs | 163 +++++++++ .../src/search/service/response.rs | 40 +++ 5 files changed, 325 insertions(+), 305 deletions(-) create mode 100644 packages/elf-service/src/search/service/context.rs create mode 100644 packages/elf-service/src/search/service/entrypoint.rs create mode 100644 packages/elf-service/src/search/service/execution.rs create mode 100644 packages/elf-service/src/search/service/response.rs diff --git a/packages/elf-service/src/search/service.rs b/packages/elf-service/src/search/service.rs index dbabcbef..db6dffb8 100644 --- a/packages/elf-service/src/search/service.rs +++ b/packages/elf-service/src/search/service.rs @@ -1,305 +1,4 @@ -use crate::{ - Error, - search::{ - self, BuildQueryPlanArgs, DynamicGateSummary, ElfService, ExpansionMode, FinishSearchArgs, - HashMap, MAX_CANDIDATE_K, MaybeDynamicSearchArgs, RawSearchExecutionContext, RawSearchPath, - Result, SearchFilter, SearchRawPlannedResponse, SearchRequest, SearchResponse, - SearchRetrievalArgs, Uuid, ranking, - }, -}; - -impl ElfService { - /// Runs the quick raw-search path and returns ranked items without a query plan. - pub async fn search_raw_quick(&self, req: SearchRequest) -> Result { - self.execute_search_raw_path(req, RawSearchPath::Quick).await.map(|response| { - SearchResponse { - trace_id: response.trace_id, - items: response.items, - trajectory_summary: response.trajectory_summary, - } - }) - } - - /// Runs the planned raw-search path and returns ranked items plus a query plan. - pub async fn search_raw_planned(&self, req: SearchRequest) -> Result { - self.execute_search_raw_path(req, RawSearchPath::Planned).await - } - - /// Runs the default raw-search path and returns ranked items. - pub async fn search_raw(&self, req: SearchRequest) -> Result { - self.search_raw_planned(req).await.map(|response| SearchResponse { - trace_id: response.trace_id, - items: response.items, - trajectory_summary: response.trajectory_summary, - }) - } - - async fn execute_search_raw_path( - &self, - req: SearchRequest, - path: RawSearchPath, - ) -> Result { - let context = self.prepare_raw_search_execution(req, path)?; - - if context.allowed_scopes.is_empty() { - return self.execute_search_raw_no_allowed_scopes(&context, path).await; - } - - let dynamic_gate_enabled = - path == RawSearchPath::Planned && context.expansion_mode == ExpansionMode::Dynamic; - - self.execute_search_raw_with_allowed_scopes(&context, path, dynamic_gate_enabled).await - } - - async fn execute_search_raw_no_allowed_scopes( - &self, - context: &RawSearchExecutionContext, - path: RawSearchPath, - ) -> Result { - let expanded_queries = vec![context.query.clone()]; - let response = self - .finish_search(FinishSearchArgs { - path, - trace_id: context.trace_id, - query: context.query.as_str(), - tenant_id: context.tenant_id.as_str(), - project_id: context.project_id.as_str(), - agent_id: context.agent_id.as_str(), - token_id: context.token_id.as_deref(), - read_profile: context.read_profile.as_str(), - allowed_scopes: &context.allowed_scopes, - expanded_queries: expanded_queries.clone(), - expansion_mode: context.expansion_mode, - candidates: Vec::new(), - structured_matches: HashMap::new(), - recursive_retrieval: None, - top_k: context.top_k, - record_hits_enabled: context.record_hits_enabled, - ranking_override: context.ranking_override.clone(), - payload_level: context.payload_level, - filter: context.filter.as_ref(), - requested_candidate_k: context.requested_candidate_k, - effective_candidate_k: context.effective_candidate_k, - }) - .await?; - - Ok(self.build_raw_planned_response( - context, - path, - response, - expanded_queries, - DynamicGateSummary::default(), - )) - } - - async fn execute_search_raw_with_allowed_scopes( - &self, - context: &RawSearchExecutionContext, - path: RawSearchPath, - dynamic_gate_enabled: bool, - ) -> Result { - let filter = search::build_search_filter( - context.tenant_id.as_str(), - context.project_id.as_str(), - context.agent_id.as_str(), - &context.allowed_scopes, - ); - let retrieval_candidate_k = if context.filter.is_some() { - context.effective_candidate_k - } else { - context.candidate_k - }; - let (baseline_vector, early_response, dynamic_gate) = self - .maybe_finish_dynamic_search(MaybeDynamicSearchArgs { - path, - enabled: dynamic_gate_enabled, - trace_id: context.trace_id, - query: context.query.as_str(), - tenant_id: context.tenant_id.as_str(), - project_id: context.project_id.as_str(), - agent_id: context.agent_id.as_str(), - token_id: context.token_id.as_deref(), - read_profile: context.read_profile.as_str(), - allowed_scopes: &context.allowed_scopes, - project_context_description: context.project_context_description.as_deref(), - filter: &filter, - service_filter: context.filter.as_ref(), - candidate_k: retrieval_candidate_k, - requested_candidate_k: context.requested_candidate_k, - effective_candidate_k: context.effective_candidate_k, - top_k: context.top_k, - record_hits_enabled: context.record_hits_enabled, - ranking_override: context.ranking_override.as_ref(), - retrieval_sources_policy: &context.retrieval_sources_policy, - payload_level: context.payload_level, - }) - .await?; - - if let Some(response) = early_response { - return Ok(self.build_raw_planned_response( - context, - path, - response, - vec![context.query.clone()], - dynamic_gate, - )); - } - - let retrieval = self - .retrieve_search_candidates(SearchRetrievalArgs { - query: context.query.as_str(), - expansion_mode: context.expansion_mode, - project_context_description: context.project_context_description.as_deref(), - filter: &filter, - candidate_k: retrieval_candidate_k, - baseline_vector: baseline_vector.as_ref(), - tenant_id: context.tenant_id.as_str(), - project_id: context.project_id.as_str(), - agent_id: context.agent_id.as_str(), - allowed_scopes: &context.allowed_scopes, - retrieval_sources_policy: &context.retrieval_sources_policy, - }) - .await?; - let expanded_queries = retrieval.expanded_queries.clone(); - let response = self - .finish_search(FinishSearchArgs { - path, - trace_id: context.trace_id, - query: context.query.as_str(), - tenant_id: context.tenant_id.as_str(), - project_id: context.project_id.as_str(), - agent_id: context.agent_id.as_str(), - token_id: context.token_id.as_deref(), - read_profile: context.read_profile.as_str(), - allowed_scopes: &context.allowed_scopes, - expanded_queries: retrieval.expanded_queries, - expansion_mode: context.expansion_mode, - candidates: retrieval.candidates, - structured_matches: retrieval.structured_matches, - recursive_retrieval: retrieval.recursive, - top_k: context.top_k, - record_hits_enabled: context.record_hits_enabled, - ranking_override: context.ranking_override.clone(), - payload_level: context.payload_level, - filter: context.filter.as_ref(), - requested_candidate_k: context.requested_candidate_k, - effective_candidate_k: context.effective_candidate_k, - }) - .await?; - - Ok(self.build_raw_planned_response(context, path, response, expanded_queries, dynamic_gate)) - } - - fn prepare_raw_search_execution( - &self, - req: SearchRequest, - path: RawSearchPath, - ) -> Result { - let tenant_id = req.tenant_id.trim().to_string(); - let project_id = req.project_id.trim().to_string(); - let agent_id = req.agent_id.trim().to_string(); - let token_id = req - .token_id - .as_deref() - .map(str::trim) - .filter(|value| !value.is_empty()) - .map(|value| value.to_string()); - - search::validate_search_request_inputs( - tenant_id.as_str(), - project_id.as_str(), - agent_id.as_str(), - req.query.as_str(), - )?; - - let top_k = req.top_k.unwrap_or(self.cfg.memory.top_k).max(1); - let candidate_k = req.candidate_k.unwrap_or(self.cfg.memory.candidate_k).max(top_k); - let requested_candidate_k = candidate_k; - let filter = req - .filter - .as_ref() - .map(SearchFilter::parse) - .transpose() - .map_err(|err| Error::InvalidRequest { message: err.to_string() })?; - let effective_candidate_k = if filter.is_some() { - requested_candidate_k.saturating_mul(3).min(MAX_CANDIDATE_K).max(top_k) - } else { - requested_candidate_k - }; - let query = req.query; - let read_profile = req.read_profile; - let record_hits_enabled = req.record_hits.unwrap_or(false); - let ranking_override = req.ranking; - let retrieval_sources_policy = ranking::resolve_retrieval_sources_policy( - &self.cfg.ranking.retrieval_sources, - ranking_override.as_ref().and_then(|override_| override_.retrieval_sources.as_ref()), - )?; - let expansion_mode = match path { - RawSearchPath::Quick => ExpansionMode::Off, - RawSearchPath::Planned => ranking::resolve_expansion_mode(&self.cfg), - }; - let trace_id = Uuid::new_v4(); - let project_context_description = self - .resolve_project_context_description(tenant_id.as_str(), project_id.as_str()) - .map(|value| value.to_string()); - let allowed_scopes = ranking::resolve_scopes(&self.cfg, read_profile.as_str())?; - let policies = self.resolve_finish_search_policies(ranking_override.as_ref())?; - - Ok(RawSearchExecutionContext { - tenant_id, - project_id, - agent_id, - token_id, - top_k, - candidate_k, - requested_candidate_k, - effective_candidate_k, - filter, - query, - read_profile, - payload_level: req.payload_level, - record_hits_enabled, - ranking_override, - retrieval_sources_policy, - expansion_mode, - trace_id, - project_context_description, - allowed_scopes, - policies, - }) - } - - fn build_raw_planned_response( - &self, - context: &RawSearchExecutionContext, - path: RawSearchPath, - response: SearchResponse, - expanded_queries: Vec, - dynamic_gate: DynamicGateSummary, - ) -> SearchRawPlannedResponse { - let query_plan = self.build_query_plan(BuildQueryPlanArgs { - path, - query: context.query.as_str(), - tenant_id: context.tenant_id.as_str(), - project_id: context.project_id.as_str(), - agent_id: context.agent_id.as_str(), - read_profile: context.read_profile.as_str(), - allowed_scopes: &context.allowed_scopes, - expansion_mode: context.expansion_mode, - expanded_queries, - top_k: context.top_k, - candidate_k: context.candidate_k, - retrieval_sources_policy: &context.retrieval_sources_policy, - recursive_enabled: self.cfg.search.recursive.enabled, - policies: &context.policies, - dynamic_gate, - }); - - SearchRawPlannedResponse { - trace_id: response.trace_id, - items: response.items, - trajectory_summary: response.trajectory_summary, - query_plan, - } - } -} +mod context; +mod entrypoint; +mod execution; +mod response; diff --git a/packages/elf-service/src/search/service/context.rs b/packages/elf-service/src/search/service/context.rs new file mode 100644 index 00000000..3e310136 --- /dev/null +++ b/packages/elf-service/src/search/service/context.rs @@ -0,0 +1,88 @@ +use crate::{ + Error, + search::{ + self, ElfService, ExpansionMode, MAX_CANDIDATE_K, RawSearchExecutionContext, RawSearchPath, + Result, SearchFilter, SearchRequest, Uuid, ranking, + }, +}; + +impl ElfService { + pub(in crate::search) fn prepare_raw_search_execution( + &self, + req: SearchRequest, + path: RawSearchPath, + ) -> Result { + let tenant_id = req.tenant_id.trim().to_string(); + let project_id = req.project_id.trim().to_string(); + let agent_id = req.agent_id.trim().to_string(); + let token_id = req + .token_id + .as_deref() + .map(str::trim) + .filter(|value| !value.is_empty()) + .map(|value| value.to_string()); + + search::validate_search_request_inputs( + tenant_id.as_str(), + project_id.as_str(), + agent_id.as_str(), + req.query.as_str(), + )?; + + let top_k = req.top_k.unwrap_or(self.cfg.memory.top_k).max(1); + let candidate_k = req.candidate_k.unwrap_or(self.cfg.memory.candidate_k).max(top_k); + let requested_candidate_k = candidate_k; + let filter = req + .filter + .as_ref() + .map(SearchFilter::parse) + .transpose() + .map_err(|err| Error::InvalidRequest { message: err.to_string() })?; + let effective_candidate_k = if filter.is_some() { + requested_candidate_k.saturating_mul(3).min(MAX_CANDIDATE_K).max(top_k) + } else { + requested_candidate_k + }; + let query = req.query; + let read_profile = req.read_profile; + let record_hits_enabled = req.record_hits.unwrap_or(false); + let ranking_override = req.ranking; + let retrieval_sources_policy = ranking::resolve_retrieval_sources_policy( + &self.cfg.ranking.retrieval_sources, + ranking_override.as_ref().and_then(|override_| override_.retrieval_sources.as_ref()), + )?; + let expansion_mode = match path { + RawSearchPath::Quick => ExpansionMode::Off, + RawSearchPath::Planned => ranking::resolve_expansion_mode(&self.cfg), + }; + let trace_id = Uuid::new_v4(); + let project_context_description = self + .resolve_project_context_description(tenant_id.as_str(), project_id.as_str()) + .map(|value| value.to_string()); + let allowed_scopes = ranking::resolve_scopes(&self.cfg, read_profile.as_str())?; + let policies = self.resolve_finish_search_policies(ranking_override.as_ref())?; + + Ok(RawSearchExecutionContext { + tenant_id, + project_id, + agent_id, + token_id, + top_k, + candidate_k, + requested_candidate_k, + effective_candidate_k, + filter, + query, + read_profile, + payload_level: req.payload_level, + record_hits_enabled, + ranking_override, + retrieval_sources_policy, + expansion_mode, + trace_id, + project_context_description, + allowed_scopes, + policies, + }) + } +} diff --git a/packages/elf-service/src/search/service/entrypoint.rs b/packages/elf-service/src/search/service/entrypoint.rs new file mode 100644 index 00000000..7101737a --- /dev/null +++ b/packages/elf-service/src/search/service/entrypoint.rs @@ -0,0 +1,30 @@ +use crate::search::{ + ElfService, RawSearchPath, Result, SearchRawPlannedResponse, SearchRequest, SearchResponse, +}; + +impl ElfService { + /// Runs the quick raw-search path and returns ranked items without a query plan. + pub async fn search_raw_quick(&self, req: SearchRequest) -> Result { + self.execute_search_raw_path(req, RawSearchPath::Quick).await.map(|response| { + SearchResponse { + trace_id: response.trace_id, + items: response.items, + trajectory_summary: response.trajectory_summary, + } + }) + } + + /// Runs the planned raw-search path and returns ranked items plus a query plan. + pub async fn search_raw_planned(&self, req: SearchRequest) -> Result { + self.execute_search_raw_path(req, RawSearchPath::Planned).await + } + + /// Runs the default raw-search path and returns ranked items. + pub async fn search_raw(&self, req: SearchRequest) -> Result { + self.search_raw_planned(req).await.map(|response| SearchResponse { + trace_id: response.trace_id, + items: response.items, + trajectory_summary: response.trajectory_summary, + }) + } +} diff --git a/packages/elf-service/src/search/service/execution.rs b/packages/elf-service/src/search/service/execution.rs new file mode 100644 index 00000000..4df432b3 --- /dev/null +++ b/packages/elf-service/src/search/service/execution.rs @@ -0,0 +1,163 @@ +use crate::search::{ + self, DynamicGateSummary, ElfService, ExpansionMode, FinishSearchArgs, HashMap, + MaybeDynamicSearchArgs, RawSearchExecutionContext, RawSearchPath, Result, + SearchRawPlannedResponse, SearchRequest, SearchRetrievalArgs, +}; + +impl ElfService { + pub(in crate::search) async fn execute_search_raw_path( + &self, + req: SearchRequest, + path: RawSearchPath, + ) -> Result { + let context = self.prepare_raw_search_execution(req, path)?; + + if context.allowed_scopes.is_empty() { + return self.execute_search_raw_no_allowed_scopes(&context, path).await; + } + + let dynamic_gate_enabled = + path == RawSearchPath::Planned && context.expansion_mode == ExpansionMode::Dynamic; + + self.execute_search_raw_with_allowed_scopes(&context, path, dynamic_gate_enabled).await + } + + async fn execute_search_raw_no_allowed_scopes( + &self, + context: &RawSearchExecutionContext, + path: RawSearchPath, + ) -> Result { + let expanded_queries = vec![context.query.clone()]; + let response = self + .finish_search(FinishSearchArgs { + path, + trace_id: context.trace_id, + query: context.query.as_str(), + tenant_id: context.tenant_id.as_str(), + project_id: context.project_id.as_str(), + agent_id: context.agent_id.as_str(), + token_id: context.token_id.as_deref(), + read_profile: context.read_profile.as_str(), + allowed_scopes: &context.allowed_scopes, + expanded_queries: expanded_queries.clone(), + expansion_mode: context.expansion_mode, + candidates: Vec::new(), + structured_matches: HashMap::new(), + recursive_retrieval: None, + top_k: context.top_k, + record_hits_enabled: context.record_hits_enabled, + ranking_override: context.ranking_override.clone(), + payload_level: context.payload_level, + filter: context.filter.as_ref(), + requested_candidate_k: context.requested_candidate_k, + effective_candidate_k: context.effective_candidate_k, + }) + .await?; + + Ok(self.build_raw_planned_response( + context, + path, + response, + expanded_queries, + DynamicGateSummary::default(), + )) + } + + async fn execute_search_raw_with_allowed_scopes( + &self, + context: &RawSearchExecutionContext, + path: RawSearchPath, + dynamic_gate_enabled: bool, + ) -> Result { + let filter = search::build_search_filter( + context.tenant_id.as_str(), + context.project_id.as_str(), + context.agent_id.as_str(), + &context.allowed_scopes, + ); + let retrieval_candidate_k = if context.filter.is_some() { + context.effective_candidate_k + } else { + context.candidate_k + }; + let (baseline_vector, early_response, dynamic_gate) = self + .maybe_finish_dynamic_search(MaybeDynamicSearchArgs { + path, + enabled: dynamic_gate_enabled, + trace_id: context.trace_id, + query: context.query.as_str(), + tenant_id: context.tenant_id.as_str(), + project_id: context.project_id.as_str(), + agent_id: context.agent_id.as_str(), + token_id: context.token_id.as_deref(), + read_profile: context.read_profile.as_str(), + allowed_scopes: &context.allowed_scopes, + project_context_description: context.project_context_description.as_deref(), + filter: &filter, + service_filter: context.filter.as_ref(), + candidate_k: retrieval_candidate_k, + requested_candidate_k: context.requested_candidate_k, + effective_candidate_k: context.effective_candidate_k, + top_k: context.top_k, + record_hits_enabled: context.record_hits_enabled, + ranking_override: context.ranking_override.as_ref(), + retrieval_sources_policy: &context.retrieval_sources_policy, + payload_level: context.payload_level, + }) + .await?; + + if let Some(response) = early_response { + return Ok(self.build_raw_planned_response( + context, + path, + response, + vec![context.query.clone()], + dynamic_gate, + )); + } + + let retrieval = self + .retrieve_search_candidates(SearchRetrievalArgs { + query: context.query.as_str(), + expansion_mode: context.expansion_mode, + project_context_description: context.project_context_description.as_deref(), + filter: &filter, + candidate_k: retrieval_candidate_k, + baseline_vector: baseline_vector.as_ref(), + tenant_id: context.tenant_id.as_str(), + project_id: context.project_id.as_str(), + agent_id: context.agent_id.as_str(), + allowed_scopes: &context.allowed_scopes, + retrieval_sources_policy: &context.retrieval_sources_policy, + }) + .await?; + let expanded_queries = retrieval.expanded_queries.clone(); + let response = self + .finish_search(FinishSearchArgs { + path, + trace_id: context.trace_id, + query: context.query.as_str(), + tenant_id: context.tenant_id.as_str(), + project_id: context.project_id.as_str(), + agent_id: context.agent_id.as_str(), + token_id: context.token_id.as_deref(), + read_profile: context.read_profile.as_str(), + allowed_scopes: &context.allowed_scopes, + expanded_queries: retrieval.expanded_queries, + expansion_mode: context.expansion_mode, + candidates: retrieval.candidates, + structured_matches: retrieval.structured_matches, + recursive_retrieval: retrieval.recursive, + top_k: context.top_k, + record_hits_enabled: context.record_hits_enabled, + ranking_override: context.ranking_override.clone(), + payload_level: context.payload_level, + filter: context.filter.as_ref(), + requested_candidate_k: context.requested_candidate_k, + effective_candidate_k: context.effective_candidate_k, + }) + .await?; + + Ok(self.build_raw_planned_response(context, path, response, expanded_queries, dynamic_gate)) + } +} diff --git a/packages/elf-service/src/search/service/response.rs b/packages/elf-service/src/search/service/response.rs new file mode 100644 index 00000000..b952eb8d --- /dev/null +++ b/packages/elf-service/src/search/service/response.rs @@ -0,0 +1,40 @@ +use crate::search::{ + BuildQueryPlanArgs, DynamicGateSummary, ElfService, RawSearchExecutionContext, RawSearchPath, + SearchRawPlannedResponse, SearchResponse, +}; + +impl ElfService { + pub(in crate::search) fn build_raw_planned_response( + &self, + context: &RawSearchExecutionContext, + path: RawSearchPath, + response: SearchResponse, + expanded_queries: Vec, + dynamic_gate: DynamicGateSummary, + ) -> SearchRawPlannedResponse { + let query_plan = self.build_query_plan(BuildQueryPlanArgs { + path, + query: context.query.as_str(), + tenant_id: context.tenant_id.as_str(), + project_id: context.project_id.as_str(), + agent_id: context.agent_id.as_str(), + read_profile: context.read_profile.as_str(), + allowed_scopes: &context.allowed_scopes, + expansion_mode: context.expansion_mode, + expanded_queries, + top_k: context.top_k, + candidate_k: context.candidate_k, + retrieval_sources_policy: &context.retrieval_sources_policy, + recursive_enabled: self.cfg.search.recursive.enabled, + policies: &context.policies, + dynamic_gate, + }); + + SearchRawPlannedResponse { + trace_id: response.trace_id, + items: response.items, + trajectory_summary: response.trajectory_summary, + query_plan, + } + } +} From 4755e72d71f88349a49e564b5c09cccc1fa1ae86 Mon Sep 17 00:00:00 2001 From: Yvette Carlisle Date: Tue, 30 Jun 2026 16:34:40 +0800 Subject: [PATCH 2/6] {"schema":"decodex/commit/1","summary":"Split search rerank modules","authority":"manual"} --- .../elf-service/src/search/finish/rerank.rs | 303 +----------------- .../src/search/finish/rerank/cache.rs | 154 +++++++++ .../src/search/finish/rerank/scoring.rs | 91 ++++++ .../src/search/finish/rerank/snippets.rs | 66 ++++ 4 files changed, 314 insertions(+), 300 deletions(-) create mode 100644 packages/elf-service/src/search/finish/rerank/cache.rs create mode 100644 packages/elf-service/src/search/finish/rerank/scoring.rs create mode 100644 packages/elf-service/src/search/finish/rerank/snippets.rs diff --git a/packages/elf-service/src/search/finish/rerank.rs b/packages/elf-service/src/search/finish/rerank.rs index b6e74834..817589a7 100644 --- a/packages/elf-service/src/search/finish/rerank.rs +++ b/packages/elf-service/src/search/finish/rerank.rs @@ -1,300 +1,3 @@ -use crate::{ - Error, - search::{ - self, CacheKind, ChunkCandidate, ChunkMeta, ChunkSnippet, Duration, ElfService, HashMap, - NoteMeta, OffsetDateTime, RerankCacheCandidate, RerankCacheItem, RerankCachePayload, - Result, SearchCache, Uuid, ranking, - }, -}; - -impl ElfService { - pub(in crate::search) async fn build_snippet_items( - &self, - filtered_candidates: &[ChunkCandidate], - note_meta: &HashMap, - ) -> Result> { - if filtered_candidates.is_empty() { - return Ok(Vec::new()); - } - - let pairs = ranking::collect_neighbor_pairs(filtered_candidates); - let chunk_rows = search::fetch_chunks_by_pair(&self.db.pool, &pairs).await?; - let mut chunk_by_id = HashMap::new(); - let mut chunk_by_note_index = HashMap::new(); - - for row in chunk_rows { - chunk_by_note_index.insert((row.note_id, row.chunk_index), row.clone()); - chunk_by_id.insert(row.chunk_id, row); - } - - let mut items = Vec::new(); - - for candidate in filtered_candidates { - let Some(chunk_row) = chunk_by_id.get(&candidate.chunk_id) else { - tracing::warn!( - chunk_id = %candidate.chunk_id, - "Chunk metadata missing for candidate." - ); - - continue; - }; - let snippet = ranking::stitch_snippet( - candidate.note_id, - chunk_row.chunk_index, - &chunk_by_note_index, - ); - - if snippet.is_empty() { - continue; - } - - let Some(note) = note_meta.get(&candidate.note_id) else { continue }; - let chunk = ChunkMeta { - chunk_id: chunk_row.chunk_id, - chunk_index: chunk_row.chunk_index, - start_offset: chunk_row.start_offset, - end_offset: chunk_row.end_offset, - }; - - items.push(ChunkSnippet { - note: note.clone(), - chunk, - snippet, - retrieval_rank: candidate.retrieval_rank, - retrieval_score: candidate.retrieval_score, - }); - } - - Ok(items) - } - - pub(in crate::search) async fn rerank_snippet_items( - &self, - query: &str, - snippet_items: &[ChunkSnippet], - cache_cfg: &SearchCache, - now: OffsetDateTime, - ) -> Result> { - if snippet_items.is_empty() { - return Ok(Vec::new()); - } - - let (cache_candidates, signature) = Self::build_rerank_cache_signature(snippet_items); - let mut cache_key: Option = None; - let mut cached_scores: Option> = None; - - if cache_cfg.enabled { - match ranking::build_rerank_cache_key( - query, - self.cfg.providers.rerank.provider_id.as_str(), - self.cfg.providers.rerank.model.as_str(), - &signature, - ) { - Ok(key) => { - cache_key = Some(key.clone()); - cached_scores = self - .read_rerank_cache_scores(&key, cache_candidates.as_slice(), cache_cfg, now) - .await; - }, - Err(err) => { - tracing::warn!( - error = %err, - cache_kind = CacheKind::Rerank.as_str(), - "Cache key build failed." - ); - }, - } - } - - if let Some(scores) = cached_scores { - return Ok(scores); - } - - let docs: Vec = snippet_items.iter().map(|item| item.snippet.clone()).collect(); - let scores = self.providers.rerank.rerank(&self.cfg.providers.rerank, query, &docs).await?; - - if scores.len() != snippet_items.len() { - return Err(Error::Provider { - message: "Rerank provider returned mismatched score count.".to_string(), - }); - } - if cache_cfg.enabled - && let Some(key) = cache_key.as_ref() - && !cache_candidates.is_empty() - { - self.store_rerank_cache_scores( - key, - cache_candidates.as_slice(), - scores.as_slice(), - cache_cfg, - ) - .await; - } - - Ok(scores) - } - - pub(in crate::search) fn build_rerank_cache_signature( - snippet_items: &[ChunkSnippet], - ) -> (Vec, Vec<(Uuid, OffsetDateTime)>) { - let candidates: Vec = snippet_items - .iter() - .map(|item| RerankCacheCandidate { - chunk_id: item.chunk.chunk_id, - updated_at: item.note.updated_at, - }) - .collect(); - let signature: Vec<(Uuid, OffsetDateTime)> = - candidates.iter().map(|candidate| (candidate.chunk_id, candidate.updated_at)).collect(); - - (candidates, signature) - } - - pub(in crate::search) async fn read_rerank_cache_scores( - &self, - key: &str, - cache_candidates: &[RerankCacheCandidate], - cache_cfg: &SearchCache, - now: OffsetDateTime, - ) -> Option> { - match search::fetch_cache_payload(&self.db.pool, CacheKind::Rerank, key, now).await { - Ok(Some(payload)) => { - let decoded: RerankCachePayload = match serde_json::from_value(payload.value) { - Ok(value) => value, - Err(err) => { - tracing::warn!( - error = %err, - cache_kind = CacheKind::Rerank.as_str(), - cache_key_prefix = ranking::cache_key_prefix(key), - "Cache payload decode failed." - ); - - RerankCachePayload { items: Vec::new() } - }, - }; - - if let Some(scores) = ranking::build_cached_scores(&decoded, cache_candidates) { - tracing::info!( - cache_kind = CacheKind::Rerank.as_str(), - cache_key_prefix = ranking::cache_key_prefix(key), - hit = true, - payload_size = payload.size_bytes, - ttl_days = cache_cfg.rerank_ttl_days, - "Cache hit." - ); - - Some(scores) - } else { - tracing::warn!( - cache_kind = CacheKind::Rerank.as_str(), - cache_key_prefix = ranking::cache_key_prefix(key), - hit = false, - payload_size = payload.size_bytes, - ttl_days = cache_cfg.rerank_ttl_days, - "Cache payload did not match candidates." - ); - - None - } - }, - Ok(None) => { - tracing::info!( - cache_kind = CacheKind::Rerank.as_str(), - cache_key_prefix = ranking::cache_key_prefix(key), - hit = false, - payload_size = 0_u64, - ttl_days = cache_cfg.rerank_ttl_days, - "Cache miss." - ); - - None - }, - Err(err) => { - tracing::warn!( - error = %err, - cache_kind = CacheKind::Rerank.as_str(), - cache_key_prefix = ranking::cache_key_prefix(key), - "Cache read failed." - ); - - None - }, - } - } - - pub(in crate::search) async fn store_rerank_cache_scores( - &self, - key: &str, - cache_candidates: &[RerankCacheCandidate], - scores: &[f32], - cache_cfg: &SearchCache, - ) { - let payload = RerankCachePayload { - items: cache_candidates - .iter() - .zip(scores.iter()) - .map(|(candidate, score)| RerankCacheItem { - chunk_id: candidate.chunk_id, - updated_at: candidate.updated_at, - score: *score, - }) - .collect(), - }; - - match serde_json::to_value(&payload) { - Ok(payload_json) => { - let stored_at = OffsetDateTime::now_utc(); - let expires_at = stored_at + Duration::days(cache_cfg.rerank_ttl_days); - - match search::store_cache_payload( - &self.db.pool, - CacheKind::Rerank, - key, - payload_json, - stored_at, - expires_at, - cache_cfg.max_payload_bytes, - ) - .await - { - Ok(Some(payload_size)) => { - tracing::info!( - cache_kind = CacheKind::Rerank.as_str(), - cache_key_prefix = ranking::cache_key_prefix(key), - hit = false, - payload_size, - ttl_days = cache_cfg.rerank_ttl_days, - "Cache stored." - ); - }, - Ok(None) => { - tracing::warn!( - cache_kind = CacheKind::Rerank.as_str(), - cache_key_prefix = ranking::cache_key_prefix(key), - hit = false, - payload_size = 0_u64, - ttl_days = cache_cfg.rerank_ttl_days, - "Cache payload skipped due to size." - ); - }, - Err(err) => { - tracing::warn!( - error = %err, - cache_kind = CacheKind::Rerank.as_str(), - cache_key_prefix = ranking::cache_key_prefix(key), - "Cache write failed." - ); - }, - } - }, - Err(err) => { - tracing::warn!( - error = %err, - cache_kind = CacheKind::Rerank.as_str(), - cache_key_prefix = ranking::cache_key_prefix(key), - "Cache payload encode failed." - ); - }, - } - } -} +mod cache; +mod scoring; +mod snippets; diff --git a/packages/elf-service/src/search/finish/rerank/cache.rs b/packages/elf-service/src/search/finish/rerank/cache.rs new file mode 100644 index 00000000..365b6db1 --- /dev/null +++ b/packages/elf-service/src/search/finish/rerank/cache.rs @@ -0,0 +1,154 @@ +use crate::search::{ + self, CacheKind, Duration, ElfService, OffsetDateTime, RerankCacheCandidate, RerankCacheItem, + RerankCachePayload, SearchCache, ranking, +}; + +impl ElfService { + pub(in crate::search) async fn read_rerank_cache_scores( + &self, + key: &str, + cache_candidates: &[RerankCacheCandidate], + cache_cfg: &SearchCache, + now: OffsetDateTime, + ) -> Option> { + match search::fetch_cache_payload(&self.db.pool, CacheKind::Rerank, key, now).await { + Ok(Some(payload)) => { + let decoded: RerankCachePayload = match serde_json::from_value(payload.value) { + Ok(value) => value, + Err(err) => { + tracing::warn!( + error = %err, + cache_kind = CacheKind::Rerank.as_str(), + cache_key_prefix = ranking::cache_key_prefix(key), + "Cache payload decode failed." + ); + + RerankCachePayload { items: Vec::new() } + }, + }; + + if let Some(scores) = ranking::build_cached_scores(&decoded, cache_candidates) { + tracing::info!( + cache_kind = CacheKind::Rerank.as_str(), + cache_key_prefix = ranking::cache_key_prefix(key), + hit = true, + payload_size = payload.size_bytes, + ttl_days = cache_cfg.rerank_ttl_days, + "Cache hit." + ); + + Some(scores) + } else { + tracing::warn!( + cache_kind = CacheKind::Rerank.as_str(), + cache_key_prefix = ranking::cache_key_prefix(key), + hit = false, + payload_size = payload.size_bytes, + ttl_days = cache_cfg.rerank_ttl_days, + "Cache payload did not match candidates." + ); + + None + } + }, + Ok(None) => { + tracing::info!( + cache_kind = CacheKind::Rerank.as_str(), + cache_key_prefix = ranking::cache_key_prefix(key), + hit = false, + payload_size = 0_u64, + ttl_days = cache_cfg.rerank_ttl_days, + "Cache miss." + ); + + None + }, + Err(err) => { + tracing::warn!( + error = %err, + cache_kind = CacheKind::Rerank.as_str(), + cache_key_prefix = ranking::cache_key_prefix(key), + "Cache read failed." + ); + + None + }, + } + } + + pub(in crate::search) async fn store_rerank_cache_scores( + &self, + key: &str, + cache_candidates: &[RerankCacheCandidate], + scores: &[f32], + cache_cfg: &SearchCache, + ) { + let payload = RerankCachePayload { + items: cache_candidates + .iter() + .zip(scores.iter()) + .map(|(candidate, score)| RerankCacheItem { + chunk_id: candidate.chunk_id, + updated_at: candidate.updated_at, + score: *score, + }) + .collect(), + }; + + match serde_json::to_value(&payload) { + Ok(payload_json) => { + let stored_at = OffsetDateTime::now_utc(); + let expires_at = stored_at + Duration::days(cache_cfg.rerank_ttl_days); + + match search::store_cache_payload( + &self.db.pool, + CacheKind::Rerank, + key, + payload_json, + stored_at, + expires_at, + cache_cfg.max_payload_bytes, + ) + .await + { + Ok(Some(payload_size)) => { + tracing::info!( + cache_kind = CacheKind::Rerank.as_str(), + cache_key_prefix = ranking::cache_key_prefix(key), + hit = false, + payload_size, + ttl_days = cache_cfg.rerank_ttl_days, + "Cache stored." + ); + }, + Ok(None) => { + tracing::warn!( + cache_kind = CacheKind::Rerank.as_str(), + cache_key_prefix = ranking::cache_key_prefix(key), + hit = false, + payload_size = 0_u64, + ttl_days = cache_cfg.rerank_ttl_days, + "Cache payload skipped due to size." + ); + }, + Err(err) => { + tracing::warn!( + error = %err, + cache_kind = CacheKind::Rerank.as_str(), + cache_key_prefix = ranking::cache_key_prefix(key), + "Cache write failed." + ); + }, + } + }, + Err(err) => { + tracing::warn!( + error = %err, + cache_kind = CacheKind::Rerank.as_str(), + cache_key_prefix = ranking::cache_key_prefix(key), + "Cache payload encode failed." + ); + }, + } + } +} diff --git a/packages/elf-service/src/search/finish/rerank/scoring.rs b/packages/elf-service/src/search/finish/rerank/scoring.rs new file mode 100644 index 00000000..181b26ce --- /dev/null +++ b/packages/elf-service/src/search/finish/rerank/scoring.rs @@ -0,0 +1,91 @@ +use crate::{ + Error, + search::{ + CacheKind, ChunkSnippet, ElfService, OffsetDateTime, RerankCacheCandidate, Result, + SearchCache, Uuid, ranking, + }, +}; + +impl ElfService { + pub(in crate::search) async fn rerank_snippet_items( + &self, + query: &str, + snippet_items: &[ChunkSnippet], + cache_cfg: &SearchCache, + now: OffsetDateTime, + ) -> Result> { + if snippet_items.is_empty() { + return Ok(Vec::new()); + } + + let (cache_candidates, signature) = Self::build_rerank_cache_signature(snippet_items); + let mut cache_key: Option = None; + let mut cached_scores: Option> = None; + + if cache_cfg.enabled { + match ranking::build_rerank_cache_key( + query, + self.cfg.providers.rerank.provider_id.as_str(), + self.cfg.providers.rerank.model.as_str(), + &signature, + ) { + Ok(key) => { + cache_key = Some(key.clone()); + cached_scores = self + .read_rerank_cache_scores(&key, cache_candidates.as_slice(), cache_cfg, now) + .await; + }, + Err(err) => { + tracing::warn!( + error = %err, + cache_kind = CacheKind::Rerank.as_str(), + "Cache key build failed." + ); + }, + } + } + + if let Some(scores) = cached_scores { + return Ok(scores); + } + + let docs: Vec = snippet_items.iter().map(|item| item.snippet.clone()).collect(); + let scores = self.providers.rerank.rerank(&self.cfg.providers.rerank, query, &docs).await?; + + if scores.len() != snippet_items.len() { + return Err(Error::Provider { + message: "Rerank provider returned mismatched score count.".to_string(), + }); + } + if cache_cfg.enabled + && let Some(key) = cache_key.as_ref() + && !cache_candidates.is_empty() + { + self.store_rerank_cache_scores( + key, + cache_candidates.as_slice(), + scores.as_slice(), + cache_cfg, + ) + .await; + } + + Ok(scores) + } + + pub(in crate::search) fn build_rerank_cache_signature( + snippet_items: &[ChunkSnippet], + ) -> (Vec, Vec<(Uuid, OffsetDateTime)>) { + let candidates: Vec = snippet_items + .iter() + .map(|item| RerankCacheCandidate { + chunk_id: item.chunk.chunk_id, + updated_at: item.note.updated_at, + }) + .collect(); + let signature: Vec<(Uuid, OffsetDateTime)> = + candidates.iter().map(|candidate| (candidate.chunk_id, candidate.updated_at)).collect(); + + (candidates, signature) + } +} diff --git a/packages/elf-service/src/search/finish/rerank/snippets.rs b/packages/elf-service/src/search/finish/rerank/snippets.rs new file mode 100644 index 00000000..d03474eb --- /dev/null +++ b/packages/elf-service/src/search/finish/rerank/snippets.rs @@ -0,0 +1,66 @@ +use crate::search::{ + self, ChunkCandidate, ChunkMeta, ChunkSnippet, ElfService, HashMap, NoteMeta, Result, Uuid, + ranking, +}; + +impl ElfService { + pub(in crate::search) async fn build_snippet_items( + &self, + filtered_candidates: &[ChunkCandidate], + note_meta: &HashMap, + ) -> Result> { + if filtered_candidates.is_empty() { + return Ok(Vec::new()); + } + + let pairs = ranking::collect_neighbor_pairs(filtered_candidates); + let chunk_rows = search::fetch_chunks_by_pair(&self.db.pool, &pairs).await?; + let mut chunk_by_id = HashMap::new(); + let mut chunk_by_note_index = HashMap::new(); + + for row in chunk_rows { + chunk_by_note_index.insert((row.note_id, row.chunk_index), row.clone()); + chunk_by_id.insert(row.chunk_id, row); + } + + let mut items = Vec::new(); + + for candidate in filtered_candidates { + let Some(chunk_row) = chunk_by_id.get(&candidate.chunk_id) else { + tracing::warn!( + chunk_id = %candidate.chunk_id, + "Chunk metadata missing for candidate." + ); + + continue; + }; + let snippet = ranking::stitch_snippet( + candidate.note_id, + chunk_row.chunk_index, + &chunk_by_note_index, + ); + + if snippet.is_empty() { + continue; + } + + let Some(note) = note_meta.get(&candidate.note_id) else { continue }; + let chunk = ChunkMeta { + chunk_id: chunk_row.chunk_id, + chunk_index: chunk_row.chunk_index, + start_offset: chunk_row.start_offset, + end_offset: chunk_row.end_offset, + }; + + items.push(ChunkSnippet { + note: note.clone(), + chunk, + snippet, + retrieval_rank: candidate.retrieval_rank, + retrieval_score: candidate.retrieval_score, + }); + } + + Ok(items) + } +} From 8ced38789d63aef27fba18fa91228b44f26927cc Mon Sep 17 00:00:00 2001 From: Yvette Carlisle Date: Tue, 30 Jun 2026 16:38:11 +0800 Subject: [PATCH 3/6] {"schema":"decodex/commit/1","summary":"Split search scoring modules","authority":"manual"} --- .../elf-service/src/search/finish/scoring.rs | 263 +----------------- .../src/search/finish/scoring/build.rs | 72 +++++ .../src/search/finish/scoring/candidates.rs | 33 +++ .../src/search/finish/scoring/diversity.rs | 23 ++ .../src/search/finish/scoring/hits.rs | 23 ++ .../src/search/finish/scoring/policies.rs | 38 +++ .../src/search/finish/scoring/snippets.rs | 90 ++++++ 7 files changed, 285 insertions(+), 257 deletions(-) create mode 100644 packages/elf-service/src/search/finish/scoring/build.rs create mode 100644 packages/elf-service/src/search/finish/scoring/candidates.rs create mode 100644 packages/elf-service/src/search/finish/scoring/diversity.rs create mode 100644 packages/elf-service/src/search/finish/scoring/hits.rs create mode 100644 packages/elf-service/src/search/finish/scoring/policies.rs create mode 100644 packages/elf-service/src/search/finish/scoring/snippets.rs diff --git a/packages/elf-service/src/search/finish/scoring.rs b/packages/elf-service/src/search/finish/scoring.rs index bafd16ff..d6d5cc31 100644 --- a/packages/elf-service/src/search/finish/scoring.rs +++ b/packages/elf-service/src/search/finish/scoring.rs @@ -1,257 +1,6 @@ -use crate::search::{ - self, ChunkCandidate, ChunkSnippet, DiversityDecision, ElfService, FinishSearchPolicies, - FinishSearchScoringResult, HashMap, MAX_MATCHED_TERMS, NoteMeta, OffsetDateTime, Ordering, - RankingRequestOverride, ResolvedDiversityPolicy, Result, ScoreCandidateCtx, ScoreSnippetArgs, - ScoredChunk, SearchFilter, SearchFilterImpact, Uuid, ranking, structured, -}; - -impl ElfService { - #[allow(clippy::too_many_arguments)] - pub(in crate::search) async fn build_finish_search_scoring( - &self, - query: &str, - candidates: Vec, - note_meta: &HashMap, - policies: &FinishSearchPolicies, - top_k: u32, - candidate_count: usize, - filter: Option<&SearchFilter>, - requested_candidate_k: u32, - effective_candidate_k: u32, - now: OffsetDateTime, - skip_rerank: bool, - ) -> Result { - let (filtered_candidates, filter_impact) = self.apply_filter_to_candidates( - candidates, - note_meta, - filter, - requested_candidate_k, - effective_candidate_k, - ); - let filtered_candidate_count = filtered_candidates.len(); - let snippet_items = self.build_snippet_items(&filtered_candidates, note_meta).await?; - let snippet_count = snippet_items.len(); - let query_tokens = ranking::tokenize_query(query, MAX_MATCHED_TERMS); - let scope_context_boost_by_scope = - ranking::build_scope_context_boost_by_scope(&query_tokens, self.cfg.context.as_ref()); - let det_query_tokens = structured::build_deterministic_query_tokens(&self.cfg, query); - let scored = self - .score_snippet_items(ScoreSnippetArgs { - query, - snippet_items, - scope_context_boost_by_scope: &scope_context_boost_by_scope, - det_query_tokens: det_query_tokens.as_slice(), - blend_policy: &policies.blend_policy, - cache_cfg: &self.cfg.search.cache, - now, - candidate_count, - skip_rerank, - }) - .await?; - let scored_count = scored.len(); - let trace_candidates = self.build_trace_candidates(&scored, now); - let results = search::select_best_scored_chunks(scored); - let fused_results = results.clone(); - let (selected_results, diversity_decisions) = - self.apply_diversity_policy(results, top_k, &policies.diversity_policy).await?; - let selected_count = selected_results.len(); - - Ok(FinishSearchScoringResult { - query_tokens, - filtered_candidates, - scored_count, - snippet_count, - filtered_candidate_count, - filter_impact, - trace_candidates, - fused_results, - selected_results, - diversity_decisions, - selected_count, - }) - } - - pub(in crate::search) fn apply_filter_to_candidates( - &self, - candidates: Vec, - note_meta: &HashMap, - filter: Option<&SearchFilter>, - requested_candidate_k: u32, - effective_candidate_k: u32, - ) -> (Vec, Option) { - let filtered_candidates: Vec = candidates - .into_iter() - .filter(|candidate| ranking::candidate_matches_note(note_meta, candidate)) - .collect(); - - match filter { - Some(filter) => { - let (candidates, filter_impact) = filter.eval( - filtered_candidates, - note_meta, - requested_candidate_k, - effective_candidate_k, - ); - - (candidates, Some(filter_impact)) - }, - None => (filtered_candidates, None), - } - } - - pub(in crate::search) fn resolve_finish_search_policies( - &self, - ranking_override: Option<&RankingRequestOverride>, - ) -> Result { - let blend_policy = ranking::resolve_blend_policy( - &self.cfg.ranking.blend, - ranking_override.and_then(|override_| override_.blend.as_ref()), - )?; - let diversity_policy = ranking::resolve_diversity_policy( - &self.cfg.ranking.diversity, - ranking_override.and_then(|override_| override_.diversity.as_ref()), - )?; - let retrieval_sources_policy = ranking::resolve_retrieval_sources_policy( - &self.cfg.ranking.retrieval_sources, - ranking_override.and_then(|override_| override_.retrieval_sources.as_ref()), - )?; - let policy_snapshot = ranking::build_policy_snapshot( - &self.cfg, - &blend_policy, - &diversity_policy, - &retrieval_sources_policy, - ranking_override, - ); - let policy_hash = ranking::hash_policy_snapshot(&policy_snapshot)?; - let policy_id = format!("ranking_v2:{}", &policy_hash[..12.min(policy_hash.len())]); - - Ok(FinishSearchPolicies { - blend_policy, - diversity_policy, - retrieval_sources_policy, - policy_snapshot, - policy_id, - }) - } - - pub(in crate::search) async fn score_snippet_items( - &self, - args: ScoreSnippetArgs<'_, '_>, - ) -> Result> { - let ScoreSnippetArgs { - query, - snippet_items, - scope_context_boost_by_scope, - det_query_tokens, - blend_policy, - cache_cfg, - now, - candidate_count, - skip_rerank, - } = args; - - if snippet_items.is_empty() { - return Ok(Vec::new()); - } - - let scores = if skip_rerank { - Self::build_quick_find_rerank_scores(&snippet_items) - } else { - self.rerank_snippet_items(query, snippet_items.as_slice(), cache_cfg, now).await? - }; - let rerank_ranks = ranking::build_rerank_ranks(&snippet_items, &scores); - let total_rerank = u32::try_from(scores.len()).unwrap_or(1).max(1); - let total_retrieval = u32::try_from(candidate_count).unwrap_or(1).max(1); - let score_ctx = ScoreCandidateCtx { - cfg: &self.cfg, - blend_policy, - scope_context_boost_by_scope, - det_query_tokens, - now, - total_rerank, - total_retrieval, - }; - let mut scored = Vec::with_capacity(snippet_items.len()); - - for ((item, rerank_score), rerank_rank) in - snippet_items.into_iter().zip(scores).zip(rerank_ranks) - { - scored.push(search::score_chunk_candidate(&score_ctx, item, rerank_score, rerank_rank)); - } - - Ok(scored) - } - - pub(in crate::search) fn build_quick_find_rerank_scores( - snippet_items: &[ChunkSnippet], - ) -> Vec { - let mut idxs: Vec = (0..snippet_items.len()).collect(); - - idxs.sort_by(|&a, &b| { - let ord = snippet_items[a].retrieval_rank.cmp(&snippet_items[b].retrieval_rank); - - if ord != Ordering::Equal { - return ord; - } - - let ord = snippet_items[a].chunk.chunk_index.cmp(&snippet_items[b].chunk.chunk_index); - - if ord != Ordering::Equal { - return ord; - } - - snippet_items[a].chunk.chunk_id.cmp(&snippet_items[b].chunk.chunk_id) - }); - - let total = idxs.len(); - - if total == 0 { - return Vec::new(); - } - - let mut scores = vec![0_f32; total]; - - for (rank, idx) in idxs.into_iter().enumerate() { - scores[idx] = 1.0 / (rank as f32 + 1.0); - } - - scores - } - - pub(in crate::search) async fn apply_diversity_policy( - &self, - results: Vec, - top_k: u32, - diversity_policy: &ResolvedDiversityPolicy, - ) -> Result<(Vec, HashMap)> { - let note_vectors = if diversity_policy.enabled { - search::fetch_note_vectors_for_diversity(&self.db.pool, results.as_slice()).await? - } else { - HashMap::new() - }; - let (selected_results, diversity_decisions) = - ranking::select_diverse_results(results, top_k, diversity_policy, ¬e_vectors); - - Ok((selected_results, diversity_decisions)) - } - - pub(in crate::search) async fn record_hits_if_enabled( - &self, - enabled: bool, - query: &str, - selected_results: &[ScoredChunk], - now: OffsetDateTime, - ) -> Result<()> { - if !enabled || selected_results.is_empty() { - return Ok(()); - } - - let mut tx = self.db.pool.begin().await?; - - search::record_hits(&mut *tx, query, selected_results, now).await?; - - tx.commit().await?; - - Ok(()) - } -} +mod build; +mod candidates; +mod diversity; +mod hits; +mod policies; +mod snippets; diff --git a/packages/elf-service/src/search/finish/scoring/build.rs b/packages/elf-service/src/search/finish/scoring/build.rs new file mode 100644 index 00000000..ca64aca5 --- /dev/null +++ b/packages/elf-service/src/search/finish/scoring/build.rs @@ -0,0 +1,72 @@ +use crate::search::{ + self, ChunkCandidate, ElfService, FinishSearchPolicies, FinishSearchScoringResult, HashMap, + MAX_MATCHED_TERMS, NoteMeta, OffsetDateTime, Result, ScoreSnippetArgs, SearchFilter, Uuid, + ranking, structured, +}; + +impl ElfService { + #[allow(clippy::too_many_arguments)] + pub(in crate::search) async fn build_finish_search_scoring( + &self, + query: &str, + candidates: Vec, + note_meta: &HashMap, + policies: &FinishSearchPolicies, + top_k: u32, + candidate_count: usize, + filter: Option<&SearchFilter>, + requested_candidate_k: u32, + effective_candidate_k: u32, + now: OffsetDateTime, + skip_rerank: bool, + ) -> Result { + let (filtered_candidates, filter_impact) = self.apply_filter_to_candidates( + candidates, + note_meta, + filter, + requested_candidate_k, + effective_candidate_k, + ); + let filtered_candidate_count = filtered_candidates.len(); + let snippet_items = self.build_snippet_items(&filtered_candidates, note_meta).await?; + let snippet_count = snippet_items.len(); + let query_tokens = ranking::tokenize_query(query, MAX_MATCHED_TERMS); + let scope_context_boost_by_scope = + ranking::build_scope_context_boost_by_scope(&query_tokens, self.cfg.context.as_ref()); + let det_query_tokens = structured::build_deterministic_query_tokens(&self.cfg, query); + let scored = self + .score_snippet_items(ScoreSnippetArgs { + query, + snippet_items, + scope_context_boost_by_scope: &scope_context_boost_by_scope, + det_query_tokens: det_query_tokens.as_slice(), + blend_policy: &policies.blend_policy, + cache_cfg: &self.cfg.search.cache, + now, + candidate_count, + skip_rerank, + }) + .await?; + let scored_count = scored.len(); + let trace_candidates = self.build_trace_candidates(&scored, now); + let results = search::select_best_scored_chunks(scored); + let fused_results = results.clone(); + let (selected_results, diversity_decisions) = + self.apply_diversity_policy(results, top_k, &policies.diversity_policy).await?; + let selected_count = selected_results.len(); + + Ok(FinishSearchScoringResult { + query_tokens, + filtered_candidates, + scored_count, + snippet_count, + filtered_candidate_count, + filter_impact, + trace_candidates, + fused_results, + selected_results, + diversity_decisions, + selected_count, + }) + } +} diff --git a/packages/elf-service/src/search/finish/scoring/candidates.rs b/packages/elf-service/src/search/finish/scoring/candidates.rs new file mode 100644 index 00000000..16915224 --- /dev/null +++ b/packages/elf-service/src/search/finish/scoring/candidates.rs @@ -0,0 +1,33 @@ +use crate::search::{ + ChunkCandidate, ElfService, HashMap, NoteMeta, SearchFilter, SearchFilterImpact, Uuid, ranking, +}; + +impl ElfService { + pub(in crate::search) fn apply_filter_to_candidates( + &self, + candidates: Vec, + note_meta: &HashMap, + filter: Option<&SearchFilter>, + requested_candidate_k: u32, + effective_candidate_k: u32, + ) -> (Vec, Option) { + let filtered_candidates: Vec = candidates + .into_iter() + .filter(|candidate| ranking::candidate_matches_note(note_meta, candidate)) + .collect(); + + match filter { + Some(filter) => { + let (candidates, filter_impact) = filter.eval( + filtered_candidates, + note_meta, + requested_candidate_k, + effective_candidate_k, + ); + + (candidates, Some(filter_impact)) + }, + None => (filtered_candidates, None), + } + } +} diff --git a/packages/elf-service/src/search/finish/scoring/diversity.rs b/packages/elf-service/src/search/finish/scoring/diversity.rs new file mode 100644 index 00000000..a38e5f6b --- /dev/null +++ b/packages/elf-service/src/search/finish/scoring/diversity.rs @@ -0,0 +1,23 @@ +use crate::search::{ + self, DiversityDecision, ElfService, HashMap, ResolvedDiversityPolicy, Result, ScoredChunk, + Uuid, ranking, +}; + +impl ElfService { + pub(in crate::search) async fn apply_diversity_policy( + &self, + results: Vec, + top_k: u32, + diversity_policy: &ResolvedDiversityPolicy, + ) -> Result<(Vec, HashMap)> { + let note_vectors = if diversity_policy.enabled { + search::fetch_note_vectors_for_diversity(&self.db.pool, results.as_slice()).await? + } else { + HashMap::new() + }; + let (selected_results, diversity_decisions) = + ranking::select_diverse_results(results, top_k, diversity_policy, ¬e_vectors); + + Ok((selected_results, diversity_decisions)) + } +} diff --git a/packages/elf-service/src/search/finish/scoring/hits.rs b/packages/elf-service/src/search/finish/scoring/hits.rs new file mode 100644 index 00000000..814d29d9 --- /dev/null +++ b/packages/elf-service/src/search/finish/scoring/hits.rs @@ -0,0 +1,23 @@ +use crate::search::{self, ElfService, OffsetDateTime, Result, ScoredChunk}; + +impl ElfService { + pub(in crate::search) async fn record_hits_if_enabled( + &self, + enabled: bool, + query: &str, + selected_results: &[ScoredChunk], + now: OffsetDateTime, + ) -> Result<()> { + if !enabled || selected_results.is_empty() { + return Ok(()); + } + + let mut tx = self.db.pool.begin().await?; + + search::record_hits(&mut *tx, query, selected_results, now).await?; + + tx.commit().await?; + + Ok(()) + } +} diff --git a/packages/elf-service/src/search/finish/scoring/policies.rs b/packages/elf-service/src/search/finish/scoring/policies.rs new file mode 100644 index 00000000..ec5687b0 --- /dev/null +++ b/packages/elf-service/src/search/finish/scoring/policies.rs @@ -0,0 +1,38 @@ +use crate::search::{ElfService, FinishSearchPolicies, RankingRequestOverride, Result, ranking}; + +impl ElfService { + pub(in crate::search) fn resolve_finish_search_policies( + &self, + ranking_override: Option<&RankingRequestOverride>, + ) -> Result { + let blend_policy = ranking::resolve_blend_policy( + &self.cfg.ranking.blend, + ranking_override.and_then(|override_| override_.blend.as_ref()), + )?; + let diversity_policy = ranking::resolve_diversity_policy( + &self.cfg.ranking.diversity, + ranking_override.and_then(|override_| override_.diversity.as_ref()), + )?; + let retrieval_sources_policy = ranking::resolve_retrieval_sources_policy( + &self.cfg.ranking.retrieval_sources, + ranking_override.and_then(|override_| override_.retrieval_sources.as_ref()), + )?; + let policy_snapshot = ranking::build_policy_snapshot( + &self.cfg, + &blend_policy, + &diversity_policy, + &retrieval_sources_policy, + ranking_override, + ); + let policy_hash = ranking::hash_policy_snapshot(&policy_snapshot)?; + let policy_id = format!("ranking_v2:{}", &policy_hash[..12.min(policy_hash.len())]); + + Ok(FinishSearchPolicies { + blend_policy, + diversity_policy, + retrieval_sources_policy, + policy_snapshot, + policy_id, + }) + } +} diff --git a/packages/elf-service/src/search/finish/scoring/snippets.rs b/packages/elf-service/src/search/finish/scoring/snippets.rs new file mode 100644 index 00000000..70ee95ce --- /dev/null +++ b/packages/elf-service/src/search/finish/scoring/snippets.rs @@ -0,0 +1,90 @@ +use crate::search::{ + self, ChunkSnippet, ElfService, Ordering, Result, ScoreCandidateCtx, ScoreSnippetArgs, + ScoredChunk, ranking, +}; + +impl ElfService { + pub(in crate::search) async fn score_snippet_items( + &self, + args: ScoreSnippetArgs<'_, '_>, + ) -> Result> { + let ScoreSnippetArgs { + query, + snippet_items, + scope_context_boost_by_scope, + det_query_tokens, + blend_policy, + cache_cfg, + now, + candidate_count, + skip_rerank, + } = args; + + if snippet_items.is_empty() { + return Ok(Vec::new()); + } + + let scores = if skip_rerank { + Self::build_quick_find_rerank_scores(&snippet_items) + } else { + self.rerank_snippet_items(query, snippet_items.as_slice(), cache_cfg, now).await? + }; + let rerank_ranks = ranking::build_rerank_ranks(&snippet_items, &scores); + let total_rerank = u32::try_from(scores.len()).unwrap_or(1).max(1); + let total_retrieval = u32::try_from(candidate_count).unwrap_or(1).max(1); + let score_ctx = ScoreCandidateCtx { + cfg: &self.cfg, + blend_policy, + scope_context_boost_by_scope, + det_query_tokens, + now, + total_rerank, + total_retrieval, + }; + let mut scored = Vec::with_capacity(snippet_items.len()); + + for ((item, rerank_score), rerank_rank) in + snippet_items.into_iter().zip(scores).zip(rerank_ranks) + { + scored.push(search::score_chunk_candidate(&score_ctx, item, rerank_score, rerank_rank)); + } + + Ok(scored) + } + + pub(in crate::search) fn build_quick_find_rerank_scores( + snippet_items: &[ChunkSnippet], + ) -> Vec { + let mut idxs: Vec = (0..snippet_items.len()).collect(); + + idxs.sort_by(|&a, &b| { + let ord = snippet_items[a].retrieval_rank.cmp(&snippet_items[b].retrieval_rank); + + if ord != Ordering::Equal { + return ord; + } + + let ord = snippet_items[a].chunk.chunk_index.cmp(&snippet_items[b].chunk.chunk_index); + + if ord != Ordering::Equal { + return ord; + } + + snippet_items[a].chunk.chunk_id.cmp(&snippet_items[b].chunk.chunk_id) + }); + + let total = idxs.len(); + + if total == 0 { + return Vec::new(); + } + + let mut scores = vec![0_f32; total]; + + for (rank, idx) in idxs.into_iter().enumerate() { + scores[idx] = 1.0 / (rank as f32 + 1.0); + } + + scores + } +} From bf397e6d3ba08e2cd6ff9bc2f99911fa7ffefa78 Mon Sep 17 00:00:00 2001 From: Yvette Carlisle Date: Tue, 30 Jun 2026 16:48:45 +0800 Subject: [PATCH 4/6] {"schema":"decodex/commit/1","summary":"Split search trace persistence modules","authority":"manual"} --- .../src/search/trace_persistence.rs | 287 +----------------- .../search/trace_persistence/candidates.rs | 56 ++++ .../src/search/trace_persistence/enqueue.rs | 41 +++ .../src/search/trace_persistence/header.rs | 74 +++++ .../src/search/trace_persistence/items.rs | 42 +++ .../src/search/trace_persistence/stages.rs | 68 +++++ 6 files changed, 292 insertions(+), 276 deletions(-) create mode 100644 packages/elf-service/src/search/trace_persistence/candidates.rs create mode 100644 packages/elf-service/src/search/trace_persistence/enqueue.rs create mode 100644 packages/elf-service/src/search/trace_persistence/header.rs create mode 100644 packages/elf-service/src/search/trace_persistence/items.rs create mode 100644 packages/elf-service/src/search/trace_persistence/stages.rs diff --git a/packages/elf-service/src/search/trace_persistence.rs b/packages/elf-service/src/search/trace_persistence.rs index 461b3516..bb355412 100644 --- a/packages/elf-service/src/search/trace_persistence.rs +++ b/packages/elf-service/src/search/trace_persistence.rs @@ -1,44 +1,12 @@ -use crate::{ - Error, - search::{ - OffsetDateTime, PgConnection, PgExecutor, QueryBuilder, Result, TraceCandidateRecord, - TraceItemRecord, TracePayload, TraceRecord, TraceTrajectoryStageRecord, Uuid, - }, -}; +mod candidates; +mod enqueue; +mod header; +mod items; +mod stages; -pub(super) async fn enqueue_trace<'e, E>(executor: E, payload: TracePayload) -> Result<()> -where - E: PgExecutor<'e>, -{ - let now = OffsetDateTime::now_utc(); - let payload_json = serde_json::to_value(&payload).map_err(|err| Error::Storage { - message: format!("Failed to encode search trace payload: {err}"), - })?; +pub(super) use self::enqueue::enqueue_trace; - sqlx::query( - "\ -INSERT INTO search_trace_outbox ( - outbox_id, - trace_id, - status, - attempts, - last_error, - available_at, - payload, - created_at, - updated_at -) -VALUES ($1, $2, 'PENDING', 0, NULL, $3, $4, $3, $3)", - ) - .bind(Uuid::new_v4()) - .bind(payload.trace.trace_id) - .bind(now) - .bind(payload_json) - .execute(executor) - .await?; - - Ok(()) -} +use crate::search::{PgConnection, Result, TracePayload}; pub(super) async fn persist_trace_inline( executor: &mut PgConnection, @@ -50,243 +18,10 @@ pub(super) async fn persist_trace_inline( let stages = payload.stages; let trace_id = trace.trace_id; - persist_trace_inline_header(executor, &trace).await?; - persist_trace_inline_items(executor, trace_id, items).await?; - persist_trace_inline_stages(executor, trace_id, stages).await?; - persist_trace_inline_candidates(executor, trace_id, candidates).await?; - - Ok(()) -} - -pub(super) async fn persist_trace_inline_stages( - executor: &mut PgConnection, - trace_id: Uuid, - stages: Vec, -) -> Result<()> { - if stages.is_empty() { - return Ok(()); - } - - let mut item_records = Vec::new(); - let mut stage_builder = QueryBuilder::new( - "\ -INSERT INTO search_trace_stages ( - stage_id, - trace_id, - stage_order, - stage_name, - stage_payload, - created_at -) ", - ); - - stage_builder.push_values(stages, |mut b, stage| { - for item in stage.items { - item_records.push((stage.stage_id, item)); - } - - b.push_bind(stage.stage_id) - .push_bind(trace_id) - .push_bind(stage.stage_order as i32) - .push_bind(stage.stage_name) - .push_bind(stage.stage_payload) - .push_bind(stage.created_at); - }); - stage_builder.push(" ON CONFLICT (stage_id) DO NOTHING"); - stage_builder.build().execute(&mut *executor).await?; - - if item_records.is_empty() { - return Ok(()); - } - - let mut item_builder = QueryBuilder::new( - "\ -INSERT INTO search_trace_stage_items ( - id, - stage_id, - item_id, - note_id, - chunk_id, - metrics -) ", - ); - - item_builder.push_values(item_records, |mut b, (stage_id, item)| { - b.push_bind(item.id) - .push_bind(stage_id) - .push_bind(item.item_id) - .push_bind(item.note_id) - .push_bind(item.chunk_id) - .push_bind(item.metrics); - }); - item_builder.push(" ON CONFLICT (id) DO NOTHING"); - item_builder.build().execute(executor).await?; - - Ok(()) -} - -pub(super) async fn persist_trace_inline_header( - executor: &mut PgConnection, - trace: &TraceRecord, -) -> Result<()> { - let expanded_queries_json = serde_json::to_value(&trace.expanded_queries).map_err(|err| { - Error::Storage { message: format!("Failed to encode expanded_queries: {err}") } - })?; - let allowed_scopes_json = serde_json::to_value(&trace.allowed_scopes).map_err(|err| { - Error::Storage { message: format!("Failed to encode allowed_scopes: {err}") } - })?; - - sqlx::query( - "\ -INSERT INTO search_traces ( - trace_id, - tenant_id, - project_id, - agent_id, - read_profile, - query, - expansion_mode, - expanded_queries, - allowed_scopes, - candidate_count, - top_k, - config_snapshot, - trace_version, - created_at, - expires_at -) -VALUES ( - $1, - $2, - $3, - $4, - $5, - $6, - $7, - $8, - $9, - $10, - $11, - $12, - $13, - $14, - $15 -) - ON CONFLICT (trace_id) DO NOTHING", - ) - .bind(trace.trace_id) - .bind(trace.tenant_id.as_str()) - .bind(trace.project_id.as_str()) - .bind(trace.agent_id.as_str()) - .bind(trace.read_profile.as_str()) - .bind(trace.query.as_str()) - .bind(trace.expansion_mode.as_str()) - .bind(expanded_queries_json) - .bind(allowed_scopes_json) - .bind(trace.candidate_count as i32) - .bind(trace.top_k as i32) - .bind(trace.config_snapshot.clone()) - .bind(trace.trace_version) - .bind(trace.created_at) - .bind(trace.expires_at) - .execute(executor) - .await?; - - Ok(()) -} - -pub(super) async fn persist_trace_inline_items( - executor: &mut PgConnection, - trace_id: Uuid, - items: Vec, -) -> Result<()> { - if items.is_empty() { - return Ok(()); - } - - let mut builder = QueryBuilder::new( - "\ -INSERT INTO search_trace_items ( - item_id, - trace_id, - note_id, - chunk_id, - rank, - final_score, - explain -) ", - ); - - builder.push_values(items, |mut b, item| { - let explain_json = - serde_json::to_value(item.explain).expect("SearchExplain must be JSON-serializable."); - - b.push_bind(item.item_id) - .push_bind(trace_id) - .push_bind(item.note_id) - .push_bind(item.chunk_id) - .push_bind(item.rank as i32) - .push_bind(item.final_score) - .push_bind(explain_json); - }); - - builder.push(" ON CONFLICT (item_id) DO NOTHING"); - builder.build().execute(executor).await?; - - Ok(()) -} - -pub(super) async fn persist_trace_inline_candidates( - executor: &mut PgConnection, - trace_id: Uuid, - candidates: Vec, -) -> Result<()> { - if candidates.is_empty() { - return Ok(()); - } - - let mut builder = QueryBuilder::new( - "\ -INSERT INTO search_trace_candidates ( - candidate_id, - trace_id, - note_id, - chunk_id, - chunk_index, - snippet, - candidate_snapshot, - retrieval_rank, - rerank_score, - note_scope, - note_importance, - note_updated_at, - note_hit_count, - note_last_hit_at, - created_at, - expires_at -) ", - ); - - builder.push_values(candidates, |mut b, candidate| { - b.push_bind(candidate.candidate_id) - .push_bind(trace_id) - .push_bind(candidate.note_id) - .push_bind(candidate.chunk_id) - .push_bind(candidate.chunk_index) - .push_bind(candidate.snippet) - .push_bind(candidate.candidate_snapshot) - .push_bind(candidate.retrieval_rank as i32) - .push_bind(candidate.rerank_score) - .push_bind(candidate.note_scope) - .push_bind(candidate.note_importance) - .push_bind(candidate.note_updated_at) - .push_bind(candidate.note_hit_count) - .push_bind(candidate.note_last_hit_at) - .push_bind(candidate.created_at) - .push_bind(candidate.expires_at); - }); - builder.push(" ON CONFLICT (candidate_id) DO NOTHING"); - builder.build().execute(executor).await?; + header::persist_trace_inline_header(executor, &trace).await?; + items::persist_trace_inline_items(executor, trace_id, items).await?; + stages::persist_trace_inline_stages(executor, trace_id, stages).await?; + candidates::persist_trace_inline_candidates(executor, trace_id, candidates).await?; Ok(()) } diff --git a/packages/elf-service/src/search/trace_persistence/candidates.rs b/packages/elf-service/src/search/trace_persistence/candidates.rs new file mode 100644 index 00000000..e5e814b0 --- /dev/null +++ b/packages/elf-service/src/search/trace_persistence/candidates.rs @@ -0,0 +1,56 @@ +use crate::search::{PgConnection, QueryBuilder, Result, TraceCandidateRecord, Uuid}; + +pub(in crate::search::trace_persistence) async fn persist_trace_inline_candidates( + executor: &mut PgConnection, + trace_id: Uuid, + candidates: Vec, +) -> Result<()> { + if candidates.is_empty() { + return Ok(()); + } + + let mut builder = QueryBuilder::new( + "\ +INSERT INTO search_trace_candidates ( + candidate_id, + trace_id, + note_id, + chunk_id, + chunk_index, + snippet, + candidate_snapshot, + retrieval_rank, + rerank_score, + note_scope, + note_importance, + note_updated_at, + note_hit_count, + note_last_hit_at, + created_at, + expires_at +) ", + ); + + builder.push_values(candidates, |mut b, candidate| { + b.push_bind(candidate.candidate_id) + .push_bind(trace_id) + .push_bind(candidate.note_id) + .push_bind(candidate.chunk_id) + .push_bind(candidate.chunk_index) + .push_bind(candidate.snippet) + .push_bind(candidate.candidate_snapshot) + .push_bind(candidate.retrieval_rank as i32) + .push_bind(candidate.rerank_score) + .push_bind(candidate.note_scope) + .push_bind(candidate.note_importance) + .push_bind(candidate.note_updated_at) + .push_bind(candidate.note_hit_count) + .push_bind(candidate.note_last_hit_at) + .push_bind(candidate.created_at) + .push_bind(candidate.expires_at); + }); + builder.push(" ON CONFLICT (candidate_id) DO NOTHING"); + builder.build().execute(executor).await?; + + Ok(()) +} diff --git a/packages/elf-service/src/search/trace_persistence/enqueue.rs b/packages/elf-service/src/search/trace_persistence/enqueue.rs new file mode 100644 index 00000000..3f5efb4c --- /dev/null +++ b/packages/elf-service/src/search/trace_persistence/enqueue.rs @@ -0,0 +1,41 @@ +use crate::{ + Error, + search::{OffsetDateTime, PgExecutor, Result, TracePayload, Uuid}, +}; + +pub(in crate::search) async fn enqueue_trace<'e, E>( + executor: E, + payload: TracePayload, +) -> Result<()> +where + E: PgExecutor<'e>, +{ + let now = OffsetDateTime::now_utc(); + let payload_json = serde_json::to_value(&payload).map_err(|err| Error::Storage { + message: format!("Failed to encode search trace payload: {err}"), + })?; + + sqlx::query( + "\ +INSERT INTO search_trace_outbox ( + outbox_id, + trace_id, + status, + attempts, + last_error, + available_at, + payload, + created_at, + updated_at +) +VALUES ($1, $2, 'PENDING', 0, NULL, $3, $4, $3, $3)", + ) + .bind(Uuid::new_v4()) + .bind(payload.trace.trace_id) + .bind(now) + .bind(payload_json) + .execute(executor) + .await?; + + Ok(()) +} diff --git a/packages/elf-service/src/search/trace_persistence/header.rs b/packages/elf-service/src/search/trace_persistence/header.rs new file mode 100644 index 00000000..cbdc763d --- /dev/null +++ b/packages/elf-service/src/search/trace_persistence/header.rs @@ -0,0 +1,74 @@ +use crate::{ + Error, + search::{PgConnection, Result, TraceRecord}, +}; + +pub(in crate::search::trace_persistence) async fn persist_trace_inline_header( + executor: &mut PgConnection, + trace: &TraceRecord, +) -> Result<()> { + let expanded_queries_json = serde_json::to_value(&trace.expanded_queries).map_err(|err| { + Error::Storage { message: format!("Failed to encode expanded_queries: {err}") } + })?; + let allowed_scopes_json = serde_json::to_value(&trace.allowed_scopes).map_err(|err| { + Error::Storage { message: format!("Failed to encode allowed_scopes: {err}") } + })?; + + sqlx::query( + "\ +INSERT INTO search_traces ( + trace_id, + tenant_id, + project_id, + agent_id, + read_profile, + query, + expansion_mode, + expanded_queries, + allowed_scopes, + candidate_count, + top_k, + config_snapshot, + trace_version, + created_at, + expires_at +) +VALUES ( + $1, + $2, + $3, + $4, + $5, + $6, + $7, + $8, + $9, + $10, + $11, + $12, + $13, + $14, + $15 +) + ON CONFLICT (trace_id) DO NOTHING", + ) + .bind(trace.trace_id) + .bind(trace.tenant_id.as_str()) + .bind(trace.project_id.as_str()) + .bind(trace.agent_id.as_str()) + .bind(trace.read_profile.as_str()) + .bind(trace.query.as_str()) + .bind(trace.expansion_mode.as_str()) + .bind(expanded_queries_json) + .bind(allowed_scopes_json) + .bind(trace.candidate_count as i32) + .bind(trace.top_k as i32) + .bind(trace.config_snapshot.clone()) + .bind(trace.trace_version) + .bind(trace.created_at) + .bind(trace.expires_at) + .execute(executor) + .await?; + + Ok(()) +} diff --git a/packages/elf-service/src/search/trace_persistence/items.rs b/packages/elf-service/src/search/trace_persistence/items.rs new file mode 100644 index 00000000..d10bf391 --- /dev/null +++ b/packages/elf-service/src/search/trace_persistence/items.rs @@ -0,0 +1,42 @@ +use crate::search::{PgConnection, QueryBuilder, Result, TraceItemRecord, Uuid}; + +pub(in crate::search::trace_persistence) async fn persist_trace_inline_items( + executor: &mut PgConnection, + trace_id: Uuid, + items: Vec, +) -> Result<()> { + if items.is_empty() { + return Ok(()); + } + + let mut builder = QueryBuilder::new( + "\ +INSERT INTO search_trace_items ( + item_id, + trace_id, + note_id, + chunk_id, + rank, + final_score, + explain +) ", + ); + + builder.push_values(items, |mut b, item| { + let explain_json = + serde_json::to_value(item.explain).expect("SearchExplain must be JSON-serializable."); + + b.push_bind(item.item_id) + .push_bind(trace_id) + .push_bind(item.note_id) + .push_bind(item.chunk_id) + .push_bind(item.rank as i32) + .push_bind(item.final_score) + .push_bind(explain_json); + }); + + builder.push(" ON CONFLICT (item_id) DO NOTHING"); + builder.build().execute(executor).await?; + + Ok(()) +} diff --git a/packages/elf-service/src/search/trace_persistence/stages.rs b/packages/elf-service/src/search/trace_persistence/stages.rs new file mode 100644 index 00000000..4f617904 --- /dev/null +++ b/packages/elf-service/src/search/trace_persistence/stages.rs @@ -0,0 +1,68 @@ +use crate::search::{PgConnection, QueryBuilder, Result, TraceTrajectoryStageRecord, Uuid}; + +pub(in crate::search::trace_persistence) async fn persist_trace_inline_stages( + executor: &mut PgConnection, + trace_id: Uuid, + stages: Vec, +) -> Result<()> { + if stages.is_empty() { + return Ok(()); + } + + let mut item_records = Vec::new(); + let mut stage_builder = QueryBuilder::new( + "\ +INSERT INTO search_trace_stages ( + stage_id, + trace_id, + stage_order, + stage_name, + stage_payload, + created_at +) ", + ); + + stage_builder.push_values(stages, |mut b, stage| { + for item in stage.items { + item_records.push((stage.stage_id, item)); + } + + b.push_bind(stage.stage_id) + .push_bind(trace_id) + .push_bind(stage.stage_order as i32) + .push_bind(stage.stage_name) + .push_bind(stage.stage_payload) + .push_bind(stage.created_at); + }); + stage_builder.push(" ON CONFLICT (stage_id) DO NOTHING"); + stage_builder.build().execute(&mut *executor).await?; + + if item_records.is_empty() { + return Ok(()); + } + + let mut item_builder = QueryBuilder::new( + "\ +INSERT INTO search_trace_stage_items ( + id, + stage_id, + item_id, + note_id, + chunk_id, + metrics +) ", + ); + + item_builder.push_values(item_records, |mut b, (stage_id, item)| { + b.push_bind(item.id) + .push_bind(stage_id) + .push_bind(item.item_id) + .push_bind(item.note_id) + .push_bind(item.chunk_id) + .push_bind(item.metrics); + }); + item_builder.push(" ON CONFLICT (id) DO NOTHING"); + item_builder.build().execute(executor).await?; + + Ok(()) +} From b1f843b7d6a162a8c915184b364360c200d8ffd0 Mon Sep 17 00:00:00 2001 From: Yvette Carlisle Date: Tue, 30 Jun 2026 16:50:56 +0800 Subject: [PATCH 5/6] {"schema":"decodex/commit/1","summary":"Split structured retrieval modules","authority":"manual"} --- .../src/search/retrieval/structured.rs | 250 +----------------- .../retrieval/structured/best_chunks.rs | 36 +++ .../search/retrieval/structured/fetch_hits.rs | 142 ++++++++++ .../search/retrieval/structured/service.rs | 76 ++++++ 4 files changed, 257 insertions(+), 247 deletions(-) create mode 100644 packages/elf-service/src/search/retrieval/structured/best_chunks.rs create mode 100644 packages/elf-service/src/search/retrieval/structured/fetch_hits.rs create mode 100644 packages/elf-service/src/search/retrieval/structured/service.rs diff --git a/packages/elf-service/src/search/retrieval/structured.rs b/packages/elf-service/src/search/retrieval/structured.rs index a7763e79..b4590e6b 100644 --- a/packages/elf-service/src/search/retrieval/structured.rs +++ b/packages/elf-service/src/search/retrieval/structured.rs @@ -1,247 +1,3 @@ -use crate::search::{ - self, BestChunkForNoteRow, ElfService, FieldHit, HashMap, ORG_PROJECT_ID, Result, - StructuredFieldHitArgs, StructuredFieldHitRow, StructuredFieldRetrievalArgs, - StructuredFieldRetrievalResult, Uuid, -}; - -impl ElfService { - pub(in crate::search::retrieval) async fn retrieve_structured_field_candidates( - &self, - args: StructuredFieldRetrievalArgs<'_>, - ) -> Result { - let StructuredFieldRetrievalArgs { - tenant_id, - project_id, - agent_id, - allowed_scopes, - query_vec, - candidate_k, - now, - } = args; - - if query_vec.is_empty() { - return Ok(StructuredFieldRetrievalResult { - candidates: Vec::new(), - structured_matches: HashMap::new(), - }); - } - - let embed_version = crate::embedding_version(&self.cfg); - let vec_text = crate::vector_to_pg(query_vec); - let private_allowed = allowed_scopes.iter().any(|scope| scope == "agent_private"); - let non_private_scopes: Vec = - allowed_scopes.iter().filter(|scope| *scope != "agent_private").cloned().collect(); - let retrieval_limit = i64::from(candidate_k.saturating_mul(4).clamp(16, 400)); - let rows = self - .fetch_structured_field_hits(StructuredFieldHitArgs { - embed_version: embed_version.as_str(), - tenant_id, - project_id, - agent_id, - now, - vec_text: vec_text.as_str(), - retrieval_limit, - private_allowed, - non_private_scopes: non_private_scopes.as_slice(), - }) - .await?; - let (ordered_note_ids, structured_matches_out) = - search::build_structured_field_matches(rows); - - if ordered_note_ids.is_empty() { - return Ok(StructuredFieldRetrievalResult { - candidates: Vec::new(), - structured_matches: structured_matches_out, - }); - } - - let best_by_note = self - .fetch_best_chunks_for_notes( - embed_version.as_str(), - ordered_note_ids.as_slice(), - vec_text.as_str(), - ) - .await?; - let structured_candidates = search::build_structured_field_candidates( - candidate_k, - ordered_note_ids, - best_by_note, - embed_version.as_str(), - ); - - Ok(StructuredFieldRetrievalResult { - candidates: structured_candidates, - structured_matches: structured_matches_out, - }) - } - - async fn fetch_structured_field_hits( - &self, - args: StructuredFieldHitArgs<'_>, - ) -> Result> { - if args.private_allowed && args.non_private_scopes.is_empty() { - self.fetch_structured_field_hits_private_only(args).await - } else if !args.private_allowed { - self.fetch_structured_field_hits_non_private_only(args).await - } else { - self.fetch_structured_field_hits_mixed(args).await - } - } - - async fn fetch_structured_field_hits_private_only( - &self, - args: StructuredFieldHitArgs<'_>, - ) -> Result> { - let rows = sqlx::query_as::<_, StructuredFieldHitRow>( - "\ -SELECT - f.note_id, - f.field_kind -FROM memory_note_fields f -JOIN note_field_embeddings e - ON e.field_id = f.field_id - AND e.embedding_version = $1 -JOIN memory_notes n - ON n.note_id = f.note_id -WHERE n.tenant_id = $2 - AND n.project_id = $3 - AND n.status = 'active' - AND (n.expires_at IS NULL OR n.expires_at > $4) - AND n.scope = 'agent_private' - AND n.agent_id = $5 -ORDER BY e.vec <=> $6::text::vector ASC -LIMIT $7", - ) - .bind(args.embed_version) - .bind(args.tenant_id) - .bind(args.project_id) - .bind(args.now) - .bind(args.agent_id) - .bind(args.vec_text) - .bind(args.retrieval_limit) - .fetch_all(&self.db.pool) - .await?; - - Ok(rows - .into_iter() - .map(|row| FieldHit { note_id: row.note_id, field_kind: row.field_kind }) - .collect()) - } - - async fn fetch_structured_field_hits_non_private_only( - &self, - args: StructuredFieldHitArgs<'_>, - ) -> Result> { - let rows = sqlx::query_as::<_, StructuredFieldHitRow>( - "\ -SELECT - f.note_id, - f.field_kind -FROM memory_note_fields f -JOIN note_field_embeddings e - ON e.field_id = f.field_id - AND e.embedding_version = $1 -JOIN memory_notes n - ON n.note_id = f.note_id -WHERE n.tenant_id = $2 - AND (n.project_id = $3 OR (n.project_id = $8 AND n.scope = 'org_shared')) - AND n.status = 'active' - AND (n.expires_at IS NULL OR n.expires_at > $4) - AND n.scope = ANY($5::text[]) -ORDER BY e.vec <=> $6::text::vector ASC -LIMIT $7", - ) - .bind(args.embed_version) - .bind(args.tenant_id) - .bind(args.project_id) - .bind(args.now) - .bind(args.non_private_scopes) - .bind(args.vec_text) - .bind(args.retrieval_limit) - .bind(ORG_PROJECT_ID) - .fetch_all(&self.db.pool) - .await?; - - Ok(rows - .into_iter() - .map(|row| FieldHit { note_id: row.note_id, field_kind: row.field_kind }) - .collect()) - } - - async fn fetch_structured_field_hits_mixed( - &self, - args: StructuredFieldHitArgs<'_>, - ) -> Result> { - let rows = sqlx::query_as::<_, StructuredFieldHitRow>( - "\ -SELECT - f.note_id, - f.field_kind -FROM memory_note_fields f -JOIN note_field_embeddings e - ON e.field_id = f.field_id - AND e.embedding_version = $1 -JOIN memory_notes n - ON n.note_id = f.note_id -WHERE n.tenant_id = $2 - AND (n.project_id = $3 OR (n.project_id = $9 AND n.scope = 'org_shared')) - AND n.status = 'active' - AND (n.expires_at IS NULL OR n.expires_at > $4) - AND ( - (n.scope = 'agent_private' AND n.agent_id = $5) - OR n.scope = ANY($6::text[]) - ) -ORDER BY e.vec <=> $7::text::vector ASC -LIMIT $8", - ) - .bind(args.embed_version) - .bind(args.tenant_id) - .bind(args.project_id) - .bind(args.now) - .bind(args.agent_id) - .bind(args.non_private_scopes) - .bind(args.vec_text) - .bind(args.retrieval_limit) - .bind(ORG_PROJECT_ID) - .fetch_all(&self.db.pool) - .await?; - - Ok(rows - .into_iter() - .map(|row| FieldHit { note_id: row.note_id, field_kind: row.field_kind }) - .collect()) - } - - async fn fetch_best_chunks_for_notes( - &self, - embed_version: &str, - ordered_note_ids: &[Uuid], - vec_text: &str, - ) -> Result> { - let best_chunks = sqlx::query_as::<_, BestChunkForNoteRow>( - "\ -SELECT DISTINCT ON (c.note_id) - c.note_id, - c.chunk_id, - c.chunk_index -FROM memory_note_chunks c -JOIN note_chunk_embeddings e - ON e.chunk_id = c.chunk_id - AND e.embedding_version = $1 -WHERE c.note_id = ANY($2::uuid[]) -ORDER BY c.note_id ASC, e.vec <=> $3::text::vector ASC", - ) - .bind(embed_version) - .bind(ordered_note_ids) - .bind(vec_text) - .fetch_all(&self.db.pool) - .await?; - let mut best_by_note = HashMap::new(); - - for row in best_chunks { - best_by_note.insert(row.note_id, (row.chunk_id, row.chunk_index)); - } - - Ok(best_by_note) - } -} +mod best_chunks; +mod fetch_hits; +mod service; diff --git a/packages/elf-service/src/search/retrieval/structured/best_chunks.rs b/packages/elf-service/src/search/retrieval/structured/best_chunks.rs new file mode 100644 index 00000000..f3f67309 --- /dev/null +++ b/packages/elf-service/src/search/retrieval/structured/best_chunks.rs @@ -0,0 +1,36 @@ +use crate::search::{BestChunkForNoteRow, ElfService, HashMap, Result, Uuid}; + +impl ElfService { + pub(in crate::search::retrieval) async fn fetch_best_chunks_for_notes( + &self, + embed_version: &str, + ordered_note_ids: &[Uuid], + vec_text: &str, + ) -> Result> { + let best_chunks = sqlx::query_as::<_, BestChunkForNoteRow>( + "\ +SELECT DISTINCT ON (c.note_id) + c.note_id, + c.chunk_id, + c.chunk_index +FROM memory_note_chunks c +JOIN note_chunk_embeddings e + ON e.chunk_id = c.chunk_id + AND e.embedding_version = $1 +WHERE c.note_id = ANY($2::uuid[]) +ORDER BY c.note_id ASC, e.vec <=> $3::text::vector ASC", + ) + .bind(embed_version) + .bind(ordered_note_ids) + .bind(vec_text) + .fetch_all(&self.db.pool) + .await?; + let mut best_by_note = HashMap::new(); + + for row in best_chunks { + best_by_note.insert(row.note_id, (row.chunk_id, row.chunk_index)); + } + + Ok(best_by_note) + } +} diff --git a/packages/elf-service/src/search/retrieval/structured/fetch_hits.rs b/packages/elf-service/src/search/retrieval/structured/fetch_hits.rs new file mode 100644 index 00000000..1a89091a --- /dev/null +++ b/packages/elf-service/src/search/retrieval/structured/fetch_hits.rs @@ -0,0 +1,142 @@ +use crate::search::{ + ElfService, FieldHit, ORG_PROJECT_ID, Result, StructuredFieldHitArgs, StructuredFieldHitRow, +}; + +impl ElfService { + pub(in crate::search::retrieval) async fn fetch_structured_field_hits( + &self, + args: StructuredFieldHitArgs<'_>, + ) -> Result> { + if args.private_allowed && args.non_private_scopes.is_empty() { + self.fetch_structured_field_hits_private_only(args).await + } else if !args.private_allowed { + self.fetch_structured_field_hits_non_private_only(args).await + } else { + self.fetch_structured_field_hits_mixed(args).await + } + } + + async fn fetch_structured_field_hits_private_only( + &self, + args: StructuredFieldHitArgs<'_>, + ) -> Result> { + let rows = sqlx::query_as::<_, StructuredFieldHitRow>( + "\ +SELECT + f.note_id, + f.field_kind +FROM memory_note_fields f +JOIN note_field_embeddings e + ON e.field_id = f.field_id + AND e.embedding_version = $1 +JOIN memory_notes n + ON n.note_id = f.note_id +WHERE n.tenant_id = $2 + AND n.project_id = $3 + AND n.status = 'active' + AND (n.expires_at IS NULL OR n.expires_at > $4) + AND n.scope = 'agent_private' + AND n.agent_id = $5 +ORDER BY e.vec <=> $6::text::vector ASC +LIMIT $7", + ) + .bind(args.embed_version) + .bind(args.tenant_id) + .bind(args.project_id) + .bind(args.now) + .bind(args.agent_id) + .bind(args.vec_text) + .bind(args.retrieval_limit) + .fetch_all(&self.db.pool) + .await?; + + Ok(rows + .into_iter() + .map(|row| FieldHit { note_id: row.note_id, field_kind: row.field_kind }) + .collect()) + } + + async fn fetch_structured_field_hits_non_private_only( + &self, + args: StructuredFieldHitArgs<'_>, + ) -> Result> { + let rows = sqlx::query_as::<_, StructuredFieldHitRow>( + "\ +SELECT + f.note_id, + f.field_kind +FROM memory_note_fields f +JOIN note_field_embeddings e + ON e.field_id = f.field_id + AND e.embedding_version = $1 +JOIN memory_notes n + ON n.note_id = f.note_id +WHERE n.tenant_id = $2 + AND (n.project_id = $3 OR (n.project_id = $8 AND n.scope = 'org_shared')) + AND n.status = 'active' + AND (n.expires_at IS NULL OR n.expires_at > $4) + AND n.scope = ANY($5::text[]) +ORDER BY e.vec <=> $6::text::vector ASC +LIMIT $7", + ) + .bind(args.embed_version) + .bind(args.tenant_id) + .bind(args.project_id) + .bind(args.now) + .bind(args.non_private_scopes) + .bind(args.vec_text) + .bind(args.retrieval_limit) + .bind(ORG_PROJECT_ID) + .fetch_all(&self.db.pool) + .await?; + + Ok(rows + .into_iter() + .map(|row| FieldHit { note_id: row.note_id, field_kind: row.field_kind }) + .collect()) + } + + async fn fetch_structured_field_hits_mixed( + &self, + args: StructuredFieldHitArgs<'_>, + ) -> Result> { + let rows = sqlx::query_as::<_, StructuredFieldHitRow>( + "\ +SELECT + f.note_id, + f.field_kind +FROM memory_note_fields f +JOIN note_field_embeddings e + ON e.field_id = f.field_id + AND e.embedding_version = $1 +JOIN memory_notes n + ON n.note_id = f.note_id +WHERE n.tenant_id = $2 + AND (n.project_id = $3 OR (n.project_id = $9 AND n.scope = 'org_shared')) + AND n.status = 'active' + AND (n.expires_at IS NULL OR n.expires_at > $4) + AND ( + (n.scope = 'agent_private' AND n.agent_id = $5) + OR n.scope = ANY($6::text[]) + ) +ORDER BY e.vec <=> $7::text::vector ASC +LIMIT $8", + ) + .bind(args.embed_version) + .bind(args.tenant_id) + .bind(args.project_id) + .bind(args.now) + .bind(args.agent_id) + .bind(args.non_private_scopes) + .bind(args.vec_text) + .bind(args.retrieval_limit) + .bind(ORG_PROJECT_ID) + .fetch_all(&self.db.pool) + .await?; + + Ok(rows + .into_iter() + .map(|row| FieldHit { note_id: row.note_id, field_kind: row.field_kind }) + .collect()) + } +} diff --git a/packages/elf-service/src/search/retrieval/structured/service.rs b/packages/elf-service/src/search/retrieval/structured/service.rs new file mode 100644 index 00000000..5242e919 --- /dev/null +++ b/packages/elf-service/src/search/retrieval/structured/service.rs @@ -0,0 +1,76 @@ +use crate::search::{ + self, ElfService, HashMap, Result, StructuredFieldHitArgs, StructuredFieldRetrievalArgs, + StructuredFieldRetrievalResult, +}; + +impl ElfService { + pub(in crate::search::retrieval) async fn retrieve_structured_field_candidates( + &self, + args: StructuredFieldRetrievalArgs<'_>, + ) -> Result { + let StructuredFieldRetrievalArgs { + tenant_id, + project_id, + agent_id, + allowed_scopes, + query_vec, + candidate_k, + now, + } = args; + + if query_vec.is_empty() { + return Ok(StructuredFieldRetrievalResult { + candidates: Vec::new(), + structured_matches: HashMap::new(), + }); + } + + let embed_version = crate::embedding_version(&self.cfg); + let vec_text = crate::vector_to_pg(query_vec); + let private_allowed = allowed_scopes.iter().any(|scope| scope == "agent_private"); + let non_private_scopes: Vec = + allowed_scopes.iter().filter(|scope| *scope != "agent_private").cloned().collect(); + let retrieval_limit = i64::from(candidate_k.saturating_mul(4).clamp(16, 400)); + let rows = self + .fetch_structured_field_hits(StructuredFieldHitArgs { + embed_version: embed_version.as_str(), + tenant_id, + project_id, + agent_id, + now, + vec_text: vec_text.as_str(), + retrieval_limit, + private_allowed, + non_private_scopes: non_private_scopes.as_slice(), + }) + .await?; + let (ordered_note_ids, structured_matches_out) = + search::build_structured_field_matches(rows); + + if ordered_note_ids.is_empty() { + return Ok(StructuredFieldRetrievalResult { + candidates: Vec::new(), + structured_matches: structured_matches_out, + }); + } + + let best_by_note = self + .fetch_best_chunks_for_notes( + embed_version.as_str(), + ordered_note_ids.as_slice(), + vec_text.as_str(), + ) + .await?; + let structured_candidates = search::build_structured_field_candidates( + candidate_k, + ordered_note_ids, + best_by_note, + embed_version.as_str(), + ); + + Ok(StructuredFieldRetrievalResult { + candidates: structured_candidates, + structured_matches: structured_matches_out, + }) + } +} From 23b16606c728f2fdafa5909156e1e4f216fc39d5 Mon Sep 17 00:00:00 2001 From: Yvette Carlisle Date: Tue, 30 Jun 2026 16:53:31 +0800 Subject: [PATCH 6/6] {"schema":"decodex/commit/1","summary":"Split search trace stage builders","authority":"manual"} --- .../elf-service/src/search/trace_stages.rs | 231 +----------------- .../src/search/trace_stages/audit.rs | 8 + .../src/search/trace_stages/trajectory.rs | 222 +++++++++++++++++ 3 files changed, 233 insertions(+), 228 deletions(-) create mode 100644 packages/elf-service/src/search/trace_stages/audit.rs create mode 100644 packages/elf-service/src/search/trace_stages/trajectory.rs diff --git a/packages/elf-service/src/search/trace_stages.rs b/packages/elf-service/src/search/trace_stages.rs index 4b559dc7..7c8b21a5 100644 --- a/packages/elf-service/src/search/trace_stages.rs +++ b/packages/elf-service/src/search/trace_stages.rs @@ -1,229 +1,4 @@ -use crate::search::{ - self, BuildTraceArgs, MAX_TRAJECTORY_STAGE_ITEMS, SEARCH_RETRIEVAL_TRAJECTORY_SCHEMA_V1, - TraceTrajectoryStageItemRecord, TraceTrajectoryStageRecord, Uuid, Value, ranking, -}; +mod audit; +mod trajectory; -pub(super) fn build_trace_audit(actor_id: &str, token_id: Option<&str>) -> Value { - match token_id.map(str::trim).filter(|value| !value.is_empty()) { - Some(token_id) => serde_json::json!({ "actor_id": actor_id, "token_id": token_id }), - None => serde_json::json!({ "actor_id": actor_id }), - } -} - -pub(super) fn build_trace_trajectory_stages( - args: &BuildTraceArgs<'_>, -) -> Vec { - let path_label = search::raw_search_path_label(args.path); - - vec![ - build_trace_rewrite_stage(args, path_label), - build_trace_recall_stage(args, path_label), - build_trace_fusion_stage(args, path_label), - build_trace_rerank_stage(args, path_label), - build_trace_final_stage(args, path_label), - ] -} - -pub(super) fn build_trace_rewrite_stage( - args: &BuildTraceArgs<'_>, - path_label: &str, -) -> TraceTrajectoryStageRecord { - let expanded_queries = search::sorted_unique_strings(args.expanded_queries.clone()); - - TraceTrajectoryStageRecord { - stage_id: Uuid::new_v4(), - stage_order: 1, - stage_name: "rewrite.expansion".to_string(), - stage_payload: serde_json::json!({ - "schema": SEARCH_RETRIEVAL_TRAJECTORY_SCHEMA_V1, - "path": path_label, - "inputs": { - "query": args.query, - "expansion_mode": ranking::expansion_mode_label(args.expansion_mode), - }, - "outputs": { - "expanded_queries": expanded_queries, - }, - "stats": { - "expanded_query_count": args.expanded_queries.len(), - }, - }), - created_at: args.now, - items: Vec::new(), - } -} - -pub(super) fn build_trace_recall_stage( - args: &BuildTraceArgs<'_>, - path_label: &str, -) -> TraceTrajectoryStageRecord { - let mut stage_payload = serde_json::json!({ - "schema": SEARCH_RETRIEVAL_TRAJECTORY_SCHEMA_V1, - "path": path_label, - "stats": { - "candidate_count_before_filter": args.candidate_count, - "candidate_count_after_filter": args.filtered_candidate_count, - "snippet_count": args.snippet_count, - }, - }); - - if let Some(filter_impact) = &args.filter_impact - && let Some(payload) = stage_payload.as_object_mut() - { - payload.insert("filter_impact".to_string(), filter_impact.to_stage_payload()); - } - if let Some(recursive_retrieval) = args.recursive_retrieval - && recursive_retrieval.enabled - && let Some(payload) = stage_payload.as_object_mut() - { - payload.insert( - "recursive".to_string(), - serde_json::json!({ - "enabled": true, - "scopes_seeded": recursive_retrieval.scopes_seeded, - "scopes_queried": recursive_retrieval.scopes_queried, - "candidates_before": recursive_retrieval.candidates_before, - "candidates_added": recursive_retrieval.candidates_added, - "candidates_after": recursive_retrieval.candidates_after, - "rounds_executed": recursive_retrieval.rounds_executed, - "total_queries": recursive_retrieval.total_queries, - "stop_reason": recursive_retrieval - .stop_reason - .clone() - .unwrap_or_else(|| "converged".to_string()), - }), - ); - } - - let items: Vec = args - .recall_candidates - .iter() - .take(MAX_TRAJECTORY_STAGE_ITEMS) - .map(|candidate| TraceTrajectoryStageItemRecord { - id: Uuid::new_v4(), - item_id: None, - note_id: Some(candidate.note_id), - chunk_id: Some(candidate.chunk_id), - metrics: serde_json::json!({ - "retrieval_rank": candidate.retrieval_rank, - "chunk_index": candidate.chunk_index, - }), - }) - .collect(); - - TraceTrajectoryStageRecord { - stage_id: Uuid::new_v4(), - stage_order: 2, - stage_name: "recall.candidates".to_string(), - stage_payload, - created_at: args.now, - items, - } -} - -pub(super) fn build_trace_fusion_stage( - args: &BuildTraceArgs<'_>, - path_label: &str, -) -> TraceTrajectoryStageRecord { - let items: Vec = args - .fused_results - .iter() - .take(MAX_TRAJECTORY_STAGE_ITEMS) - .map(|scored| TraceTrajectoryStageItemRecord { - id: Uuid::new_v4(), - item_id: None, - note_id: Some(scored.item.note.note_id), - chunk_id: Some(scored.item.chunk.chunk_id), - metrics: serde_json::json!({ - "retrieval_rank": scored.item.retrieval_rank, - "final_score": scored.final_score, - }), - }) - .collect(); - - TraceTrajectoryStageRecord { - stage_id: Uuid::new_v4(), - stage_order: 3, - stage_name: "fusion.merge".to_string(), - stage_payload: serde_json::json!({ - "schema": SEARCH_RETRIEVAL_TRAJECTORY_SCHEMA_V1, - "path": path_label, - "stats": { - "scored_count": args.scored_count, - "fused_count": args.fused_count, - }, - "decisions": { - "fusion_weight": args.policies.retrieval_sources_policy.fusion_weight, - "structured_field_weight": args.policies.retrieval_sources_policy.structured_field_weight, - "fusion_priority": args.policies.retrieval_sources_policy.fusion_priority, - "structured_field_priority": args.policies.retrieval_sources_policy.structured_field_priority, - }, - }), - created_at: args.now, - items, - } -} - -pub(super) fn build_trace_rerank_stage( - args: &BuildTraceArgs<'_>, - path_label: &str, -) -> TraceTrajectoryStageRecord { - let items: Vec = args - .fused_results - .iter() - .take(MAX_TRAJECTORY_STAGE_ITEMS) - .map(|scored| TraceTrajectoryStageItemRecord { - id: Uuid::new_v4(), - item_id: None, - note_id: Some(scored.item.note.note_id), - chunk_id: Some(scored.item.chunk.chunk_id), - metrics: serde_json::json!({ - "rerank_score": scored.rerank_score, - "rerank_rank": scored.rerank_rank, - "rerank_norm": scored.rerank_norm, - "retrieval_norm": scored.retrieval_norm, - "final_score": scored.final_score, - }), - }) - .collect(); - - TraceTrajectoryStageRecord { - stage_id: Uuid::new_v4(), - stage_order: 4, - stage_name: "rerank.score".to_string(), - stage_payload: serde_json::json!({ - "schema": SEARCH_RETRIEVAL_TRAJECTORY_SCHEMA_V1, - "path": path_label, - "stats": { - "reranked_count": args.scored_count, - }, - "decisions": { - "blend_enabled": args.policies.blend_policy.enabled, - "diversity_enabled": args.policies.diversity_policy.enabled, - }, - }), - created_at: args.now, - items, - } -} - -pub(super) fn build_trace_final_stage( - args: &BuildTraceArgs<'_>, - path_label: &str, -) -> TraceTrajectoryStageRecord { - TraceTrajectoryStageRecord { - stage_id: Uuid::new_v4(), - stage_order: 5, - stage_name: "selection.final".to_string(), - stage_payload: serde_json::json!({ - "schema": SEARCH_RETRIEVAL_TRAJECTORY_SCHEMA_V1, - "path": path_label, - "stats": { - "selected_count": args.selected_count, - "top_k": args.top_k, - }, - }), - created_at: args.now, - items: Vec::new(), - } -} +pub(super) use self::{audit::build_trace_audit, trajectory::build_trace_trajectory_stages}; diff --git a/packages/elf-service/src/search/trace_stages/audit.rs b/packages/elf-service/src/search/trace_stages/audit.rs new file mode 100644 index 00000000..ee24125f --- /dev/null +++ b/packages/elf-service/src/search/trace_stages/audit.rs @@ -0,0 +1,8 @@ +use crate::search::Value; + +pub(in crate::search) fn build_trace_audit(actor_id: &str, token_id: Option<&str>) -> Value { + match token_id.map(str::trim).filter(|value| !value.is_empty()) { + Some(token_id) => serde_json::json!({ "actor_id": actor_id, "token_id": token_id }), + None => serde_json::json!({ "actor_id": actor_id }), + } +} diff --git a/packages/elf-service/src/search/trace_stages/trajectory.rs b/packages/elf-service/src/search/trace_stages/trajectory.rs new file mode 100644 index 00000000..dfc16c7e --- /dev/null +++ b/packages/elf-service/src/search/trace_stages/trajectory.rs @@ -0,0 +1,222 @@ +use crate::search::{ + self, BuildTraceArgs, MAX_TRAJECTORY_STAGE_ITEMS, SEARCH_RETRIEVAL_TRAJECTORY_SCHEMA_V1, + TraceTrajectoryStageItemRecord, TraceTrajectoryStageRecord, Uuid, ranking, +}; + +pub(in crate::search) fn build_trace_trajectory_stages( + args: &BuildTraceArgs<'_>, +) -> Vec { + let path_label = search::raw_search_path_label(args.path); + + vec![ + build_trace_rewrite_stage(args, path_label), + build_trace_recall_stage(args, path_label), + build_trace_fusion_stage(args, path_label), + build_trace_rerank_stage(args, path_label), + build_trace_final_stage(args, path_label), + ] +} + +fn build_trace_rewrite_stage( + args: &BuildTraceArgs<'_>, + path_label: &str, +) -> TraceTrajectoryStageRecord { + let expanded_queries = search::sorted_unique_strings(args.expanded_queries.clone()); + + TraceTrajectoryStageRecord { + stage_id: Uuid::new_v4(), + stage_order: 1, + stage_name: "rewrite.expansion".to_string(), + stage_payload: serde_json::json!({ + "schema": SEARCH_RETRIEVAL_TRAJECTORY_SCHEMA_V1, + "path": path_label, + "inputs": { + "query": args.query, + "expansion_mode": ranking::expansion_mode_label(args.expansion_mode), + }, + "outputs": { + "expanded_queries": expanded_queries, + }, + "stats": { + "expanded_query_count": args.expanded_queries.len(), + }, + }), + created_at: args.now, + items: Vec::new(), + } +} + +fn build_trace_recall_stage( + args: &BuildTraceArgs<'_>, + path_label: &str, +) -> TraceTrajectoryStageRecord { + let mut stage_payload = serde_json::json!({ + "schema": SEARCH_RETRIEVAL_TRAJECTORY_SCHEMA_V1, + "path": path_label, + "stats": { + "candidate_count_before_filter": args.candidate_count, + "candidate_count_after_filter": args.filtered_candidate_count, + "snippet_count": args.snippet_count, + }, + }); + + if let Some(filter_impact) = &args.filter_impact + && let Some(payload) = stage_payload.as_object_mut() + { + payload.insert("filter_impact".to_string(), filter_impact.to_stage_payload()); + } + if let Some(recursive_retrieval) = args.recursive_retrieval + && recursive_retrieval.enabled + && let Some(payload) = stage_payload.as_object_mut() + { + payload.insert( + "recursive".to_string(), + serde_json::json!({ + "enabled": true, + "scopes_seeded": recursive_retrieval.scopes_seeded, + "scopes_queried": recursive_retrieval.scopes_queried, + "candidates_before": recursive_retrieval.candidates_before, + "candidates_added": recursive_retrieval.candidates_added, + "candidates_after": recursive_retrieval.candidates_after, + "rounds_executed": recursive_retrieval.rounds_executed, + "total_queries": recursive_retrieval.total_queries, + "stop_reason": recursive_retrieval + .stop_reason + .clone() + .unwrap_or_else(|| "converged".to_string()), + }), + ); + } + + let items: Vec = args + .recall_candidates + .iter() + .take(MAX_TRAJECTORY_STAGE_ITEMS) + .map(|candidate| TraceTrajectoryStageItemRecord { + id: Uuid::new_v4(), + item_id: None, + note_id: Some(candidate.note_id), + chunk_id: Some(candidate.chunk_id), + metrics: serde_json::json!({ + "retrieval_rank": candidate.retrieval_rank, + "chunk_index": candidate.chunk_index, + }), + }) + .collect(); + + TraceTrajectoryStageRecord { + stage_id: Uuid::new_v4(), + stage_order: 2, + stage_name: "recall.candidates".to_string(), + stage_payload, + created_at: args.now, + items, + } +} + +fn build_trace_fusion_stage( + args: &BuildTraceArgs<'_>, + path_label: &str, +) -> TraceTrajectoryStageRecord { + let items: Vec = args + .fused_results + .iter() + .take(MAX_TRAJECTORY_STAGE_ITEMS) + .map(|scored| TraceTrajectoryStageItemRecord { + id: Uuid::new_v4(), + item_id: None, + note_id: Some(scored.item.note.note_id), + chunk_id: Some(scored.item.chunk.chunk_id), + metrics: serde_json::json!({ + "retrieval_rank": scored.item.retrieval_rank, + "final_score": scored.final_score, + }), + }) + .collect(); + + TraceTrajectoryStageRecord { + stage_id: Uuid::new_v4(), + stage_order: 3, + stage_name: "fusion.merge".to_string(), + stage_payload: serde_json::json!({ + "schema": SEARCH_RETRIEVAL_TRAJECTORY_SCHEMA_V1, + "path": path_label, + "stats": { + "scored_count": args.scored_count, + "fused_count": args.fused_count, + }, + "decisions": { + "fusion_weight": args.policies.retrieval_sources_policy.fusion_weight, + "structured_field_weight": args.policies.retrieval_sources_policy.structured_field_weight, + "fusion_priority": args.policies.retrieval_sources_policy.fusion_priority, + "structured_field_priority": args.policies.retrieval_sources_policy.structured_field_priority, + }, + }), + created_at: args.now, + items, + } +} + +fn build_trace_rerank_stage( + args: &BuildTraceArgs<'_>, + path_label: &str, +) -> TraceTrajectoryStageRecord { + let items: Vec = args + .fused_results + .iter() + .take(MAX_TRAJECTORY_STAGE_ITEMS) + .map(|scored| TraceTrajectoryStageItemRecord { + id: Uuid::new_v4(), + item_id: None, + note_id: Some(scored.item.note.note_id), + chunk_id: Some(scored.item.chunk.chunk_id), + metrics: serde_json::json!({ + "rerank_score": scored.rerank_score, + "rerank_rank": scored.rerank_rank, + "rerank_norm": scored.rerank_norm, + "retrieval_norm": scored.retrieval_norm, + "final_score": scored.final_score, + }), + }) + .collect(); + + TraceTrajectoryStageRecord { + stage_id: Uuid::new_v4(), + stage_order: 4, + stage_name: "rerank.score".to_string(), + stage_payload: serde_json::json!({ + "schema": SEARCH_RETRIEVAL_TRAJECTORY_SCHEMA_V1, + "path": path_label, + "stats": { + "reranked_count": args.scored_count, + }, + "decisions": { + "blend_enabled": args.policies.blend_policy.enabled, + "diversity_enabled": args.policies.diversity_policy.enabled, + }, + }), + created_at: args.now, + items, + } +} + +fn build_trace_final_stage( + args: &BuildTraceArgs<'_>, + path_label: &str, +) -> TraceTrajectoryStageRecord { + TraceTrajectoryStageRecord { + stage_id: Uuid::new_v4(), + stage_order: 5, + stage_name: "selection.final".to_string(), + stage_payload: serde_json::json!({ + "schema": SEARCH_RETRIEVAL_TRAJECTORY_SCHEMA_V1, + "path": path_label, + "stats": { + "selected_count": args.selected_count, + "top_k": args.top_k, + }, + }), + created_at: args.now, + items: Vec::new(), + } +}