diff --git a/packages/elf-service/src/progressive_search.rs b/packages/elf-service/src/progressive_search.rs index 659c40a6..32de5c66 100644 --- a/packages/elf-service/src/progressive_search.rs +++ b/packages/elf-service/src/progressive_search.rs @@ -1,6 +1,7 @@ //! Progressive-search APIs. mod details; +mod followup; mod service; mod storage; mod types; diff --git a/packages/elf-service/src/progressive_search/followup.rs b/packages/elf-service/src/progressive_search/followup.rs new file mode 100644 index 00000000..cd516793 --- /dev/null +++ b/packages/elf-service/src/progressive_search/followup.rs @@ -0,0 +1,228 @@ +use std::collections::{HashMap, hash_set::HashSet}; + +use sqlx; +use time::OffsetDateTime; +use uuid::Uuid; + +use crate::{ + ElfService, Error, PayloadLevel, Result, + access::{self, ORG_PROJECT_ID}, + progressive_search::{ + details::{self, SearchDetailsBuildArgs}, + storage::{self}, + types::{ + SearchDetailsRequest, SearchDetailsResponse, SearchIndexItem, SearchSessionGetRequest, + SearchSessionGetResponse, SearchSessionItemRecord, SearchTimelineGroup, + SearchTimelineRequest, SearchTimelineResponse, + }, + }, + structured_fields, +}; +use elf_storage::models::MemoryNote; + +impl ElfService { + /// Reloads a stored search session and optionally extends its TTL. + pub async fn search_session_get( + &self, + req: SearchSessionGetRequest, + ) -> Result { + let tenant_id = req.tenant_id.trim(); + let project_id = req.project_id.trim(); + let agent_id = req.agent_id.trim(); + + if tenant_id.is_empty() || project_id.is_empty() || agent_id.is_empty() { + return Err(Error::InvalidRequest { + message: "tenant_id, project_id, and agent_id are required.".to_string(), + }); + } + + let now = OffsetDateTime::now_utc(); + let session = + storage::load_search_session(&self.db.pool, req.search_session_id, now).await?; + + details::validate_search_session_access(&session, tenant_id, project_id, agent_id)?; + + let touch = req.touch.unwrap_or(true); + let expires_at = if touch { + storage::touch_search_session(&self.db.pool, &session, now).await? + } else { + session.expires_at + }; + let top_k = req.top_k.unwrap_or(self.cfg.memory.top_k).max(1); + let items: Vec = session + .items + .into_iter() + .take(top_k as usize) + .map(|item| item.to_index_item()) + .collect(); + + Ok(SearchSessionGetResponse { + trace_id: session.trace_id, + search_session_id: session.search_session_id, + expires_at, + items, + mode: session.mode, + query_plan: session.query_plan, + trajectory_summary: session.trajectory_summary, + }) + } + + /// Reprojects a stored search session into timeline groups. + pub async fn search_timeline( + &self, + req: SearchTimelineRequest, + ) -> Result { + let tenant_id = req.tenant_id.trim(); + let project_id = req.project_id.trim(); + let agent_id = req.agent_id.trim(); + + if tenant_id.is_empty() || project_id.is_empty() || agent_id.is_empty() { + return Err(Error::InvalidRequest { + message: "tenant_id, project_id, and agent_id are required.".to_string(), + }); + } + + let now = OffsetDateTime::now_utc(); + let session = + storage::load_search_session(&self.db.pool, req.search_session_id, now).await?; + + details::validate_search_session_access(&session, tenant_id, project_id, agent_id)?; + + let expires_at = storage::touch_search_session(&self.db.pool, &session, now).await?; + let payload_level = req.payload_level; + let group_by = req.group_by.unwrap_or_else(|| { + if payload_level == PayloadLevel::L0 { "none".to_string() } else { "day".to_string() } + }); + + match group_by.as_str() { + "day" => details::build_timeline_by_day( + session.search_session_id, + expires_at, + &session.items, + ), + "none" => Ok(SearchTimelineResponse { + search_session_id: session.search_session_id, + expires_at, + groups: vec![SearchTimelineGroup { + date: "all".to_string(), + items: session + .items + .iter() + .map(SearchSessionItemRecord::to_index_item) + .collect(), + }], + }), + _ => Err(Error::InvalidRequest { + message: "group_by must be one of: day, none.".to_string(), + }), + } + } + + /// Materializes selected note details out of a stored search session. + pub async fn search_details(&self, req: SearchDetailsRequest) -> Result { + let tenant_id = req.tenant_id.trim(); + let project_id = req.project_id.trim(); + let agent_id = req.agent_id.trim(); + + if tenant_id.is_empty() || project_id.is_empty() || agent_id.is_empty() { + return Err(Error::InvalidRequest { + message: "tenant_id, project_id, and agent_id are required.".to_string(), + }); + } + + let now = OffsetDateTime::now_utc(); + let session = + storage::load_search_session(&self.db.pool, req.search_session_id, now).await?; + + details::validate_search_session_access(&session, tenant_id, project_id, agent_id)?; + + let expires_at = storage::touch_search_session(&self.db.pool, &session, now).await?; + let mut by_note_id: HashMap = HashMap::new(); + + for item in &session.items { + by_note_id.insert(item.note_id, item.clone()); + } + + let mut requested_in_session = Vec::new(); + let mut seen = HashSet::new(); + + for note_id in &req.note_ids { + if by_note_id.contains_key(note_id) && seen.insert(*note_id) { + requested_in_session.push(*note_id); + } + } + + let mut notes_by_id = HashMap::new(); + + if !requested_in_session.is_empty() { + let rows: Vec = sqlx::query_as::<_, MemoryNote>( + "\ +SELECT * +FROM memory_notes +WHERE note_id = ANY($1::uuid[]) + AND tenant_id = $2 + AND ( + project_id = $3 + OR (project_id = $4 AND scope = 'org_shared') + )", + ) + .bind(requested_in_session.as_slice()) + .bind(session.tenant_id.as_str()) + .bind(session.project_id.as_str()) + .bind(ORG_PROJECT_ID) + .fetch_all(&self.db.pool) + .await?; + + for note in rows { + notes_by_id.insert(note.note_id, note); + } + } + + let structured_by_note = if req.payload_level == PayloadLevel::L0 { + HashMap::new() + } else { + structured_fields::fetch_structured_fields( + &self.db.pool, + requested_in_session.as_slice(), + ) + .await? + }; + let allowed_scopes = details::resolve_read_scopes(&self.cfg, &session.read_profile)?; + let shared_grants = access::load_shared_read_grants_with_org_shared( + &self.db.pool, + session.tenant_id.as_str(), + session.project_id.as_str(), + agent_id, + allowed_scopes.iter().any(|scope| scope == "org_shared"), + ) + .await?; + let record_hits = req.record_hits.unwrap_or(true); + let details_args = SearchDetailsBuildArgs { + session_items_by_note_id: &by_note_id, + notes_by_id: ¬es_by_id, + structured_by_note: &structured_by_note, + session: &session, + shared_grants: &shared_grants, + allowed_scopes: &allowed_scopes, + now, + record_hits_enabled: record_hits, + payload_level: req.payload_level, + max_note_chars: self.cfg.memory.max_note_chars as usize, + }; + let (results, hits) = details::build_search_details_results(req.note_ids, details_args); + + if !hits.is_empty() { + let mut tx = self.db.pool.begin().await?; + + storage::record_detail_hits(&mut *tx, &session.query, &hits, now).await?; + + tx.commit().await?; + } + + Ok(SearchDetailsResponse { + search_session_id: session.search_session_id, + expires_at, + results, + }) + } +} diff --git a/packages/elf-service/src/progressive_search/service.rs b/packages/elf-service/src/progressive_search/service.rs index 4a90d0ad..04817342 100644 --- a/packages/elf-service/src/progressive_search/service.rs +++ b/packages/elf-service/src/progressive_search/service.rs @@ -1,27 +1,19 @@ -use std::collections::{HashMap, hash_set::HashSet}; - -use sqlx; use time::{Duration, OffsetDateTime}; use uuid::Uuid; use crate::{ - ElfService, Error, PayloadLevel, Result, SearchRequest, - access::{self, ORG_PROJECT_ID}, + ElfService, Error, Result, SearchRequest, progressive_search::{ - details::{self, SearchDetailsBuildArgs}, + details, storage::{self}, types::{ - NewSearchSession, SESSION_SLIDING_TTL_HOURS, SearchDetailsRequest, - SearchDetailsResponse, SearchIndexItem, SearchIndexPlannedResponse, - SearchIndexResponse, SearchSessionGetRequest, SearchSessionGetResponse, - SearchSessionItemRecord, SearchSessionMode, SearchSessionizePath, - SearchSessionizedOutput, SearchTimelineGroup, SearchTimelineRequest, - SearchTimelineResponse, + NewSearchSession, SESSION_SLIDING_TTL_HOURS, SearchIndexItem, + SearchIndexPlannedResponse, SearchIndexResponse, SearchSessionItemRecord, + SearchSessionMode, SearchSessionizePath, SearchSessionizedOutput, }, }, structured_fields, }; -use elf_storage::models::MemoryNote; impl ElfService { /// Runs the default progressive-search path and returns indexed results. @@ -149,209 +141,4 @@ impl ElfService { query_plan, }) } - - /// Reloads a stored search session and optionally extends its TTL. - pub async fn search_session_get( - &self, - req: SearchSessionGetRequest, - ) -> Result { - let tenant_id = req.tenant_id.trim(); - let project_id = req.project_id.trim(); - let agent_id = req.agent_id.trim(); - - if tenant_id.is_empty() || project_id.is_empty() || agent_id.is_empty() { - return Err(Error::InvalidRequest { - message: "tenant_id, project_id, and agent_id are required.".to_string(), - }); - } - - let now = OffsetDateTime::now_utc(); - let session = - storage::load_search_session(&self.db.pool, req.search_session_id, now).await?; - - details::validate_search_session_access(&session, tenant_id, project_id, agent_id)?; - - let touch = req.touch.unwrap_or(true); - let expires_at = if touch { - storage::touch_search_session(&self.db.pool, &session, now).await? - } else { - session.expires_at - }; - let top_k = req.top_k.unwrap_or(self.cfg.memory.top_k).max(1); - let items: Vec = session - .items - .into_iter() - .take(top_k as usize) - .map(|item| item.to_index_item()) - .collect(); - - Ok(SearchSessionGetResponse { - trace_id: session.trace_id, - search_session_id: session.search_session_id, - expires_at, - items, - mode: session.mode, - query_plan: session.query_plan, - trajectory_summary: session.trajectory_summary, - }) - } - - /// Reprojects a stored search session into timeline groups. - pub async fn search_timeline( - &self, - req: SearchTimelineRequest, - ) -> Result { - let tenant_id = req.tenant_id.trim(); - let project_id = req.project_id.trim(); - let agent_id = req.agent_id.trim(); - - if tenant_id.is_empty() || project_id.is_empty() || agent_id.is_empty() { - return Err(Error::InvalidRequest { - message: "tenant_id, project_id, and agent_id are required.".to_string(), - }); - } - - let now = OffsetDateTime::now_utc(); - let session = - storage::load_search_session(&self.db.pool, req.search_session_id, now).await?; - - details::validate_search_session_access(&session, tenant_id, project_id, agent_id)?; - - let expires_at = storage::touch_search_session(&self.db.pool, &session, now).await?; - let payload_level = req.payload_level; - let group_by = req.group_by.unwrap_or_else(|| { - if payload_level == PayloadLevel::L0 { "none".to_string() } else { "day".to_string() } - }); - - match group_by.as_str() { - "day" => details::build_timeline_by_day( - session.search_session_id, - expires_at, - &session.items, - ), - "none" => Ok(SearchTimelineResponse { - search_session_id: session.search_session_id, - expires_at, - groups: vec![SearchTimelineGroup { - date: "all".to_string(), - items: session - .items - .iter() - .map(SearchSessionItemRecord::to_index_item) - .collect(), - }], - }), - _ => Err(Error::InvalidRequest { - message: "group_by must be one of: day, none.".to_string(), - }), - } - } - - /// Materializes selected note details out of a stored search session. - pub async fn search_details(&self, req: SearchDetailsRequest) -> Result { - let tenant_id = req.tenant_id.trim(); - let project_id = req.project_id.trim(); - let agent_id = req.agent_id.trim(); - - if tenant_id.is_empty() || project_id.is_empty() || agent_id.is_empty() { - return Err(Error::InvalidRequest { - message: "tenant_id, project_id, and agent_id are required.".to_string(), - }); - } - - let now = OffsetDateTime::now_utc(); - let session = - storage::load_search_session(&self.db.pool, req.search_session_id, now).await?; - - details::validate_search_session_access(&session, tenant_id, project_id, agent_id)?; - - let expires_at = storage::touch_search_session(&self.db.pool, &session, now).await?; - let mut by_note_id: HashMap = HashMap::new(); - - for item in &session.items { - by_note_id.insert(item.note_id, item.clone()); - } - - let mut requested_in_session = Vec::new(); - let mut seen = HashSet::new(); - - for note_id in &req.note_ids { - if by_note_id.contains_key(note_id) && seen.insert(*note_id) { - requested_in_session.push(*note_id); - } - } - - let mut notes_by_id = HashMap::new(); - - if !requested_in_session.is_empty() { - let rows: Vec = sqlx::query_as::<_, MemoryNote>( - "\ -SELECT * -FROM memory_notes -WHERE note_id = ANY($1::uuid[]) - AND tenant_id = $2 - AND ( - project_id = $3 - OR (project_id = $4 AND scope = 'org_shared') - )", - ) - .bind(requested_in_session.as_slice()) - .bind(session.tenant_id.as_str()) - .bind(session.project_id.as_str()) - .bind(ORG_PROJECT_ID) - .fetch_all(&self.db.pool) - .await?; - - for note in rows { - notes_by_id.insert(note.note_id, note); - } - } - - let structured_by_note = if req.payload_level == PayloadLevel::L0 { - HashMap::new() - } else { - structured_fields::fetch_structured_fields( - &self.db.pool, - requested_in_session.as_slice(), - ) - .await? - }; - let allowed_scopes = details::resolve_read_scopes(&self.cfg, &session.read_profile)?; - let shared_grants = access::load_shared_read_grants_with_org_shared( - &self.db.pool, - session.tenant_id.as_str(), - session.project_id.as_str(), - agent_id, - allowed_scopes.iter().any(|scope| scope == "org_shared"), - ) - .await?; - let record_hits = req.record_hits.unwrap_or(true); - let details_args = SearchDetailsBuildArgs { - session_items_by_note_id: &by_note_id, - notes_by_id: ¬es_by_id, - structured_by_note: &structured_by_note, - session: &session, - shared_grants: &shared_grants, - allowed_scopes: &allowed_scopes, - now, - record_hits_enabled: record_hits, - payload_level: req.payload_level, - max_note_chars: self.cfg.memory.max_note_chars as usize, - }; - let (results, hits) = details::build_search_details_results(req.note_ids, details_args); - - if !hits.is_empty() { - let mut tx = self.db.pool.begin().await?; - - storage::record_detail_hits(&mut *tx, &session.query, &hits, now).await?; - - tx.commit().await?; - } - - Ok(SearchDetailsResponse { - search_session_id: session.search_session_id, - expires_at, - results, - }) - } }