Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/elf-service/src/progressive_search.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Progressive-search APIs.

mod details;
mod followup;
mod service;
mod storage;
mod types;
Expand Down
228 changes: 228 additions & 0 deletions packages/elf-service/src/progressive_search/followup.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
use std::collections::{HashMap, hash_set::HashSet};

use sqlx;
use time::OffsetDateTime;
use uuid::Uuid;

use crate::{
ElfService, Error, PayloadLevel, Result,
access::{self, ORG_PROJECT_ID},
progressive_search::{
details::{self, SearchDetailsBuildArgs},
storage::{self},
types::{
SearchDetailsRequest, SearchDetailsResponse, SearchIndexItem, SearchSessionGetRequest,
SearchSessionGetResponse, SearchSessionItemRecord, SearchTimelineGroup,
SearchTimelineRequest, SearchTimelineResponse,
},
},
structured_fields,
};
use elf_storage::models::MemoryNote;

impl ElfService {
/// Reloads a stored search session and optionally extends its TTL.
pub async fn search_session_get(
&self,
req: SearchSessionGetRequest,
) -> Result<SearchSessionGetResponse> {
let tenant_id = req.tenant_id.trim();
let project_id = req.project_id.trim();
let agent_id = req.agent_id.trim();

if tenant_id.is_empty() || project_id.is_empty() || agent_id.is_empty() {
return Err(Error::InvalidRequest {
message: "tenant_id, project_id, and agent_id are required.".to_string(),
});
}

let now = OffsetDateTime::now_utc();
let session =
storage::load_search_session(&self.db.pool, req.search_session_id, now).await?;

details::validate_search_session_access(&session, tenant_id, project_id, agent_id)?;

let touch = req.touch.unwrap_or(true);
let expires_at = if touch {
storage::touch_search_session(&self.db.pool, &session, now).await?
} else {
session.expires_at
};
let top_k = req.top_k.unwrap_or(self.cfg.memory.top_k).max(1);
let items: Vec<SearchIndexItem> = session
.items
.into_iter()
.take(top_k as usize)
.map(|item| item.to_index_item())
.collect();

Ok(SearchSessionGetResponse {
trace_id: session.trace_id,
search_session_id: session.search_session_id,
expires_at,
items,
mode: session.mode,
query_plan: session.query_plan,
trajectory_summary: session.trajectory_summary,
})
}

/// Reprojects a stored search session into timeline groups.
pub async fn search_timeline(
&self,
req: SearchTimelineRequest,
) -> Result<SearchTimelineResponse> {
let tenant_id = req.tenant_id.trim();
let project_id = req.project_id.trim();
let agent_id = req.agent_id.trim();

if tenant_id.is_empty() || project_id.is_empty() || agent_id.is_empty() {
return Err(Error::InvalidRequest {
message: "tenant_id, project_id, and agent_id are required.".to_string(),
});
}

let now = OffsetDateTime::now_utc();
let session =
storage::load_search_session(&self.db.pool, req.search_session_id, now).await?;

details::validate_search_session_access(&session, tenant_id, project_id, agent_id)?;

let expires_at = storage::touch_search_session(&self.db.pool, &session, now).await?;
let payload_level = req.payload_level;
let group_by = req.group_by.unwrap_or_else(|| {
if payload_level == PayloadLevel::L0 { "none".to_string() } else { "day".to_string() }
});

match group_by.as_str() {
"day" => details::build_timeline_by_day(
session.search_session_id,
expires_at,
&session.items,
),
"none" => Ok(SearchTimelineResponse {
search_session_id: session.search_session_id,
expires_at,
groups: vec![SearchTimelineGroup {
date: "all".to_string(),
items: session
.items
.iter()
.map(SearchSessionItemRecord::to_index_item)
.collect(),
}],
}),
_ => Err(Error::InvalidRequest {
message: "group_by must be one of: day, none.".to_string(),
}),
}
}

/// Materializes selected note details out of a stored search session.
pub async fn search_details(&self, req: SearchDetailsRequest) -> Result<SearchDetailsResponse> {
let tenant_id = req.tenant_id.trim();
let project_id = req.project_id.trim();
let agent_id = req.agent_id.trim();

if tenant_id.is_empty() || project_id.is_empty() || agent_id.is_empty() {
return Err(Error::InvalidRequest {
message: "tenant_id, project_id, and agent_id are required.".to_string(),
});
}

let now = OffsetDateTime::now_utc();
let session =
storage::load_search_session(&self.db.pool, req.search_session_id, now).await?;

details::validate_search_session_access(&session, tenant_id, project_id, agent_id)?;

let expires_at = storage::touch_search_session(&self.db.pool, &session, now).await?;
let mut by_note_id: HashMap<Uuid, SearchSessionItemRecord> = HashMap::new();

for item in &session.items {
by_note_id.insert(item.note_id, item.clone());
}

let mut requested_in_session = Vec::new();
let mut seen = HashSet::new();

for note_id in &req.note_ids {
if by_note_id.contains_key(note_id) && seen.insert(*note_id) {
requested_in_session.push(*note_id);
}
}

let mut notes_by_id = HashMap::new();

if !requested_in_session.is_empty() {
let rows: Vec<MemoryNote> = sqlx::query_as::<_, MemoryNote>(
"\
SELECT *
FROM memory_notes
WHERE note_id = ANY($1::uuid[])
AND tenant_id = $2
AND (
project_id = $3
OR (project_id = $4 AND scope = 'org_shared')
)",
)
.bind(requested_in_session.as_slice())
.bind(session.tenant_id.as_str())
.bind(session.project_id.as_str())
.bind(ORG_PROJECT_ID)
.fetch_all(&self.db.pool)
.await?;

for note in rows {
notes_by_id.insert(note.note_id, note);
}
}

let structured_by_note = if req.payload_level == PayloadLevel::L0 {
HashMap::new()
} else {
structured_fields::fetch_structured_fields(
&self.db.pool,
requested_in_session.as_slice(),
)
.await?
};
let allowed_scopes = details::resolve_read_scopes(&self.cfg, &session.read_profile)?;
let shared_grants = access::load_shared_read_grants_with_org_shared(
&self.db.pool,
session.tenant_id.as_str(),
session.project_id.as_str(),
agent_id,
allowed_scopes.iter().any(|scope| scope == "org_shared"),
)
.await?;
let record_hits = req.record_hits.unwrap_or(true);
let details_args = SearchDetailsBuildArgs {
session_items_by_note_id: &by_note_id,
notes_by_id: &notes_by_id,
structured_by_note: &structured_by_note,
session: &session,
shared_grants: &shared_grants,
allowed_scopes: &allowed_scopes,
now,
record_hits_enabled: record_hits,
payload_level: req.payload_level,
max_note_chars: self.cfg.memory.max_note_chars as usize,
};
let (results, hits) = details::build_search_details_results(req.note_ids, details_args);

if !hits.is_empty() {
let mut tx = self.db.pool.begin().await?;

storage::record_detail_hits(&mut *tx, &session.query, &hits, now).await?;

tx.commit().await?;
}

Ok(SearchDetailsResponse {
search_session_id: session.search_session_id,
expires_at,
results,
})
}
}
Loading