From 65d9e42f16e67c69559595ba12f2de0f4d7a9a12 Mon Sep 17 00:00:00 2001 From: Yvette Carlisle Date: Tue, 30 Jun 2026 08:42:29 -0400 Subject: [PATCH 1/6] {"schema":"decodex/commit/1","summary":"Split knowledge page readback DTO modules","authority":"manual"} --- .../elf-service/src/knowledge/api/readback.rs | 290 +----------------- .../src/knowledge/api/readback/lint.rs | 50 +++ .../src/knowledge/api/readback/page.rs | 17 + .../src/knowledge/api/readback/responses.rs | 26 ++ .../src/knowledge/api/readback/sections.rs | 88 ++++++ .../src/knowledge/api/readback/sources.rs | 45 +++ .../src/knowledge/api/readback/summary.rs | 60 ++++ 7 files changed, 302 insertions(+), 274 deletions(-) create mode 100644 packages/elf-service/src/knowledge/api/readback/lint.rs create mode 100644 packages/elf-service/src/knowledge/api/readback/page.rs create mode 100644 packages/elf-service/src/knowledge/api/readback/responses.rs create mode 100644 packages/elf-service/src/knowledge/api/readback/sections.rs create mode 100644 packages/elf-service/src/knowledge/api/readback/sources.rs create mode 100644 packages/elf-service/src/knowledge/api/readback/summary.rs diff --git a/packages/elf-service/src/knowledge/api/readback.rs b/packages/elf-service/src/knowledge/api/readback.rs index 069eb6d1..6915a43c 100644 --- a/packages/elf-service/src/knowledge/api/readback.rs +++ b/packages/elf-service/src/knowledge/api/readback.rs @@ -1,275 +1,17 @@ -use crate::knowledge::api::{ - self, KnowledgePage, KnowledgePageLintFinding, KnowledgePageSection, KnowledgePageSourceRef, - OffsetDateTime, Serialize, Uuid, Value, +mod lint; +mod page; +mod responses; +mod sections; +mod sources; +mod summary; + +pub use self::{ + lint::KnowledgePageLintFindingResponse, + page::KnowledgePageResponse, + responses::{ + KnowledgePageLintResponse, KnowledgePageRebuildResponse, KnowledgePagesListResponse, + }, + sections::{KnowledgePageSectionResponse, KnowledgePageSectionSourceBacklink}, + sources::KnowledgePageSourceRefResponse, + summary::KnowledgePageSummary, }; - -/// Response returned after rebuilding a derived knowledge page. -#[derive(Clone, Debug, Serialize)] -pub struct KnowledgePageRebuildResponse { - /// Rebuilt page with sections, source refs, and lint findings. - pub page: KnowledgePageResponse, -} - -/// Response returned by derived knowledge page listing. -#[derive(Clone, Debug, Serialize)] -pub struct KnowledgePagesListResponse { - /// Returned pages. - pub pages: Vec, -} - -/// Response returned after linting one knowledge page. -#[derive(Clone, Debug, Serialize)] -pub struct KnowledgePageLintResponse { - /// Page identifier. - pub page_id: Uuid, - /// Current lint findings. - pub findings: Vec, -} - -/// Summary DTO for one derived knowledge page. -#[derive(Clone, Debug, Serialize)] -pub struct KnowledgePageSummary { - /// Page identifier. - pub page_id: Uuid, - /// Tenant that owns the page. - pub tenant_id: String, - /// Project that owns the page. - pub project_id: String, - /// Page kind. - pub page_kind: String, - /// Stable page key. - pub page_key: String, - /// Page title. - pub title: String, - /// Versioned page contract schema. - pub contract_schema: String, - /// Page lifecycle status. - pub status: String, - /// Canonical source snapshot hash. - pub rebuild_source_hash: String, - /// Canonical page content hash. - pub content_hash: String, - /// Source coverage metadata. - pub source_coverage: Value, - /// Rebuild metadata. - pub rebuild_metadata: Value, - /// Previous-version diff metadata, when present. - pub previous_version_diff: Option, - /// Creation timestamp. - pub created_at: OffsetDateTime, - /// Last update timestamp. - pub updated_at: OffsetDateTime, - /// Last rebuild timestamp. - pub rebuilt_at: OffsetDateTime, -} -impl From for KnowledgePageSummary { - fn from(page: KnowledgePage) -> Self { - Self { - page_id: page.page_id, - tenant_id: page.tenant_id, - project_id: page.project_id, - page_kind: page.page_kind, - page_key: page.page_key, - title: page.title, - contract_schema: page.contract_schema, - status: page.status, - rebuild_source_hash: page.rebuild_source_hash, - content_hash: page.content_hash, - source_coverage: page.source_coverage, - previous_version_diff: api::previous_version_diff_from_metadata(&page.rebuild_metadata), - rebuild_metadata: page.rebuild_metadata, - created_at: page.created_at, - updated_at: page.updated_at, - rebuilt_at: page.rebuilt_at, - } - } -} - -/// Full readback DTO for one derived knowledge page. -#[derive(Clone, Debug, Serialize)] -pub struct KnowledgePageResponse { - /// Page summary. - pub page: KnowledgePageSummary, - /// Page sections. - pub sections: Vec, - /// Normalized source refs. - pub source_refs: Vec, - /// Lint findings. - pub lint_findings: Vec, -} - -/// Readback DTO for one page section. -#[derive(Clone, Debug, Serialize)] -pub struct KnowledgePageSectionResponse { - /// Section identifier. - pub section_id: Uuid, - /// Parent page identifier. - pub page_id: Uuid, - /// Stable section key. - pub section_key: String, - /// Section heading. - pub heading: String, - /// Section role. - pub role: String, - /// Section content. - pub content: String, - /// Display order. - pub ordinal: i32, - /// Serialized citation array. - pub citations: Value, - /// Reason this section is intentionally unsupported, when present. - pub unsupported_reason: Option, - /// Count of section-local citations. - pub citation_count: usize, - /// Count of normalized source refs attached to this section. - pub source_ref_count: usize, - /// True when the section has both citations and normalized source backlinks. - pub coverage_complete: bool, - /// Section-local normalized source backlinks. - pub source_backlinks: Vec, - /// Section content hash. - pub content_hash: String, - /// Creation timestamp. - pub created_at: OffsetDateTime, - /// Last update timestamp. - pub updated_at: OffsetDateTime, -} -impl From for KnowledgePageSectionResponse { - fn from(section: KnowledgePageSection) -> Self { - Self { - section_id: section.section_id, - page_id: section.page_id, - section_key: section.section_key, - heading: section.heading, - role: section.role, - content: section.content, - ordinal: section.ordinal, - citations: section.citations, - unsupported_reason: section.unsupported_reason, - citation_count: 0, - source_ref_count: 0, - coverage_complete: false, - source_backlinks: Vec::new(), - content_hash: section.content_hash, - created_at: section.created_at, - updated_at: section.updated_at, - } - } -} - -/// Section-local source backlink used by page readback and viewer provenance. -#[derive(Clone, Debug, Serialize)] -pub struct KnowledgePageSectionSourceBacklink { - /// Source kind. - pub source_kind: String, - /// Authoritative source identifier. - pub source_id: Uuid, - /// Captured source status. - pub source_status: Option, - /// Captured source update timestamp. - pub source_updated_at: Option, - /// Captured source content hash. - pub source_content_hash: Option, -} -impl From<&KnowledgePageSourceRef> for KnowledgePageSectionSourceBacklink { - fn from(source_ref: &KnowledgePageSourceRef) -> Self { - Self { - source_kind: source_ref.source_kind.clone(), - source_id: source_ref.source_id, - source_status: source_ref.source_status.clone(), - source_updated_at: source_ref.source_updated_at, - source_content_hash: source_ref.source_content_hash.clone(), - } - } -} - -/// Readback DTO for one normalized source reference. -#[derive(Clone, Debug, Serialize)] -pub struct KnowledgePageSourceRefResponse { - /// Source-reference row identifier. - pub ref_id: Uuid, - /// Parent page identifier. - pub page_id: Uuid, - /// Citing section, when section-scoped. - pub section_id: Option, - /// Source kind. - pub source_kind: String, - /// Authoritative source identifier. - pub source_id: Uuid, - /// Captured source status. - pub source_status: Option, - /// Captured source update timestamp. - pub source_updated_at: Option, - /// Captured source content hash. - pub source_content_hash: Option, - /// Captured source snapshot. - pub source_snapshot: Value, - /// Citation-local metadata. - pub citation_metadata: Value, - /// Creation timestamp. - pub created_at: OffsetDateTime, -} -impl From for KnowledgePageSourceRefResponse { - fn from(source_ref: KnowledgePageSourceRef) -> Self { - Self { - ref_id: source_ref.ref_id, - page_id: source_ref.page_id, - section_id: source_ref.section_id, - source_kind: source_ref.source_kind, - source_id: source_ref.source_id, - source_status: source_ref.source_status, - source_updated_at: source_ref.source_updated_at, - source_content_hash: source_ref.source_content_hash, - source_snapshot: source_ref.source_snapshot, - citation_metadata: source_ref.citation_metadata, - created_at: source_ref.created_at, - } - } -} - -/// Readback DTO for one knowledge page lint finding. -#[derive(Clone, Debug, Serialize)] -pub struct KnowledgePageLintFindingResponse { - /// Lint finding identifier. - pub finding_id: Uuid, - /// Parent page identifier. - pub page_id: Uuid, - /// Associated section, when available. - pub section_id: Option, - /// Finding type. - pub finding_type: String, - /// Finding severity. - pub severity: String, - /// Source kind associated with the finding, when available. - pub source_kind: Option, - /// Source identifier associated with the finding, when available. - pub source_id: Option, - /// Human-readable finding message. - pub message: String, - /// Structured finding details. - pub details: Value, - /// Operator guidance for repair or rebuild. - pub repair_guidance: String, - /// Creation timestamp. - pub created_at: OffsetDateTime, -} -impl From for KnowledgePageLintFindingResponse { - fn from(finding: KnowledgePageLintFinding) -> Self { - let repair_guidance = - api::repair_guidance_for_finding_type(finding.finding_type.as_str()).to_string(); - - Self { - finding_id: finding.finding_id, - page_id: finding.page_id, - section_id: finding.section_id, - finding_type: finding.finding_type, - severity: finding.severity, - source_kind: finding.source_kind, - source_id: finding.source_id, - message: finding.message, - repair_guidance, - details: finding.details, - created_at: finding.created_at, - } - } -} diff --git a/packages/elf-service/src/knowledge/api/readback/lint.rs b/packages/elf-service/src/knowledge/api/readback/lint.rs new file mode 100644 index 00000000..9f6d1112 --- /dev/null +++ b/packages/elf-service/src/knowledge/api/readback/lint.rs @@ -0,0 +1,50 @@ +use crate::knowledge::api::{ + self, KnowledgePageLintFinding, OffsetDateTime, Serialize, Uuid, Value, +}; + +/// Readback DTO for one knowledge page lint finding. +#[derive(Clone, Debug, Serialize)] +pub struct KnowledgePageLintFindingResponse { + /// Lint finding identifier. + pub finding_id: Uuid, + /// Parent page identifier. + pub page_id: Uuid, + /// Associated section, when available. + pub section_id: Option, + /// Finding type. + pub finding_type: String, + /// Finding severity. + pub severity: String, + /// Source kind associated with the finding, when available. + pub source_kind: Option, + /// Source identifier associated with the finding, when available. + pub source_id: Option, + /// Human-readable finding message. + pub message: String, + /// Structured finding details. + pub details: Value, + /// Operator guidance for repair or rebuild. + pub repair_guidance: String, + /// Creation timestamp. + pub created_at: OffsetDateTime, +} +impl From for KnowledgePageLintFindingResponse { + fn from(finding: KnowledgePageLintFinding) -> Self { + let repair_guidance = + api::repair_guidance_for_finding_type(finding.finding_type.as_str()).to_string(); + + Self { + finding_id: finding.finding_id, + page_id: finding.page_id, + section_id: finding.section_id, + finding_type: finding.finding_type, + severity: finding.severity, + source_kind: finding.source_kind, + source_id: finding.source_id, + message: finding.message, + repair_guidance, + details: finding.details, + created_at: finding.created_at, + } + } +} diff --git a/packages/elf-service/src/knowledge/api/readback/page.rs b/packages/elf-service/src/knowledge/api/readback/page.rs new file mode 100644 index 00000000..b6a02120 --- /dev/null +++ b/packages/elf-service/src/knowledge/api/readback/page.rs @@ -0,0 +1,17 @@ +use crate::knowledge::api::{ + KnowledgePageLintFindingResponse, KnowledgePageSectionResponse, KnowledgePageSourceRefResponse, + KnowledgePageSummary, Serialize, +}; + +/// Full readback DTO for one derived knowledge page. +#[derive(Clone, Debug, Serialize)] +pub struct KnowledgePageResponse { + /// Page summary. + pub page: KnowledgePageSummary, + /// Page sections. + pub sections: Vec, + /// Normalized source refs. + pub source_refs: Vec, + /// Lint findings. + pub lint_findings: Vec, +} diff --git a/packages/elf-service/src/knowledge/api/readback/responses.rs b/packages/elf-service/src/knowledge/api/readback/responses.rs new file mode 100644 index 00000000..89fad786 --- /dev/null +++ b/packages/elf-service/src/knowledge/api/readback/responses.rs @@ -0,0 +1,26 @@ +use crate::knowledge::api::{ + KnowledgePageLintFindingResponse, KnowledgePageResponse, KnowledgePageSummary, Serialize, Uuid, +}; + +/// Response returned after rebuilding a derived knowledge page. +#[derive(Clone, Debug, Serialize)] +pub struct KnowledgePageRebuildResponse { + /// Rebuilt page with sections, source refs, and lint findings. + pub page: KnowledgePageResponse, +} + +/// Response returned by derived knowledge page listing. +#[derive(Clone, Debug, Serialize)] +pub struct KnowledgePagesListResponse { + /// Returned pages. + pub pages: Vec, +} + +/// Response returned after linting one knowledge page. +#[derive(Clone, Debug, Serialize)] +pub struct KnowledgePageLintResponse { + /// Page identifier. + pub page_id: Uuid, + /// Current lint findings. + pub findings: Vec, +} diff --git a/packages/elf-service/src/knowledge/api/readback/sections.rs b/packages/elf-service/src/knowledge/api/readback/sections.rs new file mode 100644 index 00000000..b88902b0 --- /dev/null +++ b/packages/elf-service/src/knowledge/api/readback/sections.rs @@ -0,0 +1,88 @@ +use crate::knowledge::api::{ + KnowledgePageSection, KnowledgePageSourceRef, OffsetDateTime, Serialize, Uuid, Value, +}; + +/// Readback DTO for one page section. +#[derive(Clone, Debug, Serialize)] +pub struct KnowledgePageSectionResponse { + /// Section identifier. + pub section_id: Uuid, + /// Parent page identifier. + pub page_id: Uuid, + /// Stable section key. + pub section_key: String, + /// Section heading. + pub heading: String, + /// Section role. + pub role: String, + /// Section content. + pub content: String, + /// Display order. + pub ordinal: i32, + /// Serialized citation array. + pub citations: Value, + /// Reason this section is intentionally unsupported, when present. + pub unsupported_reason: Option, + /// Count of section-local citations. + pub citation_count: usize, + /// Count of normalized source refs attached to this section. + pub source_ref_count: usize, + /// True when the section has both citations and normalized source backlinks. + pub coverage_complete: bool, + /// Section-local normalized source backlinks. + pub source_backlinks: Vec, + /// Section content hash. + pub content_hash: String, + /// Creation timestamp. + pub created_at: OffsetDateTime, + /// Last update timestamp. + pub updated_at: OffsetDateTime, +} +impl From for KnowledgePageSectionResponse { + fn from(section: KnowledgePageSection) -> Self { + Self { + section_id: section.section_id, + page_id: section.page_id, + section_key: section.section_key, + heading: section.heading, + role: section.role, + content: section.content, + ordinal: section.ordinal, + citations: section.citations, + unsupported_reason: section.unsupported_reason, + citation_count: 0, + source_ref_count: 0, + coverage_complete: false, + source_backlinks: Vec::new(), + content_hash: section.content_hash, + created_at: section.created_at, + updated_at: section.updated_at, + } + } +} + +/// Section-local source backlink used by page readback and viewer provenance. +#[derive(Clone, Debug, Serialize)] +pub struct KnowledgePageSectionSourceBacklink { + /// Source kind. + pub source_kind: String, + /// Authoritative source identifier. + pub source_id: Uuid, + /// Captured source status. + pub source_status: Option, + /// Captured source update timestamp. + pub source_updated_at: Option, + /// Captured source content hash. + pub source_content_hash: Option, +} +impl From<&KnowledgePageSourceRef> for KnowledgePageSectionSourceBacklink { + fn from(source_ref: &KnowledgePageSourceRef) -> Self { + Self { + source_kind: source_ref.source_kind.clone(), + source_id: source_ref.source_id, + source_status: source_ref.source_status.clone(), + source_updated_at: source_ref.source_updated_at, + source_content_hash: source_ref.source_content_hash.clone(), + } + } +} diff --git a/packages/elf-service/src/knowledge/api/readback/sources.rs b/packages/elf-service/src/knowledge/api/readback/sources.rs new file mode 100644 index 00000000..fb696a28 --- /dev/null +++ b/packages/elf-service/src/knowledge/api/readback/sources.rs @@ -0,0 +1,45 @@ +use crate::knowledge::api::{KnowledgePageSourceRef, OffsetDateTime, Serialize, Uuid, Value}; + +/// Readback DTO for one normalized source reference. +#[derive(Clone, Debug, Serialize)] +pub struct KnowledgePageSourceRefResponse { + /// Source-reference row identifier. + pub ref_id: Uuid, + /// Parent page identifier. + pub page_id: Uuid, + /// Citing section, when section-scoped. + pub section_id: Option, + /// Source kind. + pub source_kind: String, + /// Authoritative source identifier. + pub source_id: Uuid, + /// Captured source status. + pub source_status: Option, + /// Captured source update timestamp. + pub source_updated_at: Option, + /// Captured source content hash. + pub source_content_hash: Option, + /// Captured source snapshot. + pub source_snapshot: Value, + /// Citation-local metadata. + pub citation_metadata: Value, + /// Creation timestamp. + pub created_at: OffsetDateTime, +} +impl From for KnowledgePageSourceRefResponse { + fn from(source_ref: KnowledgePageSourceRef) -> Self { + Self { + ref_id: source_ref.ref_id, + page_id: source_ref.page_id, + section_id: source_ref.section_id, + source_kind: source_ref.source_kind, + source_id: source_ref.source_id, + source_status: source_ref.source_status, + source_updated_at: source_ref.source_updated_at, + source_content_hash: source_ref.source_content_hash, + source_snapshot: source_ref.source_snapshot, + citation_metadata: source_ref.citation_metadata, + created_at: source_ref.created_at, + } + } +} diff --git a/packages/elf-service/src/knowledge/api/readback/summary.rs b/packages/elf-service/src/knowledge/api/readback/summary.rs new file mode 100644 index 00000000..b0fb2540 --- /dev/null +++ b/packages/elf-service/src/knowledge/api/readback/summary.rs @@ -0,0 +1,60 @@ +use crate::knowledge::api::{self, KnowledgePage, OffsetDateTime, Serialize, Uuid, Value}; + +/// Summary DTO for one derived knowledge page. +#[derive(Clone, Debug, Serialize)] +pub struct KnowledgePageSummary { + /// Page identifier. + pub page_id: Uuid, + /// Tenant that owns the page. + pub tenant_id: String, + /// Project that owns the page. + pub project_id: String, + /// Page kind. + pub page_kind: String, + /// Stable page key. + pub page_key: String, + /// Page title. + pub title: String, + /// Versioned page contract schema. + pub contract_schema: String, + /// Page lifecycle status. + pub status: String, + /// Canonical source snapshot hash. + pub rebuild_source_hash: String, + /// Canonical page content hash. + pub content_hash: String, + /// Source coverage metadata. + pub source_coverage: Value, + /// Rebuild metadata. + pub rebuild_metadata: Value, + /// Previous-version diff metadata, when present. + pub previous_version_diff: Option, + /// Creation timestamp. + pub created_at: OffsetDateTime, + /// Last update timestamp. + pub updated_at: OffsetDateTime, + /// Last rebuild timestamp. + pub rebuilt_at: OffsetDateTime, +} +impl From for KnowledgePageSummary { + fn from(page: KnowledgePage) -> Self { + Self { + page_id: page.page_id, + tenant_id: page.tenant_id, + project_id: page.project_id, + page_kind: page.page_kind, + page_key: page.page_key, + title: page.title, + contract_schema: page.contract_schema, + status: page.status, + rebuild_source_hash: page.rebuild_source_hash, + content_hash: page.content_hash, + source_coverage: page.source_coverage, + previous_version_diff: api::previous_version_diff_from_metadata(&page.rebuild_metadata), + rebuild_metadata: page.rebuild_metadata, + created_at: page.created_at, + updated_at: page.updated_at, + rebuilt_at: page.rebuilt_at, + } + } +} From e212a661e9c7be1a9cc5ec6c7da4609a7474ac27 Mon Sep 17 00:00:00 2001 From: Yvette Carlisle Date: Tue, 30 Jun 2026 08:50:40 -0400 Subject: [PATCH 2/6] {"schema":"decodex/commit/1","summary":"Split promotion payload modules","authority":"manual"} --- .../src/consolidation/promotion/payload.rs | 302 +----------------- .../consolidation/promotion/payload/decode.rs | 32 ++ .../consolidation/promotion/payload/refs.rs | 62 ++++ .../consolidation/promotion/payload/scope.rs | 34 ++ .../consolidation/promotion/payload/tests.rs | 138 ++++++++ .../promotion/payload/validate.rs | 26 ++ 6 files changed, 305 insertions(+), 289 deletions(-) create mode 100644 packages/elf-service/src/consolidation/promotion/payload/decode.rs create mode 100644 packages/elf-service/src/consolidation/promotion/payload/refs.rs create mode 100644 packages/elf-service/src/consolidation/promotion/payload/scope.rs create mode 100644 packages/elf-service/src/consolidation/promotion/payload/tests.rs create mode 100644 packages/elf-service/src/consolidation/promotion/payload/validate.rs diff --git a/packages/elf-service/src/consolidation/promotion/payload.rs b/packages/elf-service/src/consolidation/promotion/payload.rs index 5487fb0c..9f82dbc6 100644 --- a/packages/elf-service/src/consolidation/promotion/payload.rs +++ b/packages/elf-service/src/consolidation/promotion/payload.rs @@ -1,289 +1,13 @@ -use serde_json::Value; -use time::OffsetDateTime; -use uuid::Uuid; - -use crate::{Error, Result, access::ORG_PROJECT_ID, consolidation::types::PromotedMemoryPayload}; -use elf_config::Config; -use elf_domain::writegate::{self, NoteInput}; -use elf_storage::models::ConsolidationProposal; - -pub(in crate::consolidation) fn promoted_memory_scope( - payload: &PromotedMemoryPayload, - default_scope: &str, -) -> Result { - match payload.scope.as_deref() { - Some(raw) => { - let scope = raw.trim(); - - if scope.is_empty() { - return Err(Error::InvalidRequest { - message: "proposed_payload.scope must not be empty when provided.".to_string(), - }); - } - - Ok(scope.to_string()) - }, - None => Ok(default_scope.to_string()), - } -} - -pub(in crate::consolidation) fn promoted_memory_project_id<'a>( - proposal_project_id: &'a str, - scope: &str, -) -> &'a str { - if scope == "org_shared" { ORG_PROJECT_ID } else { proposal_project_id } -} - -pub(in crate::consolidation) fn promoted_memory_target_ref( - note_id: Uuid, - now: OffsetDateTime, -) -> Value { - serde_json::json!({ - "schema": "elf.memory_record_ref/v1", - "kind": "note", - "id": note_id, - "status": "active", - "applied_at": now, - }) -} - -pub(super) fn decode_promoted_memory_payload( - proposal: &ConsolidationProposal, -) -> Result { - let payload: PromotedMemoryPayload = serde_json::from_value(proposal.proposed_payload.clone()) - .map_err(|err| Error::InvalidRequest { - message: format!("proposed_payload is not a memory note payload: {err}"), - })?; - - if !matches!(payload.source_ref, Value::Object(_)) { - return Err(Error::InvalidRequest { - message: "proposed_payload.source_ref must be a JSON object when provided.".to_string(), - }); - } - if payload.importance.is_some_and(invalid_score) - || payload.confidence.is_some_and(invalid_score) - { - return Err(Error::InvalidRequest { - message: "proposed memory scores must be finite values in 0.0..=1.0.".to_string(), - }); - } - - Ok(payload) -} - -pub(super) fn validate_promoted_memory_payload( - payload: &PromotedMemoryPayload, - effective_scope: &str, - cfg: &Config, -) -> Result<()> { - let gate = NoteInput { - note_type: payload.note_type.clone(), - scope: effective_scope.to_string(), - text: payload.text.clone(), - }; - - if let Err(code) = writegate::writegate(&gate, cfg) { - return Err(Error::InvalidRequest { - message: format!( - "proposed memory failed writegate: {}", - crate::writegate_reason_code(code) - ), - }); - } - - Ok(()) -} - -pub(super) fn target_note_id(proposal: &ConsolidationProposal) -> Result { - let raw = proposal - .target_ref - .get("id") - .or_else(|| proposal.target_ref.get("note_id")) - .and_then(Value::as_str) - .ok_or_else(|| Error::InvalidRequest { - message: "update_derived_note requires target_ref.id or target_ref.note_id." - .to_string(), - })?; - - Uuid::parse_str(raw).map_err(|err| Error::InvalidRequest { - message: format!("target_ref note id is invalid: {err}"), - }) -} - -pub(super) fn promotion_source_ref( - proposal: &ConsolidationProposal, - proposed_source_ref: &Value, - reviewer_agent_id: &str, - review_comment: Option<&str>, - now: OffsetDateTime, -) -> Value { - serde_json::json!({ - "schema": "elf.memory_promotion/v1", - "proposal_id": proposal.proposal_id, - "run_id": proposal.run_id, - "proposal_kind": proposal.proposal_kind, - "apply_intent": proposal.apply_intent, - "source_refs": proposal.source_refs, - "source_snapshot": proposal.source_snapshot, - "lineage": proposal.lineage, - "unsupported_claim_flags": proposal.unsupported_claim_flags, - "review": { - "action": "apply", - "reviewer_agent_id": reviewer_agent_id, - "review_comment": review_comment, - "applied_at": now, - }, - "proposed_source_ref": proposed_source_ref, - }) -} - -pub(super) fn normalized_optional_string(value: Option) -> Option { - value.map(|raw| raw.trim().to_string()).filter(|trimmed| !trimmed.is_empty()) -} - -fn invalid_score(score: f32) -> bool { - !score.is_finite() || !(0.0..=1.0).contains(&score) -} - -#[cfg(test)] -mod tests { - use std::path::PathBuf; - - use serde_json::Value; - use time::OffsetDateTime; - use uuid::Uuid; - - use crate::consolidation::promotion::payload; - use elf_storage::models::ConsolidationProposal; - - fn config() -> elf_config::Config { - let path = PathBuf::from(env!("CARGO_MANIFEST_DIR")) - .join("../elf-config/tests/fixtures/sample_config.template.toml"); - - elf_config::load(path.as_path()).expect("test config should load") - } - - fn proposal(target_ref: Value, proposed_payload: Value) -> ConsolidationProposal { - ConsolidationProposal { - proposal_id: Uuid::from_u128(1), - run_id: Uuid::from_u128(2), - tenant_id: "tenant-a".to_string(), - project_id: "project-a".to_string(), - agent_id: "agent-a".to_string(), - contract_schema: "elf.consolidation.proposal/v1".to_string(), - proposal_kind: "derived_note".to_string(), - apply_intent: "create_derived_note".to_string(), - review_state: "approved".to_string(), - source_refs: serde_json::json!([{"kind": "note", "id": "source-a"}]), - source_snapshot: serde_json::json!({"captured": true}), - lineage: serde_json::json!({"generated_by": "test"}), - diff: serde_json::json!({}), - confidence: 0.75, - unsupported_claim_flags: serde_json::json!([]), - contradiction_markers: serde_json::json!([]), - staleness_markers: serde_json::json!([]), - target_ref, - proposed_payload, - reviewer_agent_id: None, - review_comment: None, - reviewed_at: None, - created_at: OffsetDateTime::UNIX_EPOCH, - updated_at: OffsetDateTime::UNIX_EPOCH, - } - } - - fn valid_payload() -> Value { - serde_json::json!({ - "type": "fact", - "text": "Fact: Promotion payloads keep explicit evidence.", - "source_ref": {"kind": "note", "id": "source-a"}, - "importance": 0.5, - "confidence": 1.0 - }) - } - - #[test] - fn decode_promoted_memory_payload_rejects_non_object_source_ref_and_bad_scores() { - let bad_source = proposal( - serde_json::json!({}), - serde_json::json!({ - "type": "fact", - "text": "Fact: source_ref must stay structured.", - "source_ref": ["not", "object"] - }), - ); - let bad_score = proposal( - serde_json::json!({}), - serde_json::json!({ - "type": "fact", - "text": "Fact: score bounds are enforced.", - "source_ref": {}, - "importance": 1.5 - }), - ); - - assert!(payload::decode_promoted_memory_payload(&bad_source).is_err()); - assert!(payload::decode_promoted_memory_payload(&bad_score).is_err()); - } - - #[test] - fn validate_promoted_memory_payload_maps_writegate_rejections() { - let payload = payload::decode_promoted_memory_payload(&proposal( - serde_json::json!({}), - serde_json::json!({ - "type": "fact", - "text": "", - "source_ref": {} - }), - )) - .expect("payload shape should decode"); - let err = payload::validate_promoted_memory_payload(&payload, "agent_private", &config()) - .expect_err("empty text should fail writegate"); - - assert!( - matches!(err, crate::Error::InvalidRequest { message } if message.contains("REJECT_EMPTY")) - ); - } - - #[test] - fn normalized_optional_string_trims_and_drops_empty_values() { - assert_eq!( - payload::normalized_optional_string(Some(" memory-key ".to_string())), - Some("memory-key".to_string()) - ); - assert_eq!(payload::normalized_optional_string(Some(" ".to_string())), None); - assert_eq!(payload::normalized_optional_string(None), None); - } - - #[test] - fn target_note_id_accepts_id_and_note_id_alias() { - let note_id = Uuid::from_u128(42); - let by_id = proposal(serde_json::json!({"id": note_id}), valid_payload()); - let by_note_id = proposal(serde_json::json!({"note_id": note_id}), valid_payload()); - - assert_eq!(payload::target_note_id(&by_id).expect("id should parse"), note_id); - assert_eq!(payload::target_note_id(&by_note_id).expect("note_id should parse"), note_id); - } - - #[test] - fn promotion_refs_preserve_schema_and_review_context() { - let proposal = proposal(serde_json::json!({}), valid_payload()); - let note_id = Uuid::from_u128(99); - let target_ref = payload::promoted_memory_target_ref(note_id, OffsetDateTime::UNIX_EPOCH); - let source_ref = payload::promotion_source_ref( - &proposal, - &serde_json::json!({"kind": "note", "id": "source-a"}), - "reviewer-a", - Some("approved"), - OffsetDateTime::UNIX_EPOCH, - ); - - assert_eq!(target_ref["schema"], "elf.memory_record_ref/v1"); - assert_eq!(target_ref["kind"], "note"); - assert_eq!(target_ref["id"], note_id.to_string()); - assert_eq!(source_ref["schema"], "elf.memory_promotion/v1"); - assert_eq!(source_ref["review"]["action"], "apply"); - assert_eq!(source_ref["review"]["reviewer_agent_id"], "reviewer-a"); - assert_eq!(source_ref["proposed_source_ref"]["kind"], "note"); - } -} +mod decode; +mod refs; +mod scope; +mod validate; + +pub(in crate::consolidation) use self::{ + decode::decode_promoted_memory_payload, + refs::{promoted_memory_target_ref, promotion_source_ref, target_note_id}, + scope::{normalized_optional_string, promoted_memory_project_id, promoted_memory_scope}, + validate::validate_promoted_memory_payload, +}; + +#[cfg(test)] mod tests; diff --git a/packages/elf-service/src/consolidation/promotion/payload/decode.rs b/packages/elf-service/src/consolidation/promotion/payload/decode.rs new file mode 100644 index 00000000..59fbfac9 --- /dev/null +++ b/packages/elf-service/src/consolidation/promotion/payload/decode.rs @@ -0,0 +1,32 @@ +use serde_json::Value; + +use crate::{Error, Result, consolidation::types::PromotedMemoryPayload}; +use elf_storage::models::ConsolidationProposal; + +pub(in crate::consolidation) fn decode_promoted_memory_payload( + proposal: &ConsolidationProposal, +) -> Result { + let payload: PromotedMemoryPayload = serde_json::from_value(proposal.proposed_payload.clone()) + .map_err(|err| Error::InvalidRequest { + message: format!("proposed_payload is not a memory note payload: {err}"), + })?; + + if !matches!(payload.source_ref, Value::Object(_)) { + return Err(Error::InvalidRequest { + message: "proposed_payload.source_ref must be a JSON object when provided.".to_string(), + }); + } + if payload.importance.is_some_and(invalid_score) + || payload.confidence.is_some_and(invalid_score) + { + return Err(Error::InvalidRequest { + message: "proposed memory scores must be finite values in 0.0..=1.0.".to_string(), + }); + } + + Ok(payload) +} + +fn invalid_score(score: f32) -> bool { + !score.is_finite() || !(0.0..=1.0).contains(&score) +} diff --git a/packages/elf-service/src/consolidation/promotion/payload/refs.rs b/packages/elf-service/src/consolidation/promotion/payload/refs.rs new file mode 100644 index 00000000..45cc88d4 --- /dev/null +++ b/packages/elf-service/src/consolidation/promotion/payload/refs.rs @@ -0,0 +1,62 @@ +use serde_json::Value; +use time::OffsetDateTime; +use uuid::Uuid; + +use crate::{Error, Result}; +use elf_storage::models::ConsolidationProposal; + +pub(in crate::consolidation) fn promoted_memory_target_ref( + note_id: Uuid, + now: OffsetDateTime, +) -> Value { + serde_json::json!({ + "schema": "elf.memory_record_ref/v1", + "kind": "note", + "id": note_id, + "status": "active", + "applied_at": now, + }) +} + +pub(in crate::consolidation) fn target_note_id(proposal: &ConsolidationProposal) -> Result { + let raw = proposal + .target_ref + .get("id") + .or_else(|| proposal.target_ref.get("note_id")) + .and_then(Value::as_str) + .ok_or_else(|| Error::InvalidRequest { + message: "update_derived_note requires target_ref.id or target_ref.note_id." + .to_string(), + })?; + + Uuid::parse_str(raw).map_err(|err| Error::InvalidRequest { + message: format!("target_ref note id is invalid: {err}"), + }) +} + +pub(in crate::consolidation) fn promotion_source_ref( + proposal: &ConsolidationProposal, + proposed_source_ref: &Value, + reviewer_agent_id: &str, + review_comment: Option<&str>, + now: OffsetDateTime, +) -> Value { + serde_json::json!({ + "schema": "elf.memory_promotion/v1", + "proposal_id": proposal.proposal_id, + "run_id": proposal.run_id, + "proposal_kind": proposal.proposal_kind, + "apply_intent": proposal.apply_intent, + "source_refs": proposal.source_refs, + "source_snapshot": proposal.source_snapshot, + "lineage": proposal.lineage, + "unsupported_claim_flags": proposal.unsupported_claim_flags, + "review": { + "action": "apply", + "reviewer_agent_id": reviewer_agent_id, + "review_comment": review_comment, + "applied_at": now, + }, + "proposed_source_ref": proposed_source_ref, + }) +} diff --git a/packages/elf-service/src/consolidation/promotion/payload/scope.rs b/packages/elf-service/src/consolidation/promotion/payload/scope.rs new file mode 100644 index 00000000..aa9c2585 --- /dev/null +++ b/packages/elf-service/src/consolidation/promotion/payload/scope.rs @@ -0,0 +1,34 @@ +use crate::{Error, Result, access::ORG_PROJECT_ID, consolidation::types::PromotedMemoryPayload}; + +pub(in crate::consolidation) fn promoted_memory_scope( + payload: &PromotedMemoryPayload, + default_scope: &str, +) -> Result { + match payload.scope.as_deref() { + Some(raw) => { + let scope = raw.trim(); + + if scope.is_empty() { + return Err(Error::InvalidRequest { + message: "proposed_payload.scope must not be empty when provided.".to_string(), + }); + } + + Ok(scope.to_string()) + }, + None => Ok(default_scope.to_string()), + } +} + +pub(in crate::consolidation) fn promoted_memory_project_id<'a>( + proposal_project_id: &'a str, + scope: &str, +) -> &'a str { + if scope == "org_shared" { ORG_PROJECT_ID } else { proposal_project_id } +} + +pub(in crate::consolidation) fn normalized_optional_string( + value: Option, +) -> Option { + value.map(|raw| raw.trim().to_string()).filter(|trimmed| !trimmed.is_empty()) +} diff --git a/packages/elf-service/src/consolidation/promotion/payload/tests.rs b/packages/elf-service/src/consolidation/promotion/payload/tests.rs new file mode 100644 index 00000000..3ad673e3 --- /dev/null +++ b/packages/elf-service/src/consolidation/promotion/payload/tests.rs @@ -0,0 +1,138 @@ +use std::path::PathBuf; + +use serde_json::Value; +use time::OffsetDateTime; +use uuid::Uuid; + +use crate::Error; +use elf_config::Config; +use elf_storage::models::ConsolidationProposal; + +fn config() -> Config { + let path = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("../elf-config/tests/fixtures/sample_config.template.toml"); + + elf_config::load(path.as_path()).expect("test config should load") +} + +fn proposal(target_ref: Value, proposed_payload: Value) -> ConsolidationProposal { + ConsolidationProposal { + proposal_id: Uuid::from_u128(1), + run_id: Uuid::from_u128(2), + tenant_id: "tenant-a".to_string(), + project_id: "project-a".to_string(), + agent_id: "agent-a".to_string(), + contract_schema: "elf.consolidation.proposal/v1".to_string(), + proposal_kind: "derived_note".to_string(), + apply_intent: "create_derived_note".to_string(), + review_state: "approved".to_string(), + source_refs: serde_json::json!([{"kind": "note", "id": "source-a"}]), + source_snapshot: serde_json::json!({"captured": true}), + lineage: serde_json::json!({"generated_by": "test"}), + diff: serde_json::json!({}), + confidence: 0.75, + unsupported_claim_flags: serde_json::json!([]), + contradiction_markers: serde_json::json!([]), + staleness_markers: serde_json::json!([]), + target_ref, + proposed_payload, + reviewer_agent_id: None, + review_comment: None, + reviewed_at: None, + created_at: OffsetDateTime::UNIX_EPOCH, + updated_at: OffsetDateTime::UNIX_EPOCH, + } +} + +fn valid_payload() -> Value { + serde_json::json!({ + "type": "fact", + "text": "Fact: Promotion payloads keep explicit evidence.", + "source_ref": {"kind": "note", "id": "source-a"}, + "importance": 0.5, + "confidence": 1.0 + }) +} + +#[test] +fn decode_promoted_memory_payload_rejects_non_object_source_ref_and_bad_scores() { + let bad_source = proposal( + serde_json::json!({}), + serde_json::json!({ + "type": "fact", + "text": "Fact: source_ref must stay structured.", + "source_ref": ["not", "object"] + }), + ); + let bad_score = proposal( + serde_json::json!({}), + serde_json::json!({ + "type": "fact", + "text": "Fact: score bounds are enforced.", + "source_ref": {}, + "importance": 1.5 + }), + ); + + assert!(super::decode_promoted_memory_payload(&bad_source).is_err()); + assert!(super::decode_promoted_memory_payload(&bad_score).is_err()); +} + +#[test] +fn validate_promoted_memory_payload_maps_writegate_rejections() { + let payload = super::decode_promoted_memory_payload(&proposal( + serde_json::json!({}), + serde_json::json!({ + "type": "fact", + "text": "", + "source_ref": {} + }), + )) + .expect("payload shape should decode"); + let err = super::validate_promoted_memory_payload(&payload, "agent_private", &config()) + .expect_err("empty text should fail writegate"); + + assert!(matches!(err, Error::InvalidRequest { message } if message.contains("REJECT_EMPTY"))); +} + +#[test] +fn normalized_optional_string_trims_and_drops_empty_values() { + assert_eq!( + super::normalized_optional_string(Some(" memory-key ".to_string())), + Some("memory-key".to_string()) + ); + assert_eq!(super::normalized_optional_string(Some(" ".to_string())), None); + assert_eq!(super::normalized_optional_string(None), None); +} + +#[test] +fn target_note_id_accepts_id_and_note_id_alias() { + let note_id = Uuid::from_u128(42); + let by_id = proposal(serde_json::json!({"id": note_id}), valid_payload()); + let by_note_id = proposal(serde_json::json!({"note_id": note_id}), valid_payload()); + + assert_eq!(super::target_note_id(&by_id).expect("id should parse"), note_id); + assert_eq!(super::target_note_id(&by_note_id).expect("note_id should parse"), note_id); +} + +#[test] +fn promotion_refs_preserve_schema_and_review_context() { + let proposal = proposal(serde_json::json!({}), valid_payload()); + let note_id = Uuid::from_u128(99); + let target_ref = super::promoted_memory_target_ref(note_id, OffsetDateTime::UNIX_EPOCH); + let source_ref = super::promotion_source_ref( + &proposal, + &serde_json::json!({"kind": "note", "id": "source-a"}), + "reviewer-a", + Some("approved"), + OffsetDateTime::UNIX_EPOCH, + ); + + assert_eq!(target_ref["schema"], "elf.memory_record_ref/v1"); + assert_eq!(target_ref["kind"], "note"); + assert_eq!(target_ref["id"], note_id.to_string()); + assert_eq!(source_ref["schema"], "elf.memory_promotion/v1"); + assert_eq!(source_ref["review"]["action"], "apply"); + assert_eq!(source_ref["review"]["reviewer_agent_id"], "reviewer-a"); + assert_eq!(source_ref["proposed_source_ref"]["kind"], "note"); +} diff --git a/packages/elf-service/src/consolidation/promotion/payload/validate.rs b/packages/elf-service/src/consolidation/promotion/payload/validate.rs new file mode 100644 index 00000000..5840cce2 --- /dev/null +++ b/packages/elf-service/src/consolidation/promotion/payload/validate.rs @@ -0,0 +1,26 @@ +use crate::{Error, Result, consolidation::types::PromotedMemoryPayload}; +use elf_config::Config; +use elf_domain::writegate::{self, NoteInput}; + +pub(in crate::consolidation) fn validate_promoted_memory_payload( + payload: &PromotedMemoryPayload, + effective_scope: &str, + cfg: &Config, +) -> Result<()> { + let gate = NoteInput { + note_type: payload.note_type.clone(), + scope: effective_scope.to_string(), + text: payload.text.clone(), + }; + + if let Err(code) = writegate::writegate(&gate, cfg) { + return Err(Error::InvalidRequest { + message: format!( + "proposed memory failed writegate: {}", + crate::writegate_reason_code(code) + ), + }); + } + + Ok(()) +} From be2ea66d4f7e06f624594b91be3349205aa3f9fd Mon Sep 17 00:00:00 2001 From: Yvette Carlisle Date: Tue, 30 Jun 2026 08:55:36 -0400 Subject: [PATCH 3/6] {"schema":"decodex/commit/1","summary":"Split memory correction storage modules","authority":"manual"} --- .../src/memory_corrections/storage.rs | 308 +----------------- .../src/memory_corrections/storage/args.rs | 12 + .../memory_corrections/storage/lifecycle.rs | 63 ++++ .../src/memory_corrections/storage/load.rs | 27 ++ .../memory_corrections/storage/mutations.rs | 154 +++++++++ .../memory_corrections/storage/versions.rs | 72 ++++ 6 files changed, 338 insertions(+), 298 deletions(-) create mode 100644 packages/elf-service/src/memory_corrections/storage/args.rs create mode 100644 packages/elf-service/src/memory_corrections/storage/lifecycle.rs create mode 100644 packages/elf-service/src/memory_corrections/storage/load.rs create mode 100644 packages/elf-service/src/memory_corrections/storage/mutations.rs create mode 100644 packages/elf-service/src/memory_corrections/storage/versions.rs diff --git a/packages/elf-service/src/memory_corrections/storage.rs b/packages/elf-service/src/memory_corrections/storage.rs index 62d46272..2e56a818 100644 --- a/packages/elf-service/src/memory_corrections/storage.rs +++ b/packages/elf-service/src/memory_corrections/storage.rs @@ -1,299 +1,11 @@ -use serde_json::Value; -use sqlx::{Postgres, Transaction}; -use time::OffsetDateTime; -use uuid::Uuid; - -use crate::{ - Error, InsertVersionArgs, Result, - access::ORG_PROJECT_ID, - memory_corrections::{ - types::MemoryCorrectionAction, - validation::{self}, - }, +mod args; +mod lifecycle; +mod load; +mod mutations; +mod versions; + +pub(super) use self::{ + args::RestoreNoteArgs, + load::load_note_for_correction, + mutations::{delete_note, restore_note, supersede_note}, }; -use elf_storage::models::MemoryNote; - -pub(super) struct RestoreNoteArgs<'a> { - pub(super) actor_agent_id: &'a str, - pub(super) reason: &'a str, - pub(super) correction_source_ref: &'a Value, - pub(super) restore_version_id: Option, - pub(super) embedding_version: &'a str, - pub(super) now: OffsetDateTime, -} - -pub(super) async fn load_note_for_correction( - tx: &mut Transaction<'_, Postgres>, - note_id: Uuid, - tenant_id: &str, - project_id: &str, -) -> Result { - sqlx::query_as::<_, MemoryNote>( - "\ -SELECT * -FROM memory_notes -WHERE note_id = $1 AND tenant_id = $2 AND project_id IN ($3, $4) -FOR UPDATE", - ) - .bind(note_id) - .bind(tenant_id) - .bind(project_id) - .bind(ORG_PROJECT_ID) - .fetch_optional(&mut **tx) - .await? - .ok_or_else(|| Error::InvalidRequest { message: "Note not found.".to_string() }) -} - -pub(super) async fn supersede_note( - tx: &mut Transaction<'_, Postgres>, - note: &mut MemoryNote, - actor_agent_id: &str, - reason: &str, - correction_source_ref: &Value, - now: OffsetDateTime, -) -> Result> { - if note.status == "deprecated" { - return Ok(None); - } - if note.status == "deleted" { - return Err(Error::InvalidRequest { - message: "Deleted memory must be restored before it can be superseded.".to_string(), - }); - } - - let prev_snapshot = crate::note_snapshot(note); - - note.status = "deprecated".to_string(); - note.updated_at = now; - note.source_ref = validation::correction_source_ref_for( - MemoryCorrectionAction::Supersede, - &prev_snapshot, - correction_source_ref, - reason, - actor_agent_id, - now, - None, - ); - - update_note_lifecycle(tx, note).await?; - - let version_id = insert_correction_version( - tx, - note, - "DEPRECATE", - prev_snapshot, - actor_agent_id, - reason, - now, - ) - .await?; - - crate::enqueue_outbox_tx(&mut **tx, note.note_id, "DELETE", ¬e.embedding_version, now) - .await?; - - Ok(Some(version_id)) -} - -pub(super) async fn delete_note( - tx: &mut Transaction<'_, Postgres>, - note: &mut MemoryNote, - actor_agent_id: &str, - reason: &str, - correction_source_ref: &Value, - now: OffsetDateTime, -) -> Result> { - if note.status == "deleted" { - return Ok(None); - } - - let prev_snapshot = crate::note_snapshot(note); - - note.status = "deleted".to_string(); - note.updated_at = now; - note.source_ref = validation::correction_source_ref_for( - MemoryCorrectionAction::Delete, - &prev_snapshot, - correction_source_ref, - reason, - actor_agent_id, - now, - None, - ); - - update_note_lifecycle(tx, note).await?; - - let version_id = - insert_correction_version(tx, note, "DELETE", prev_snapshot, actor_agent_id, reason, now) - .await?; - - crate::enqueue_outbox_tx(&mut **tx, note.note_id, "DELETE", ¬e.embedding_version, now) - .await?; - - Ok(Some(version_id)) -} - -pub(super) async fn restore_note( - tx: &mut Transaction<'_, Postgres>, - note: &mut MemoryNote, - args: RestoreNoteArgs<'_>, -) -> Result> { - if note.status == "active" { - return Ok(None); - } - - let (restore_version_id, restore_snapshot) = - load_restore_snapshot(tx, note.note_id, args.restore_version_id).await?; - let prev_snapshot = crate::note_snapshot(note); - - validation::apply_restore_snapshot(note, &restore_snapshot, args.now)?; - - note.embedding_version = args.embedding_version.to_string(); - note.source_ref = validation::correction_source_ref_for( - MemoryCorrectionAction::Restore, - &restore_snapshot, - args.correction_source_ref, - args.reason, - args.actor_agent_id, - args.now, - Some(restore_version_id), - ); - - update_note_restored(tx, note).await?; - - let version_id = insert_correction_version( - tx, - note, - "RESTORE", - prev_snapshot, - args.actor_agent_id, - args.reason, - args.now, - ) - .await?; - - crate::enqueue_outbox_tx(&mut **tx, note.note_id, "UPSERT", ¬e.embedding_version, args.now) - .await?; - - Ok(Some(version_id)) -} - -async fn update_note_lifecycle( - tx: &mut Transaction<'_, Postgres>, - note: &MemoryNote, -) -> Result<()> { - sqlx::query( - "\ -UPDATE memory_notes -SET status = $1, updated_at = $2, source_ref = $3 -WHERE note_id = $4", - ) - .bind(note.status.as_str()) - .bind(note.updated_at) - .bind(¬e.source_ref) - .bind(note.note_id) - .execute(&mut **tx) - .await?; - - Ok(()) -} - -async fn update_note_restored(tx: &mut Transaction<'_, Postgres>, note: &MemoryNote) -> Result<()> { - sqlx::query( - "\ -UPDATE memory_notes -SET - scope = $1, - type = $2, - key = $3, - text = $4, - importance = $5, - confidence = $6, - status = $7, - updated_at = $8, - expires_at = $9, - embedding_version = $10, - source_ref = $11 -WHERE note_id = $12", - ) - .bind(note.scope.as_str()) - .bind(note.r#type.as_str()) - .bind(note.key.as_deref()) - .bind(note.text.as_str()) - .bind(note.importance) - .bind(note.confidence) - .bind(note.status.as_str()) - .bind(note.updated_at) - .bind(note.expires_at) - .bind(note.embedding_version.as_str()) - .bind(¬e.source_ref) - .bind(note.note_id) - .execute(&mut **tx) - .await?; - - Ok(()) -} - -async fn insert_correction_version( - tx: &mut Transaction<'_, Postgres>, - note: &MemoryNote, - op: &str, - prev_snapshot: Value, - actor_agent_id: &str, - reason: &str, - now: OffsetDateTime, -) -> Result { - let reason = format!("memory_correction.{}: {reason}", op.to_ascii_lowercase()); - - crate::insert_version( - &mut **tx, - InsertVersionArgs { - note_id: note.note_id, - op, - prev_snapshot: Some(prev_snapshot), - new_snapshot: Some(crate::note_snapshot(note)), - reason: reason.as_str(), - actor: actor_agent_id, - ts: now, - }, - ) - .await -} - -async fn load_restore_snapshot( - tx: &mut Transaction<'_, Postgres>, - note_id: Uuid, - restore_version_id: Option, -) -> Result<(Uuid, Value)> { - let row: Option<(Uuid, Value)> = if let Some(version_id) = restore_version_id { - sqlx::query_as( - "\ -SELECT version_id, prev_snapshot -FROM memory_note_versions -WHERE note_id = $1 AND version_id = $2 AND prev_snapshot IS NOT NULL -LIMIT 1", - ) - .bind(note_id) - .bind(version_id) - .fetch_optional(&mut **tx) - .await? - } else { - sqlx::query_as( - "\ -SELECT version_id, prev_snapshot -FROM memory_note_versions -WHERE note_id = $1 - AND op IN ('DELETE', 'DEPRECATE') - AND prev_snapshot IS NOT NULL - AND prev_snapshot ->> 'status' = 'active' -ORDER BY ts DESC, version_id DESC -LIMIT 1", - ) - .bind(note_id) - .fetch_optional(&mut **tx) - .await? - }; - - row.ok_or_else(|| Error::InvalidRequest { - message: "No restorable memory snapshot was found.".to_string(), - }) -} diff --git a/packages/elf-service/src/memory_corrections/storage/args.rs b/packages/elf-service/src/memory_corrections/storage/args.rs new file mode 100644 index 00000000..284e362e --- /dev/null +++ b/packages/elf-service/src/memory_corrections/storage/args.rs @@ -0,0 +1,12 @@ +use serde_json::Value; +use time::OffsetDateTime; +use uuid::Uuid; + +pub(in crate::memory_corrections) struct RestoreNoteArgs<'a> { + pub(in crate::memory_corrections) actor_agent_id: &'a str, + pub(in crate::memory_corrections) reason: &'a str, + pub(in crate::memory_corrections) correction_source_ref: &'a Value, + pub(in crate::memory_corrections) restore_version_id: Option, + pub(in crate::memory_corrections) embedding_version: &'a str, + pub(in crate::memory_corrections) now: OffsetDateTime, +} diff --git a/packages/elf-service/src/memory_corrections/storage/lifecycle.rs b/packages/elf-service/src/memory_corrections/storage/lifecycle.rs new file mode 100644 index 00000000..b760ab67 --- /dev/null +++ b/packages/elf-service/src/memory_corrections/storage/lifecycle.rs @@ -0,0 +1,63 @@ +use sqlx::{Postgres, Transaction}; + +use crate::Result; +use elf_storage::models::MemoryNote; + +pub(super) async fn update_note_lifecycle( + tx: &mut Transaction<'_, Postgres>, + note: &MemoryNote, +) -> Result<()> { + sqlx::query( + "\ +UPDATE memory_notes +SET status = $1, updated_at = $2, source_ref = $3 +WHERE note_id = $4", + ) + .bind(note.status.as_str()) + .bind(note.updated_at) + .bind(¬e.source_ref) + .bind(note.note_id) + .execute(&mut **tx) + .await?; + + Ok(()) +} + +pub(super) async fn update_note_restored( + tx: &mut Transaction<'_, Postgres>, + note: &MemoryNote, +) -> Result<()> { + sqlx::query( + "\ +UPDATE memory_notes +SET + scope = $1, + type = $2, + key = $3, + text = $4, + importance = $5, + confidence = $6, + status = $7, + updated_at = $8, + expires_at = $9, + embedding_version = $10, + source_ref = $11 +WHERE note_id = $12", + ) + .bind(note.scope.as_str()) + .bind(note.r#type.as_str()) + .bind(note.key.as_deref()) + .bind(note.text.as_str()) + .bind(note.importance) + .bind(note.confidence) + .bind(note.status.as_str()) + .bind(note.updated_at) + .bind(note.expires_at) + .bind(note.embedding_version.as_str()) + .bind(¬e.source_ref) + .bind(note.note_id) + .execute(&mut **tx) + .await?; + + Ok(()) +} diff --git a/packages/elf-service/src/memory_corrections/storage/load.rs b/packages/elf-service/src/memory_corrections/storage/load.rs new file mode 100644 index 00000000..34fd7544 --- /dev/null +++ b/packages/elf-service/src/memory_corrections/storage/load.rs @@ -0,0 +1,27 @@ +use sqlx::{Postgres, Transaction}; +use uuid::Uuid; + +use crate::{Error, Result, access::ORG_PROJECT_ID}; +use elf_storage::models::MemoryNote; + +pub(in crate::memory_corrections) async fn load_note_for_correction( + tx: &mut Transaction<'_, Postgres>, + note_id: Uuid, + tenant_id: &str, + project_id: &str, +) -> Result { + sqlx::query_as::<_, MemoryNote>( + "\ +SELECT * +FROM memory_notes +WHERE note_id = $1 AND tenant_id = $2 AND project_id IN ($3, $4) +FOR UPDATE", + ) + .bind(note_id) + .bind(tenant_id) + .bind(project_id) + .bind(ORG_PROJECT_ID) + .fetch_optional(&mut **tx) + .await? + .ok_or_else(|| Error::InvalidRequest { message: "Note not found.".to_string() }) +} diff --git a/packages/elf-service/src/memory_corrections/storage/mutations.rs b/packages/elf-service/src/memory_corrections/storage/mutations.rs new file mode 100644 index 00000000..4ddd09cd --- /dev/null +++ b/packages/elf-service/src/memory_corrections/storage/mutations.rs @@ -0,0 +1,154 @@ +use serde_json::Value; +use sqlx::{Postgres, Transaction}; +use time::OffsetDateTime; +use uuid::Uuid; + +use crate::{ + Error, Result, + memory_corrections::{ + storage::{RestoreNoteArgs, lifecycle, versions}, + types::MemoryCorrectionAction, + validation::{self}, + }, +}; +use elf_storage::models::MemoryNote; + +pub(in crate::memory_corrections) async fn supersede_note( + tx: &mut Transaction<'_, Postgres>, + note: &mut MemoryNote, + actor_agent_id: &str, + reason: &str, + correction_source_ref: &Value, + now: OffsetDateTime, +) -> Result> { + if note.status == "deprecated" { + return Ok(None); + } + if note.status == "deleted" { + return Err(Error::InvalidRequest { + message: "Deleted memory must be restored before it can be superseded.".to_string(), + }); + } + + let prev_snapshot = crate::note_snapshot(note); + + note.status = "deprecated".to_string(); + note.updated_at = now; + note.source_ref = validation::correction_source_ref_for( + MemoryCorrectionAction::Supersede, + &prev_snapshot, + correction_source_ref, + reason, + actor_agent_id, + now, + None, + ); + + lifecycle::update_note_lifecycle(tx, note).await?; + + let version_id = versions::insert_correction_version( + tx, + note, + "DEPRECATE", + prev_snapshot, + actor_agent_id, + reason, + now, + ) + .await?; + + crate::enqueue_outbox_tx(&mut **tx, note.note_id, "DELETE", ¬e.embedding_version, now) + .await?; + + Ok(Some(version_id)) +} + +pub(in crate::memory_corrections) async fn delete_note( + tx: &mut Transaction<'_, Postgres>, + note: &mut MemoryNote, + actor_agent_id: &str, + reason: &str, + correction_source_ref: &Value, + now: OffsetDateTime, +) -> Result> { + if note.status == "deleted" { + return Ok(None); + } + + let prev_snapshot = crate::note_snapshot(note); + + note.status = "deleted".to_string(); + note.updated_at = now; + note.source_ref = validation::correction_source_ref_for( + MemoryCorrectionAction::Delete, + &prev_snapshot, + correction_source_ref, + reason, + actor_agent_id, + now, + None, + ); + + lifecycle::update_note_lifecycle(tx, note).await?; + + let version_id = versions::insert_correction_version( + tx, + note, + "DELETE", + prev_snapshot, + actor_agent_id, + reason, + now, + ) + .await?; + + crate::enqueue_outbox_tx(&mut **tx, note.note_id, "DELETE", ¬e.embedding_version, now) + .await?; + + Ok(Some(version_id)) +} + +pub(in crate::memory_corrections) async fn restore_note( + tx: &mut Transaction<'_, Postgres>, + note: &mut MemoryNote, + args: RestoreNoteArgs<'_>, +) -> Result> { + if note.status == "active" { + return Ok(None); + } + + let (restore_version_id, restore_snapshot) = + versions::load_restore_snapshot(tx, note.note_id, args.restore_version_id).await?; + let prev_snapshot = crate::note_snapshot(note); + + validation::apply_restore_snapshot(note, &restore_snapshot, args.now)?; + + note.embedding_version = args.embedding_version.to_string(); + note.source_ref = validation::correction_source_ref_for( + MemoryCorrectionAction::Restore, + &restore_snapshot, + args.correction_source_ref, + args.reason, + args.actor_agent_id, + args.now, + Some(restore_version_id), + ); + + lifecycle::update_note_restored(tx, note).await?; + + let version_id = versions::insert_correction_version( + tx, + note, + "RESTORE", + prev_snapshot, + args.actor_agent_id, + args.reason, + args.now, + ) + .await?; + + crate::enqueue_outbox_tx(&mut **tx, note.note_id, "UPSERT", ¬e.embedding_version, args.now) + .await?; + + Ok(Some(version_id)) +} diff --git a/packages/elf-service/src/memory_corrections/storage/versions.rs b/packages/elf-service/src/memory_corrections/storage/versions.rs new file mode 100644 index 00000000..3a102514 --- /dev/null +++ b/packages/elf-service/src/memory_corrections/storage/versions.rs @@ -0,0 +1,72 @@ +use serde_json::Value; +use sqlx::{Postgres, Transaction}; +use time::OffsetDateTime; +use uuid::Uuid; + +use crate::{Error, InsertVersionArgs, Result}; +use elf_storage::models::MemoryNote; + +pub(super) async fn insert_correction_version( + tx: &mut Transaction<'_, Postgres>, + note: &MemoryNote, + op: &str, + prev_snapshot: Value, + actor_agent_id: &str, + reason: &str, + now: OffsetDateTime, +) -> Result { + let reason = format!("memory_correction.{}: {reason}", op.to_ascii_lowercase()); + + crate::insert_version( + &mut **tx, + InsertVersionArgs { + note_id: note.note_id, + op, + prev_snapshot: Some(prev_snapshot), + new_snapshot: Some(crate::note_snapshot(note)), + reason: reason.as_str(), + actor: actor_agent_id, + ts: now, + }, + ) + .await +} + +pub(super) async fn load_restore_snapshot( + tx: &mut Transaction<'_, Postgres>, + note_id: Uuid, + restore_version_id: Option, +) -> Result<(Uuid, Value)> { + let row: Option<(Uuid, Value)> = if let Some(version_id) = restore_version_id { + sqlx::query_as( + "\ +SELECT version_id, prev_snapshot +FROM memory_note_versions +WHERE note_id = $1 AND version_id = $2 AND prev_snapshot IS NOT NULL +LIMIT 1", + ) + .bind(note_id) + .bind(version_id) + .fetch_optional(&mut **tx) + .await? + } else { + sqlx::query_as( + "\ +SELECT version_id, prev_snapshot +FROM memory_note_versions +WHERE note_id = $1 + AND op IN ('DELETE', 'DEPRECATE') + AND prev_snapshot IS NOT NULL + AND prev_snapshot ->> 'status' = 'active' +ORDER BY ts DESC, version_id DESC +LIMIT 1", + ) + .bind(note_id) + .fetch_optional(&mut **tx) + .await? + }; + + row.ok_or_else(|| Error::InvalidRequest { + message: "No restorable memory snapshot was found.".to_string(), + }) +} From 599094557977b9bb9a90ba6efd77fa3ac67ff981 Mon Sep 17 00:00:00 2001 From: Yvette Carlisle Date: Tue, 30 Jun 2026 09:01:20 -0400 Subject: [PATCH 4/6] {"schema":"decodex/commit/1","summary":"Split consolidation proposal storage modules","authority":"manual"} --- .../src/consolidation/proposals.rs | 322 +----------------- .../src/consolidation/proposals/insert.rs | 71 ++++ .../src/consolidation/proposals/read.rs | 135 ++++++++ .../src/consolidation/proposals/update.rs | 116 +++++++ 4 files changed, 330 insertions(+), 314 deletions(-) create mode 100644 packages/elf-storage/src/consolidation/proposals/insert.rs create mode 100644 packages/elf-storage/src/consolidation/proposals/read.rs create mode 100644 packages/elf-storage/src/consolidation/proposals/update.rs diff --git a/packages/elf-storage/src/consolidation/proposals.rs b/packages/elf-storage/src/consolidation/proposals.rs index 0633b03e..39a61ca5 100644 --- a/packages/elf-storage/src/consolidation/proposals.rs +++ b/packages/elf-storage/src/consolidation/proposals.rs @@ -1,315 +1,9 @@ -use sqlx::PgExecutor; -use uuid::Uuid; - -use crate::{ - Result, - consolidation::{ - sql::CONSOLIDATION_PROPOSAL_SELECT, - types::{ConsolidationProposalReviewUpdate, ConsolidationProposalTargetRefUpdate}, - }, - models::ConsolidationProposal, +mod insert; +mod read; +mod update; + +pub use self::{ + insert::insert_consolidation_proposal, + read::{get_consolidation_proposal, list_consolidation_proposals, lock_consolidation_proposal}, + update::{update_consolidation_proposal_review, update_consolidation_proposal_target_ref}, }; - -/// Inserts one consolidation proposal. -pub async fn insert_consolidation_proposal<'e, E>( - executor: E, - proposal: &ConsolidationProposal, -) -> Result<()> -where - E: PgExecutor<'e>, -{ - sqlx::query( - "\ -INSERT INTO consolidation_proposals ( - proposal_id, - run_id, - tenant_id, - project_id, - agent_id, - contract_schema, - proposal_kind, - apply_intent, - review_state, - source_refs, - source_snapshot, - lineage, - diff, - confidence, - unsupported_claim_flags, - contradiction_markers, - staleness_markers, - target_ref, - proposed_payload, - reviewer_agent_id, - review_comment, - reviewed_at, - created_at, - updated_at -) -VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,$20,$21,$22,$23,$24)", - ) - .bind(proposal.proposal_id) - .bind(proposal.run_id) - .bind(proposal.tenant_id.as_str()) - .bind(proposal.project_id.as_str()) - .bind(proposal.agent_id.as_str()) - .bind(proposal.contract_schema.as_str()) - .bind(proposal.proposal_kind.as_str()) - .bind(proposal.apply_intent.as_str()) - .bind(proposal.review_state.as_str()) - .bind(&proposal.source_refs) - .bind(&proposal.source_snapshot) - .bind(&proposal.lineage) - .bind(&proposal.diff) - .bind(proposal.confidence) - .bind(&proposal.unsupported_claim_flags) - .bind(&proposal.contradiction_markers) - .bind(&proposal.staleness_markers) - .bind(&proposal.target_ref) - .bind(&proposal.proposed_payload) - .bind(proposal.reviewer_agent_id.as_deref()) - .bind(proposal.review_comment.as_deref()) - .bind(proposal.reviewed_at) - .bind(proposal.created_at) - .bind(proposal.updated_at) - .execute(executor) - .await?; - - Ok(()) -} - -/// Fetches one consolidation proposal by tenant and proposal identifier. -pub async fn get_consolidation_proposal<'e, E>( - executor: E, - tenant_id: &str, - project_id: &str, - proposal_id: Uuid, -) -> Result> -where - E: PgExecutor<'e>, -{ - let row = sqlx::query_as::<_, ConsolidationProposal>(CONSOLIDATION_PROPOSAL_SELECT) - .bind(tenant_id) - .bind(project_id) - .bind(proposal_id) - .fetch_optional(executor) - .await?; - - Ok(row) -} - -/// Locks one consolidation proposal by tenant and proposal identifier. -pub async fn lock_consolidation_proposal<'e, E>( - executor: E, - tenant_id: &str, - project_id: &str, - proposal_id: Uuid, -) -> Result> -where - E: PgExecutor<'e>, -{ - let row = sqlx::query_as::<_, ConsolidationProposal>( - "\ -SELECT - proposal_id, - run_id, - tenant_id, - project_id, - agent_id, - contract_schema, - proposal_kind, - apply_intent, - review_state, - source_refs, - source_snapshot, - lineage, - diff, - confidence, - COALESCE(unsupported_claim_flags, '[]'::jsonb) AS unsupported_claim_flags, - COALESCE(contradiction_markers, '[]'::jsonb) AS contradiction_markers, - COALESCE(staleness_markers, '[]'::jsonb) AS staleness_markers, - COALESCE(target_ref, '{}'::jsonb) AS target_ref, - COALESCE(proposed_payload, '{}'::jsonb) AS proposed_payload, - reviewer_agent_id, - review_comment, - reviewed_at, - created_at, - updated_at -FROM consolidation_proposals -WHERE tenant_id = $1 AND project_id = $2 AND proposal_id = $3 -LIMIT 1 -FOR UPDATE", - ) - .bind(tenant_id) - .bind(project_id) - .bind(proposal_id) - .fetch_optional(executor) - .await?; - - Ok(row) -} - -/// Lists consolidation proposals for one tenant and project. -pub async fn list_consolidation_proposals<'e, E>( - executor: E, - tenant_id: &str, - project_id: &str, - run_id: Option, - review_state: Option<&str>, - limit: i64, -) -> Result> -where - E: PgExecutor<'e>, -{ - let rows = sqlx::query_as::<_, ConsolidationProposal>( - "\ -SELECT - proposal_id, - run_id, - tenant_id, - project_id, - agent_id, - contract_schema, - proposal_kind, - apply_intent, - review_state, - source_refs, - source_snapshot, - lineage, - diff, - confidence, - COALESCE(unsupported_claim_flags, '[]'::jsonb) AS unsupported_claim_flags, - COALESCE(contradiction_markers, '[]'::jsonb) AS contradiction_markers, - COALESCE(staleness_markers, '[]'::jsonb) AS staleness_markers, - COALESCE(target_ref, '{}'::jsonb) AS target_ref, - COALESCE(proposed_payload, '{}'::jsonb) AS proposed_payload, - reviewer_agent_id, - review_comment, - reviewed_at, - created_at, - updated_at -FROM consolidation_proposals -WHERE tenant_id = $1 - AND project_id = $2 - AND ($3::uuid IS NULL OR run_id = $3) - AND ($4::text IS NULL OR review_state = $4) -ORDER BY created_at DESC, proposal_id DESC -LIMIT $5", - ) - .bind(tenant_id) - .bind(project_id) - .bind(run_id) - .bind(review_state) - .bind(limit) - .fetch_all(executor) - .await?; - - Ok(rows) -} - -/// Updates one proposal review state. -pub async fn update_consolidation_proposal_review<'e, E>( - executor: E, - args: ConsolidationProposalReviewUpdate<'_>, -) -> Result> -where - E: PgExecutor<'e>, -{ - let row = sqlx::query_as::<_, ConsolidationProposal>( - "\ -UPDATE consolidation_proposals -SET - review_state = $1, - reviewer_agent_id = $2, - review_comment = $3, - reviewed_at = $4, - updated_at = $4 -WHERE tenant_id = $5 AND project_id = $6 AND proposal_id = $7 -RETURNING - proposal_id, - run_id, - tenant_id, - project_id, - agent_id, - contract_schema, - proposal_kind, - apply_intent, - review_state, - source_refs, - source_snapshot, - lineage, - diff, - confidence, - COALESCE(unsupported_claim_flags, '[]'::jsonb) AS unsupported_claim_flags, - COALESCE(contradiction_markers, '[]'::jsonb) AS contradiction_markers, - COALESCE(staleness_markers, '[]'::jsonb) AS staleness_markers, - COALESCE(target_ref, '{}'::jsonb) AS target_ref, - COALESCE(proposed_payload, '{}'::jsonb) AS proposed_payload, - reviewer_agent_id, - review_comment, - reviewed_at, - created_at, - updated_at", - ) - .bind(args.review_state) - .bind(args.reviewer_agent_id) - .bind(args.review_comment) - .bind(args.now) - .bind(args.tenant_id) - .bind(args.project_id) - .bind(args.proposal_id) - .fetch_optional(executor) - .await?; - - Ok(row) -} - -/// Updates one proposal target reference. -pub async fn update_consolidation_proposal_target_ref<'e, E>( - executor: E, - args: ConsolidationProposalTargetRefUpdate<'_>, -) -> Result> -where - E: PgExecutor<'e>, -{ - let row = sqlx::query_as::<_, ConsolidationProposal>( - "\ -UPDATE consolidation_proposals -SET target_ref = $1, updated_at = $2 -WHERE tenant_id = $3 AND project_id = $4 AND proposal_id = $5 -RETURNING - proposal_id, - run_id, - tenant_id, - project_id, - agent_id, - contract_schema, - proposal_kind, - apply_intent, - review_state, - source_refs, - source_snapshot, - lineage, - diff, - confidence, - COALESCE(unsupported_claim_flags, '[]'::jsonb) AS unsupported_claim_flags, - COALESCE(contradiction_markers, '[]'::jsonb) AS contradiction_markers, - COALESCE(staleness_markers, '[]'::jsonb) AS staleness_markers, - COALESCE(target_ref, '{}'::jsonb) AS target_ref, - COALESCE(proposed_payload, '{}'::jsonb) AS proposed_payload, - reviewer_agent_id, - review_comment, - reviewed_at, - created_at, - updated_at", - ) - .bind(args.target_ref) - .bind(args.now) - .bind(args.tenant_id) - .bind(args.project_id) - .bind(args.proposal_id) - .fetch_optional(executor) - .await?; - - Ok(row) -} diff --git a/packages/elf-storage/src/consolidation/proposals/insert.rs b/packages/elf-storage/src/consolidation/proposals/insert.rs new file mode 100644 index 00000000..bc4d5d2b --- /dev/null +++ b/packages/elf-storage/src/consolidation/proposals/insert.rs @@ -0,0 +1,71 @@ +use sqlx::PgExecutor; + +use crate::{Result, models::ConsolidationProposal}; + +/// Inserts one consolidation proposal. +pub async fn insert_consolidation_proposal<'e, E>( + executor: E, + proposal: &ConsolidationProposal, +) -> Result<()> +where + E: PgExecutor<'e>, +{ + sqlx::query( + "\ +INSERT INTO consolidation_proposals ( + proposal_id, + run_id, + tenant_id, + project_id, + agent_id, + contract_schema, + proposal_kind, + apply_intent, + review_state, + source_refs, + source_snapshot, + lineage, + diff, + confidence, + unsupported_claim_flags, + contradiction_markers, + staleness_markers, + target_ref, + proposed_payload, + reviewer_agent_id, + review_comment, + reviewed_at, + created_at, + updated_at +) +VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,$20,$21,$22,$23,$24)", + ) + .bind(proposal.proposal_id) + .bind(proposal.run_id) + .bind(proposal.tenant_id.as_str()) + .bind(proposal.project_id.as_str()) + .bind(proposal.agent_id.as_str()) + .bind(proposal.contract_schema.as_str()) + .bind(proposal.proposal_kind.as_str()) + .bind(proposal.apply_intent.as_str()) + .bind(proposal.review_state.as_str()) + .bind(&proposal.source_refs) + .bind(&proposal.source_snapshot) + .bind(&proposal.lineage) + .bind(&proposal.diff) + .bind(proposal.confidence) + .bind(&proposal.unsupported_claim_flags) + .bind(&proposal.contradiction_markers) + .bind(&proposal.staleness_markers) + .bind(&proposal.target_ref) + .bind(&proposal.proposed_payload) + .bind(proposal.reviewer_agent_id.as_deref()) + .bind(proposal.review_comment.as_deref()) + .bind(proposal.reviewed_at) + .bind(proposal.created_at) + .bind(proposal.updated_at) + .execute(executor) + .await?; + + Ok(()) +} diff --git a/packages/elf-storage/src/consolidation/proposals/read.rs b/packages/elf-storage/src/consolidation/proposals/read.rs new file mode 100644 index 00000000..be5eba31 --- /dev/null +++ b/packages/elf-storage/src/consolidation/proposals/read.rs @@ -0,0 +1,135 @@ +use sqlx::PgExecutor; +use uuid::Uuid; + +use crate::{ + Result, consolidation::sql::CONSOLIDATION_PROPOSAL_SELECT, models::ConsolidationProposal, +}; + +/// Fetches one consolidation proposal by tenant and proposal identifier. +pub async fn get_consolidation_proposal<'e, E>( + executor: E, + tenant_id: &str, + project_id: &str, + proposal_id: Uuid, +) -> Result> +where + E: PgExecutor<'e>, +{ + let row = sqlx::query_as::<_, ConsolidationProposal>(CONSOLIDATION_PROPOSAL_SELECT) + .bind(tenant_id) + .bind(project_id) + .bind(proposal_id) + .fetch_optional(executor) + .await?; + + Ok(row) +} + +/// Locks one consolidation proposal by tenant and proposal identifier. +pub async fn lock_consolidation_proposal<'e, E>( + executor: E, + tenant_id: &str, + project_id: &str, + proposal_id: Uuid, +) -> Result> +where + E: PgExecutor<'e>, +{ + let row = sqlx::query_as::<_, ConsolidationProposal>( + "\ +SELECT + proposal_id, + run_id, + tenant_id, + project_id, + agent_id, + contract_schema, + proposal_kind, + apply_intent, + review_state, + source_refs, + source_snapshot, + lineage, + diff, + confidence, + COALESCE(unsupported_claim_flags, '[]'::jsonb) AS unsupported_claim_flags, + COALESCE(contradiction_markers, '[]'::jsonb) AS contradiction_markers, + COALESCE(staleness_markers, '[]'::jsonb) AS staleness_markers, + COALESCE(target_ref, '{}'::jsonb) AS target_ref, + COALESCE(proposed_payload, '{}'::jsonb) AS proposed_payload, + reviewer_agent_id, + review_comment, + reviewed_at, + created_at, + updated_at +FROM consolidation_proposals +WHERE tenant_id = $1 AND project_id = $2 AND proposal_id = $3 +LIMIT 1 +FOR UPDATE", + ) + .bind(tenant_id) + .bind(project_id) + .bind(proposal_id) + .fetch_optional(executor) + .await?; + + Ok(row) +} + +/// Lists consolidation proposals for one tenant and project. +pub async fn list_consolidation_proposals<'e, E>( + executor: E, + tenant_id: &str, + project_id: &str, + run_id: Option, + review_state: Option<&str>, + limit: i64, +) -> Result> +where + E: PgExecutor<'e>, +{ + let rows = sqlx::query_as::<_, ConsolidationProposal>( + "\ +SELECT + proposal_id, + run_id, + tenant_id, + project_id, + agent_id, + contract_schema, + proposal_kind, + apply_intent, + review_state, + source_refs, + source_snapshot, + lineage, + diff, + confidence, + COALESCE(unsupported_claim_flags, '[]'::jsonb) AS unsupported_claim_flags, + COALESCE(contradiction_markers, '[]'::jsonb) AS contradiction_markers, + COALESCE(staleness_markers, '[]'::jsonb) AS staleness_markers, + COALESCE(target_ref, '{}'::jsonb) AS target_ref, + COALESCE(proposed_payload, '{}'::jsonb) AS proposed_payload, + reviewer_agent_id, + review_comment, + reviewed_at, + created_at, + updated_at +FROM consolidation_proposals +WHERE tenant_id = $1 + AND project_id = $2 + AND ($3::uuid IS NULL OR run_id = $3) + AND ($4::text IS NULL OR review_state = $4) +ORDER BY created_at DESC, proposal_id DESC +LIMIT $5", + ) + .bind(tenant_id) + .bind(project_id) + .bind(run_id) + .bind(review_state) + .bind(limit) + .fetch_all(executor) + .await?; + + Ok(rows) +} diff --git a/packages/elf-storage/src/consolidation/proposals/update.rs b/packages/elf-storage/src/consolidation/proposals/update.rs new file mode 100644 index 00000000..169563b2 --- /dev/null +++ b/packages/elf-storage/src/consolidation/proposals/update.rs @@ -0,0 +1,116 @@ +use sqlx::PgExecutor; + +use crate::{ + Result, + consolidation::types::{ + ConsolidationProposalReviewUpdate, ConsolidationProposalTargetRefUpdate, + }, + models::ConsolidationProposal, +}; + +/// Updates one proposal review state. +pub async fn update_consolidation_proposal_review<'e, E>( + executor: E, + args: ConsolidationProposalReviewUpdate<'_>, +) -> Result> +where + E: PgExecutor<'e>, +{ + let row = sqlx::query_as::<_, ConsolidationProposal>( + "\ +UPDATE consolidation_proposals +SET + review_state = $1, + reviewer_agent_id = $2, + review_comment = $3, + reviewed_at = $4, + updated_at = $4 +WHERE tenant_id = $5 AND project_id = $6 AND proposal_id = $7 +RETURNING + proposal_id, + run_id, + tenant_id, + project_id, + agent_id, + contract_schema, + proposal_kind, + apply_intent, + review_state, + source_refs, + source_snapshot, + lineage, + diff, + confidence, + COALESCE(unsupported_claim_flags, '[]'::jsonb) AS unsupported_claim_flags, + COALESCE(contradiction_markers, '[]'::jsonb) AS contradiction_markers, + COALESCE(staleness_markers, '[]'::jsonb) AS staleness_markers, + COALESCE(target_ref, '{}'::jsonb) AS target_ref, + COALESCE(proposed_payload, '{}'::jsonb) AS proposed_payload, + reviewer_agent_id, + review_comment, + reviewed_at, + created_at, + updated_at", + ) + .bind(args.review_state) + .bind(args.reviewer_agent_id) + .bind(args.review_comment) + .bind(args.now) + .bind(args.tenant_id) + .bind(args.project_id) + .bind(args.proposal_id) + .fetch_optional(executor) + .await?; + + Ok(row) +} + +/// Updates one proposal target reference. +pub async fn update_consolidation_proposal_target_ref<'e, E>( + executor: E, + args: ConsolidationProposalTargetRefUpdate<'_>, +) -> Result> +where + E: PgExecutor<'e>, +{ + let row = sqlx::query_as::<_, ConsolidationProposal>( + "\ +UPDATE consolidation_proposals +SET target_ref = $1, updated_at = $2 +WHERE tenant_id = $3 AND project_id = $4 AND proposal_id = $5 +RETURNING + proposal_id, + run_id, + tenant_id, + project_id, + agent_id, + contract_schema, + proposal_kind, + apply_intent, + review_state, + source_refs, + source_snapshot, + lineage, + diff, + confidence, + COALESCE(unsupported_claim_flags, '[]'::jsonb) AS unsupported_claim_flags, + COALESCE(contradiction_markers, '[]'::jsonb) AS contradiction_markers, + COALESCE(staleness_markers, '[]'::jsonb) AS staleness_markers, + COALESCE(target_ref, '{}'::jsonb) AS target_ref, + COALESCE(proposed_payload, '{}'::jsonb) AS proposed_payload, + reviewer_agent_id, + review_comment, + reviewed_at, + created_at, + updated_at", + ) + .bind(args.target_ref) + .bind(args.now) + .bind(args.tenant_id) + .bind(args.project_id) + .bind(args.proposal_id) + .fetch_optional(executor) + .await?; + + Ok(row) +} From 5b133dbfe39e5a2394b432fcc47ba1d6a0c46533 Mon Sep 17 00:00:00 2001 From: Yvette Carlisle Date: Tue, 30 Jun 2026 09:04:05 -0400 Subject: [PATCH 5/6] {"schema":"decodex/commit/1","summary":"Split knowledge source storage modules","authority":"manual"} --- packages/elf-storage/src/knowledge/sources.rs | 313 +----------------- .../src/knowledge/sources/events.rs | 63 ++++ .../src/knowledge/sources/notes.rs | 58 ++++ .../src/knowledge/sources/proposals.rs | 54 +++ .../src/knowledge/sources/relations.rs | 133 ++++++++ 5 files changed, 319 insertions(+), 302 deletions(-) create mode 100644 packages/elf-storage/src/knowledge/sources/events.rs create mode 100644 packages/elf-storage/src/knowledge/sources/notes.rs create mode 100644 packages/elf-storage/src/knowledge/sources/proposals.rs create mode 100644 packages/elf-storage/src/knowledge/sources/relations.rs diff --git a/packages/elf-storage/src/knowledge/sources.rs b/packages/elf-storage/src/knowledge/sources.rs index 314fe97a..88bffc1c 100644 --- a/packages/elf-storage/src/knowledge/sources.rs +++ b/packages/elf-storage/src/knowledge/sources.rs @@ -1,304 +1,13 @@ mod docs; - -pub use self::docs::{fetch_knowledge_doc_chunk_sources, fetch_knowledge_doc_sources}; - -use sqlx::PgExecutor; -use uuid::Uuid; - -use crate::{ - Result, - knowledge::types::{ - KnowledgeEventSource, KnowledgeNoteSource, KnowledgeProposalSource, - KnowledgeRelationSource, KnowledgeRelationSourcesFetch, - }, +mod events; +mod notes; +mod proposals; +mod relations; + +pub use self::{ + docs::{fetch_knowledge_doc_chunk_sources, fetch_knowledge_doc_sources}, + events::fetch_knowledge_event_sources, + notes::fetch_knowledge_note_sources, + proposals::fetch_knowledge_proposal_sources, + relations::fetch_knowledge_relation_sources, }; - -/// Fetches note sources by identifier for a knowledge page rebuild. -pub async fn fetch_knowledge_note_sources<'e, E>( - executor: E, - tenant_id: &str, - project_id: &str, - agent_id: Option<&str>, - allowed_scopes: &[String], - note_ids: &[Uuid], -) -> Result> -where - E: PgExecutor<'e>, -{ - if note_ids.is_empty() { - return Ok(Vec::new()); - } - - let rows = sqlx::query_as::<_, KnowledgeNoteSource>( - "\ -SELECT - note_id, - agent_id, - scope, - type AS note_type, - key, - text, - importance, - confidence, - status, - created_at, - updated_at, - expires_at, - embedding_version, - source_ref -FROM memory_notes -WHERE tenant_id = $1 - AND project_id = $2 - AND ($3::text IS NULL OR scope <> 'agent_private' OR agent_id = $3) - AND scope = ANY($4::text[]) - AND note_id = ANY($5::uuid[]) - AND status = 'active' - AND (expires_at IS NULL OR expires_at > now()) -ORDER BY updated_at ASC, note_id ASC", - ) - .bind(tenant_id) - .bind(project_id) - .bind(agent_id) - .bind(allowed_scopes) - .bind(note_ids) - .fetch_all(executor) - .await?; - - Ok(rows) -} - -/// Fetches durable add_event audit sources by decision identifier. -pub async fn fetch_knowledge_event_sources<'e, E>( - executor: E, - tenant_id: &str, - project_id: &str, - agent_id: Option<&str>, - allowed_scopes: &[String], - decision_ids: &[Uuid], -) -> Result> -where - E: PgExecutor<'e>, -{ - if decision_ids.is_empty() { - return Ok(Vec::new()); - } - - let rows = sqlx::query_as::<_, KnowledgeEventSource>( - "\ -SELECT - memory_ingest_decisions.decision_id, - memory_ingest_decisions.agent_id, - memory_ingest_decisions.scope, - memory_ingest_decisions.pipeline, - memory_ingest_decisions.note_type, - memory_ingest_decisions.note_key, - memory_ingest_decisions.note_id, - memory_ingest_decisions.policy_decision, - memory_ingest_decisions.note_op, - memory_ingest_decisions.reason_code, - memory_ingest_decisions.details, - memory_ingest_decisions.ts -FROM memory_ingest_decisions -JOIN memory_notes note ON note.note_id = memory_ingest_decisions.note_id -WHERE memory_ingest_decisions.tenant_id = $1 - AND memory_ingest_decisions.project_id = $2 - AND ($3::text IS NULL OR memory_ingest_decisions.scope <> 'agent_private' OR memory_ingest_decisions.agent_id = $3) - AND memory_ingest_decisions.scope = ANY($4::text[]) - AND memory_ingest_decisions.decision_id = ANY($5::uuid[]) - AND memory_ingest_decisions.pipeline = 'add_event' - AND memory_ingest_decisions.policy_decision IN ('remember', 'update') - AND note.tenant_id = memory_ingest_decisions.tenant_id - AND note.project_id = memory_ingest_decisions.project_id - AND note.status = 'active' - AND (note.expires_at IS NULL OR note.expires_at > now()) - AND ($3::text IS NULL OR note.scope <> 'agent_private' OR note.agent_id = $3) - AND note.scope = ANY($4::text[]) -ORDER BY memory_ingest_decisions.ts ASC, memory_ingest_decisions.decision_id ASC", - ) - .bind(tenant_id) - .bind(project_id) - .bind(agent_id) - .bind(allowed_scopes) - .bind(decision_ids) - .fetch_all(executor) - .await?; - - Ok(rows) -} - -/// Fetches relation sources by graph fact identifier for a knowledge page rebuild. -pub async fn fetch_knowledge_relation_sources<'e, E>( - executor: E, - params: KnowledgeRelationSourcesFetch<'_>, -) -> Result> -where - E: PgExecutor<'e>, -{ - if params.fact_ids.is_empty() { - return Ok(Vec::new()); - } - - let rows = sqlx::query_as::<_, KnowledgeRelationSource>( - "\ -SELECT - gf.fact_id, - gf.agent_id, - gf.scope, - subject.canonical AS subject, - subject.kind AS subject_kind, - gf.predicate, - object_entity.canonical AS object_entity, - object_entity.kind AS object_kind, - gf.object_value, - gf.valid_from, - gf.valid_to, - gf.updated_at, - COALESCE( - jsonb_agg( - jsonb_build_object( - 'note_id', evidence.note_id, - 'status', note.status, - 'updated_at', note.updated_at - ) - ORDER BY evidence.created_at ASC, evidence.note_id ASC - ) FILTER ( - WHERE evidence.note_id IS NOT NULL - AND note.tenant_id = gf.tenant_id - AND note.project_id = gf.project_id - AND note.status = 'active' - AND (note.expires_at IS NULL OR note.expires_at > now()) - AND note.scope = ANY($4::text[]) - AND ( - $3::text IS NULL - OR ($6 AND note.scope = 'agent_private' AND note.agent_id = $3) - OR ( - note.scope <> 'agent_private' - AND ( - note.agent_id = $3 - OR concat(note.scope, ':', note.agent_id) = ANY($5::text[]) - ) - ) - ) - ), - '[]'::jsonb - ) AS evidence_notes -FROM graph_facts gf -JOIN graph_entities subject ON subject.entity_id = gf.subject_entity_id -LEFT JOIN graph_entities object_entity ON object_entity.entity_id = gf.object_entity_id -LEFT JOIN graph_fact_evidence evidence ON evidence.fact_id = gf.fact_id -LEFT JOIN memory_notes note ON note.note_id = evidence.note_id -WHERE gf.tenant_id = $1 - AND gf.project_id = $2 - AND gf.scope = ANY($4::text[]) - AND ( - $3::text IS NULL - OR ($6 AND gf.scope = 'agent_private' AND gf.agent_id = $3) - OR ( - gf.scope <> 'agent_private' - AND ( - gf.agent_id = $3 - OR concat(gf.scope, ':', gf.agent_id) = ANY($5::text[]) - ) - ) - ) - AND gf.fact_id = ANY($7::uuid[]) - AND EXISTS ( - SELECT 1 - FROM graph_fact_evidence readable_evidence - JOIN memory_notes readable_note - ON readable_note.note_id = readable_evidence.note_id - WHERE readable_evidence.fact_id = gf.fact_id - AND readable_note.tenant_id = gf.tenant_id - AND readable_note.project_id = gf.project_id - AND readable_note.status = 'active' - AND (readable_note.expires_at IS NULL OR readable_note.expires_at > now()) - AND readable_note.scope = ANY($4::text[]) - AND ( - $3::text IS NULL - OR ($6 AND readable_note.scope = 'agent_private' AND readable_note.agent_id = $3) - OR ( - readable_note.scope <> 'agent_private' - AND ( - readable_note.agent_id = $3 - OR concat(readable_note.scope, ':', readable_note.agent_id) = ANY($5::text[]) - ) - ) - ) - ) -GROUP BY - gf.fact_id, - gf.agent_id, - gf.scope, - subject.canonical, - subject.kind, - gf.predicate, - object_entity.canonical, - object_entity.kind, - gf.object_value, - gf.valid_from, - gf.valid_to, - gf.updated_at -ORDER BY gf.updated_at ASC, gf.fact_id ASC", - ) - .bind(params.tenant_id) - .bind(params.project_id) - .bind(params.agent_id) - .bind(params.allowed_scopes) - .bind(params.shared_scope_keys) - .bind(params.private_allowed) - .bind(params.fact_ids) - .fetch_all(executor) - .await?; - - Ok(rows) -} - -/// Fetches applied proposal sources by identifier for a knowledge page rebuild. -pub async fn fetch_knowledge_proposal_sources<'e, E>( - executor: E, - tenant_id: &str, - project_id: &str, - proposal_ids: &[Uuid], -) -> Result> -where - E: PgExecutor<'e>, -{ - if proposal_ids.is_empty() { - return Ok(Vec::new()); - } - - let rows = sqlx::query_as::<_, KnowledgeProposalSource>( - "\ -SELECT - proposal_id, - run_id, - agent_id, - proposal_kind, - apply_intent, - review_state, - source_refs, - source_snapshot, - lineage, - diff, - confidence, - COALESCE(unsupported_claim_flags, '[]'::jsonb) AS unsupported_claim_flags, - COALESCE(contradiction_markers, '[]'::jsonb) AS contradiction_markers, - COALESCE(staleness_markers, '[]'::jsonb) AS staleness_markers, - COALESCE(target_ref, '{}'::jsonb) AS target_ref, - COALESCE(proposed_payload, '{}'::jsonb) AS proposed_payload, - updated_at -FROM consolidation_proposals -WHERE tenant_id = $1 - AND project_id = $2 - AND proposal_id = ANY($3::uuid[]) - AND review_state = 'applied' -ORDER BY updated_at ASC, proposal_id ASC", - ) - .bind(tenant_id) - .bind(project_id) - .bind(proposal_ids) - .fetch_all(executor) - .await?; - - Ok(rows) -} diff --git a/packages/elf-storage/src/knowledge/sources/events.rs b/packages/elf-storage/src/knowledge/sources/events.rs new file mode 100644 index 00000000..4939dcd0 --- /dev/null +++ b/packages/elf-storage/src/knowledge/sources/events.rs @@ -0,0 +1,63 @@ +use sqlx::PgExecutor; +use uuid::Uuid; + +use crate::{Result, knowledge::types::KnowledgeEventSource}; + +/// Fetches durable add_event audit sources by decision identifier. +pub async fn fetch_knowledge_event_sources<'e, E>( + executor: E, + tenant_id: &str, + project_id: &str, + agent_id: Option<&str>, + allowed_scopes: &[String], + decision_ids: &[Uuid], +) -> Result> +where + E: PgExecutor<'e>, +{ + if decision_ids.is_empty() { + return Ok(Vec::new()); + } + + let rows = sqlx::query_as::<_, KnowledgeEventSource>( + "\ +SELECT + memory_ingest_decisions.decision_id, + memory_ingest_decisions.agent_id, + memory_ingest_decisions.scope, + memory_ingest_decisions.pipeline, + memory_ingest_decisions.note_type, + memory_ingest_decisions.note_key, + memory_ingest_decisions.note_id, + memory_ingest_decisions.policy_decision, + memory_ingest_decisions.note_op, + memory_ingest_decisions.reason_code, + memory_ingest_decisions.details, + memory_ingest_decisions.ts +FROM memory_ingest_decisions +JOIN memory_notes note ON note.note_id = memory_ingest_decisions.note_id +WHERE memory_ingest_decisions.tenant_id = $1 + AND memory_ingest_decisions.project_id = $2 + AND ($3::text IS NULL OR memory_ingest_decisions.scope <> 'agent_private' OR memory_ingest_decisions.agent_id = $3) + AND memory_ingest_decisions.scope = ANY($4::text[]) + AND memory_ingest_decisions.decision_id = ANY($5::uuid[]) + AND memory_ingest_decisions.pipeline = 'add_event' + AND memory_ingest_decisions.policy_decision IN ('remember', 'update') + AND note.tenant_id = memory_ingest_decisions.tenant_id + AND note.project_id = memory_ingest_decisions.project_id + AND note.status = 'active' + AND (note.expires_at IS NULL OR note.expires_at > now()) + AND ($3::text IS NULL OR note.scope <> 'agent_private' OR note.agent_id = $3) + AND note.scope = ANY($4::text[]) +ORDER BY memory_ingest_decisions.ts ASC, memory_ingest_decisions.decision_id ASC", + ) + .bind(tenant_id) + .bind(project_id) + .bind(agent_id) + .bind(allowed_scopes) + .bind(decision_ids) + .fetch_all(executor) + .await?; + + Ok(rows) +} diff --git a/packages/elf-storage/src/knowledge/sources/notes.rs b/packages/elf-storage/src/knowledge/sources/notes.rs new file mode 100644 index 00000000..12c03a6f --- /dev/null +++ b/packages/elf-storage/src/knowledge/sources/notes.rs @@ -0,0 +1,58 @@ +use sqlx::PgExecutor; +use uuid::Uuid; + +use crate::{Result, knowledge::types::KnowledgeNoteSource}; + +/// Fetches note sources by identifier for a knowledge page rebuild. +pub async fn fetch_knowledge_note_sources<'e, E>( + executor: E, + tenant_id: &str, + project_id: &str, + agent_id: Option<&str>, + allowed_scopes: &[String], + note_ids: &[Uuid], +) -> Result> +where + E: PgExecutor<'e>, +{ + if note_ids.is_empty() { + return Ok(Vec::new()); + } + + let rows = sqlx::query_as::<_, KnowledgeNoteSource>( + "\ +SELECT + note_id, + agent_id, + scope, + type AS note_type, + key, + text, + importance, + confidence, + status, + created_at, + updated_at, + expires_at, + embedding_version, + source_ref +FROM memory_notes +WHERE tenant_id = $1 + AND project_id = $2 + AND ($3::text IS NULL OR scope <> 'agent_private' OR agent_id = $3) + AND scope = ANY($4::text[]) + AND note_id = ANY($5::uuid[]) + AND status = 'active' + AND (expires_at IS NULL OR expires_at > now()) +ORDER BY updated_at ASC, note_id ASC", + ) + .bind(tenant_id) + .bind(project_id) + .bind(agent_id) + .bind(allowed_scopes) + .bind(note_ids) + .fetch_all(executor) + .await?; + + Ok(rows) +} diff --git a/packages/elf-storage/src/knowledge/sources/proposals.rs b/packages/elf-storage/src/knowledge/sources/proposals.rs new file mode 100644 index 00000000..304543ae --- /dev/null +++ b/packages/elf-storage/src/knowledge/sources/proposals.rs @@ -0,0 +1,54 @@ +use sqlx::PgExecutor; +use uuid::Uuid; + +use crate::{Result, knowledge::types::KnowledgeProposalSource}; + +/// Fetches applied proposal sources by identifier for a knowledge page rebuild. +pub async fn fetch_knowledge_proposal_sources<'e, E>( + executor: E, + tenant_id: &str, + project_id: &str, + proposal_ids: &[Uuid], +) -> Result> +where + E: PgExecutor<'e>, +{ + if proposal_ids.is_empty() { + return Ok(Vec::new()); + } + + let rows = sqlx::query_as::<_, KnowledgeProposalSource>( + "\ +SELECT + proposal_id, + run_id, + agent_id, + proposal_kind, + apply_intent, + review_state, + source_refs, + source_snapshot, + lineage, + diff, + confidence, + COALESCE(unsupported_claim_flags, '[]'::jsonb) AS unsupported_claim_flags, + COALESCE(contradiction_markers, '[]'::jsonb) AS contradiction_markers, + COALESCE(staleness_markers, '[]'::jsonb) AS staleness_markers, + COALESCE(target_ref, '{}'::jsonb) AS target_ref, + COALESCE(proposed_payload, '{}'::jsonb) AS proposed_payload, + updated_at +FROM consolidation_proposals +WHERE tenant_id = $1 + AND project_id = $2 + AND proposal_id = ANY($3::uuid[]) + AND review_state = 'applied' +ORDER BY updated_at ASC, proposal_id ASC", + ) + .bind(tenant_id) + .bind(project_id) + .bind(proposal_ids) + .fetch_all(executor) + .await?; + + Ok(rows) +} diff --git a/packages/elf-storage/src/knowledge/sources/relations.rs b/packages/elf-storage/src/knowledge/sources/relations.rs new file mode 100644 index 00000000..600632a5 --- /dev/null +++ b/packages/elf-storage/src/knowledge/sources/relations.rs @@ -0,0 +1,133 @@ +use sqlx::PgExecutor; + +use crate::{ + Result, + knowledge::types::{KnowledgeRelationSource, KnowledgeRelationSourcesFetch}, +}; + +/// Fetches relation sources by graph fact identifier for a knowledge page rebuild. +pub async fn fetch_knowledge_relation_sources<'e, E>( + executor: E, + params: KnowledgeRelationSourcesFetch<'_>, +) -> Result> +where + E: PgExecutor<'e>, +{ + if params.fact_ids.is_empty() { + return Ok(Vec::new()); + } + + let rows = sqlx::query_as::<_, KnowledgeRelationSource>( + "\ +SELECT + gf.fact_id, + gf.agent_id, + gf.scope, + subject.canonical AS subject, + subject.kind AS subject_kind, + gf.predicate, + object_entity.canonical AS object_entity, + object_entity.kind AS object_kind, + gf.object_value, + gf.valid_from, + gf.valid_to, + gf.updated_at, + COALESCE( + jsonb_agg( + jsonb_build_object( + 'note_id', evidence.note_id, + 'status', note.status, + 'updated_at', note.updated_at + ) + ORDER BY evidence.created_at ASC, evidence.note_id ASC + ) FILTER ( + WHERE evidence.note_id IS NOT NULL + AND note.tenant_id = gf.tenant_id + AND note.project_id = gf.project_id + AND note.status = 'active' + AND (note.expires_at IS NULL OR note.expires_at > now()) + AND note.scope = ANY($4::text[]) + AND ( + $3::text IS NULL + OR ($6 AND note.scope = 'agent_private' AND note.agent_id = $3) + OR ( + note.scope <> 'agent_private' + AND ( + note.agent_id = $3 + OR concat(note.scope, ':', note.agent_id) = ANY($5::text[]) + ) + ) + ) + ), + '[]'::jsonb + ) AS evidence_notes +FROM graph_facts gf +JOIN graph_entities subject ON subject.entity_id = gf.subject_entity_id +LEFT JOIN graph_entities object_entity ON object_entity.entity_id = gf.object_entity_id +LEFT JOIN graph_fact_evidence evidence ON evidence.fact_id = gf.fact_id +LEFT JOIN memory_notes note ON note.note_id = evidence.note_id +WHERE gf.tenant_id = $1 + AND gf.project_id = $2 + AND gf.scope = ANY($4::text[]) + AND ( + $3::text IS NULL + OR ($6 AND gf.scope = 'agent_private' AND gf.agent_id = $3) + OR ( + gf.scope <> 'agent_private' + AND ( + gf.agent_id = $3 + OR concat(gf.scope, ':', gf.agent_id) = ANY($5::text[]) + ) + ) + ) + AND gf.fact_id = ANY($7::uuid[]) + AND EXISTS ( + SELECT 1 + FROM graph_fact_evidence readable_evidence + JOIN memory_notes readable_note + ON readable_note.note_id = readable_evidence.note_id + WHERE readable_evidence.fact_id = gf.fact_id + AND readable_note.tenant_id = gf.tenant_id + AND readable_note.project_id = gf.project_id + AND readable_note.status = 'active' + AND (readable_note.expires_at IS NULL OR readable_note.expires_at > now()) + AND readable_note.scope = ANY($4::text[]) + AND ( + $3::text IS NULL + OR ($6 AND readable_note.scope = 'agent_private' AND readable_note.agent_id = $3) + OR ( + readable_note.scope <> 'agent_private' + AND ( + readable_note.agent_id = $3 + OR concat(readable_note.scope, ':', readable_note.agent_id) = ANY($5::text[]) + ) + ) + ) + ) +GROUP BY + gf.fact_id, + gf.agent_id, + gf.scope, + subject.canonical, + subject.kind, + gf.predicate, + object_entity.canonical, + object_entity.kind, + gf.object_value, + gf.valid_from, + gf.valid_to, + gf.updated_at +ORDER BY gf.updated_at ASC, gf.fact_id ASC", + ) + .bind(params.tenant_id) + .bind(params.project_id) + .bind(params.agent_id) + .bind(params.allowed_scopes) + .bind(params.shared_scope_keys) + .bind(params.private_allowed) + .bind(params.fact_ids) + .fetch_all(executor) + .await?; + + Ok(rows) +} From ee3774bd59153e553de5420e9d1e4690b75e7855 Mon Sep 17 00:00:00 2001 From: Yvette Carlisle Date: Tue, 30 Jun 2026 09:08:20 -0400 Subject: [PATCH 6/6] {"schema":"decodex/commit/1","summary":"Split knowledge source snapshot modules","authority":"manual"} --- .../src/knowledge/support/snapshots.rs | 322 +----------------- .../src/knowledge/support/snapshots/docs.rs | 103 ++++++ .../src/knowledge/support/snapshots/events.rs | 40 +++ .../src/knowledge/support/snapshots/notes.rs | 36 ++ .../knowledge/support/snapshots/proposals.rs | 45 +++ .../knowledge/support/snapshots/relations.rs | 41 +++ .../knowledge/support/snapshots/sanitize.rs | 56 +++ 7 files changed, 335 insertions(+), 308 deletions(-) create mode 100644 packages/elf-service/src/knowledge/support/snapshots/docs.rs create mode 100644 packages/elf-service/src/knowledge/support/snapshots/events.rs create mode 100644 packages/elf-service/src/knowledge/support/snapshots/notes.rs create mode 100644 packages/elf-service/src/knowledge/support/snapshots/proposals.rs create mode 100644 packages/elf-service/src/knowledge/support/snapshots/relations.rs create mode 100644 packages/elf-service/src/knowledge/support/snapshots/sanitize.rs diff --git a/packages/elf-service/src/knowledge/support/snapshots.rs b/packages/elf-service/src/knowledge/support/snapshots.rs index 23d0f576..27cd1e8c 100644 --- a/packages/elf-service/src/knowledge/support/snapshots.rs +++ b/packages/elf-service/src/knowledge/support/snapshots.rs @@ -1,309 +1,15 @@ -use crate::knowledge::support::{ - self, KnowledgeDocChunkSource, KnowledgeDocSource, KnowledgeEventSource, KnowledgeNoteSource, - KnowledgeProposalSource, KnowledgeRelationSource, KnowledgeSourceKind, Map, Number, - SourceSnapshot, Value, serde_json, +mod docs; +mod events; +mod notes; +mod proposals; +mod relations; +mod sanitize; + +pub(in crate::knowledge) use self::{ + docs::{doc_chunk_source_snapshot, doc_source_snapshot}, + events::event_source_snapshot, + notes::note_source_snapshot, + proposals::proposal_source_snapshot, + relations::relation_source_snapshot, + sanitize::sanitize_proposal_snapshot, }; - -pub(in crate::knowledge) fn doc_source_snapshot(row: KnowledgeDocSource) -> SourceSnapshot { - let title = row.title.clone().unwrap_or_else(|| "Untitled source document".to_string()); - let excerpt = - support::truncate_chars(support::normalize_whitespace(row.content.as_str()).as_str(), 240); - let line = format!("[doc:{}] {title}: {excerpt}", row.doc_type); - let snapshot = serde_json::json!({ - "kind": "doc", - "doc_id": row.doc_id, - "agent_id": row.agent_id.clone(), - "scope": row.scope.clone(), - "doc_type": row.doc_type.clone(), - "status": row.status.clone(), - "title": row.title.clone(), - "content_bytes": row.content_bytes, - "content_hash": row.content_hash.clone(), - "source_ref": row.source_ref.clone(), - "created_at": row.created_at, - "updated_at": row.updated_at, - }); - - SourceSnapshot { - kind: KnowledgeSourceKind::Doc, - id: row.doc_id, - status: Some(row.status), - updated_at: Some(row.updated_at), - content_hash: Some(row.content_hash), - snapshot, - citation_metadata: serde_json::json!({ "section_role": "source_document" }), - line, - } -} - -pub(in crate::knowledge) fn doc_chunk_source_snapshot( - row: KnowledgeDocChunkSource, -) -> SourceSnapshot { - let title = row.title.clone().unwrap_or_else(|| "Untitled source document".to_string()); - let excerpt = support::truncate_chars( - support::normalize_whitespace(row.chunk_text.as_str()).as_str(), - 240, - ); - let span_id = support::source_span_id( - row.doc_content_hash.as_str(), - row.start_offset.max(0) as usize, - row.end_offset.max(row.start_offset).max(0) as usize, - "captured", - ); - let line = format!( - "[doc_chunk:{}:{}-{}] {title}: {excerpt}", - row.chunk_index, row.start_offset, row.end_offset - ); - let source_span = serde_json::json!({ - "schema": "doc_source_span/v1", - "span_id": span_id, - "chunk_id": row.chunk_id, - "status": "captured", - "reason_code": null, - "start_offset": row.start_offset, - "end_offset": row.end_offset, - "content_hash": row.doc_content_hash.clone(), - "chunk_hash": row.chunk_hash.clone(), - }); - let snapshot = serde_json::json!({ - "kind": "doc_chunk", - "chunk_id": row.chunk_id, - "doc_id": row.doc_id, - "agent_id": row.agent_id.clone(), - "scope": row.scope.clone(), - "doc_type": row.doc_type.clone(), - "status": row.status.clone(), - "title": row.title.clone(), - "source_ref": row.source_ref.clone(), - "doc_content_hash": row.doc_content_hash.clone(), - "doc_updated_at": row.doc_updated_at, - "chunk_index": row.chunk_index, - "start_offset": row.start_offset, - "end_offset": row.end_offset, - "chunk_hash": row.chunk_hash.clone(), - "chunk_created_at": row.chunk_created_at, - "source_span": source_span, - }); - - SourceSnapshot { - kind: KnowledgeSourceKind::DocChunk, - id: row.chunk_id, - status: Some(row.status), - updated_at: Some(row.doc_updated_at), - content_hash: Some(row.chunk_hash), - snapshot, - citation_metadata: serde_json::json!({ - "section_role": "source_span", - "doc_id": row.doc_id, - "span_id": span_id, - "start_offset": row.start_offset, - "end_offset": row.end_offset, - }), - line, - } -} - -pub(in crate::knowledge) fn note_source_snapshot(row: KnowledgeNoteSource) -> SourceSnapshot { - let content_hash = support::hash_text(row.text.as_str()); - let line = format!("{}{}", support::note_prefix(&row), row.text); - let snapshot = serde_json::json!({ - "kind": "note", - "note_id": row.note_id, - "agent_id": row.agent_id.clone(), - "scope": row.scope.clone(), - "type": row.note_type.clone(), - "key": row.key.clone(), - "status": row.status.clone(), - "updated_at": row.updated_at, - "created_at": row.created_at, - "expires_at": row.expires_at, - "embedding_version": row.embedding_version.clone(), - "content_hash": content_hash, - "source_ref": row.source_ref.clone(), - "importance": row.importance, - "confidence": row.confidence, - }); - - SourceSnapshot { - kind: KnowledgeSourceKind::Note, - id: row.note_id, - status: Some(row.status), - updated_at: Some(row.updated_at), - content_hash: Some(content_hash), - snapshot, - citation_metadata: serde_json::json!({ "section_role": "source_note" }), - line, - } -} - -pub(in crate::knowledge) fn event_source_snapshot(row: KnowledgeEventSource) -> SourceSnapshot { - let content_hash = support::hash_json_lossy(&row.details); - let line = format!( - "add_event audit {} {} for {}{}", - row.note_op, - row.policy_decision, - row.note_type, - row.note_key.as_ref().map(|key| format!(" key {key}")).unwrap_or_default() - ); - let snapshot = serde_json::json!({ - "kind": "event", - "decision_id": row.decision_id, - "agent_id": row.agent_id.clone(), - "scope": row.scope.clone(), - "pipeline": row.pipeline.clone(), - "note_type": row.note_type.clone(), - "note_key": row.note_key.clone(), - "note_id": row.note_id, - "policy_decision": row.policy_decision.clone(), - "note_op": row.note_op.clone(), - "reason_code": row.reason_code.clone(), - "details_hash": content_hash, - "ts": row.ts, - }); - - SourceSnapshot { - kind: KnowledgeSourceKind::Event, - id: row.decision_id, - status: Some(row.policy_decision), - updated_at: Some(row.ts), - content_hash: Some(content_hash), - snapshot, - citation_metadata: serde_json::json!({ "section_role": "event_audit" }), - line, - } -} - -pub(in crate::knowledge) fn relation_source_snapshot( - row: KnowledgeRelationSource, -) -> SourceSnapshot { - let object = row.object_entity.clone().or(row.object_value.clone()).unwrap_or_default(); - let temporal_status = if row.valid_to.is_some() { "historical" } else { "current" }; - let line = format!("{} {} {} ({temporal_status}).", row.subject, row.predicate, object); - let content_hash = support::hash_text(line.as_str()); - let snapshot = serde_json::json!({ - "kind": "relation", - "fact_id": row.fact_id, - "agent_id": row.agent_id.clone(), - "scope": row.scope.clone(), - "subject": { "canonical": row.subject.clone(), "kind": row.subject_kind.clone() }, - "predicate": row.predicate.clone(), - "object": { - "entity": row.object_entity.clone(), - "kind": row.object_kind.clone(), - "value": row.object_value.clone() - }, - "valid_from": row.valid_from, - "valid_to": row.valid_to, - "updated_at": row.updated_at, - "content_hash": content_hash, - "evidence_notes": row.evidence_notes.clone(), - }); - - SourceSnapshot { - kind: KnowledgeSourceKind::Relation, - id: row.fact_id, - status: Some(temporal_status.to_string()), - updated_at: Some(row.updated_at), - content_hash: Some(content_hash), - snapshot, - citation_metadata: serde_json::json!({ "section_role": "relation_fact" }), - line, - } -} - -pub(in crate::knowledge) fn proposal_source_snapshot( - row: KnowledgeProposalSource, -) -> SourceSnapshot { - let content_hash = support::hash_json_lossy(&serde_json::json!({ - "diff": row.diff.clone(), - "proposed_payload": row.proposed_payload.clone(), - "review_state": row.review_state.clone(), - })); - let line = format!("Applied proposal {}", row.proposal_kind); - let snapshot = sanitize_proposal_snapshot(&serde_json::json!({ - "kind": "proposal", - "proposal_id": row.proposal_id, - "run_id": row.run_id, - "agent_id": row.agent_id.clone(), - "proposal_kind": row.proposal_kind.clone(), - "apply_intent": row.apply_intent.clone(), - "review_state": row.review_state.clone(), - "source_refs": row.source_refs.clone(), - "source_snapshot": row.source_snapshot.clone(), - "lineage": row.lineage.clone(), - "diff": row.diff.clone(), - "confidence": row.confidence, - "unsupported_claim_flags": row.unsupported_claim_flags.clone(), - "contradiction_markers": row.contradiction_markers.clone(), - "staleness_markers": row.staleness_markers.clone(), - "target_ref": row.target_ref.clone(), - "proposed_payload_hash": content_hash, - "updated_at": row.updated_at, - })); - - SourceSnapshot { - kind: KnowledgeSourceKind::Proposal, - id: row.proposal_id, - status: Some(row.review_state), - updated_at: Some(row.updated_at), - content_hash: Some(content_hash), - snapshot, - citation_metadata: serde_json::json!({ "section_role": "reviewed_proposal" }), - line, - } -} - -pub(in crate::knowledge) fn sanitize_proposal_snapshot(source_snapshot: &Value) -> Value { - let Some(object) = source_snapshot.as_object() else { - return serde_json::json!({ - "kind": "proposal", - "sanitized": true, - "source_visibility": "proposal_metadata_only", - }); - }; - let nested_source_count = - object.get("source_refs").and_then(Value::as_array).map(Vec::len).unwrap_or_default(); - let mut sanitized = Map::new(); - - for key in [ - "kind", - "proposal_id", - "run_id", - "agent_id", - "proposal_kind", - "apply_intent", - "review_state", - "confidence", - "proposed_payload_hash", - "updated_at", - ] { - if let Some(value) = object.get(key) { - sanitized.insert(key.to_string(), value.clone()); - } - } - - sanitized.insert("sanitized".to_string(), Value::Bool(true)); - sanitized.insert( - "source_visibility".to_string(), - Value::String("proposal_metadata_only".to_string()), - ); - sanitized.insert( - "omitted_fields".to_string(), - serde_json::json!([ - "source_refs", - "source_snapshot", - "lineage", - "diff", - "unsupported_claim_flags", - "contradiction_markers", - "staleness_markers", - "target_ref" - ]), - ); - sanitized.insert( - "nested_source_ref_count".to_string(), - Value::Number(Number::from(nested_source_count)), - ); - - Value::Object(sanitized) -} diff --git a/packages/elf-service/src/knowledge/support/snapshots/docs.rs b/packages/elf-service/src/knowledge/support/snapshots/docs.rs new file mode 100644 index 00000000..f988ac68 --- /dev/null +++ b/packages/elf-service/src/knowledge/support/snapshots/docs.rs @@ -0,0 +1,103 @@ +use crate::knowledge::support::{ + self, KnowledgeDocChunkSource, KnowledgeDocSource, KnowledgeSourceKind, SourceSnapshot, + serde_json, +}; + +pub(in crate::knowledge) fn doc_source_snapshot(row: KnowledgeDocSource) -> SourceSnapshot { + let title = row.title.clone().unwrap_or_else(|| "Untitled source document".to_string()); + let excerpt = + support::truncate_chars(support::normalize_whitespace(row.content.as_str()).as_str(), 240); + let line = format!("[doc:{}] {title}: {excerpt}", row.doc_type); + let snapshot = serde_json::json!({ + "kind": "doc", + "doc_id": row.doc_id, + "agent_id": row.agent_id.clone(), + "scope": row.scope.clone(), + "doc_type": row.doc_type.clone(), + "status": row.status.clone(), + "title": row.title.clone(), + "content_bytes": row.content_bytes, + "content_hash": row.content_hash.clone(), + "source_ref": row.source_ref.clone(), + "created_at": row.created_at, + "updated_at": row.updated_at, + }); + + SourceSnapshot { + kind: KnowledgeSourceKind::Doc, + id: row.doc_id, + status: Some(row.status), + updated_at: Some(row.updated_at), + content_hash: Some(row.content_hash), + snapshot, + citation_metadata: serde_json::json!({ "section_role": "source_document" }), + line, + } +} + +pub(in crate::knowledge) fn doc_chunk_source_snapshot( + row: KnowledgeDocChunkSource, +) -> SourceSnapshot { + let title = row.title.clone().unwrap_or_else(|| "Untitled source document".to_string()); + let excerpt = support::truncate_chars( + support::normalize_whitespace(row.chunk_text.as_str()).as_str(), + 240, + ); + let span_id = support::source_span_id( + row.doc_content_hash.as_str(), + row.start_offset.max(0) as usize, + row.end_offset.max(row.start_offset).max(0) as usize, + "captured", + ); + let line = format!( + "[doc_chunk:{}:{}-{}] {title}: {excerpt}", + row.chunk_index, row.start_offset, row.end_offset + ); + let source_span = serde_json::json!({ + "schema": "doc_source_span/v1", + "span_id": span_id, + "chunk_id": row.chunk_id, + "status": "captured", + "reason_code": null, + "start_offset": row.start_offset, + "end_offset": row.end_offset, + "content_hash": row.doc_content_hash.clone(), + "chunk_hash": row.chunk_hash.clone(), + }); + let snapshot = serde_json::json!({ + "kind": "doc_chunk", + "chunk_id": row.chunk_id, + "doc_id": row.doc_id, + "agent_id": row.agent_id.clone(), + "scope": row.scope.clone(), + "doc_type": row.doc_type.clone(), + "status": row.status.clone(), + "title": row.title.clone(), + "source_ref": row.source_ref.clone(), + "doc_content_hash": row.doc_content_hash.clone(), + "doc_updated_at": row.doc_updated_at, + "chunk_index": row.chunk_index, + "start_offset": row.start_offset, + "end_offset": row.end_offset, + "chunk_hash": row.chunk_hash.clone(), + "chunk_created_at": row.chunk_created_at, + "source_span": source_span, + }); + + SourceSnapshot { + kind: KnowledgeSourceKind::DocChunk, + id: row.chunk_id, + status: Some(row.status), + updated_at: Some(row.doc_updated_at), + content_hash: Some(row.chunk_hash), + snapshot, + citation_metadata: serde_json::json!({ + "section_role": "source_span", + "doc_id": row.doc_id, + "span_id": span_id, + "start_offset": row.start_offset, + "end_offset": row.end_offset, + }), + line, + } +} diff --git a/packages/elf-service/src/knowledge/support/snapshots/events.rs b/packages/elf-service/src/knowledge/support/snapshots/events.rs new file mode 100644 index 00000000..419d36dd --- /dev/null +++ b/packages/elf-service/src/knowledge/support/snapshots/events.rs @@ -0,0 +1,40 @@ +use crate::knowledge::support::{ + self, KnowledgeEventSource, KnowledgeSourceKind, SourceSnapshot, serde_json, +}; + +pub(in crate::knowledge) fn event_source_snapshot(row: KnowledgeEventSource) -> SourceSnapshot { + let content_hash = support::hash_json_lossy(&row.details); + let line = format!( + "add_event audit {} {} for {}{}", + row.note_op, + row.policy_decision, + row.note_type, + row.note_key.as_ref().map(|key| format!(" key {key}")).unwrap_or_default() + ); + let snapshot = serde_json::json!({ + "kind": "event", + "decision_id": row.decision_id, + "agent_id": row.agent_id.clone(), + "scope": row.scope.clone(), + "pipeline": row.pipeline.clone(), + "note_type": row.note_type.clone(), + "note_key": row.note_key.clone(), + "note_id": row.note_id, + "policy_decision": row.policy_decision.clone(), + "note_op": row.note_op.clone(), + "reason_code": row.reason_code.clone(), + "details_hash": content_hash, + "ts": row.ts, + }); + + SourceSnapshot { + kind: KnowledgeSourceKind::Event, + id: row.decision_id, + status: Some(row.policy_decision), + updated_at: Some(row.ts), + content_hash: Some(content_hash), + snapshot, + citation_metadata: serde_json::json!({ "section_role": "event_audit" }), + line, + } +} diff --git a/packages/elf-service/src/knowledge/support/snapshots/notes.rs b/packages/elf-service/src/knowledge/support/snapshots/notes.rs new file mode 100644 index 00000000..9a42e0aa --- /dev/null +++ b/packages/elf-service/src/knowledge/support/snapshots/notes.rs @@ -0,0 +1,36 @@ +use crate::knowledge::support::{ + self, KnowledgeNoteSource, KnowledgeSourceKind, SourceSnapshot, serde_json, +}; + +pub(in crate::knowledge) fn note_source_snapshot(row: KnowledgeNoteSource) -> SourceSnapshot { + let content_hash = support::hash_text(row.text.as_str()); + let line = format!("{}{}", support::note_prefix(&row), row.text); + let snapshot = serde_json::json!({ + "kind": "note", + "note_id": row.note_id, + "agent_id": row.agent_id.clone(), + "scope": row.scope.clone(), + "type": row.note_type.clone(), + "key": row.key.clone(), + "status": row.status.clone(), + "updated_at": row.updated_at, + "created_at": row.created_at, + "expires_at": row.expires_at, + "embedding_version": row.embedding_version.clone(), + "content_hash": content_hash, + "source_ref": row.source_ref.clone(), + "importance": row.importance, + "confidence": row.confidence, + }); + + SourceSnapshot { + kind: KnowledgeSourceKind::Note, + id: row.note_id, + status: Some(row.status), + updated_at: Some(row.updated_at), + content_hash: Some(content_hash), + snapshot, + citation_metadata: serde_json::json!({ "section_role": "source_note" }), + line, + } +} diff --git a/packages/elf-service/src/knowledge/support/snapshots/proposals.rs b/packages/elf-service/src/knowledge/support/snapshots/proposals.rs new file mode 100644 index 00000000..6c5711a5 --- /dev/null +++ b/packages/elf-service/src/knowledge/support/snapshots/proposals.rs @@ -0,0 +1,45 @@ +use crate::knowledge::support::{ + self, KnowledgeProposalSource, KnowledgeSourceKind, SourceSnapshot, serde_json, +}; + +pub(in crate::knowledge) fn proposal_source_snapshot( + row: KnowledgeProposalSource, +) -> SourceSnapshot { + let content_hash = support::hash_json_lossy(&serde_json::json!({ + "diff": row.diff.clone(), + "proposed_payload": row.proposed_payload.clone(), + "review_state": row.review_state.clone(), + })); + let line = format!("Applied proposal {}", row.proposal_kind); + let snapshot = support::sanitize_proposal_snapshot(&serde_json::json!({ + "kind": "proposal", + "proposal_id": row.proposal_id, + "run_id": row.run_id, + "agent_id": row.agent_id.clone(), + "proposal_kind": row.proposal_kind.clone(), + "apply_intent": row.apply_intent.clone(), + "review_state": row.review_state.clone(), + "source_refs": row.source_refs.clone(), + "source_snapshot": row.source_snapshot.clone(), + "lineage": row.lineage.clone(), + "diff": row.diff.clone(), + "confidence": row.confidence, + "unsupported_claim_flags": row.unsupported_claim_flags.clone(), + "contradiction_markers": row.contradiction_markers.clone(), + "staleness_markers": row.staleness_markers.clone(), + "target_ref": row.target_ref.clone(), + "proposed_payload_hash": content_hash, + "updated_at": row.updated_at, + })); + + SourceSnapshot { + kind: KnowledgeSourceKind::Proposal, + id: row.proposal_id, + status: Some(row.review_state), + updated_at: Some(row.updated_at), + content_hash: Some(content_hash), + snapshot, + citation_metadata: serde_json::json!({ "section_role": "reviewed_proposal" }), + line, + } +} diff --git a/packages/elf-service/src/knowledge/support/snapshots/relations.rs b/packages/elf-service/src/knowledge/support/snapshots/relations.rs new file mode 100644 index 00000000..eb9472ed --- /dev/null +++ b/packages/elf-service/src/knowledge/support/snapshots/relations.rs @@ -0,0 +1,41 @@ +use crate::knowledge::support::{ + self, KnowledgeRelationSource, KnowledgeSourceKind, SourceSnapshot, serde_json, +}; + +pub(in crate::knowledge) fn relation_source_snapshot( + row: KnowledgeRelationSource, +) -> SourceSnapshot { + let object = row.object_entity.clone().or(row.object_value.clone()).unwrap_or_default(); + let temporal_status = if row.valid_to.is_some() { "historical" } else { "current" }; + let line = format!("{} {} {} ({temporal_status}).", row.subject, row.predicate, object); + let content_hash = support::hash_text(line.as_str()); + let snapshot = serde_json::json!({ + "kind": "relation", + "fact_id": row.fact_id, + "agent_id": row.agent_id.clone(), + "scope": row.scope.clone(), + "subject": { "canonical": row.subject.clone(), "kind": row.subject_kind.clone() }, + "predicate": row.predicate.clone(), + "object": { + "entity": row.object_entity.clone(), + "kind": row.object_kind.clone(), + "value": row.object_value.clone() + }, + "valid_from": row.valid_from, + "valid_to": row.valid_to, + "updated_at": row.updated_at, + "content_hash": content_hash, + "evidence_notes": row.evidence_notes.clone(), + }); + + SourceSnapshot { + kind: KnowledgeSourceKind::Relation, + id: row.fact_id, + status: Some(temporal_status.to_string()), + updated_at: Some(row.updated_at), + content_hash: Some(content_hash), + snapshot, + citation_metadata: serde_json::json!({ "section_role": "relation_fact" }), + line, + } +} diff --git a/packages/elf-service/src/knowledge/support/snapshots/sanitize.rs b/packages/elf-service/src/knowledge/support/snapshots/sanitize.rs new file mode 100644 index 00000000..c43c15c7 --- /dev/null +++ b/packages/elf-service/src/knowledge/support/snapshots/sanitize.rs @@ -0,0 +1,56 @@ +use crate::knowledge::support::{Map, Number, Value, serde_json}; + +pub(in crate::knowledge) fn sanitize_proposal_snapshot(source_snapshot: &Value) -> Value { + let Some(object) = source_snapshot.as_object() else { + return serde_json::json!({ + "kind": "proposal", + "sanitized": true, + "source_visibility": "proposal_metadata_only", + }); + }; + let nested_source_count = + object.get("source_refs").and_then(Value::as_array).map(Vec::len).unwrap_or_default(); + let mut sanitized = Map::new(); + + for key in [ + "kind", + "proposal_id", + "run_id", + "agent_id", + "proposal_kind", + "apply_intent", + "review_state", + "confidence", + "proposed_payload_hash", + "updated_at", + ] { + if let Some(value) = object.get(key) { + sanitized.insert(key.to_string(), value.clone()); + } + } + + sanitized.insert("sanitized".to_string(), Value::Bool(true)); + sanitized.insert( + "source_visibility".to_string(), + Value::String("proposal_metadata_only".to_string()), + ); + sanitized.insert( + "omitted_fields".to_string(), + serde_json::json!([ + "source_refs", + "source_snapshot", + "lineage", + "diff", + "unsupported_claim_flags", + "contradiction_markers", + "staleness_markers", + "target_ref" + ]), + ); + sanitized.insert( + "nested_source_ref_count".to_string(), + Value::Number(Number::from(nested_source_count)), + ); + + Value::Object(sanitized) +}