From 2e51ad65202d08621cf3699b2b4728e7928861e6 Mon Sep 17 00:00:00 2001 From: Yvette Carlisle Date: Tue, 30 Jun 2026 10:48:27 +0800 Subject: [PATCH 1/3] {"schema":"decodex/commit/1","summary":"Split add_note validation non-English traversal.","authority":"manual"} --- .../elf-service/src/add_note/validation.rs | 185 +----------------- .../src/add_note/validation/non_english.rs | 178 +++++++++++++++++ .../add_note/validation/non_english/tests.rs | 45 +++++ 3 files changed, 230 insertions(+), 178 deletions(-) create mode 100644 packages/elf-service/src/add_note/validation/non_english.rs create mode 100644 packages/elf-service/src/add_note/validation/non_english/tests.rs diff --git a/packages/elf-service/src/add_note/validation.rs b/packages/elf-service/src/add_note/validation.rs index 05dd2c3f..f8d7b5f8 100644 --- a/packages/elf-service/src/add_note/validation.rs +++ b/packages/elf-service/src/add_note/validation.rs @@ -1,7 +1,7 @@ -use serde_json::Value; +mod non_english; use crate::{ - Error, NoteOp, Result, StructuredFields, + Error, NoteOp, Result, add_note::types::{self, AddNoteInput, AddNoteRequest, AddNoteResult}, structured_fields, }; @@ -53,15 +53,16 @@ pub(super) fn validate_add_note_request(req: &AddNoteRequest) -> Result<()> { { return Err(Error::NonEnglishInput { field: format!("$.notes[{idx}].key") }); } - if let Some(path) = find_non_english_path_in_structured( + if let Some(path) = non_english::find_non_english_path_in_structured( note.structured.as_ref(), &format!("$.notes[{idx}].structured"), ) { return Err(Error::NonEnglishInput { field: path }); } - if let Some(path) = - find_non_english_path(¬e.source_ref, &format!("$.notes[{idx}].source_ref")) - { + if let Some(path) = non_english::find_non_english_path( + ¬e.source_ref, + &format!("$.notes[{idx}].source_ref"), + ) { return Err(Error::NonEnglishInput { field: path }); } } @@ -135,178 +136,6 @@ pub(super) fn apply_write_policy_to_note( Ok((result.transformed, policy.is_some().then_some(result.audit))) } -fn find_non_english_path_in_structured( - structured: Option<&StructuredFields>, - base: &str, -) -> Option { - let structured = structured?; - - if let Some(summary) = structured.summary.as_ref() - && !english_gate::is_english_natural_language(summary) - { - return Some(format!("{base}.summary")); - } - if let Some(items) = structured.facts.as_ref() { - for (idx, item) in items.iter().enumerate() { - if !english_gate::is_english_natural_language(item) { - return Some(format!("{base}.facts[{idx}]")); - } - } - } - if let Some(items) = structured.concepts.as_ref() { - for (idx, item) in items.iter().enumerate() { - if !english_gate::is_english_natural_language(item) { - return Some(format!("{base}.concepts[{idx}]")); - } - } - } - if let Some(items) = structured.entities.as_ref() { - for (idx, entity) in items.iter().enumerate() { - let base = format!("{base}.entities[{idx}]"); - - if let Some(canonical) = entity.canonical.as_ref() - && !english_gate::is_english_natural_language(canonical) - { - return Some(format!("{base}.canonical")); - } - if let Some(kind) = entity.kind.as_ref() - && !english_gate::is_english_natural_language(kind) - { - return Some(format!("{base}.kind")); - } - if let Some(aliases) = entity.aliases.as_ref() { - for (alias_idx, alias) in aliases.iter().enumerate() { - if !english_gate::is_english_natural_language(alias) { - return Some(format!("{base}.aliases[{alias_idx}]")); - } - } - } - } - } - if let Some(items) = structured.relations.as_ref() { - for (idx, relation) in items.iter().enumerate() { - let base = format!("{base}.relations[{idx}]"); - - if let Some(subject) = relation.subject.as_ref() { - let subject_base = format!("{base}.subject"); - - if let Some(canonical) = subject.canonical.as_ref() - && !english_gate::is_english_natural_language(canonical) - { - return Some(format!("{subject_base}.canonical")); - } - if let Some(kind) = subject.kind.as_ref() - && !english_gate::is_english_natural_language(kind) - { - return Some(format!("{subject_base}.kind")); - } - if let Some(aliases) = subject.aliases.as_ref() { - for (alias_idx, alias) in aliases.iter().enumerate() { - if !english_gate::is_english_natural_language(alias) { - return Some(format!("{subject_base}.aliases[{alias_idx}]")); - } - } - } - } - if let Some(predicate) = relation.predicate.as_ref() - && !english_gate::is_english_natural_language(predicate) - { - return Some(format!("{base}.predicate")); - } - if let Some(object) = relation.object.as_ref() { - if let Some(entity) = object.entity.as_ref() { - let object_base = format!("{base}.object.entity"); - - if let Some(canonical) = entity.canonical.as_ref() - && !english_gate::is_english_natural_language(canonical) - { - return Some(format!("{object_base}.canonical")); - } - if let Some(kind) = entity.kind.as_ref() - && !english_gate::is_english_natural_language(kind) - { - return Some(format!("{object_base}.kind")); - } - if let Some(aliases) = entity.aliases.as_ref() { - for (alias_idx, alias) in aliases.iter().enumerate() { - if !english_gate::is_english_natural_language(alias) { - return Some(format!("{object_base}.aliases[{alias_idx}]")); - } - } - } - } - if let Some(value) = object.value.as_ref() - && !english_gate::is_english_natural_language(value) - { - return Some(format!("{base}.object.value")); - } - } - } - } - - None -} - -fn find_non_english_path(value: &Value, path: &str) -> Option { - find_non_english_path_inner(value, path, true) -} - -fn find_non_english_path_inner( - value: &Value, - path: &str, - is_identifier_lane: bool, -) -> Option { - fn has_english_gate(text: &str, is_identifier_lane: bool) -> bool { - if is_identifier_lane { - return english_gate::is_english_identifier(text); - } - - english_gate::is_english_natural_language(text) - } - - match value { - Value::String(text) => - if !has_english_gate(text, is_identifier_lane) { - Some(path.to_string()) - } else { - None - }, - Value::Array(items) => { - for (idx, item) in items.iter().enumerate() { - let child_path = format!("{path}[{idx}]"); - - if let Some(found) = - find_non_english_path_inner(item, &child_path, is_identifier_lane) - { - return Some(found); - } - } - - None - }, - Value::Object(map) => { - for (key, value) in map.iter() { - let identifier_lane = is_identifier_lane - || matches!(key.as_str(), "ref" | "schema" | "resolver" | "hashes" | "state"); - let child_path = format!("{path}[\"{}\"]", escape_json_path_key(key)); - - if let Some(found) = - find_non_english_path_inner(value, &child_path, identifier_lane) - { - return Some(found); - } - } - - None - }, - _ => None, - } -} - -fn escape_json_path_key(key: &str) -> String { - key.replace('\\', "\\\\").replace('"', "\\\"") -} - fn extract_structured_rejection_field_path(err: &Error) -> Option { match err { Error::NonEnglishInput { field } => Some(field.clone()), diff --git a/packages/elf-service/src/add_note/validation/non_english.rs b/packages/elf-service/src/add_note/validation/non_english.rs new file mode 100644 index 00000000..d3ffc251 --- /dev/null +++ b/packages/elf-service/src/add_note/validation/non_english.rs @@ -0,0 +1,178 @@ +use serde_json::Value; + +use crate::structured_fields::StructuredFields; +use elf_domain::english_gate; + +pub(super) fn find_non_english_path_in_structured( + structured: Option<&StructuredFields>, + base: &str, +) -> Option { + let structured = structured?; + + if let Some(summary) = structured.summary.as_ref() + && !english_gate::is_english_natural_language(summary) + { + return Some(format!("{base}.summary")); + } + if let Some(items) = structured.facts.as_ref() { + for (idx, item) in items.iter().enumerate() { + if !english_gate::is_english_natural_language(item) { + return Some(format!("{base}.facts[{idx}]")); + } + } + } + if let Some(items) = structured.concepts.as_ref() { + for (idx, item) in items.iter().enumerate() { + if !english_gate::is_english_natural_language(item) { + return Some(format!("{base}.concepts[{idx}]")); + } + } + } + if let Some(items) = structured.entities.as_ref() { + for (idx, entity) in items.iter().enumerate() { + let base = format!("{base}.entities[{idx}]"); + + if let Some(canonical) = entity.canonical.as_ref() + && !english_gate::is_english_natural_language(canonical) + { + return Some(format!("{base}.canonical")); + } + if let Some(kind) = entity.kind.as_ref() + && !english_gate::is_english_natural_language(kind) + { + return Some(format!("{base}.kind")); + } + if let Some(aliases) = entity.aliases.as_ref() { + for (alias_idx, alias) in aliases.iter().enumerate() { + if !english_gate::is_english_natural_language(alias) { + return Some(format!("{base}.aliases[{alias_idx}]")); + } + } + } + } + } + if let Some(items) = structured.relations.as_ref() { + for (idx, relation) in items.iter().enumerate() { + let base = format!("{base}.relations[{idx}]"); + + if let Some(subject) = relation.subject.as_ref() { + let subject_base = format!("{base}.subject"); + + if let Some(canonical) = subject.canonical.as_ref() + && !english_gate::is_english_natural_language(canonical) + { + return Some(format!("{subject_base}.canonical")); + } + if let Some(kind) = subject.kind.as_ref() + && !english_gate::is_english_natural_language(kind) + { + return Some(format!("{subject_base}.kind")); + } + if let Some(aliases) = subject.aliases.as_ref() { + for (alias_idx, alias) in aliases.iter().enumerate() { + if !english_gate::is_english_natural_language(alias) { + return Some(format!("{subject_base}.aliases[{alias_idx}]")); + } + } + } + } + if let Some(predicate) = relation.predicate.as_ref() + && !english_gate::is_english_natural_language(predicate) + { + return Some(format!("{base}.predicate")); + } + if let Some(object) = relation.object.as_ref() { + if let Some(entity) = object.entity.as_ref() { + let object_base = format!("{base}.object.entity"); + + if let Some(canonical) = entity.canonical.as_ref() + && !english_gate::is_english_natural_language(canonical) + { + return Some(format!("{object_base}.canonical")); + } + if let Some(kind) = entity.kind.as_ref() + && !english_gate::is_english_natural_language(kind) + { + return Some(format!("{object_base}.kind")); + } + if let Some(aliases) = entity.aliases.as_ref() { + for (alias_idx, alias) in aliases.iter().enumerate() { + if !english_gate::is_english_natural_language(alias) { + return Some(format!("{object_base}.aliases[{alias_idx}]")); + } + } + } + } + if let Some(value) = object.value.as_ref() + && !english_gate::is_english_natural_language(value) + { + return Some(format!("{base}.object.value")); + } + } + } + } + + None +} + +pub(super) fn find_non_english_path(value: &Value, path: &str) -> Option { + find_non_english_path_inner(value, path, true) +} + +fn find_non_english_path_inner( + value: &Value, + path: &str, + is_identifier_lane: bool, +) -> Option { + fn has_english_gate(text: &str, is_identifier_lane: bool) -> bool { + if is_identifier_lane { + return english_gate::is_english_identifier(text); + } + + english_gate::is_english_natural_language(text) + } + + match value { + Value::String(text) => + if !has_english_gate(text, is_identifier_lane) { + Some(path.to_string()) + } else { + None + }, + Value::Array(items) => { + for (idx, item) in items.iter().enumerate() { + let child_path = format!("{path}[{idx}]"); + + if let Some(found) = + find_non_english_path_inner(item, &child_path, is_identifier_lane) + { + return Some(found); + } + } + + None + }, + Value::Object(map) => { + for (key, value) in map.iter() { + let identifier_lane = is_identifier_lane + || matches!(key.as_str(), "ref" | "schema" | "resolver" | "hashes" | "state"); + let child_path = format!("{path}[\"{}\"]", escape_json_path_key(key)); + + if let Some(found) = + find_non_english_path_inner(value, &child_path, identifier_lane) + { + return Some(found); + } + } + + None + }, + _ => None, + } +} + +fn escape_json_path_key(key: &str) -> String { + key.replace('\\', "\\\\").replace('"', "\\\"") +} + +#[cfg(test)] mod tests; diff --git a/packages/elf-service/src/add_note/validation/non_english/tests.rs b/packages/elf-service/src/add_note/validation/non_english/tests.rs new file mode 100644 index 00000000..960ab9c6 --- /dev/null +++ b/packages/elf-service/src/add_note/validation/non_english/tests.rs @@ -0,0 +1,45 @@ +use crate::{ + add_note::validation::non_english, + structured_fields::{ + StructuredEntity, StructuredFields, StructuredRelation, StructuredRelationObject, + }, +}; + +#[test] +fn source_ref_path_escapes_quotes_and_backslashes() { + let value = serde_json::json!({ + "hint\"s": { + "quote\\path": "你好世界", + }, + }); + + assert_eq!( + non_english::find_non_english_path(&value, "$.source_ref"), + Some("$.source_ref[\"hint\\\"s\"][\"quote\\\\path\"]".to_string()) + ); +} + +#[test] +fn structured_relation_object_entity_alias_reports_precise_path() { + let structured = StructuredFields { + relations: Some(vec![StructuredRelation { + object: Some(StructuredRelationObject { + entity: Some(StructuredEntity { + aliases: Some(vec!["English alias".to_string(), "你好世界".to_string()]), + ..StructuredEntity::default() + }), + ..StructuredRelationObject::default() + }), + ..StructuredRelation::default() + }]), + ..StructuredFields::default() + }; + + assert_eq!( + non_english::find_non_english_path_in_structured( + Some(&structured), + "$.notes[0].structured", + ), + Some("$.notes[0].structured.relations[0].object.entity.aliases[1]".to_string()) + ); +} From 050ce4b79468f869a7d35eff06f9c46726937359 Mon Sep 17 00:00:00 2001 From: Yvette Carlisle Date: Tue, 30 Jun 2026 11:04:08 +0800 Subject: [PATCH 2/3] {"schema":"decodex/commit/1","summary":"Split add_note materialization paths.","authority":"manual"} --- .../elf-service/src/add_note/materialize.rs | 305 +----------------- .../src/add_note/materialize/add.rs | 91 ++++++ .../src/add_note/materialize/none.rs | 107 ++++++ .../src/add_note/materialize/update.rs | 145 +++++++++ .../src/add_note/materialize/update/tests.rs | 75 +++++ 5 files changed, 423 insertions(+), 300 deletions(-) create mode 100644 packages/elf-service/src/add_note/materialize/add.rs create mode 100644 packages/elf-service/src/add_note/materialize/none.rs create mode 100644 packages/elf-service/src/add_note/materialize/update.rs create mode 100644 packages/elf-service/src/add_note/materialize/update/tests.rs diff --git a/packages/elf-service/src/add_note/materialize.rs b/packages/elf-service/src/add_note/materialize.rs index a85afe29..77945cf7 100644 --- a/packages/elf-service/src/add_note/materialize.rs +++ b/packages/elf-service/src/add_note/materialize.rs @@ -1,303 +1,8 @@ +mod add; +mod none; mod structured_materialization; +mod update; -use sqlx::{Postgres, Transaction}; -use time::OffsetDateTime; -use uuid::Uuid; - -use crate::{ - ElfService, InsertVersionArgs, NoteOp, Result, access, - add_note::{ - persistence::{self}, - types::{AddNoteContext, AddNoteInput, AddNoteResult}, - }, - structured_fields, +pub(super) use self::{ + add::handle_add_note_add, none::handle_add_note_none, update::handle_add_note_update, }; -use elf_domain::{memory_policy::MemoryPolicyDecision, ttl}; -use elf_storage::models::MemoryNote; - -pub(super) async fn handle_add_note_add( - service: &ElfService, - tx: &mut Transaction<'_, Postgres>, - ctx: &AddNoteContext<'_>, - note: &AddNoteInput, - note_id: Uuid, -) -> Result { - access::ensure_active_project_scope_grant( - &mut **tx, - ctx.tenant_id, - ctx.project_id, - ctx.scope, - ctx.agent_id, - ) - .await?; - - let expires_at = - ttl::compute_expires_at(note.ttl_days, note.r#type.as_str(), &service.cfg, ctx.now); - let memory_note = MemoryNote { - note_id, - tenant_id: ctx.tenant_id.to_string(), - project_id: ctx.project_id.to_string(), - agent_id: ctx.agent_id.to_string(), - scope: ctx.scope.to_string(), - r#type: note.r#type.clone(), - key: note.key.clone(), - text: note.text.clone(), - importance: note.importance, - confidence: note.confidence, - status: "active".to_string(), - created_at: ctx.now, - updated_at: ctx.now, - expires_at, - embedding_version: ctx.embed_version.to_string(), - source_ref: note.source_ref.clone(), - hit_count: 0, - last_hit_at: None, - }; - - persistence::insert_memory_note_tx(tx, &memory_note).await?; - - let note_version_id = crate::insert_version( - &mut **tx, - InsertVersionArgs { - note_id: memory_note.note_id, - op: "ADD", - prev_snapshot: None, - new_snapshot: Some(crate::note_snapshot(&memory_note)), - reason: "add_note", - actor: ctx.agent_id, - ts: ctx.now, - }, - ) - .await?; - - structured_materialization::upsert_structured_and_enqueue_outbox( - tx, - note, - memory_note.note_id, - ctx.embed_version, - ctx.now, - ) - .await?; - structured_materialization::persist_graph_fields_if_present( - tx, - ctx.tenant_id, - ctx.project_id, - ctx.agent_id, - ctx.scope, - memory_note.note_id, - ctx.now, - note.structured.as_ref(), - ) - .await?; - - Ok(note_version_id) -} - -pub(super) async fn handle_add_note_update( - service: &ElfService, - tx: &mut Transaction<'_, Postgres>, - note: &AddNoteInput, - note_id: Uuid, - agent_id: &str, - now: OffsetDateTime, - policy_decision: MemoryPolicyDecision, -) -> Result<(AddNoteResult, Option)> { - let mut existing: MemoryNote = - sqlx::query_as::<_, MemoryNote>("SELECT * FROM memory_notes WHERE note_id = $1 FOR UPDATE") - .bind(note_id) - .fetch_one(&mut **tx) - .await?; - let prev_snapshot = crate::note_snapshot(&existing); - let requested_ttl = note.ttl_days.filter(|days| *days > 0); - let expires_at = match requested_ttl { - Some(ttl) => ttl::compute_expires_at(Some(ttl), note.r#type.as_str(), &service.cfg, now), - None => existing.expires_at, - }; - let expires_match = requested_ttl.map_or(existing.expires_at == expires_at, |ttl_days| { - match existing.expires_at { - Some(existing_expires_at) => { - let existing_ttl = (existing_expires_at - existing.updated_at).whole_days() as i64; - - existing_ttl == ttl_days - }, - None => false, - } - }); - let float_eps = 1e-6_f32; - let unchanged = existing.text == note.text - && (existing.importance - note.importance).abs() <= float_eps - && (existing.confidence - note.confidence).abs() <= float_eps - && expires_match - && existing.source_ref == note.source_ref; - - if unchanged { - return Ok(( - AddNoteResult { - note_id: Some(note_id), - op: NoteOp::None, - policy_decision: MemoryPolicyDecision::Ignore, - reason_code: None, - field_path: None, - write_policy_audit: None, - }, - None, - )); - } - - access::ensure_active_project_scope_grant( - &mut **tx, - existing.tenant_id.as_str(), - existing.project_id.as_str(), - existing.scope.as_str(), - existing.agent_id.as_str(), - ) - .await?; - - existing.text = note.text.clone(); - existing.importance = note.importance; - existing.confidence = note.confidence; - existing.updated_at = now; - existing.expires_at = expires_at; - existing.source_ref = note.source_ref.clone(); - - persistence::update_memory_note_tx(tx, &existing).await?; - - let note_version_id = crate::insert_version( - &mut **tx, - InsertVersionArgs { - note_id: existing.note_id, - op: "UPDATE", - prev_snapshot: Some(prev_snapshot), - new_snapshot: Some(crate::note_snapshot(&existing)), - reason: "add_note", - actor: agent_id, - ts: now, - }, - ) - .await?; - - structured_materialization::persist_graph_fields_if_present( - tx, - existing.tenant_id.as_str(), - existing.project_id.as_str(), - existing.agent_id.as_str(), - existing.scope.as_str(), - existing.note_id, - now, - note.structured.as_ref(), - ) - .await?; - structured_materialization::upsert_structured_and_enqueue_outbox( - tx, - note, - existing.note_id, - existing.embedding_version.as_str(), - now, - ) - .await?; - - Ok(( - AddNoteResult { - note_id: Some(note_id), - op: NoteOp::Update, - policy_decision, - reason_code: None, - field_path: None, - write_policy_audit: None, - }, - Some(note_version_id), - )) -} - -#[allow(clippy::too_many_arguments)] -pub(super) async fn handle_add_note_none( - tx: &mut Transaction<'_, Postgres>, - ctx: &AddNoteContext<'_>, - note: &AddNoteInput, - note_id: Uuid, - now: OffsetDateTime, - embed_version: &str, - policy_decision: MemoryPolicyDecision, -) -> Result<(AddNoteResult, Option)> { - let mut should_update = false; - - if let Some(structured) = note.structured.as_ref() { - if !structured.is_effectively_empty() { - structured_fields::upsert_structured_fields_tx(tx, note_id, structured, now).await?; - crate::enqueue_outbox_tx(&mut **tx, note_id, "UPSERT", embed_version, now).await?; - - should_update = true; - } - if structured.has_graph_fields() { - structured_materialization::persist_graph_fields_if_present( - tx, - ctx.tenant_id, - ctx.project_id, - ctx.agent_id, - ctx.scope, - note_id, - now, - Some(structured), - ) - .await?; - - should_update = true; - } - } - - if should_update { - let note_row: MemoryNote = sqlx::query_as("SELECT * FROM memory_notes WHERE note_id = $1") - .bind(note_id) - .fetch_one(&mut **tx) - .await?; - let snapshot = crate::note_snapshot(¬e_row); - let note_version_id = crate::insert_version( - &mut **tx, - InsertVersionArgs { - note_id, - op: "UPDATE", - prev_snapshot: Some(snapshot.clone()), - new_snapshot: Some(snapshot), - reason: "add_note_structured", - actor: ctx.agent_id, - ts: now, - }, - ) - .await?; - - if matches!(ctx.scope, "project_shared" | "org_shared") { - access::ensure_active_project_scope_grant( - &mut **tx, - ctx.tenant_id, - ctx.project_id, - ctx.scope, - ctx.agent_id, - ) - .await?; - } - - return Ok(( - AddNoteResult { - note_id: Some(note_id), - op: NoteOp::Update, - policy_decision, - reason_code: None, - field_path: None, - write_policy_audit: None, - }, - Some(note_version_id), - )); - } - - Ok(( - AddNoteResult { - note_id: Some(note_id), - op: NoteOp::None, - policy_decision, - reason_code: None, - field_path: None, - write_policy_audit: None, - }, - None, - )) -} diff --git a/packages/elf-service/src/add_note/materialize/add.rs b/packages/elf-service/src/add_note/materialize/add.rs new file mode 100644 index 00000000..a9a4d6f0 --- /dev/null +++ b/packages/elf-service/src/add_note/materialize/add.rs @@ -0,0 +1,91 @@ +use sqlx::{Postgres, Transaction}; +use uuid::Uuid; + +use crate::{ + ElfService, InsertVersionArgs, Result, access, + add_note::{ + materialize::structured_materialization, + persistence::{self}, + types::{AddNoteContext, AddNoteInput}, + }, +}; +use elf_domain::ttl; +use elf_storage::models::MemoryNote; + +pub(in crate::add_note) async fn handle_add_note_add( + service: &ElfService, + tx: &mut Transaction<'_, Postgres>, + ctx: &AddNoteContext<'_>, + note: &AddNoteInput, + note_id: Uuid, +) -> Result { + access::ensure_active_project_scope_grant( + &mut **tx, + ctx.tenant_id, + ctx.project_id, + ctx.scope, + ctx.agent_id, + ) + .await?; + + let expires_at = + ttl::compute_expires_at(note.ttl_days, note.r#type.as_str(), &service.cfg, ctx.now); + let memory_note = MemoryNote { + note_id, + tenant_id: ctx.tenant_id.to_string(), + project_id: ctx.project_id.to_string(), + agent_id: ctx.agent_id.to_string(), + scope: ctx.scope.to_string(), + r#type: note.r#type.clone(), + key: note.key.clone(), + text: note.text.clone(), + importance: note.importance, + confidence: note.confidence, + status: "active".to_string(), + created_at: ctx.now, + updated_at: ctx.now, + expires_at, + embedding_version: ctx.embed_version.to_string(), + source_ref: note.source_ref.clone(), + hit_count: 0, + last_hit_at: None, + }; + + persistence::insert_memory_note_tx(tx, &memory_note).await?; + + let note_version_id = crate::insert_version( + &mut **tx, + InsertVersionArgs { + note_id: memory_note.note_id, + op: "ADD", + prev_snapshot: None, + new_snapshot: Some(crate::note_snapshot(&memory_note)), + reason: "add_note", + actor: ctx.agent_id, + ts: ctx.now, + }, + ) + .await?; + + structured_materialization::upsert_structured_and_enqueue_outbox( + tx, + note, + memory_note.note_id, + ctx.embed_version, + ctx.now, + ) + .await?; + structured_materialization::persist_graph_fields_if_present( + tx, + ctx.tenant_id, + ctx.project_id, + ctx.agent_id, + ctx.scope, + memory_note.note_id, + ctx.now, + note.structured.as_ref(), + ) + .await?; + + Ok(note_version_id) +} diff --git a/packages/elf-service/src/add_note/materialize/none.rs b/packages/elf-service/src/add_note/materialize/none.rs new file mode 100644 index 00000000..ecb22b5f --- /dev/null +++ b/packages/elf-service/src/add_note/materialize/none.rs @@ -0,0 +1,107 @@ +use sqlx::{Postgres, Transaction}; +use time::OffsetDateTime; +use uuid::Uuid; + +use crate::{ + InsertVersionArgs, NoteOp, Result, access, + add_note::{ + materialize::structured_materialization, + types::{AddNoteContext, AddNoteInput, AddNoteResult}, + }, + structured_fields, +}; +use elf_domain::memory_policy::MemoryPolicyDecision; +use elf_storage::models::MemoryNote; + +#[allow(clippy::too_many_arguments)] +pub(in crate::add_note) async fn handle_add_note_none( + tx: &mut Transaction<'_, Postgres>, + ctx: &AddNoteContext<'_>, + note: &AddNoteInput, + note_id: Uuid, + now: OffsetDateTime, + embed_version: &str, + policy_decision: MemoryPolicyDecision, +) -> Result<(AddNoteResult, Option)> { + let mut should_update = false; + + if let Some(structured) = note.structured.as_ref() { + if !structured.is_effectively_empty() { + structured_fields::upsert_structured_fields_tx(tx, note_id, structured, now).await?; + crate::enqueue_outbox_tx(&mut **tx, note_id, "UPSERT", embed_version, now).await?; + + should_update = true; + } + if structured.has_graph_fields() { + structured_materialization::persist_graph_fields_if_present( + tx, + ctx.tenant_id, + ctx.project_id, + ctx.agent_id, + ctx.scope, + note_id, + now, + Some(structured), + ) + .await?; + + should_update = true; + } + } + + if should_update { + let note_row: MemoryNote = sqlx::query_as("SELECT * FROM memory_notes WHERE note_id = $1") + .bind(note_id) + .fetch_one(&mut **tx) + .await?; + let snapshot = crate::note_snapshot(¬e_row); + let note_version_id = crate::insert_version( + &mut **tx, + InsertVersionArgs { + note_id, + op: "UPDATE", + prev_snapshot: Some(snapshot.clone()), + new_snapshot: Some(snapshot), + reason: "add_note_structured", + actor: ctx.agent_id, + ts: now, + }, + ) + .await?; + + if matches!(ctx.scope, "project_shared" | "org_shared") { + access::ensure_active_project_scope_grant( + &mut **tx, + ctx.tenant_id, + ctx.project_id, + ctx.scope, + ctx.agent_id, + ) + .await?; + } + + return Ok(( + AddNoteResult { + note_id: Some(note_id), + op: NoteOp::Update, + policy_decision, + reason_code: None, + field_path: None, + write_policy_audit: None, + }, + Some(note_version_id), + )); + } + + Ok(( + AddNoteResult { + note_id: Some(note_id), + op: NoteOp::None, + policy_decision, + reason_code: None, + field_path: None, + write_policy_audit: None, + }, + None, + )) +} diff --git a/packages/elf-service/src/add_note/materialize/update.rs b/packages/elf-service/src/add_note/materialize/update.rs new file mode 100644 index 00000000..9a9266d8 --- /dev/null +++ b/packages/elf-service/src/add_note/materialize/update.rs @@ -0,0 +1,145 @@ +use sqlx::{Postgres, Transaction}; +use time::OffsetDateTime; +use uuid::Uuid; + +use crate::{ + ElfService, InsertVersionArgs, NoteOp, Result, access, + add_note::{ + materialize::structured_materialization, + persistence::{self}, + types::{AddNoteInput, AddNoteResult}, + }, +}; +use elf_domain::{memory_policy::MemoryPolicyDecision, ttl}; +use elf_storage::models::MemoryNote; + +pub(in crate::add_note) async fn handle_add_note_update( + service: &ElfService, + tx: &mut Transaction<'_, Postgres>, + note: &AddNoteInput, + note_id: Uuid, + agent_id: &str, + now: OffsetDateTime, + policy_decision: MemoryPolicyDecision, +) -> Result<(AddNoteResult, Option)> { + let mut existing: MemoryNote = + sqlx::query_as::<_, MemoryNote>("SELECT * FROM memory_notes WHERE note_id = $1 FOR UPDATE") + .bind(note_id) + .fetch_one(&mut **tx) + .await?; + let prev_snapshot = crate::note_snapshot(&existing); + let requested_ttl = requested_ttl_days(note); + let expires_at = match requested_ttl { + Some(ttl) => ttl::compute_expires_at(Some(ttl), note.r#type.as_str(), &service.cfg, now), + None => existing.expires_at, + }; + + if note_update_is_unchanged(&existing, note, expires_at, requested_ttl) { + return Ok(( + AddNoteResult { + note_id: Some(note_id), + op: NoteOp::None, + policy_decision: MemoryPolicyDecision::Ignore, + reason_code: None, + field_path: None, + write_policy_audit: None, + }, + None, + )); + } + + access::ensure_active_project_scope_grant( + &mut **tx, + existing.tenant_id.as_str(), + existing.project_id.as_str(), + existing.scope.as_str(), + existing.agent_id.as_str(), + ) + .await?; + + existing.text = note.text.clone(); + existing.importance = note.importance; + existing.confidence = note.confidence; + existing.updated_at = now; + existing.expires_at = expires_at; + existing.source_ref = note.source_ref.clone(); + + persistence::update_memory_note_tx(tx, &existing).await?; + + let note_version_id = crate::insert_version( + &mut **tx, + InsertVersionArgs { + note_id: existing.note_id, + op: "UPDATE", + prev_snapshot: Some(prev_snapshot), + new_snapshot: Some(crate::note_snapshot(&existing)), + reason: "add_note", + actor: agent_id, + ts: now, + }, + ) + .await?; + + structured_materialization::persist_graph_fields_if_present( + tx, + existing.tenant_id.as_str(), + existing.project_id.as_str(), + existing.agent_id.as_str(), + existing.scope.as_str(), + existing.note_id, + now, + note.structured.as_ref(), + ) + .await?; + structured_materialization::upsert_structured_and_enqueue_outbox( + tx, + note, + existing.note_id, + existing.embedding_version.as_str(), + now, + ) + .await?; + + Ok(( + AddNoteResult { + note_id: Some(note_id), + op: NoteOp::Update, + policy_decision, + reason_code: None, + field_path: None, + write_policy_audit: None, + }, + Some(note_version_id), + )) +} + +fn requested_ttl_days(note: &AddNoteInput) -> Option { + note.ttl_days.filter(|days| *days > 0) +} + +fn note_update_is_unchanged( + existing: &MemoryNote, + note: &AddNoteInput, + expires_at: Option, + requested_ttl: Option, +) -> bool { + let expires_match = requested_ttl.map_or(existing.expires_at == expires_at, |ttl_days| { + match existing.expires_at { + Some(existing_expires_at) => { + let existing_ttl = (existing_expires_at - existing.updated_at).whole_days(); + + existing_ttl == ttl_days + }, + None => false, + } + }); + let float_eps = 1e-6_f32; + + existing.text == note.text + && (existing.importance - note.importance).abs() <= float_eps + && (existing.confidence - note.confidence).abs() <= float_eps + && expires_match + && existing.source_ref == note.source_ref +} + +#[cfg(test)] mod tests; diff --git a/packages/elf-service/src/add_note/materialize/update/tests.rs b/packages/elf-service/src/add_note/materialize/update/tests.rs new file mode 100644 index 00000000..ba4548d7 --- /dev/null +++ b/packages/elf-service/src/add_note/materialize/update/tests.rs @@ -0,0 +1,75 @@ +use time::{Duration, OffsetDateTime}; +use uuid::Uuid; + +use crate::{ + add_note::{materialize::update, types::AddNoteInput}, + structured_fields::StructuredFields, +}; +use elf_storage::models::MemoryNote; + +fn memory_note(now: OffsetDateTime) -> MemoryNote { + MemoryNote { + note_id: Uuid::from_u128(1), + tenant_id: "t".to_string(), + project_id: "p".to_string(), + agent_id: "a".to_string(), + scope: "agent_private".to_string(), + r#type: "fact".to_string(), + key: Some("k".to_string()), + text: "English text.".to_string(), + importance: 0.5, + confidence: 0.9, + status: "active".to_string(), + created_at: now, + updated_at: now, + expires_at: Some(now + Duration::days(7)), + embedding_version: "v1".to_string(), + source_ref: serde_json::json!({"source": "test"}), + hit_count: 0, + last_hit_at: None, + } +} + +fn add_note_input() -> AddNoteInput { + AddNoteInput { + r#type: "fact".to_string(), + key: Some("k".to_string()), + text: "English text.".to_string(), + structured: Some(StructuredFields::default()), + importance: 0.5, + confidence: 0.9, + ttl_days: Some(7), + source_ref: serde_json::json!({"source": "test"}), + write_policy: None, + } +} + +#[test] +fn unchanged_update_accepts_same_effective_ttl() { + let now = OffsetDateTime::UNIX_EPOCH; + let existing = memory_note(now); + let note = add_note_input(); + + assert!(update::note_update_is_unchanged( + &existing, + ¬e, + Some(now + Duration::days(7)), + Some(7), + )); +} + +#[test] +fn unchanged_update_detects_source_ref_drift() { + let now = OffsetDateTime::UNIX_EPOCH; + let existing = memory_note(now); + let mut note = add_note_input(); + + note.source_ref = serde_json::json!({"source": "changed"}); + + assert!(!update::note_update_is_unchanged( + &existing, + ¬e, + Some(now + Duration::days(7)), + Some(7), + )); +} From 52727b4e083141db8d938dfc93c68689e8dac980 Mon Sep 17 00:00:00 2001 From: Yvette Carlisle Date: Tue, 30 Jun 2026 11:13:20 +0800 Subject: [PATCH 3/3] {"schema":"decodex/commit/1","summary":"Split add_event materialization paths.","authority":"manual"} --- .../elf-service/src/add_event/materialize.rs | 294 +----------------- .../src/add_event/materialize/add.rs | 106 +++++++ .../src/add_event/materialize/none.rs | 112 +++++++ .../src/add_event/materialize/none/tests.rs | 30 ++ .../src/add_event/materialize/update.rs | 100 ++++++ 5 files changed, 357 insertions(+), 285 deletions(-) create mode 100644 packages/elf-service/src/add_event/materialize/add.rs create mode 100644 packages/elf-service/src/add_event/materialize/none.rs create mode 100644 packages/elf-service/src/add_event/materialize/none/tests.rs create mode 100644 packages/elf-service/src/add_event/materialize/update.rs diff --git a/packages/elf-service/src/add_event/materialize.rs b/packages/elf-service/src/add_event/materialize.rs index b80df7c6..c5517cd3 100644 --- a/packages/elf-service/src/add_event/materialize.rs +++ b/packages/elf-service/src/add_event/materialize.rs @@ -1,16 +1,14 @@ +mod add; +mod none; +mod update; + use sqlx::{Postgres, Transaction}; -use uuid::Uuid; use crate::{ - InsertVersionArgs, NoteOp, Result, UpdateDecision, access, - add_event::{ - persistence::{self}, - types::{AddEventPersistOutput, AddEventResult, PersistExtractedNoteArgs}, - }, - graph_ingestion, structured_fields, + Result, UpdateDecision, + add_event::types::{AddEventPersistOutput, PersistExtractedNoteArgs}, }; use elf_domain::memory_policy::MemoryPolicyDecision; -use elf_storage::models::MemoryNote; pub(super) async fn persist_extracted_note_decision( tx: &mut Transaction<'_, Postgres>, @@ -20,284 +18,10 @@ pub(super) async fn persist_extracted_note_decision( ) -> Result { match (decision, args) { (UpdateDecision::Add { note_id, .. }, args) => - persist_extracted_note_add(tx, args, note_id, policy_decision).await, + add::persist_extracted_note_add(tx, args, note_id, policy_decision).await, (UpdateDecision::Update { note_id, .. }, args) => - persist_extracted_note_update(tx, args, note_id, policy_decision).await, + update::persist_extracted_note_update(tx, args, note_id, policy_decision).await, (UpdateDecision::None { note_id, .. }, args) => - persist_extracted_note_none(tx, args, note_id, policy_decision).await, + none::persist_extracted_note_none(tx, args, note_id, policy_decision).await, } } - -async fn persist_extracted_note_add( - tx: &mut Transaction<'_, Postgres>, - args: PersistExtractedNoteArgs<'_>, - note_id: Uuid, - policy_decision: MemoryPolicyDecision, -) -> Result { - access::ensure_active_project_scope_grant( - &mut **tx, - args.req.tenant_id.as_str(), - args.project_id, - args.scope, - args.req.agent_id.as_str(), - ) - .await?; - - let memory_note = MemoryNote { - note_id, - tenant_id: args.req.tenant_id.clone(), - project_id: args.project_id.to_string(), - agent_id: args.req.agent_id.clone(), - scope: args.scope.to_string(), - r#type: args.note_type.to_string(), - key: args.key.map(ToString::to_string), - text: args.text.to_string(), - importance: args.importance, - confidence: args.confidence, - status: "active".to_string(), - created_at: args.now, - updated_at: args.now, - expires_at: args.expires_at, - embedding_version: args.embed_version.to_string(), - source_ref: args.source_ref, - hit_count: 0, - last_hit_at: None, - }; - - persistence::insert_memory_note_tx(tx, &memory_note).await?; - - let note_version_id = crate::insert_version( - &mut **tx, - InsertVersionArgs { - note_id: memory_note.note_id, - op: "ADD", - prev_snapshot: None, - new_snapshot: Some(crate::note_snapshot(&memory_note)), - reason: "add_event", - actor: args.req.agent_id.as_str(), - ts: args.now, - }, - ) - .await?; - - crate::enqueue_outbox_tx( - &mut **tx, - memory_note.note_id, - "UPSERT", - args.embed_version, - args.now, - ) - .await?; - persistence::upsert_structured_fields_tx(tx, args.structured, memory_note.note_id, args.now) - .await?; - - if let Some(structured) = args.structured - && structured.has_graph_fields() - { - graph_ingestion::persist_graph_fields_tx( - tx, - args.req.tenant_id.as_str(), - args.project_id, - args.req.agent_id.as_str(), - args.scope, - memory_note.note_id, - structured, - args.now, - ) - .await?; - } - - Ok(( - AddEventResult { - note_id: Some(note_id), - op: NoteOp::Add, - policy_decision, - reason_code: None, - reason: args.reason.cloned(), - field_path: None, - write_policy_audits: None, - }, - Some(note_version_id), - )) -} - -async fn persist_extracted_note_update( - tx: &mut Transaction<'_, Postgres>, - args: PersistExtractedNoteArgs<'_>, - note_id: Uuid, - policy_decision: MemoryPolicyDecision, -) -> Result { - let mut existing: MemoryNote = - sqlx::query_as::<_, MemoryNote>("SELECT * FROM memory_notes WHERE note_id = $1 FOR UPDATE") - .bind(note_id) - .fetch_one(&mut **tx) - .await?; - - access::ensure_active_project_scope_grant( - &mut **tx, - existing.tenant_id.as_str(), - existing.project_id.as_str(), - existing.scope.as_str(), - existing.agent_id.as_str(), - ) - .await?; - - let prev_snapshot = crate::note_snapshot(&existing); - - existing.text = args.text.to_string(); - existing.importance = args.importance; - existing.confidence = args.confidence; - existing.updated_at = args.now; - existing.expires_at = args.expires_at; - existing.source_ref = args.source_ref; - - persistence::update_memory_note_tx(tx, &existing).await?; - - let note_version_id = crate::insert_version( - &mut **tx, - InsertVersionArgs { - note_id: existing.note_id, - op: "UPDATE", - prev_snapshot: Some(prev_snapshot), - new_snapshot: Some(crate::note_snapshot(&existing)), - reason: "add_event", - actor: args.req.agent_id.as_str(), - ts: args.now, - }, - ) - .await?; - - crate::enqueue_outbox_tx( - &mut **tx, - existing.note_id, - "UPSERT", - existing.embedding_version.as_str(), - args.now, - ) - .await?; - persistence::upsert_structured_fields_tx(tx, args.structured, existing.note_id, args.now) - .await?; - - if let Some(structured) = args.structured - && structured.has_graph_fields() - { - graph_ingestion::persist_graph_fields_tx( - tx, - args.req.tenant_id.as_str(), - existing.project_id.as_str(), - args.req.agent_id.as_str(), - args.scope, - existing.note_id, - structured, - args.now, - ) - .await?; - } - - Ok(( - AddEventResult { - note_id: Some(note_id), - op: NoteOp::Update, - policy_decision, - reason_code: None, - reason: args.reason.cloned(), - field_path: None, - write_policy_audits: None, - }, - Some(note_version_id), - )) -} - -async fn persist_extracted_note_none( - tx: &mut Transaction<'_, Postgres>, - args: PersistExtractedNoteArgs<'_>, - note_id: Uuid, - policy_decision: MemoryPolicyDecision, -) -> Result { - let mut did_update = false; - - if let Some(structured) = args.structured - && !structured.is_effectively_empty() - { - structured_fields::upsert_structured_fields_tx(tx, note_id, structured, args.now).await?; - crate::enqueue_outbox_tx(&mut **tx, note_id, "UPSERT", args.embed_version, args.now) - .await?; - - did_update = true; - } - if let Some(structured) = args.structured - && structured.has_graph_fields() - { - graph_ingestion::persist_graph_fields_tx( - tx, - args.req.tenant_id.as_str(), - args.project_id, - args.req.agent_id.as_str(), - args.scope, - note_id, - structured, - args.now, - ) - .await?; - - did_update = true; - } - - if did_update { - let note_row: MemoryNote = sqlx::query_as("SELECT * FROM memory_notes WHERE note_id = $1") - .bind(note_id) - .fetch_one(&mut **tx) - .await?; - let snapshot = crate::note_snapshot(¬e_row); - let note_version_id = crate::insert_version( - &mut **tx, - InsertVersionArgs { - note_id, - op: "UPDATE", - prev_snapshot: Some(snapshot.clone()), - new_snapshot: Some(snapshot), - reason: "add_event_structured", - actor: args.req.agent_id.as_str(), - ts: args.now, - }, - ) - .await?; - - if matches!(args.scope, "project_shared" | "org_shared") { - access::ensure_active_project_scope_grant( - &mut **tx, - args.req.tenant_id.as_str(), - args.project_id, - args.scope, - args.req.agent_id.as_str(), - ) - .await?; - } - - return Ok(( - AddEventResult { - note_id: Some(note_id), - op: NoteOp::Update, - policy_decision, - reason_code: None, - reason: args.reason.cloned(), - field_path: None, - write_policy_audits: None, - }, - Some(note_version_id), - )); - } - - Ok(( - AddEventResult { - note_id: Some(note_id), - op: NoteOp::None, - policy_decision, - reason_code: None, - reason: args.reason.cloned(), - field_path: None, - write_policy_audits: None, - }, - None, - )) -} diff --git a/packages/elf-service/src/add_event/materialize/add.rs b/packages/elf-service/src/add_event/materialize/add.rs new file mode 100644 index 00000000..5a14f39f --- /dev/null +++ b/packages/elf-service/src/add_event/materialize/add.rs @@ -0,0 +1,106 @@ +use sqlx::{Postgres, Transaction}; +use uuid::Uuid; + +use crate::{ + InsertVersionArgs, NoteOp, Result, access, + add_event::{ + persistence::{self}, + types::{AddEventPersistOutput, AddEventResult, PersistExtractedNoteArgs}, + }, + graph_ingestion, +}; +use elf_domain::memory_policy::MemoryPolicyDecision; +use elf_storage::models::MemoryNote; + +pub(super) async fn persist_extracted_note_add( + tx: &mut Transaction<'_, Postgres>, + args: PersistExtractedNoteArgs<'_>, + note_id: Uuid, + policy_decision: MemoryPolicyDecision, +) -> Result { + access::ensure_active_project_scope_grant( + &mut **tx, + args.req.tenant_id.as_str(), + args.project_id, + args.scope, + args.req.agent_id.as_str(), + ) + .await?; + + let memory_note = MemoryNote { + note_id, + tenant_id: args.req.tenant_id.clone(), + project_id: args.project_id.to_string(), + agent_id: args.req.agent_id.clone(), + scope: args.scope.to_string(), + r#type: args.note_type.to_string(), + key: args.key.map(ToString::to_string), + text: args.text.to_string(), + importance: args.importance, + confidence: args.confidence, + status: "active".to_string(), + created_at: args.now, + updated_at: args.now, + expires_at: args.expires_at, + embedding_version: args.embed_version.to_string(), + source_ref: args.source_ref, + hit_count: 0, + last_hit_at: None, + }; + + persistence::insert_memory_note_tx(tx, &memory_note).await?; + + let note_version_id = crate::insert_version( + &mut **tx, + InsertVersionArgs { + note_id: memory_note.note_id, + op: "ADD", + prev_snapshot: None, + new_snapshot: Some(crate::note_snapshot(&memory_note)), + reason: "add_event", + actor: args.req.agent_id.as_str(), + ts: args.now, + }, + ) + .await?; + + crate::enqueue_outbox_tx( + &mut **tx, + memory_note.note_id, + "UPSERT", + args.embed_version, + args.now, + ) + .await?; + persistence::upsert_structured_fields_tx(tx, args.structured, memory_note.note_id, args.now) + .await?; + + if let Some(structured) = args.structured + && structured.has_graph_fields() + { + graph_ingestion::persist_graph_fields_tx( + tx, + args.req.tenant_id.as_str(), + args.project_id, + args.req.agent_id.as_str(), + args.scope, + memory_note.note_id, + structured, + args.now, + ) + .await?; + } + + Ok(( + AddEventResult { + note_id: Some(note_id), + op: NoteOp::Add, + policy_decision, + reason_code: None, + reason: args.reason.cloned(), + field_path: None, + write_policy_audits: None, + }, + Some(note_version_id), + )) +} diff --git a/packages/elf-service/src/add_event/materialize/none.rs b/packages/elf-service/src/add_event/materialize/none.rs new file mode 100644 index 00000000..8c43f756 --- /dev/null +++ b/packages/elf-service/src/add_event/materialize/none.rs @@ -0,0 +1,112 @@ +use sqlx::{Postgres, Transaction}; +use uuid::Uuid; + +use crate::{ + InsertVersionArgs, NoteOp, Result, access, + add_event::types::{AddEventPersistOutput, AddEventResult, PersistExtractedNoteArgs}, + graph_ingestion, + structured_fields::{self, StructuredFields}, +}; +use elf_domain::memory_policy::MemoryPolicyDecision; +use elf_storage::models::MemoryNote; + +pub(super) async fn persist_extracted_note_none( + tx: &mut Transaction<'_, Postgres>, + args: PersistExtractedNoteArgs<'_>, + note_id: Uuid, + policy_decision: MemoryPolicyDecision, +) -> Result { + let Some(structured) = args.structured else { + return Ok(none_result(note_id, policy_decision, args.reason.cloned())); + }; + + if !structured_requires_update(structured) { + return Ok(none_result(note_id, policy_decision, args.reason.cloned())); + } + if !structured.is_effectively_empty() { + structured_fields::upsert_structured_fields_tx(tx, note_id, structured, args.now).await?; + crate::enqueue_outbox_tx(&mut **tx, note_id, "UPSERT", args.embed_version, args.now) + .await?; + } + if structured.has_graph_fields() { + graph_ingestion::persist_graph_fields_tx( + tx, + args.req.tenant_id.as_str(), + args.project_id, + args.req.agent_id.as_str(), + args.scope, + note_id, + structured, + args.now, + ) + .await?; + } + + let note_row: MemoryNote = sqlx::query_as("SELECT * FROM memory_notes WHERE note_id = $1") + .bind(note_id) + .fetch_one(&mut **tx) + .await?; + let snapshot = crate::note_snapshot(¬e_row); + let note_version_id = crate::insert_version( + &mut **tx, + InsertVersionArgs { + note_id, + op: "UPDATE", + prev_snapshot: Some(snapshot.clone()), + new_snapshot: Some(snapshot), + reason: "add_event_structured", + actor: args.req.agent_id.as_str(), + ts: args.now, + }, + ) + .await?; + + if matches!(args.scope, "project_shared" | "org_shared") { + access::ensure_active_project_scope_grant( + &mut **tx, + args.req.tenant_id.as_str(), + args.project_id, + args.scope, + args.req.agent_id.as_str(), + ) + .await?; + } + + Ok(( + AddEventResult { + note_id: Some(note_id), + op: NoteOp::Update, + policy_decision, + reason_code: None, + reason: args.reason.cloned(), + field_path: None, + write_policy_audits: None, + }, + Some(note_version_id), + )) +} + +fn none_result( + note_id: Uuid, + policy_decision: MemoryPolicyDecision, + reason: Option, +) -> AddEventPersistOutput { + ( + AddEventResult { + note_id: Some(note_id), + op: NoteOp::None, + policy_decision, + reason_code: None, + reason, + field_path: None, + write_policy_audits: None, + }, + None, + ) +} + +fn structured_requires_update(structured: &StructuredFields) -> bool { + !structured.is_effectively_empty() || structured.has_graph_fields() +} + +#[cfg(test)] mod tests; diff --git a/packages/elf-service/src/add_event/materialize/none/tests.rs b/packages/elf-service/src/add_event/materialize/none/tests.rs new file mode 100644 index 00000000..8119ad37 --- /dev/null +++ b/packages/elf-service/src/add_event/materialize/none/tests.rs @@ -0,0 +1,30 @@ +use crate::{ + add_event::materialize::none, + structured_fields::{ + StructuredEntity, StructuredFields, StructuredRelation, StructuredRelationObject, + }, +}; + +#[test] +fn structured_requires_update_rejects_empty_structured_fields() { + assert!(!none::structured_requires_update(&StructuredFields::default())); +} + +#[test] +fn structured_requires_update_accepts_graph_only_fields() { + let structured = StructuredFields { + relations: Some(vec![StructuredRelation { + object: Some(StructuredRelationObject { + entity: Some(StructuredEntity { + canonical: Some("Entity".to_string()), + ..StructuredEntity::default() + }), + ..StructuredRelationObject::default() + }), + ..StructuredRelation::default() + }]), + ..StructuredFields::default() + }; + + assert!(none::structured_requires_update(&structured)); +} diff --git a/packages/elf-service/src/add_event/materialize/update.rs b/packages/elf-service/src/add_event/materialize/update.rs new file mode 100644 index 00000000..cac6e657 --- /dev/null +++ b/packages/elf-service/src/add_event/materialize/update.rs @@ -0,0 +1,100 @@ +use sqlx::{Postgres, Transaction}; +use uuid::Uuid; + +use crate::{ + InsertVersionArgs, NoteOp, Result, access, + add_event::{ + persistence::{self}, + types::{AddEventPersistOutput, AddEventResult, PersistExtractedNoteArgs}, + }, + graph_ingestion, +}; +use elf_domain::memory_policy::MemoryPolicyDecision; +use elf_storage::models::MemoryNote; + +pub(super) async fn persist_extracted_note_update( + tx: &mut Transaction<'_, Postgres>, + args: PersistExtractedNoteArgs<'_>, + note_id: Uuid, + policy_decision: MemoryPolicyDecision, +) -> Result { + let mut existing: MemoryNote = + sqlx::query_as::<_, MemoryNote>("SELECT * FROM memory_notes WHERE note_id = $1 FOR UPDATE") + .bind(note_id) + .fetch_one(&mut **tx) + .await?; + + access::ensure_active_project_scope_grant( + &mut **tx, + existing.tenant_id.as_str(), + existing.project_id.as_str(), + existing.scope.as_str(), + existing.agent_id.as_str(), + ) + .await?; + + let prev_snapshot = crate::note_snapshot(&existing); + + existing.text = args.text.to_string(); + existing.importance = args.importance; + existing.confidence = args.confidence; + existing.updated_at = args.now; + existing.expires_at = args.expires_at; + existing.source_ref = args.source_ref; + + persistence::update_memory_note_tx(tx, &existing).await?; + + let note_version_id = crate::insert_version( + &mut **tx, + InsertVersionArgs { + note_id: existing.note_id, + op: "UPDATE", + prev_snapshot: Some(prev_snapshot), + new_snapshot: Some(crate::note_snapshot(&existing)), + reason: "add_event", + actor: args.req.agent_id.as_str(), + ts: args.now, + }, + ) + .await?; + + crate::enqueue_outbox_tx( + &mut **tx, + existing.note_id, + "UPSERT", + existing.embedding_version.as_str(), + args.now, + ) + .await?; + persistence::upsert_structured_fields_tx(tx, args.structured, existing.note_id, args.now) + .await?; + + if let Some(structured) = args.structured + && structured.has_graph_fields() + { + graph_ingestion::persist_graph_fields_tx( + tx, + args.req.tenant_id.as_str(), + existing.project_id.as_str(), + args.req.agent_id.as_str(), + args.scope, + existing.note_id, + structured, + args.now, + ) + .await?; + } + + Ok(( + AddEventResult { + note_id: Some(note_id), + op: NoteOp::Update, + policy_decision, + reason_code: None, + reason: args.reason.cloned(), + field_path: None, + write_policy_audits: None, + }, + Some(note_version_id), + )) +}