diff --git a/packages/elf-service/src/core_blocks/persistence/attachments.rs b/packages/elf-service/src/core_blocks/persistence/attachments.rs index 2aa3e3c7..9176720d 100644 --- a/packages/elf-service/src/core_blocks/persistence/attachments.rs +++ b/packages/elf-service/src/core_blocks/persistence/attachments.rs @@ -7,7 +7,7 @@ use crate::{ access::ORG_PROJECT_ID, core_blocks::{ rows::{CoreBlockAttachmentRow, CoreBlockRow}, - types::{PreparedAttachRequest, PreparedDetachRequest}, + types::prepared::{PreparedAttachRequest, PreparedDetachRequest}, }, }; diff --git a/packages/elf-service/src/core_blocks/persistence/audit.rs b/packages/elf-service/src/core_blocks/persistence/audit.rs index 40dd0400..7f4637cb 100644 --- a/packages/elf-service/src/core_blocks/persistence/audit.rs +++ b/packages/elf-service/src/core_blocks/persistence/audit.rs @@ -7,7 +7,7 @@ use crate::{ Result, core_blocks::{ rows::CoreBlockEventRow, - types::{CoreBlockAuditEvent, CoreBlockEventInput}, + types::{CoreBlockAuditEvent, events::CoreBlockEventInput}, }, }; diff --git a/packages/elf-service/src/core_blocks/persistence/blocks.rs b/packages/elf-service/src/core_blocks/persistence/blocks.rs index 1b45eec4..e56c6d0b 100644 --- a/packages/elf-service/src/core_blocks/persistence/blocks.rs +++ b/packages/elf-service/src/core_blocks/persistence/blocks.rs @@ -5,7 +5,7 @@ use uuid::Uuid; use crate::{ Error, Result, - core_blocks::{rows::CoreBlockRow, types::PreparedUpsertRequest, validation}, + core_blocks::{rows::CoreBlockRow, types::prepared::PreparedUpsertRequest, validation}, }; pub(in crate::core_blocks) async fn insert_core_block( diff --git a/packages/elf-service/src/core_blocks/service.rs b/packages/elf-service/src/core_blocks/service.rs index 15cc25fa..015722d1 100644 --- a/packages/elf-service/src/core_blocks/service.rs +++ b/packages/elf-service/src/core_blocks/service.rs @@ -6,9 +6,9 @@ use crate::{ persistence::{self}, types::{ CoreBlockAttachRequest, CoreBlockAttachResponse, CoreBlockDetachRequest, - CoreBlockDetachResponse, CoreBlockEventInput, CoreBlockUpsertRequest, - CoreBlockUpsertResponse, CoreBlocksGetRequest, CoreBlocksResponse, - ELF_CORE_MEMORY_BLOCKS_SCHEMA_V1, + CoreBlockDetachResponse, CoreBlockUpsertRequest, CoreBlockUpsertResponse, + CoreBlocksGetRequest, CoreBlocksResponse, ELF_CORE_MEMORY_BLOCKS_SCHEMA_V1, + events::CoreBlockEventInput, }, validation::{self}, }, diff --git a/packages/elf-service/src/core_blocks/types.rs b/packages/elf-service/src/core_blocks/types.rs index 8161b4a4..66d9b4f5 100644 --- a/packages/elf-service/src/core_blocks/types.rs +++ b/packages/elf-service/src/core_blocks/types.rs @@ -1,283 +1,19 @@ -use serde::{Deserialize, Serialize}; -use serde_json::Value; -use time::OffsetDateTime; -use uuid::Uuid; - -/// Core memory blocks response schema identifier. -pub const ELF_CORE_MEMORY_BLOCKS_SCHEMA_V1: &str = "elf.core_memory_blocks/v1"; - -pub(super) const MAX_CORE_BLOCK_CONTENT_CHARS: usize = 2_000; - -/// Request payload for attached core block readback. -#[derive(Clone, Debug, Deserialize, Serialize)] -pub struct CoreBlocksGetRequest { - /// Tenant that owns the request. - pub tenant_id: String, - /// Project context for attachment lookup. - pub project_id: String, - /// Agent requesting attached blocks. - pub agent_id: String, - /// Read profile whose exact attachments should be returned. - pub read_profile: String, -} - -/// Response payload for attached core block readback. -#[derive(Clone, Debug, Deserialize, Serialize)] -pub struct CoreBlocksResponse { - /// Response schema identifier. - pub schema: String, - /// Tenant that owns the request. - pub tenant_id: String, - /// Project context for attachment lookup. - pub project_id: String, - /// Agent requesting attached blocks. - pub agent_id: String, - /// Read profile used for attachment lookup. - pub read_profile: String, - /// Attached core blocks visible to the caller. - pub items: Vec, -} - -/// One attached core memory block. -#[derive(Clone, Debug, Deserialize, Serialize)] -pub struct CoreBlockItem { - /// Core block identifier. - pub block_id: Uuid, - /// Active attachment identifier that made the block visible. - pub attachment_id: Uuid, - /// Tenant that owns the block. - pub tenant_id: String, - /// Project that owns the block. - pub project_id: String, - /// Agent that owns the block's scope. - pub agent_id: String, - /// Scope key for the block. - pub scope: String, - /// Stable block key. - pub key: String, - /// Human-readable block title. - pub title: String, - /// Small always-attached context payload. - pub content: String, - /// Structured source/provenance metadata for the block. - pub source_ref: Value, - /// Lifecycle status for the block. - pub status: String, - #[serde(with = "crate::time_serde")] - /// Last block update timestamp. - pub updated_at: OffsetDateTime, - #[serde(with = "crate::time_serde")] - /// Attachment creation timestamp. - pub attached_at: OffsetDateTime, - /// Agent that created the attachment. - pub attached_by_agent_id: String, - /// Append-only block and attachment audit events. - pub audit_history: Vec, -} - -/// One core block audit event. -#[derive(Clone, Debug, Deserialize, Serialize)] -pub struct CoreBlockAuditEvent { - /// Audit event identifier. - pub event_id: Uuid, - /// Block identifier affected by the event. - pub block_id: Uuid, - /// Attachment identifier affected by the event, when applicable. - pub attachment_id: Option, - /// Agent that performed the event. - pub actor_agent_id: String, - /// Event type. - pub event_type: String, - /// Attachment target agent, when applicable. - pub target_agent_id: Option, - /// Attachment read profile, when applicable. - pub read_profile: Option, - /// Optional previous state snapshot. - pub prev_snapshot: Option, - /// Optional new state snapshot. - pub new_snapshot: Option, - /// Human-readable event reason. - pub reason: String, - #[serde(with = "crate::time_serde")] - /// Event timestamp. - pub ts: OffsetDateTime, -} - -/// Request payload for creating or updating a core block through admin APIs. -#[derive(Clone, Debug, Deserialize, Serialize)] -pub struct CoreBlockUpsertRequest { - /// Tenant that owns the request. - pub tenant_id: String, - /// Project context for the block. - pub project_id: String, - /// Agent creating or updating the block. - pub agent_id: String, - /// Existing block id to update. Omit to create. - pub block_id: Option, - /// Scope key for the block. - pub scope: String, - /// Stable block key. - pub key: String, - /// Human-readable block title. - pub title: String, - /// Small always-attached context payload. - pub content: String, - /// Structured source/provenance metadata for the block. - pub source_ref: Value, - /// Optional audit reason. - pub reason: Option, -} - -/// Response payload for core block creation or update. -#[derive(Clone, Debug, Deserialize, Serialize)] -pub struct CoreBlockUpsertResponse { - /// Stored block record. - pub block: CoreBlockRecord, -} - -/// Core block record returned by admin mutation APIs. -#[derive(Clone, Debug, Deserialize, Serialize)] -pub struct CoreBlockRecord { - /// Core block identifier. - pub block_id: Uuid, - /// Tenant that owns the block. - pub tenant_id: String, - /// Project that owns the block. - pub project_id: String, - /// Agent that owns the block's scope. - pub agent_id: String, - /// Scope key for the block. - pub scope: String, - /// Stable block key. - pub key: String, - /// Human-readable block title. - pub title: String, - /// Small always-attached context payload. - pub content: String, - /// Structured source/provenance metadata for the block. - pub source_ref: Value, - /// Lifecycle status for the block. - pub status: String, - #[serde(with = "crate::time_serde")] - /// Creation timestamp. - pub created_at: OffsetDateTime, - #[serde(with = "crate::time_serde")] - /// Last update timestamp. - pub updated_at: OffsetDateTime, -} - -/// Request payload for attaching a block to an agent/read-profile pair. -#[derive(Clone, Debug, Deserialize, Serialize)] -pub struct CoreBlockAttachRequest { - /// Tenant that owns the request. - pub tenant_id: String, - /// Project context for the attachment. - pub project_id: String, - /// Agent creating the attachment. - pub agent_id: String, - /// Block to attach. - pub block_id: Uuid, - /// Target agent that should receive the block. - pub target_agent_id: String, - /// Exact read profile for the attachment. - pub read_profile: String, - /// Optional audit reason. - pub reason: Option, -} - -/// Response payload for attaching a core block. -#[derive(Clone, Debug, Deserialize, Serialize)] -pub struct CoreBlockAttachResponse { - /// Attachment identifier. - pub attachment_id: Uuid, - /// Block identifier. - pub block_id: Uuid, - /// Target agent for the attachment. - pub target_agent_id: String, - /// Exact read profile for the attachment. - pub read_profile: String, - /// Agent that created the attachment. - pub attached_by_agent_id: String, - #[serde(with = "crate::time_serde")] - /// Attachment timestamp. - pub attached_at: OffsetDateTime, -} - -/// Request payload for detaching a block attachment. -#[derive(Clone, Debug, Deserialize, Serialize)] -pub struct CoreBlockDetachRequest { - /// Tenant that owns the request. - pub tenant_id: String, - /// Project context for the attachment. - pub project_id: String, - /// Agent detaching the block. - pub agent_id: String, - /// Attachment to detach. - pub attachment_id: Uuid, - /// Optional audit reason. - pub reason: Option, -} - -/// Response payload for detaching a core block. -#[derive(Clone, Debug, Deserialize, Serialize)] -pub struct CoreBlockDetachResponse { - /// Attachment identifier. - pub attachment_id: Uuid, - /// Whether an active attachment was detached. - pub detached: bool, -} - -pub(super) struct PreparedGetRequest { - pub(super) tenant_id: String, - pub(super) project_id: String, - pub(super) agent_id: String, - pub(super) read_profile: String, - pub(super) allowed_scopes: Vec, -} - -pub(super) struct PreparedUpsertRequest { - pub(super) tenant_id: String, - pub(super) project_id: String, - pub(super) agent_id: String, - pub(super) block_id: Option, - pub(super) scope: String, - pub(super) key: String, - pub(super) title: String, - pub(super) content: String, - pub(super) source_ref: Value, - pub(super) reason: String, -} - -pub(super) struct PreparedAttachRequest { - pub(super) tenant_id: String, - pub(super) project_id: String, - pub(super) agent_id: String, - pub(super) block_id: Uuid, - pub(super) target_agent_id: String, - pub(super) read_profile: String, - pub(super) allowed_scopes: Vec, - pub(super) reason: String, -} - -pub(super) struct PreparedDetachRequest { - pub(super) tenant_id: String, - pub(super) project_id: String, - pub(super) agent_id: String, - pub(super) attachment_id: Uuid, - pub(super) reason: String, -} - -pub(super) struct CoreBlockEventInput<'a> { - pub(super) block_id: Uuid, - pub(super) attachment_id: Option, - pub(super) tenant_id: &'a str, - pub(super) project_id: &'a str, - pub(super) actor_agent_id: &'a str, - pub(super) event_type: &'a str, - pub(super) target_agent_id: Option<&'a str>, - pub(super) read_profile: Option<&'a str>, - pub(super) prev_snapshot: Option, - pub(super) new_snapshot: Option, - pub(super) reason: &'a str, - pub(super) ts: OffsetDateTime, -} +pub(in crate::core_blocks) mod constants; +pub(in crate::core_blocks) mod events; +pub(in crate::core_blocks) mod prepared; + +mod requests; +mod responses; + +pub use self::{ + constants::ELF_CORE_MEMORY_BLOCKS_SCHEMA_V1, + events::CoreBlockAuditEvent, + requests::{ + CoreBlockAttachRequest, CoreBlockDetachRequest, CoreBlockUpsertRequest, + CoreBlocksGetRequest, + }, + responses::{ + CoreBlockAttachResponse, CoreBlockDetachResponse, CoreBlockItem, CoreBlockRecord, + CoreBlockUpsertResponse, CoreBlocksResponse, + }, +}; diff --git a/packages/elf-service/src/core_blocks/types/constants.rs b/packages/elf-service/src/core_blocks/types/constants.rs new file mode 100644 index 00000000..a2847e6c --- /dev/null +++ b/packages/elf-service/src/core_blocks/types/constants.rs @@ -0,0 +1,4 @@ +/// Core memory blocks response schema identifier. +pub const ELF_CORE_MEMORY_BLOCKS_SCHEMA_V1: &str = "elf.core_memory_blocks/v1"; + +pub(in crate::core_blocks) const MAX_CORE_BLOCK_CONTENT_CHARS: usize = 2_000; diff --git a/packages/elf-service/src/core_blocks/types/events.rs b/packages/elf-service/src/core_blocks/types/events.rs new file mode 100644 index 00000000..b40fe159 --- /dev/null +++ b/packages/elf-service/src/core_blocks/types/events.rs @@ -0,0 +1,47 @@ +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use time::OffsetDateTime; +use uuid::Uuid; + +/// One core block audit event. +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct CoreBlockAuditEvent { + /// Audit event identifier. + pub event_id: Uuid, + /// Block identifier affected by the event. + pub block_id: Uuid, + /// Attachment identifier affected by the event, when applicable. + pub attachment_id: Option, + /// Agent that performed the event. + pub actor_agent_id: String, + /// Event type. + pub event_type: String, + /// Attachment target agent, when applicable. + pub target_agent_id: Option, + /// Attachment read profile, when applicable. + pub read_profile: Option, + /// Optional previous state snapshot. + pub prev_snapshot: Option, + /// Optional new state snapshot. + pub new_snapshot: Option, + /// Human-readable event reason. + pub reason: String, + #[serde(with = "crate::time_serde")] + /// Event timestamp. + pub ts: OffsetDateTime, +} + +pub(in crate::core_blocks) struct CoreBlockEventInput<'a> { + pub(in crate::core_blocks) block_id: Uuid, + pub(in crate::core_blocks) attachment_id: Option, + pub(in crate::core_blocks) tenant_id: &'a str, + pub(in crate::core_blocks) project_id: &'a str, + pub(in crate::core_blocks) actor_agent_id: &'a str, + pub(in crate::core_blocks) event_type: &'a str, + pub(in crate::core_blocks) target_agent_id: Option<&'a str>, + pub(in crate::core_blocks) read_profile: Option<&'a str>, + pub(in crate::core_blocks) prev_snapshot: Option, + pub(in crate::core_blocks) new_snapshot: Option, + pub(in crate::core_blocks) reason: &'a str, + pub(in crate::core_blocks) ts: OffsetDateTime, +} diff --git a/packages/elf-service/src/core_blocks/types/prepared.rs b/packages/elf-service/src/core_blocks/types/prepared.rs new file mode 100644 index 00000000..58c3f4e4 --- /dev/null +++ b/packages/elf-service/src/core_blocks/types/prepared.rs @@ -0,0 +1,42 @@ +use serde_json::Value; +use uuid::Uuid; + +pub(in crate::core_blocks) struct PreparedGetRequest { + pub(in crate::core_blocks) tenant_id: String, + pub(in crate::core_blocks) project_id: String, + pub(in crate::core_blocks) agent_id: String, + pub(in crate::core_blocks) read_profile: String, + pub(in crate::core_blocks) allowed_scopes: Vec, +} + +pub(in crate::core_blocks) struct PreparedUpsertRequest { + pub(in crate::core_blocks) tenant_id: String, + pub(in crate::core_blocks) project_id: String, + pub(in crate::core_blocks) agent_id: String, + pub(in crate::core_blocks) block_id: Option, + pub(in crate::core_blocks) scope: String, + pub(in crate::core_blocks) key: String, + pub(in crate::core_blocks) title: String, + pub(in crate::core_blocks) content: String, + pub(in crate::core_blocks) source_ref: Value, + pub(in crate::core_blocks) reason: String, +} + +pub(in crate::core_blocks) struct PreparedAttachRequest { + pub(in crate::core_blocks) tenant_id: String, + pub(in crate::core_blocks) project_id: String, + pub(in crate::core_blocks) agent_id: String, + pub(in crate::core_blocks) block_id: Uuid, + pub(in crate::core_blocks) target_agent_id: String, + pub(in crate::core_blocks) read_profile: String, + pub(in crate::core_blocks) allowed_scopes: Vec, + pub(in crate::core_blocks) reason: String, +} + +pub(in crate::core_blocks) struct PreparedDetachRequest { + pub(in crate::core_blocks) tenant_id: String, + pub(in crate::core_blocks) project_id: String, + pub(in crate::core_blocks) agent_id: String, + pub(in crate::core_blocks) attachment_id: Uuid, + pub(in crate::core_blocks) reason: String, +} diff --git a/packages/elf-service/src/core_blocks/types/requests.rs b/packages/elf-service/src/core_blocks/types/requests.rs new file mode 100644 index 00000000..aee48f53 --- /dev/null +++ b/packages/elf-service/src/core_blocks/types/requests.rs @@ -0,0 +1,75 @@ +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use uuid::Uuid; + +/// Request payload for attached core block readback. +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct CoreBlocksGetRequest { + /// Tenant that owns the request. + pub tenant_id: String, + /// Project context for attachment lookup. + pub project_id: String, + /// Agent requesting attached blocks. + pub agent_id: String, + /// Read profile whose exact attachments should be returned. + pub read_profile: String, +} + +/// Request payload for creating or updating a core block through admin APIs. +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct CoreBlockUpsertRequest { + /// Tenant that owns the request. + pub tenant_id: String, + /// Project context for the block. + pub project_id: String, + /// Agent creating or updating the block. + pub agent_id: String, + /// Existing block id to update. Omit to create. + pub block_id: Option, + /// Scope key for the block. + pub scope: String, + /// Stable block key. + pub key: String, + /// Human-readable block title. + pub title: String, + /// Small always-attached context payload. + pub content: String, + /// Structured source/provenance metadata for the block. + pub source_ref: Value, + /// Optional audit reason. + pub reason: Option, +} + +/// Request payload for attaching a block to an agent/read-profile pair. +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct CoreBlockAttachRequest { + /// Tenant that owns the request. + pub tenant_id: String, + /// Project context for the attachment. + pub project_id: String, + /// Agent creating the attachment. + pub agent_id: String, + /// Block to attach. + pub block_id: Uuid, + /// Target agent that should receive the block. + pub target_agent_id: String, + /// Exact read profile for the attachment. + pub read_profile: String, + /// Optional audit reason. + pub reason: Option, +} + +/// Request payload for detaching a block attachment. +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct CoreBlockDetachRequest { + /// Tenant that owns the request. + pub tenant_id: String, + /// Project context for the attachment. + pub project_id: String, + /// Agent detaching the block. + pub agent_id: String, + /// Attachment to detach. + pub attachment_id: Uuid, + /// Optional audit reason. + pub reason: Option, +} diff --git a/packages/elf-service/src/core_blocks/types/responses.rs b/packages/elf-service/src/core_blocks/types/responses.rs new file mode 100644 index 00000000..d6fd6fd7 --- /dev/null +++ b/packages/elf-service/src/core_blocks/types/responses.rs @@ -0,0 +1,125 @@ +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use time::OffsetDateTime; +use uuid::Uuid; + +use crate::core_blocks::types::events::CoreBlockAuditEvent; + +/// Response payload for attached core block readback. +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct CoreBlocksResponse { + /// Response schema identifier. + pub schema: String, + /// Tenant that owns the request. + pub tenant_id: String, + /// Project context for attachment lookup. + pub project_id: String, + /// Agent requesting attached blocks. + pub agent_id: String, + /// Read profile used for attachment lookup. + pub read_profile: String, + /// Attached core blocks visible to the caller. + pub items: Vec, +} + +/// One attached core memory block. +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct CoreBlockItem { + /// Core block identifier. + pub block_id: Uuid, + /// Active attachment identifier that made the block visible. + pub attachment_id: Uuid, + /// Tenant that owns the block. + pub tenant_id: String, + /// Project that owns the block. + pub project_id: String, + /// Agent that owns the block's scope. + pub agent_id: String, + /// Scope key for the block. + pub scope: String, + /// Stable block key. + pub key: String, + /// Human-readable block title. + pub title: String, + /// Small always-attached context payload. + pub content: String, + /// Structured source/provenance metadata for the block. + pub source_ref: Value, + /// Lifecycle status for the block. + pub status: String, + #[serde(with = "crate::time_serde")] + /// Last block update timestamp. + pub updated_at: OffsetDateTime, + #[serde(with = "crate::time_serde")] + /// Attachment creation timestamp. + pub attached_at: OffsetDateTime, + /// Agent that created the attachment. + pub attached_by_agent_id: String, + /// Append-only block and attachment audit events. + pub audit_history: Vec, +} + +/// Response payload for core block creation or update. +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct CoreBlockUpsertResponse { + /// Stored block record. + pub block: CoreBlockRecord, +} + +/// Core block record returned by admin mutation APIs. +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct CoreBlockRecord { + /// Core block identifier. + pub block_id: Uuid, + /// Tenant that owns the block. + pub tenant_id: String, + /// Project that owns the block. + pub project_id: String, + /// Agent that owns the block's scope. + pub agent_id: String, + /// Scope key for the block. + pub scope: String, + /// Stable block key. + pub key: String, + /// Human-readable block title. + pub title: String, + /// Small always-attached context payload. + pub content: String, + /// Structured source/provenance metadata for the block. + pub source_ref: Value, + /// Lifecycle status for the block. + pub status: String, + #[serde(with = "crate::time_serde")] + /// Creation timestamp. + pub created_at: OffsetDateTime, + #[serde(with = "crate::time_serde")] + /// Last update timestamp. + pub updated_at: OffsetDateTime, +} + +/// Response payload for attaching a core block. +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct CoreBlockAttachResponse { + /// Attachment identifier. + pub attachment_id: Uuid, + /// Block identifier. + pub block_id: Uuid, + /// Target agent for the attachment. + pub target_agent_id: String, + /// Exact read profile for the attachment. + pub read_profile: String, + /// Agent that created the attachment. + pub attached_by_agent_id: String, + #[serde(with = "crate::time_serde")] + /// Attachment timestamp. + pub attached_at: OffsetDateTime, +} + +/// Response payload for detaching a core block. +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct CoreBlockDetachResponse { + /// Attachment identifier. + pub attachment_id: Uuid, + /// Whether an active attachment was detached. + pub detached: bool, +} diff --git a/packages/elf-service/src/core_blocks/validation.rs b/packages/elf-service/src/core_blocks/validation.rs index 3258808c..12933e69 100644 --- a/packages/elf-service/src/core_blocks/validation.rs +++ b/packages/elf-service/src/core_blocks/validation.rs @@ -1,263 +1,12 @@ -use std::collections::HashSet; - -use serde_json::Value; - -use crate::{ - Error, Result, - access::{self, ORG_PROJECT_ID}, - core_blocks::{ - rows::{CoreBlockAttachmentRow, CoreBlockJoinedRow, CoreBlockRow}, - types::{ - CoreBlockAttachRequest, CoreBlockDetachRequest, CoreBlockUpsertRequest, - CoreBlocksGetRequest, MAX_CORE_BLOCK_CONTENT_CHARS, PreparedAttachRequest, - PreparedDetachRequest, PreparedGetRequest, PreparedUpsertRequest, - }, +mod normalize; +mod prepare; +mod snapshots; +mod visibility; + +pub(super) use self::{ + prepare::{ + prepare_attach_request, prepare_detach_request, prepare_get_request, prepare_upsert_request, }, - search, + snapshots::{attachment_snapshot, block_snapshot}, + visibility::{block_read_allowed, filter_visible_rows}, }; -use elf_config::Config; -use elf_domain::english_gate::{self, EnglishGateKind}; - -pub(super) fn prepare_get_request( - cfg: &Config, - req: CoreBlocksGetRequest, -) -> Result { - let tenant_id = normalize_required(req.tenant_id.as_str(), "tenant_id")?; - let project_id = normalize_required(req.project_id.as_str(), "project_id")?; - let agent_id = normalize_required(req.agent_id.as_str(), "agent_id")?; - let read_profile = normalize_required(req.read_profile.as_str(), "read_profile")?; - let allowed_scopes = search::resolve_read_profile_scopes(cfg, read_profile.as_str())?; - - Ok(PreparedGetRequest { tenant_id, project_id, agent_id, read_profile, allowed_scopes }) -} - -pub(super) fn prepare_upsert_request( - cfg: &Config, - req: CoreBlockUpsertRequest, -) -> Result { - let tenant_id = normalize_required(req.tenant_id.as_str(), "tenant_id")?; - let requested_project_id = normalize_required(req.project_id.as_str(), "project_id")?; - let agent_id = normalize_required(req.agent_id.as_str(), "agent_id")?; - let scope = normalize_required(req.scope.as_str(), "scope")?; - let key = normalize_required(req.key.as_str(), "key")?; - let title = normalize_required(req.title.as_str(), "title")?; - let content = normalize_required(req.content.as_str(), "content")?; - let reason = req - .reason - .as_deref() - .map(|value| normalize_required(value, "reason")) - .transpose()? - .unwrap_or_else(|| "core block upsert".to_string()); - let project_id = - if scope == "org_shared" { ORG_PROJECT_ID.to_string() } else { requested_project_id }; - - validate_write_scope(cfg, scope.as_str())?; - validate_english(key.as_str(), EnglishGateKind::Identifier, "$.key")?; - validate_english(title.as_str(), EnglishGateKind::NaturalLanguage, "$.title")?; - validate_english(content.as_str(), EnglishGateKind::NaturalLanguage, "$.content")?; - validate_source_ref(&req.source_ref)?; - - if content.chars().count() > MAX_CORE_BLOCK_CONTENT_CHARS { - return Err(Error::InvalidRequest { message: "content is too long.".to_string() }); - } - - Ok(PreparedUpsertRequest { - tenant_id, - project_id, - agent_id, - block_id: req.block_id, - scope, - key, - title, - content, - source_ref: req.source_ref, - reason, - }) -} - -pub(super) fn prepare_attach_request( - cfg: &Config, - req: CoreBlockAttachRequest, -) -> Result { - let tenant_id = normalize_required(req.tenant_id.as_str(), "tenant_id")?; - let project_id = normalize_required(req.project_id.as_str(), "project_id")?; - let agent_id = normalize_required(req.agent_id.as_str(), "agent_id")?; - let target_agent_id = normalize_required(req.target_agent_id.as_str(), "target_agent_id")?; - let read_profile = normalize_required(req.read_profile.as_str(), "read_profile")?; - let allowed_scopes = search::resolve_read_profile_scopes(cfg, read_profile.as_str())?; - let reason = req - .reason - .as_deref() - .map(|value| normalize_required(value, "reason")) - .transpose()? - .unwrap_or_else(|| "core block attachment".to_string()); - - validate_english(target_agent_id.as_str(), EnglishGateKind::Identifier, "$.target_agent_id")?; - - Ok(PreparedAttachRequest { - tenant_id, - project_id, - agent_id, - block_id: req.block_id, - target_agent_id, - read_profile, - allowed_scopes, - reason, - }) -} - -pub(super) fn prepare_detach_request(req: CoreBlockDetachRequest) -> Result { - let tenant_id = normalize_required(req.tenant_id.as_str(), "tenant_id")?; - let project_id = normalize_required(req.project_id.as_str(), "project_id")?; - let agent_id = normalize_required(req.agent_id.as_str(), "agent_id")?; - let reason = req - .reason - .as_deref() - .map(|value| normalize_required(value, "reason")) - .transpose()? - .unwrap_or_else(|| "core block detach".to_string()); - - Ok(PreparedDetachRequest { - tenant_id, - project_id, - agent_id, - attachment_id: req.attachment_id, - reason, - }) -} - -pub(super) fn filter_visible_rows( - rows: Vec, - allowed_scopes: &[String], - shared_grants: &HashSet, -) -> Vec { - rows.into_iter() - .filter(|row| { - let block = CoreBlockRow { - block_id: row.block_id, - tenant_id: row.tenant_id.clone(), - project_id: row.project_id.clone(), - agent_id: row.agent_id.clone(), - scope: row.scope.clone(), - key: row.key.clone(), - title: row.title.clone(), - content: row.content.clone(), - source_ref: row.source_ref.clone(), - status: row.status.clone(), - created_at: row.created_at, - updated_at: row.updated_at, - }; - - block_read_allowed( - &block, - row.attachment_agent_id.as_str(), - allowed_scopes, - shared_grants, - ) - }) - .collect() -} - -pub(super) fn block_read_allowed( - block: &CoreBlockRow, - requester_agent_id: &str, - allowed_scopes: &[String], - shared_grants: &HashSet, -) -> bool { - if block.status != "active" { - return false; - } - if !allowed_scopes.iter().any(|scope| scope == &block.scope) { - return false; - } - if block.scope == "agent_private" { - return block.agent_id == requester_agent_id; - } - if !matches!(block.scope.as_str(), "project_shared" | "org_shared") { - return false; - } - if block.agent_id == requester_agent_id { - return true; - } - - shared_grants.contains(&access::SharedSpaceGrantKey { - scope: block.scope.clone(), - space_owner_agent_id: block.agent_id.clone(), - }) -} - -pub(super) fn block_snapshot(block: &CoreBlockRow) -> Value { - serde_json::json!({ - "block_id": block.block_id, - "tenant_id": block.tenant_id, - "project_id": block.project_id, - "agent_id": block.agent_id, - "scope": block.scope, - "key": block.key, - "title": block.title, - "content": block.content, - "source_ref": block.source_ref, - "status": block.status, - "created_at": block.created_at, - "updated_at": block.updated_at, - }) -} - -pub(super) fn attachment_snapshot(attachment: &CoreBlockAttachmentRow) -> Value { - serde_json::json!({ - "attachment_id": attachment.attachment_id, - "block_id": attachment.block_id, - "tenant_id": attachment.tenant_id, - "project_id": attachment.project_id, - "agent_id": attachment.agent_id, - "read_profile": attachment.read_profile, - "attached_by_agent_id": attachment.attached_by_agent_id, - "attached_at": attachment.attached_at, - "detached_by_agent_id": attachment.detached_by_agent_id, - "detached_at": attachment.detached_at, - }) -} - -fn normalize_required(raw: &str, field: &str) -> Result { - let trimmed = raw.trim(); - - if trimmed.is_empty() { - return Err(Error::InvalidRequest { message: format!("{field} is required.") }); - } - - Ok(trimmed.to_string()) -} - -fn validate_write_scope(cfg: &Config, scope: &str) -> Result<()> { - if !cfg.scopes.allowed.iter().any(|allowed| allowed == scope) { - return Err(Error::ScopeDenied { message: "Scope is not allowed.".to_string() }); - } - - let write_allowed = match scope { - "agent_private" => cfg.scopes.write_allowed.agent_private, - "project_shared" => cfg.scopes.write_allowed.project_shared, - "org_shared" => cfg.scopes.write_allowed.org_shared, - _ => false, - }; - - if !write_allowed { - return Err(Error::ScopeDenied { message: "Scope is not allowed.".to_string() }); - } - - Ok(()) -} - -fn validate_english(input: &str, kind: EnglishGateKind, field: &str) -> Result<()> { - english_gate::english_gate(input, kind) - .map_err(|_| Error::NonEnglishInput { field: field.to_string() }) -} - -fn validate_source_ref(source_ref: &Value) -> Result<()> { - if !source_ref.is_object() { - return Err(Error::InvalidRequest { - message: "source_ref must be a JSON object.".to_string(), - }); - } - - Ok(()) -} diff --git a/packages/elf-service/src/core_blocks/validation/normalize.rs b/packages/elf-service/src/core_blocks/validation/normalize.rs new file mode 100644 index 00000000..a39b5a3b --- /dev/null +++ b/packages/elf-service/src/core_blocks/validation/normalize.rs @@ -0,0 +1,49 @@ +use serde_json::Value; + +use crate::{Error, Result}; +use elf_config::Config; +use elf_domain::english_gate::{self, EnglishGateKind}; + +pub(super) fn normalize_required(raw: &str, field: &str) -> Result { + let trimmed = raw.trim(); + + if trimmed.is_empty() { + return Err(Error::InvalidRequest { message: format!("{field} is required.") }); + } + + Ok(trimmed.to_string()) +} + +pub(super) fn validate_write_scope(cfg: &Config, scope: &str) -> Result<()> { + if !cfg.scopes.allowed.iter().any(|allowed| allowed == scope) { + return Err(Error::ScopeDenied { message: "Scope is not allowed.".to_string() }); + } + + let write_allowed = match scope { + "agent_private" => cfg.scopes.write_allowed.agent_private, + "project_shared" => cfg.scopes.write_allowed.project_shared, + "org_shared" => cfg.scopes.write_allowed.org_shared, + _ => false, + }; + + if !write_allowed { + return Err(Error::ScopeDenied { message: "Scope is not allowed.".to_string() }); + } + + Ok(()) +} + +pub(super) fn validate_english(input: &str, kind: EnglishGateKind, field: &str) -> Result<()> { + english_gate::english_gate(input, kind) + .map_err(|_| Error::NonEnglishInput { field: field.to_string() }) +} + +pub(super) fn validate_source_ref(source_ref: &Value) -> Result<()> { + if !source_ref.is_object() { + return Err(Error::InvalidRequest { + message: "source_ref must be a JSON object.".to_string(), + }); + } + + Ok(()) +} diff --git a/packages/elf-service/src/core_blocks/validation/prepare.rs b/packages/elf-service/src/core_blocks/validation/prepare.rs new file mode 100644 index 00000000..79e491ec --- /dev/null +++ b/packages/elf-service/src/core_blocks/validation/prepare.rs @@ -0,0 +1,135 @@ +use crate::{ + Error, Result, + access::ORG_PROJECT_ID, + core_blocks::{ + types::{ + CoreBlockAttachRequest, CoreBlockDetachRequest, CoreBlockUpsertRequest, + CoreBlocksGetRequest, + constants::MAX_CORE_BLOCK_CONTENT_CHARS, + prepared::{ + PreparedAttachRequest, PreparedDetachRequest, PreparedGetRequest, + PreparedUpsertRequest, + }, + }, + validation::normalize, + }, + search, +}; +use elf_config::Config; +use elf_domain::english_gate::EnglishGateKind; + +pub(in crate::core_blocks) fn prepare_get_request( + cfg: &Config, + req: CoreBlocksGetRequest, +) -> Result { + let tenant_id = normalize::normalize_required(req.tenant_id.as_str(), "tenant_id")?; + let project_id = normalize::normalize_required(req.project_id.as_str(), "project_id")?; + let agent_id = normalize::normalize_required(req.agent_id.as_str(), "agent_id")?; + let read_profile = normalize::normalize_required(req.read_profile.as_str(), "read_profile")?; + let allowed_scopes = search::resolve_read_profile_scopes(cfg, read_profile.as_str())?; + + Ok(PreparedGetRequest { tenant_id, project_id, agent_id, read_profile, allowed_scopes }) +} + +pub(in crate::core_blocks) fn prepare_upsert_request( + cfg: &Config, + req: CoreBlockUpsertRequest, +) -> Result { + let tenant_id = normalize::normalize_required(req.tenant_id.as_str(), "tenant_id")?; + let requested_project_id = + normalize::normalize_required(req.project_id.as_str(), "project_id")?; + let agent_id = normalize::normalize_required(req.agent_id.as_str(), "agent_id")?; + let scope = normalize::normalize_required(req.scope.as_str(), "scope")?; + let key = normalize::normalize_required(req.key.as_str(), "key")?; + let title = normalize::normalize_required(req.title.as_str(), "title")?; + let content = normalize::normalize_required(req.content.as_str(), "content")?; + let reason = req + .reason + .as_deref() + .map(|value| normalize::normalize_required(value, "reason")) + .transpose()? + .unwrap_or_else(|| "core block upsert".to_string()); + let project_id = + if scope == "org_shared" { ORG_PROJECT_ID.to_string() } else { requested_project_id }; + + normalize::validate_write_scope(cfg, scope.as_str())?; + normalize::validate_english(key.as_str(), EnglishGateKind::Identifier, "$.key")?; + normalize::validate_english(title.as_str(), EnglishGateKind::NaturalLanguage, "$.title")?; + normalize::validate_english(content.as_str(), EnglishGateKind::NaturalLanguage, "$.content")?; + normalize::validate_source_ref(&req.source_ref)?; + + if content.chars().count() > MAX_CORE_BLOCK_CONTENT_CHARS { + return Err(Error::InvalidRequest { message: "content is too long.".to_string() }); + } + + Ok(PreparedUpsertRequest { + tenant_id, + project_id, + agent_id, + block_id: req.block_id, + scope, + key, + title, + content, + source_ref: req.source_ref, + reason, + }) +} + +pub(in crate::core_blocks) fn prepare_attach_request( + cfg: &Config, + req: CoreBlockAttachRequest, +) -> Result { + let tenant_id = normalize::normalize_required(req.tenant_id.as_str(), "tenant_id")?; + let project_id = normalize::normalize_required(req.project_id.as_str(), "project_id")?; + let agent_id = normalize::normalize_required(req.agent_id.as_str(), "agent_id")?; + let target_agent_id = + normalize::normalize_required(req.target_agent_id.as_str(), "target_agent_id")?; + let read_profile = normalize::normalize_required(req.read_profile.as_str(), "read_profile")?; + let allowed_scopes = search::resolve_read_profile_scopes(cfg, read_profile.as_str())?; + let reason = req + .reason + .as_deref() + .map(|value| normalize::normalize_required(value, "reason")) + .transpose()? + .unwrap_or_else(|| "core block attachment".to_string()); + + normalize::validate_english( + target_agent_id.as_str(), + EnglishGateKind::Identifier, + "$.target_agent_id", + )?; + + Ok(PreparedAttachRequest { + tenant_id, + project_id, + agent_id, + block_id: req.block_id, + target_agent_id, + read_profile, + allowed_scopes, + reason, + }) +} + +pub(in crate::core_blocks) fn prepare_detach_request( + req: CoreBlockDetachRequest, +) -> Result { + let tenant_id = normalize::normalize_required(req.tenant_id.as_str(), "tenant_id")?; + let project_id = normalize::normalize_required(req.project_id.as_str(), "project_id")?; + let agent_id = normalize::normalize_required(req.agent_id.as_str(), "agent_id")?; + let reason = req + .reason + .as_deref() + .map(|value| normalize::normalize_required(value, "reason")) + .transpose()? + .unwrap_or_else(|| "core block detach".to_string()); + + Ok(PreparedDetachRequest { + tenant_id, + project_id, + agent_id, + attachment_id: req.attachment_id, + reason, + }) +} diff --git a/packages/elf-service/src/core_blocks/validation/snapshots.rs b/packages/elf-service/src/core_blocks/validation/snapshots.rs new file mode 100644 index 00000000..17f2553a --- /dev/null +++ b/packages/elf-service/src/core_blocks/validation/snapshots.rs @@ -0,0 +1,35 @@ +use serde_json::Value; + +use crate::core_blocks::rows::{CoreBlockAttachmentRow, CoreBlockRow}; + +pub(in crate::core_blocks) fn block_snapshot(block: &CoreBlockRow) -> Value { + serde_json::json!({ + "block_id": block.block_id, + "tenant_id": block.tenant_id, + "project_id": block.project_id, + "agent_id": block.agent_id, + "scope": block.scope, + "key": block.key, + "title": block.title, + "content": block.content, + "source_ref": block.source_ref, + "status": block.status, + "created_at": block.created_at, + "updated_at": block.updated_at, + }) +} + +pub(in crate::core_blocks) fn attachment_snapshot(attachment: &CoreBlockAttachmentRow) -> Value { + serde_json::json!({ + "attachment_id": attachment.attachment_id, + "block_id": attachment.block_id, + "tenant_id": attachment.tenant_id, + "project_id": attachment.project_id, + "agent_id": attachment.agent_id, + "read_profile": attachment.read_profile, + "attached_by_agent_id": attachment.attached_by_agent_id, + "attached_at": attachment.attached_at, + "detached_by_agent_id": attachment.detached_by_agent_id, + "detached_at": attachment.detached_at, + }) +} diff --git a/packages/elf-service/src/core_blocks/validation/visibility.rs b/packages/elf-service/src/core_blocks/validation/visibility.rs new file mode 100644 index 00000000..b78efd46 --- /dev/null +++ b/packages/elf-service/src/core_blocks/validation/visibility.rs @@ -0,0 +1,66 @@ +use std::collections::HashSet; + +use crate::{ + access, + core_blocks::rows::{CoreBlockJoinedRow, CoreBlockRow}, +}; + +pub(in crate::core_blocks) fn filter_visible_rows( + rows: Vec, + allowed_scopes: &[String], + shared_grants: &HashSet, +) -> Vec { + rows.into_iter() + .filter(|row| { + let block = CoreBlockRow { + block_id: row.block_id, + tenant_id: row.tenant_id.clone(), + project_id: row.project_id.clone(), + agent_id: row.agent_id.clone(), + scope: row.scope.clone(), + key: row.key.clone(), + title: row.title.clone(), + content: row.content.clone(), + source_ref: row.source_ref.clone(), + status: row.status.clone(), + created_at: row.created_at, + updated_at: row.updated_at, + }; + + block_read_allowed( + &block, + row.attachment_agent_id.as_str(), + allowed_scopes, + shared_grants, + ) + }) + .collect() +} + +pub(in crate::core_blocks) fn block_read_allowed( + block: &CoreBlockRow, + requester_agent_id: &str, + allowed_scopes: &[String], + shared_grants: &HashSet, +) -> bool { + if block.status != "active" { + return false; + } + if !allowed_scopes.iter().any(|scope| scope == &block.scope) { + return false; + } + if block.scope == "agent_private" { + return block.agent_id == requester_agent_id; + } + if !matches!(block.scope.as_str(), "project_shared" | "org_shared") { + return false; + } + if block.agent_id == requester_agent_id { + return true; + } + + shared_grants.contains(&access::SharedSpaceGrantKey { + scope: block.scope.clone(), + space_owner_agent_id: block.agent_id.clone(), + }) +} diff --git a/packages/elf-service/src/docs/source_capture/helpers.rs b/packages/elf-service/src/docs/source_capture/helpers.rs index e1d097be..00d1258c 100644 --- a/packages/elf-service/src/docs/source_capture/helpers.rs +++ b/packages/elf-service/src/docs/source_capture/helpers.rs @@ -1,175 +1,13 @@ -use crate::docs::{ - DOC_SOURCE_SPAN_SCHEMA_V1, DocType, DocsSourceSpanRef, Error, Map, OffsetDateTime, Result, - Rfc3339, Value, WritePolicyAudit, source_capture, +mod identity; +mod spans; +mod time; + +pub(super) use self::{ + identity::{source_identity_value, source_origin, source_type}, + spans::{source_policy_spans, source_spans_to_value}, + time::format_timestamp, }; -pub(super) fn source_policy_spans( - raw_content_hash: &str, - audit: Option<&WritePolicyAudit>, -) -> Vec { - let Some(audit) = audit else { - return Vec::new(); - }; - let mut spans = Vec::with_capacity(audit.exclusions.len() + audit.redactions.len()); - - for span in &audit.exclusions { - spans.push(policy_span_ref( - raw_content_hash, - span.start, - span.end, - "excluded", - "WRITE_POLICY_EXCLUSION", - )); - } - for redaction in &audit.redactions { - spans.push(policy_span_ref( - raw_content_hash, - redaction.span.start, - redaction.span.end, - "redacted", - "WRITE_POLICY_REDACTION", - )); - } - - spans -} - -pub(super) fn source_spans_to_value(spans: &[DocsSourceSpanRef]) -> Result { - serde_json::to_value(spans).map_err(|err| Error::InvalidRequest { - message: format!("failed to encode source span metadata: {err}"), - }) -} - -pub(super) fn source_type(source_ref: &Map, doc_type: DocType) -> String { - source_ref - .get("source_kind") - .and_then(Value::as_str) - .filter(|value| !value.trim().is_empty()) - .unwrap_or_else(|| doc_type.as_str()) - .to_string() -} - -pub(super) fn source_origin(source_ref: &Map, doc_type: DocType) -> String { - if let Some(origin) = source_ref_string(source_ref, "canonical_uri") - .or_else(|| source_ref_string(source_ref, "url")) - .or_else(|| source_ref_string(source_ref, "uri")) - { - return origin.to_string(); - } - - match doc_type { - DocType::Chat => source_ref_string(source_ref, "message_id") - .map(|message_id| { - format!( - "thread:{}#{}", - source_ref_string(source_ref, "thread_id").unwrap_or("unknown"), - message_id - ) - }) - .unwrap_or_else(|| { - format!( - "thread:{}", - source_ref_string(source_ref, "thread_id").unwrap_or("unknown") - ) - }), - DocType::Search => source_ref_string(source_ref, "domain") - .map(|domain| format!("search:{domain}")) - .unwrap_or_else(|| "search:unknown".to_string()), - DocType::Dev => dev_origin(source_ref), - DocType::Knowledge => source_ref_string(source_ref, "ts") - .map(|ts| format!("knowledge:{ts}")) - .unwrap_or_else(|| "knowledge:unknown".to_string()), - } -} - -pub(super) fn source_identity_value(source_ref: &Map, doc_type: DocType) -> Value { - if let Some(canonical_uri) = source_ref_string(source_ref, "canonical_uri") { - return serde_json::json!(["canonical_uri", canonical_uri]); - } - - match doc_type { - DocType::Chat => serde_json::json!([ - "chat", - source_ref_string(source_ref, "thread_id"), - source_ref_string(source_ref, "message_id"), - source_ref_string(source_ref, "role"), - source_ref_string(source_ref, "ts"), - ]), - DocType::Search => serde_json::json!([ - "search", - source_ref_string(source_ref, "url"), - source_ref_string(source_ref, "domain"), - source_ref_string(source_ref, "query"), - source_ref_string(source_ref, "ts"), - ]), - DocType::Dev => serde_json::json!([ - "dev", - source_ref_string(source_ref, "repo"), - source_ref_string(source_ref, "path"), - source_ref_string(source_ref, "commit_sha"), - source_ref_i64(source_ref, "pr_number"), - source_ref_i64(source_ref, "issue_number"), - ]), - DocType::Knowledge => serde_json::json!([ - "knowledge", - source_ref_string(source_ref, "uri"), - source_ref_string(source_ref, "ts"), - ]), - } -} - -pub(super) fn format_timestamp(ts: OffsetDateTime) -> Result { - ts.format(&Rfc3339).map_err(|err| Error::InvalidRequest { - message: format!("failed to format RFC3339 timestamp: {err}"), - }) -} - -fn policy_span_ref( - content_hash: &str, - start: usize, - end: usize, - status: &str, - reason_code: &str, -) -> DocsSourceSpanRef { - DocsSourceSpanRef { - schema: DOC_SOURCE_SPAN_SCHEMA_V1.to_string(), - span_id: source_capture::source_span_id(content_hash, start, end, reason_code), - chunk_id: None, - status: status.to_string(), - reason_code: Some(reason_code.to_string()), - start_offset: start, - end_offset: end, - content_hash: content_hash.to_string(), - chunk_hash: None, - } -} - -fn dev_origin(source_ref: &Map) -> String { - let repo = source_ref_string(source_ref, "repo").unwrap_or("unknown"); - let path = source_ref_string(source_ref, "path").unwrap_or(""); - let revision = source_ref_string(source_ref, "commit_sha") - .map(|commit| format!("@{commit}")) - .or_else(|| source_ref_i64(source_ref, "pr_number").map(|pr| format!("#pr-{pr}"))) - .or_else(|| { - source_ref_i64(source_ref, "issue_number").map(|issue| format!("#issue-{issue}")) - }) - .unwrap_or_default(); - - if path.is_empty() { - format!("repo:{repo}{revision}") - } else { - format!("repo:{repo}/{path}{revision}") - } -} - -fn source_ref_string<'a>(source_ref: &'a Map, key: &str) -> Option<&'a str> { - source_ref.get(key).and_then(Value::as_str).filter(|value| !value.trim().is_empty()) -} - -fn source_ref_i64(source_ref: &Map, key: &str) -> Option { - source_ref.get(key).and_then(Value::as_i64) -} - #[cfg(test)] mod tests { use crate::docs::{DocType, Map, Value, source_capture::helpers}; diff --git a/packages/elf-service/src/docs/source_capture/helpers/identity.rs b/packages/elf-service/src/docs/source_capture/helpers/identity.rs new file mode 100644 index 00000000..73f5233a --- /dev/null +++ b/packages/elf-service/src/docs/source_capture/helpers/identity.rs @@ -0,0 +1,114 @@ +use crate::docs::{DocType, Map, Value}; + +pub(in crate::docs::source_capture) fn source_type( + source_ref: &Map, + doc_type: DocType, +) -> String { + source_ref + .get("source_kind") + .and_then(Value::as_str) + .filter(|value| !value.trim().is_empty()) + .unwrap_or_else(|| doc_type.as_str()) + .to_string() +} + +pub(in crate::docs::source_capture) fn source_origin( + source_ref: &Map, + doc_type: DocType, +) -> String { + if let Some(origin) = source_ref_string(source_ref, "canonical_uri") + .or_else(|| source_ref_string(source_ref, "url")) + .or_else(|| source_ref_string(source_ref, "uri")) + { + return origin.to_string(); + } + + match doc_type { + DocType::Chat => source_ref_string(source_ref, "message_id") + .map(|message_id| { + format!( + "thread:{}#{}", + source_ref_string(source_ref, "thread_id").unwrap_or("unknown"), + message_id + ) + }) + .unwrap_or_else(|| { + format!( + "thread:{}", + source_ref_string(source_ref, "thread_id").unwrap_or("unknown") + ) + }), + DocType::Search => source_ref_string(source_ref, "domain") + .map(|domain| format!("search:{domain}")) + .unwrap_or_else(|| "search:unknown".to_string()), + DocType::Dev => dev_origin(source_ref), + DocType::Knowledge => source_ref_string(source_ref, "ts") + .map(|ts| format!("knowledge:{ts}")) + .unwrap_or_else(|| "knowledge:unknown".to_string()), + } +} + +pub(in crate::docs::source_capture) fn source_identity_value( + source_ref: &Map, + doc_type: DocType, +) -> Value { + if let Some(canonical_uri) = source_ref_string(source_ref, "canonical_uri") { + return serde_json::json!(["canonical_uri", canonical_uri]); + } + + match doc_type { + DocType::Chat => serde_json::json!([ + "chat", + source_ref_string(source_ref, "thread_id"), + source_ref_string(source_ref, "message_id"), + source_ref_string(source_ref, "role"), + source_ref_string(source_ref, "ts"), + ]), + DocType::Search => serde_json::json!([ + "search", + source_ref_string(source_ref, "url"), + source_ref_string(source_ref, "domain"), + source_ref_string(source_ref, "query"), + source_ref_string(source_ref, "ts"), + ]), + DocType::Dev => serde_json::json!([ + "dev", + source_ref_string(source_ref, "repo"), + source_ref_string(source_ref, "path"), + source_ref_string(source_ref, "commit_sha"), + source_ref_i64(source_ref, "pr_number"), + source_ref_i64(source_ref, "issue_number"), + ]), + DocType::Knowledge => serde_json::json!([ + "knowledge", + source_ref_string(source_ref, "uri"), + source_ref_string(source_ref, "ts"), + ]), + } +} + +fn dev_origin(source_ref: &Map) -> String { + let repo = source_ref_string(source_ref, "repo").unwrap_or("unknown"); + let path = source_ref_string(source_ref, "path").unwrap_or(""); + let revision = source_ref_string(source_ref, "commit_sha") + .map(|commit| format!("@{commit}")) + .or_else(|| source_ref_i64(source_ref, "pr_number").map(|pr| format!("#pr-{pr}"))) + .or_else(|| { + source_ref_i64(source_ref, "issue_number").map(|issue| format!("#issue-{issue}")) + }) + .unwrap_or_default(); + + if path.is_empty() { + format!("repo:{repo}{revision}") + } else { + format!("repo:{repo}/{path}{revision}") + } +} + +fn source_ref_string<'a>(source_ref: &'a Map, key: &str) -> Option<&'a str> { + source_ref.get(key).and_then(Value::as_str).filter(|value| !value.trim().is_empty()) +} + +fn source_ref_i64(source_ref: &Map, key: &str) -> Option { + source_ref.get(key).and_then(Value::as_i64) +} diff --git a/packages/elf-service/src/docs/source_capture/helpers/spans.rs b/packages/elf-service/src/docs/source_capture/helpers/spans.rs new file mode 100644 index 00000000..af3c2b93 --- /dev/null +++ b/packages/elf-service/src/docs/source_capture/helpers/spans.rs @@ -0,0 +1,63 @@ +use crate::docs::{ + DOC_SOURCE_SPAN_SCHEMA_V1, DocsSourceSpanRef, Error, Result, Value, WritePolicyAudit, + source_capture, +}; + +pub(in crate::docs::source_capture) fn source_policy_spans( + raw_content_hash: &str, + audit: Option<&WritePolicyAudit>, +) -> Vec { + let Some(audit) = audit else { + return Vec::new(); + }; + let mut spans = Vec::with_capacity(audit.exclusions.len() + audit.redactions.len()); + + for span in &audit.exclusions { + spans.push(policy_span_ref( + raw_content_hash, + span.start, + span.end, + "excluded", + "WRITE_POLICY_EXCLUSION", + )); + } + for redaction in &audit.redactions { + spans.push(policy_span_ref( + raw_content_hash, + redaction.span.start, + redaction.span.end, + "redacted", + "WRITE_POLICY_REDACTION", + )); + } + + spans +} + +pub(in crate::docs::source_capture) fn source_spans_to_value( + spans: &[DocsSourceSpanRef], +) -> Result { + serde_json::to_value(spans).map_err(|err| Error::InvalidRequest { + message: format!("failed to encode source span metadata: {err}"), + }) +} + +fn policy_span_ref( + content_hash: &str, + start: usize, + end: usize, + status: &str, + reason_code: &str, +) -> DocsSourceSpanRef { + DocsSourceSpanRef { + schema: DOC_SOURCE_SPAN_SCHEMA_V1.to_string(), + span_id: source_capture::source_span_id(content_hash, start, end, reason_code), + chunk_id: None, + status: status.to_string(), + reason_code: Some(reason_code.to_string()), + start_offset: start, + end_offset: end, + content_hash: content_hash.to_string(), + chunk_hash: None, + } +} diff --git a/packages/elf-service/src/docs/source_capture/helpers/time.rs b/packages/elf-service/src/docs/source_capture/helpers/time.rs new file mode 100644 index 00000000..6ab119a9 --- /dev/null +++ b/packages/elf-service/src/docs/source_capture/helpers/time.rs @@ -0,0 +1,7 @@ +use crate::docs::{Error, OffsetDateTime, Result, Rfc3339}; + +pub(in crate::docs::source_capture) fn format_timestamp(ts: OffsetDateTime) -> Result { + ts.format(&Rfc3339).map_err(|err| Error::InvalidRequest { + message: format!("failed to format RFC3339 timestamp: {err}"), + }) +} diff --git a/packages/elf-service/src/docs/validation/source_ref.rs b/packages/elf-service/src/docs/validation/source_ref.rs index 082e3e1c..65e0bc24 100644 --- a/packages/elf-service/src/docs/validation/source_ref.rs +++ b/packages/elf-service/src/docs/validation/source_ref.rs @@ -1,263 +1,9 @@ -use crate::docs::validation::{ - Error, Map, OffsetDateTime, Result, Rfc3339, SOURCE_LIBRARY_FIELD_KEYS, SOURCE_LIBRARY_KINDS, - SOURCE_LIBRARY_TRUST_LABELS, Value, +mod locators; +mod metadata; +mod requirements; +mod strings; + +pub(in crate::docs) use self::{ + metadata::validate_source_library_metadata, requirements::validate_doc_source_ref_requirements, + strings::extract_source_ref_string, }; - -pub(in crate::docs) fn extract_source_ref_string( - source_ref: &Map, - key: &str, - path: &str, -) -> Result { - source_ref - .get(key) - .and_then(Value::as_str) - .map(|text| text.trim().to_string()) - .filter(|text| !text.is_empty()) - .ok_or_else(|| Error::InvalidRequest { message: format!("{path} is required.") }) -} - -pub(in crate::docs) fn validate_doc_source_ref_requirements( - source_doc_type: &str, - source_ref: &Map, -) -> Result<()> { - match source_doc_type { - "chat" => { - extract_source_ref_string(source_ref, "thread_id", "$.source_ref[\"thread_id\"]")?; - extract_source_ref_string(source_ref, "role", "$.source_ref[\"role\"]")?; - }, - "search" => { - extract_source_ref_string(source_ref, "query", "$.source_ref[\"query\"]")?; - extract_source_ref_string(source_ref, "url", "$.source_ref[\"url\"]")?; - extract_source_ref_string(source_ref, "domain", "$.source_ref[\"domain\"]")?; - }, - "dev" => { - extract_source_ref_string(source_ref, "repo", "$.source_ref[\"repo\"]")?; - - let commit_sha_present = source_ref - .get("commit_sha") - .and_then(Value::as_str) - .is_some_and(|value| !value.trim().is_empty()); - let pr_number_present = source_ref - .get("pr_number") - .is_some_and(|value| value.as_i64().is_some() || value.as_u64().is_some()); - let issue_number_present = source_ref - .get("issue_number") - .is_some_and(|value| value.as_i64().is_some() || value.as_u64().is_some()); - let present_count = - commit_sha_present as u8 + pr_number_present as u8 + issue_number_present as u8; - - if present_count != 1 { - return Err(Error::InvalidRequest { - message: - "For doc_type=dev, exactly one of commit_sha, pr_number, or issue_number is required." - .to_string(), - }); - } - }, - "knowledge" => {}, - _ => unreachable!(), - } - - Ok(()) -} - -pub(in crate::docs) fn validate_source_library_metadata( - source_doc_type: &str, - source_ref: &Map, -) -> Result<()> { - if !source_library_metadata_present(source_ref) { - return Ok(()); - } - - let source_kind = - extract_source_ref_string(source_ref, "source_kind", "$.source_ref[\"source_kind\"]")?; - - if !SOURCE_LIBRARY_KINDS.contains(&source_kind.as_str()) { - return Err(Error::InvalidRequest { - message: format!( - "$.source_ref[\"source_kind\"] must be one of: {}.", - SOURCE_LIBRARY_KINDS.join("|") - ), - }); - } - - validate_source_kind_doc_type(source_kind.as_str(), source_doc_type)?; - extract_source_ref_string(source_ref, "canonical_uri", "$.source_ref[\"canonical_uri\"]")?; - validate_source_ref_rfc3339(source_ref, "captured_at")?; - - if source_ref.contains_key("source_created_at") { - validate_source_ref_rfc3339(source_ref, "source_created_at")?; - } - - let trust_label = - extract_source_ref_string(source_ref, "trust_label", "$.source_ref[\"trust_label\"]")?; - - if !SOURCE_LIBRARY_TRUST_LABELS.contains(&trust_label.as_str()) { - return Err(Error::InvalidRequest { - message: format!( - "$.source_ref[\"trust_label\"] must be one of: {}.", - SOURCE_LIBRARY_TRUST_LABELS.join("|") - ), - }); - } - - validate_optional_source_ref_string(source_ref, "author")?; - validate_optional_source_ref_string(source_ref, "handle")?; - validate_optional_source_ref_string(source_ref, "source_content_hash")?; - - if let Some(locator) = source_ref.get("excerpt_locator") { - validate_source_library_excerpt_locator(locator)?; - } - - Ok(()) -} - -pub(in crate::docs) fn source_library_metadata_present(source_ref: &Map) -> bool { - SOURCE_LIBRARY_FIELD_KEYS.iter().any(|key| source_ref.contains_key(*key)) -} - -pub(in crate::docs) fn validate_source_kind_doc_type( - source_kind: &str, - source_doc_type: &str, -) -> Result<()> { - let expected_doc_type = match source_kind { - "social_thread" | "chat_excerpt" => Some("chat"), - "repo_file" => Some("dev"), - _ => None, - }; - - if let Some(expected_doc_type) = expected_doc_type - && source_doc_type != expected_doc_type - { - return Err(Error::InvalidRequest { - message: format!( - "$.source_ref[\"source_kind\"]={source_kind} requires doc_type={expected_doc_type}." - ), - }); - } - - Ok(()) -} - -pub(in crate::docs) fn validate_source_ref_rfc3339( - source_ref: &Map, - key: &str, -) -> Result<()> { - let path = format!("$.source_ref[\"{key}\"]"); - let value = extract_source_ref_string(source_ref, key, path.as_str())?; - - OffsetDateTime::parse(value.as_str(), &Rfc3339).map_err(|_| Error::InvalidRequest { - message: format!("{path} must be an RFC3339 datetime string."), - })?; - - Ok(()) -} - -pub(in crate::docs) fn validate_optional_source_ref_string( - source_ref: &Map, - key: &str, -) -> Result<()> { - let path = format!("$.source_ref[\"{key}\"]"); - - validate_optional_source_ref_string_at(source_ref, key, path.as_str()) -} - -pub(in crate::docs) fn validate_optional_source_ref_string_at( - source_ref: &Map, - key: &str, - path: &str, -) -> Result<()> { - let Some(value) = source_ref.get(key) else { - return Ok(()); - }; - - value.as_str().map(str::trim).filter(|value| !value.is_empty()).ok_or_else(|| { - Error::InvalidRequest { message: format!("{path} must be a non-empty string.") } - })?; - - Ok(()) -} - -pub(in crate::docs) fn validate_source_library_excerpt_locator(locator: &Value) -> Result<()> { - let locator = locator.as_object().ok_or_else(|| Error::InvalidRequest { - message: "$.source_ref[\"excerpt_locator\"] must be a JSON object.".to_string(), - })?; - let has_quote = locator.contains_key("quote"); - let has_position = locator.contains_key("position"); - - if !has_quote && !has_position { - return Err(Error::InvalidRequest { - message: "$.source_ref[\"excerpt_locator\"] requires quote or position.".to_string(), - }); - } - - if let Some(quote) = locator.get("quote") { - validate_source_library_quote_locator(quote)?; - } - if let Some(position) = locator.get("position") { - validate_source_library_position_locator(position)?; - } - - Ok(()) -} - -pub(in crate::docs) fn validate_source_library_quote_locator(quote: &Value) -> Result<()> { - let quote = quote.as_object().ok_or_else(|| Error::InvalidRequest { - message: "$.source_ref[\"excerpt_locator\"][\"quote\"] must be a JSON object.".to_string(), - })?; - - extract_source_ref_string( - quote, - "exact", - "$.source_ref[\"excerpt_locator\"][\"quote\"][\"exact\"]", - )?; - validate_optional_source_ref_string_at( - quote, - "prefix", - "$.source_ref[\"excerpt_locator\"][\"quote\"][\"prefix\"]", - )?; - validate_optional_source_ref_string_at( - quote, - "suffix", - "$.source_ref[\"excerpt_locator\"][\"quote\"][\"suffix\"]", - )?; - - Ok(()) -} - -pub(in crate::docs) fn validate_source_library_position_locator(position: &Value) -> Result<()> { - let position = position.as_object().ok_or_else(|| Error::InvalidRequest { - message: "$.source_ref[\"excerpt_locator\"][\"position\"] must be a JSON object." - .to_string(), - })?; - let start = source_ref_u64( - position, - "start", - "$.source_ref[\"excerpt_locator\"][\"position\"][\"start\"]", - )?; - let end = source_ref_u64( - position, - "end", - "$.source_ref[\"excerpt_locator\"][\"position\"][\"end\"]", - )?; - - if start >= end { - return Err(Error::InvalidRequest { - message: "$.source_ref[\"excerpt_locator\"][\"position\"] start must be before end." - .to_string(), - }); - } - - Ok(()) -} - -pub(in crate::docs) fn source_ref_u64( - source_ref: &Map, - key: &str, - path: &str, -) -> Result { - source_ref - .get(key) - .and_then(Value::as_u64) - .ok_or_else(|| Error::InvalidRequest { message: format!("{path} must be an integer.") }) -} diff --git a/packages/elf-service/src/docs/validation/source_ref/locators.rs b/packages/elf-service/src/docs/validation/source_ref/locators.rs new file mode 100644 index 00000000..3eaa7dca --- /dev/null +++ b/packages/elf-service/src/docs/validation/source_ref/locators.rs @@ -0,0 +1,83 @@ +use crate::docs::validation::{Error, Map, Result, Value, source_ref::strings}; + +pub(in crate::docs::validation::source_ref) fn validate_source_library_excerpt_locator( + locator: &Value, +) -> Result<()> { + let locator = locator.as_object().ok_or_else(|| Error::InvalidRequest { + message: "$.source_ref[\"excerpt_locator\"] must be a JSON object.".to_string(), + })?; + let has_quote = locator.contains_key("quote"); + let has_position = locator.contains_key("position"); + + if !has_quote && !has_position { + return Err(Error::InvalidRequest { + message: "$.source_ref[\"excerpt_locator\"] requires quote or position.".to_string(), + }); + } + + if let Some(quote) = locator.get("quote") { + validate_source_library_quote_locator(quote)?; + } + if let Some(position) = locator.get("position") { + validate_source_library_position_locator(position)?; + } + + Ok(()) +} + +fn validate_source_library_quote_locator(quote: &Value) -> Result<()> { + let quote = quote.as_object().ok_or_else(|| Error::InvalidRequest { + message: "$.source_ref[\"excerpt_locator\"][\"quote\"] must be a JSON object.".to_string(), + })?; + + strings::extract_source_ref_string( + quote, + "exact", + "$.source_ref[\"excerpt_locator\"][\"quote\"][\"exact\"]", + )?; + strings::validate_optional_source_ref_string_at( + quote, + "prefix", + "$.source_ref[\"excerpt_locator\"][\"quote\"][\"prefix\"]", + )?; + strings::validate_optional_source_ref_string_at( + quote, + "suffix", + "$.source_ref[\"excerpt_locator\"][\"quote\"][\"suffix\"]", + )?; + + Ok(()) +} + +fn validate_source_library_position_locator(position: &Value) -> Result<()> { + let position = position.as_object().ok_or_else(|| Error::InvalidRequest { + message: "$.source_ref[\"excerpt_locator\"][\"position\"] must be a JSON object." + .to_string(), + })?; + let start = source_ref_u64( + position, + "start", + "$.source_ref[\"excerpt_locator\"][\"position\"][\"start\"]", + )?; + let end = source_ref_u64( + position, + "end", + "$.source_ref[\"excerpt_locator\"][\"position\"][\"end\"]", + )?; + + if start >= end { + return Err(Error::InvalidRequest { + message: "$.source_ref[\"excerpt_locator\"][\"position\"] start must be before end." + .to_string(), + }); + } + + Ok(()) +} + +fn source_ref_u64(source_ref: &Map, key: &str, path: &str) -> Result { + source_ref + .get(key) + .and_then(Value::as_u64) + .ok_or_else(|| Error::InvalidRequest { message: format!("{path} must be an integer.") }) +} diff --git a/packages/elf-service/src/docs/validation/source_ref/metadata.rs b/packages/elf-service/src/docs/validation/source_ref/metadata.rs new file mode 100644 index 00000000..e40f2088 --- /dev/null +++ b/packages/elf-service/src/docs/validation/source_ref/metadata.rs @@ -0,0 +1,103 @@ +use crate::docs::validation::{ + Error, Map, OffsetDateTime, Result, Rfc3339, SOURCE_LIBRARY_FIELD_KEYS, SOURCE_LIBRARY_KINDS, + SOURCE_LIBRARY_TRUST_LABELS, Value, + source_ref::{locators, strings}, +}; + +pub(in crate::docs) fn validate_source_library_metadata( + source_doc_type: &str, + source_ref: &Map, +) -> Result<()> { + if !source_library_metadata_present(source_ref) { + return Ok(()); + } + + let source_kind = strings::extract_source_ref_string( + source_ref, + "source_kind", + "$.source_ref[\"source_kind\"]", + )?; + + if !SOURCE_LIBRARY_KINDS.contains(&source_kind.as_str()) { + return Err(Error::InvalidRequest { + message: format!( + "$.source_ref[\"source_kind\"] must be one of: {}.", + SOURCE_LIBRARY_KINDS.join("|") + ), + }); + } + + validate_source_kind_doc_type(source_kind.as_str(), source_doc_type)?; + + strings::extract_source_ref_string( + source_ref, + "canonical_uri", + "$.source_ref[\"canonical_uri\"]", + )?; + + validate_source_ref_rfc3339(source_ref, "captured_at")?; + + if source_ref.contains_key("source_created_at") { + validate_source_ref_rfc3339(source_ref, "source_created_at")?; + } + + let trust_label = strings::extract_source_ref_string( + source_ref, + "trust_label", + "$.source_ref[\"trust_label\"]", + )?; + + if !SOURCE_LIBRARY_TRUST_LABELS.contains(&trust_label.as_str()) { + return Err(Error::InvalidRequest { + message: format!( + "$.source_ref[\"trust_label\"] must be one of: {}.", + SOURCE_LIBRARY_TRUST_LABELS.join("|") + ), + }); + } + + strings::validate_optional_source_ref_string(source_ref, "author")?; + strings::validate_optional_source_ref_string(source_ref, "handle")?; + strings::validate_optional_source_ref_string(source_ref, "source_content_hash")?; + + if let Some(locator) = source_ref.get("excerpt_locator") { + locators::validate_source_library_excerpt_locator(locator)?; + } + + Ok(()) +} + +fn source_library_metadata_present(source_ref: &Map) -> bool { + SOURCE_LIBRARY_FIELD_KEYS.iter().any(|key| source_ref.contains_key(*key)) +} + +fn validate_source_kind_doc_type(source_kind: &str, source_doc_type: &str) -> Result<()> { + let expected_doc_type = match source_kind { + "social_thread" | "chat_excerpt" => Some("chat"), + "repo_file" => Some("dev"), + _ => None, + }; + + if let Some(expected_doc_type) = expected_doc_type + && source_doc_type != expected_doc_type + { + return Err(Error::InvalidRequest { + message: format!( + "$.source_ref[\"source_kind\"]={source_kind} requires doc_type={expected_doc_type}." + ), + }); + } + + Ok(()) +} + +fn validate_source_ref_rfc3339(source_ref: &Map, key: &str) -> Result<()> { + let path = format!("$.source_ref[\"{key}\"]"); + let value = strings::extract_source_ref_string(source_ref, key, path.as_str())?; + + OffsetDateTime::parse(value.as_str(), &Rfc3339).map_err(|_| Error::InvalidRequest { + message: format!("{path} must be an RFC3339 datetime string."), + })?; + + Ok(()) +} diff --git a/packages/elf-service/src/docs/validation/source_ref/requirements.rs b/packages/elf-service/src/docs/validation/source_ref/requirements.rs new file mode 100644 index 00000000..6f22ff7e --- /dev/null +++ b/packages/elf-service/src/docs/validation/source_ref/requirements.rs @@ -0,0 +1,56 @@ +use crate::docs::validation::{Error, Map, Result, Value, source_ref::strings}; + +pub(in crate::docs) fn validate_doc_source_ref_requirements( + source_doc_type: &str, + source_ref: &Map, +) -> Result<()> { + match source_doc_type { + "chat" => { + strings::extract_source_ref_string( + source_ref, + "thread_id", + "$.source_ref[\"thread_id\"]", + )?; + strings::extract_source_ref_string(source_ref, "role", "$.source_ref[\"role\"]")?; + }, + "search" => { + strings::extract_source_ref_string(source_ref, "query", "$.source_ref[\"query\"]")?; + strings::extract_source_ref_string(source_ref, "url", "$.source_ref[\"url\"]")?; + strings::extract_source_ref_string(source_ref, "domain", "$.source_ref[\"domain\"]")?; + }, + "dev" => { + strings::extract_source_ref_string(source_ref, "repo", "$.source_ref[\"repo\"]")?; + + validate_dev_revision_ref(source_ref)?; + }, + "knowledge" => {}, + _ => unreachable!(), + } + + Ok(()) +} + +fn validate_dev_revision_ref(source_ref: &Map) -> Result<()> { + let commit_sha_present = source_ref + .get("commit_sha") + .and_then(Value::as_str) + .is_some_and(|value| !value.trim().is_empty()); + let pr_number_present = source_ref + .get("pr_number") + .is_some_and(|value| value.as_i64().is_some() || value.as_u64().is_some()); + let issue_number_present = source_ref + .get("issue_number") + .is_some_and(|value| value.as_i64().is_some() || value.as_u64().is_some()); + let present_count = + commit_sha_present as u8 + pr_number_present as u8 + issue_number_present as u8; + + if present_count != 1 { + return Err(Error::InvalidRequest { + message: + "For doc_type=dev, exactly one of commit_sha, pr_number, or issue_number is required." + .to_string(), + }); + } + + Ok(()) +} diff --git a/packages/elf-service/src/docs/validation/source_ref/strings.rs b/packages/elf-service/src/docs/validation/source_ref/strings.rs new file mode 100644 index 00000000..33298f16 --- /dev/null +++ b/packages/elf-service/src/docs/validation/source_ref/strings.rs @@ -0,0 +1,39 @@ +use crate::docs::validation::{Error, Map, Result, Value}; + +pub(in crate::docs) fn extract_source_ref_string( + source_ref: &Map, + key: &str, + path: &str, +) -> Result { + source_ref + .get(key) + .and_then(Value::as_str) + .map(|text| text.trim().to_string()) + .filter(|text| !text.is_empty()) + .ok_or_else(|| Error::InvalidRequest { message: format!("{path} is required.") }) +} + +pub(in crate::docs::validation::source_ref) fn validate_optional_source_ref_string( + source_ref: &Map, + key: &str, +) -> Result<()> { + let path = format!("$.source_ref[\"{key}\"]"); + + validate_optional_source_ref_string_at(source_ref, key, path.as_str()) +} + +pub(in crate::docs::validation::source_ref) fn validate_optional_source_ref_string_at( + source_ref: &Map, + key: &str, + path: &str, +) -> Result<()> { + let Some(value) = source_ref.get(key) else { + return Ok(()); + }; + + value.as_str().map(str::trim).filter(|value| !value.is_empty()).ok_or_else(|| { + Error::InvalidRequest { message: format!("{path} must be a non-empty string.") } + })?; + + Ok(()) +} diff --git a/packages/elf-service/src/knowledge/watch.rs b/packages/elf-service/src/knowledge/watch.rs index 099cd071..78cdf618 100644 --- a/packages/elf-service/src/knowledge/watch.rs +++ b/packages/elf-service/src/knowledge/watch.rs @@ -21,14 +21,10 @@ pub(super) use self::{ }; use crate::knowledge::{ - BTreeMap, BTreeSet, ConsolidationApplyIntent, ConsolidationInputRef, ConsolidationLineage, - ConsolidationMarker, ConsolidationMarkerSeverity, ConsolidationMarkers, - ConsolidationProposalDiff, ConsolidationProposalInput, ConsolidationRunCreateResponse, - ConsolidationSourceKind, ConsolidationSourceSnapshot, Error, KnowledgeDeltaMemoryCandidate, - KnowledgePage, KnowledgePageChangedSource, KnowledgePageKind, KnowledgePageProposalRunSummary, - KnowledgePageRebuildOutput, KnowledgePageRebuildRequest, KnowledgePageResponse, - KnowledgePageSection, KnowledgePageSectionRebuildState, KnowledgePageSectionResponse, - KnowledgePageSourceRef, KnowledgePageSourceRefResponse, KnowledgePageWatchRebuildItem, + BTreeMap, BTreeSet, Error, KnowledgePage, KnowledgePageChangedSource, KnowledgePageKind, + KnowledgePageProposalRunSummary, KnowledgePageRebuildOutput, KnowledgePageRebuildRequest, + KnowledgePageResponse, KnowledgePageSection, KnowledgePageSectionRebuildState, + KnowledgePageSectionResponse, KnowledgePageSourceRef, KnowledgePageWatchRebuildItem, KnowledgePageWatchRebuildSummary, KnowledgeSourceKind, LintDraft, Result, SourceIds, Uuid, Value, WatchRebuildOutcome, empty_object, previous_version_diff_from_metadata, serde_json, truncate_chars, diff --git a/packages/elf-service/src/knowledge/watch/candidates.rs b/packages/elf-service/src/knowledge/watch/candidates.rs index f0c30179..6dadeadf 100644 --- a/packages/elf-service/src/knowledge/watch/candidates.rs +++ b/packages/elf-service/src/knowledge/watch/candidates.rs @@ -1,276 +1,10 @@ -use crate::knowledge::watch::{ - self, BTreeSet, ConsolidationApplyIntent, ConsolidationInputRef, ConsolidationLineage, - ConsolidationMarker, ConsolidationMarkerSeverity, ConsolidationMarkers, - ConsolidationProposalDiff, ConsolidationProposalInput, ConsolidationRunCreateResponse, - ConsolidationSourceKind, ConsolidationSourceSnapshot, KnowledgeDeltaMemoryCandidate, - KnowledgePageChangedSource, KnowledgePageProposalRunSummary, KnowledgePageRebuildOutput, - KnowledgePageResponse, KnowledgePageSectionResponse, KnowledgePageSourceRefResponse, - KnowledgeSourceKind, Value, serde_json, +mod build; +mod proposal; +mod refs; +mod run; + +pub(in crate::knowledge) use self::{ + build::memory_candidates_for_page, + proposal::candidate_proposal_input, + run::{candidate_run_input_refs, knowledge_delta_source_snapshot, proposal_run_summary}, }; - -pub(in crate::knowledge) fn memory_candidates_for_page( - page: &KnowledgePageResponse, - outputs: &[KnowledgePageRebuildOutput], -) -> Vec { - let reasons = watch::candidate_reasons_by_section(outputs); - - page.sections - .iter() - .filter_map(|section| { - let reason = reasons.get(section.section_key.as_str())?; - - memory_candidate_for_section(page, section, reason.as_str()) - }) - .collect() -} - -pub(in crate::knowledge) fn memory_candidate_for_section( - page: &KnowledgePageResponse, - section: &KnowledgePageSectionResponse, - reason: &str, -) -> Option { - let source_refs = page - .source_refs - .iter() - .filter(|source_ref| source_ref.section_id == Some(section.section_id)) - .filter_map(|source_ref| consolidation_input_ref(source_ref, page, section, reason)) - .collect::>(); - - if source_refs.is_empty() { - return None; - } - - let source_snapshot = candidate_source_snapshot(page, section, reason, &source_refs); - let diff = candidate_diff(page, section, reason); - let proposed_payload = candidate_proposed_payload(page, section, reason); - - Some(KnowledgeDeltaMemoryCandidate { - reason: reason.to_string(), - page_id: page.page.page_id, - section_id: section.section_id, - section_key: section.section_key.clone(), - source_refs, - source_snapshot, - diff, - proposed_payload, - }) -} - -pub(in crate::knowledge) fn consolidation_input_ref( - source_ref: &KnowledgePageSourceRefResponse, - page: &KnowledgePageResponse, - section: &KnowledgePageSectionResponse, - reason: &str, -) -> Option { - let kind = consolidation_source_kind(source_ref.source_kind.as_str())?; - - Some(ConsolidationInputRef { - kind, - id: source_ref.source_id, - snapshot: ConsolidationSourceSnapshot { - status: source_ref.source_status.clone(), - updated_at: source_ref.source_updated_at, - content_hash: source_ref.source_content_hash.clone(), - embedding_version: None, - trace_version: None, - source_ref: source_ref.source_snapshot.clone(), - metadata: serde_json::json!({ - "schema": "elf.knowledge_delta.source_ref/v1", - "reason": reason, - "page_id": page.page.page_id, - "page_kind": page.page.page_kind, - "page_key": page.page.page_key, - "section_id": section.section_id, - "section_key": section.section_key, - }), - }, - }) -} - -pub(in crate::knowledge) fn consolidation_source_kind( - source_kind: &str, -) -> Option { - match KnowledgeSourceKind::parse(source_kind)? { - KnowledgeSourceKind::Doc => Some(ConsolidationSourceKind::Doc), - KnowledgeSourceKind::DocChunk => Some(ConsolidationSourceKind::DocChunk), - KnowledgeSourceKind::Note => Some(ConsolidationSourceKind::Note), - KnowledgeSourceKind::Event => Some(ConsolidationSourceKind::Event), - KnowledgeSourceKind::Relation | KnowledgeSourceKind::Proposal => None, - } -} - -pub(in crate::knowledge) fn candidate_source_snapshot( - page: &KnowledgePageResponse, - section: &KnowledgePageSectionResponse, - reason: &str, - source_refs: &[ConsolidationInputRef], -) -> Value { - serde_json::json!({ - "schema": "elf.knowledge_delta.source_snapshot/v1", - "reason": reason, - "page": { - "page_id": page.page.page_id, - "page_kind": page.page.page_kind, - "page_key": page.page.page_key, - "content_hash": page.page.content_hash, - "rebuild_source_hash": page.page.rebuild_source_hash, - "previous_version_diff": page.page.previous_version_diff, - }, - "section": { - "section_id": section.section_id, - "section_key": section.section_key, - "heading": section.heading, - "content_hash": section.content_hash, - "citation_count": section.citation_count, - "source_ref_count": section.source_ref_count, - }, - "source_ref_count": source_refs.len(), - "source_mutation_allowed": false, - }) -} - -pub(in crate::knowledge) fn candidate_diff( - page: &KnowledgePageResponse, - section: &KnowledgePageSectionResponse, - reason: &str, -) -> ConsolidationProposalDiff { - ConsolidationProposalDiff { - summary: format!( - "Create a reviewable memory candidate for knowledge page '{}' section '{}' because {reason}.", - page.page.page_key, section.section_key - ), - before: serde_json::json!({ - "page_id": page.page.page_id, - "section_id": section.section_id, - "previous_version_diff": page.page.previous_version_diff, - }), - after: serde_json::json!({ - "target": "derived_note", - "reason": reason, - "page_id": page.page.page_id, - "section_id": section.section_id, - "section_key": section.section_key, - }), - } -} - -pub(in crate::knowledge) fn candidate_proposed_payload( - page: &KnowledgePageResponse, - section: &KnowledgePageSectionResponse, - reason: &str, -) -> Value { - let text = watch::truncate_chars( - format!( - "Plan: Review knowledge page {} section {} because source changes produced a {reason} delta.", - page.page.page_key, section.section_key - ) - .as_str(), - 220, - ); - - serde_json::json!({ - "type": "plan", - "key": format!( - "knowledge_delta_{}_{}", - page.page.page_key.replace('-', "_"), - section.section_key.replace('-', "_") - ), - "text": text, - "scope": "project_shared", - "importance": 0.65, - "confidence": 0.72, - "source_ref": { - "schema": "elf.knowledge_delta/v1", - "reason": reason, - "page_id": page.page.page_id, - "section_id": section.section_id, - "page_key": page.page.page_key, - "section_key": section.section_key, - "source_mutation_allowed": false, - } - }) -} - -pub(in crate::knowledge) fn candidate_proposal_input( - candidate: &KnowledgeDeltaMemoryCandidate, -) -> ConsolidationProposalInput { - ConsolidationProposalInput { - proposal_kind: "knowledge_delta_memory_candidate".to_string(), - apply_intent: ConsolidationApplyIntent::CreateDerivedNote, - source_refs: candidate.source_refs.clone(), - source_snapshot: candidate.source_snapshot.clone(), - lineage: ConsolidationLineage { - source_refs: candidate.source_refs.clone(), - parent_run_id: None, - parent_proposal_ids: Vec::new(), - }, - confidence: 0.72, - unsupported_claim_flags: Vec::new(), - markers: candidate_markers(candidate), - diff: candidate.diff.clone(), - target_ref: watch::empty_object(), - proposed_payload: candidate.proposed_payload.clone(), - } -} - -pub(in crate::knowledge) fn candidate_markers( - candidate: &KnowledgeDeltaMemoryCandidate, -) -> ConsolidationMarkers { - let marker = ConsolidationMarker { - severity: ConsolidationMarkerSeverity::Medium, - message: format!( - "Knowledge delta '{}' requires reviewer confirmation before memory promotion.", - candidate.reason - ), - source: candidate.source_refs.first().cloned(), - }; - - if candidate.reason == "conflict" { - ConsolidationMarkers { contradictions: vec![marker], staleness: Vec::new() } - } else { - ConsolidationMarkers { contradictions: Vec::new(), staleness: vec![marker] } - } -} - -pub(in crate::knowledge) fn candidate_run_input_refs( - candidates: &[KnowledgeDeltaMemoryCandidate], -) -> Vec { - let mut seen = BTreeSet::new(); - let mut out = Vec::new(); - - for source_ref in candidates.iter().flat_map(|candidate| &candidate.source_refs) { - if seen.insert((source_ref.kind.as_str().to_string(), source_ref.id)) { - out.push(source_ref.clone()); - } - } - - out -} - -pub(in crate::knowledge) fn knowledge_delta_source_snapshot( - changed_sources: &[KnowledgePageChangedSource], - candidates: &[KnowledgeDeltaMemoryCandidate], -) -> Value { - serde_json::json!({ - "schema": "elf.knowledge_delta.run_source_snapshot/v1", - "changed_sources": changed_sources, - "candidate_count": candidates.len(), - "candidate_reasons": candidates - .iter() - .map(|candidate| candidate.reason.clone()) - .collect::>(), - "source_mutation_allowed": false, - }) -} - -pub(in crate::knowledge) fn proposal_run_summary( - created: ConsolidationRunCreateResponse, - proposal_count: usize, -) -> KnowledgePageProposalRunSummary { - KnowledgePageProposalRunSummary { - run_id: created.run.run_id, - job_id: created.job_id, - proposal_count, - review_surface: "consolidation_proposals".to_string(), - } -} diff --git a/packages/elf-service/src/knowledge/watch/candidates/build.rs b/packages/elf-service/src/knowledge/watch/candidates/build.rs new file mode 100644 index 00000000..480fb6c5 --- /dev/null +++ b/packages/elf-service/src/knowledge/watch/candidates/build.rs @@ -0,0 +1,86 @@ +use crate::knowledge::{ + ConsolidationInputRef, KnowledgeDeltaMemoryCandidate, KnowledgePageRebuildOutput, + KnowledgePageResponse, KnowledgePageSectionResponse, Value, + watch::{ + self, + candidates::{proposal, refs}, + }, +}; + +pub(in crate::knowledge) fn memory_candidates_for_page( + page: &KnowledgePageResponse, + outputs: &[KnowledgePageRebuildOutput], +) -> Vec { + let reasons = watch::candidate_reasons_by_section(outputs); + + page.sections + .iter() + .filter_map(|section| { + let reason = reasons.get(section.section_key.as_str())?; + + memory_candidate_for_section(page, section, reason.as_str()) + }) + .collect() +} + +fn memory_candidate_for_section( + page: &KnowledgePageResponse, + section: &KnowledgePageSectionResponse, + reason: &str, +) -> Option { + let source_refs = page + .source_refs + .iter() + .filter(|source_ref| source_ref.section_id == Some(section.section_id)) + .filter_map(|source_ref| refs::consolidation_input_ref(source_ref, page, section, reason)) + .collect::>(); + + if source_refs.is_empty() { + return None; + } + + let source_snapshot = candidate_source_snapshot(page, section, reason, &source_refs); + let diff = proposal::candidate_diff(page, section, reason); + let proposed_payload = proposal::candidate_proposed_payload(page, section, reason); + + Some(KnowledgeDeltaMemoryCandidate { + reason: reason.to_string(), + page_id: page.page.page_id, + section_id: section.section_id, + section_key: section.section_key.clone(), + source_refs, + source_snapshot, + diff, + proposed_payload, + }) +} + +fn candidate_source_snapshot( + page: &KnowledgePageResponse, + section: &KnowledgePageSectionResponse, + reason: &str, + source_refs: &[ConsolidationInputRef], +) -> Value { + serde_json::json!({ + "schema": "elf.knowledge_delta.source_snapshot/v1", + "reason": reason, + "page": { + "page_id": page.page.page_id, + "page_kind": page.page.page_kind, + "page_key": page.page.page_key, + "content_hash": page.page.content_hash, + "rebuild_source_hash": page.page.rebuild_source_hash, + "previous_version_diff": page.page.previous_version_diff, + }, + "section": { + "section_id": section.section_id, + "section_key": section.section_key, + "heading": section.heading, + "content_hash": section.content_hash, + "citation_count": section.citation_count, + "source_ref_count": section.source_ref_count, + }, + "source_ref_count": source_refs.len(), + "source_mutation_allowed": false, + }) +} diff --git a/packages/elf-service/src/knowledge/watch/candidates/proposal.rs b/packages/elf-service/src/knowledge/watch/candidates/proposal.rs new file mode 100644 index 00000000..b0db3907 --- /dev/null +++ b/packages/elf-service/src/knowledge/watch/candidates/proposal.rs @@ -0,0 +1,107 @@ +use crate::knowledge::{ + ConsolidationApplyIntent, ConsolidationLineage, ConsolidationMarker, + ConsolidationMarkerSeverity, ConsolidationMarkers, ConsolidationProposalDiff, + ConsolidationProposalInput, KnowledgeDeltaMemoryCandidate, KnowledgePageResponse, + KnowledgePageSectionResponse, Value, watch, +}; + +pub(in crate::knowledge) fn candidate_proposal_input( + candidate: &KnowledgeDeltaMemoryCandidate, +) -> ConsolidationProposalInput { + ConsolidationProposalInput { + proposal_kind: "knowledge_delta_memory_candidate".to_string(), + apply_intent: ConsolidationApplyIntent::CreateDerivedNote, + source_refs: candidate.source_refs.clone(), + source_snapshot: candidate.source_snapshot.clone(), + lineage: ConsolidationLineage { + source_refs: candidate.source_refs.clone(), + parent_run_id: None, + parent_proposal_ids: Vec::new(), + }, + confidence: 0.72, + unsupported_claim_flags: Vec::new(), + markers: candidate_markers(candidate), + diff: candidate.diff.clone(), + target_ref: watch::empty_object(), + proposed_payload: candidate.proposed_payload.clone(), + } +} + +pub(in crate::knowledge::watch::candidates) fn candidate_diff( + page: &KnowledgePageResponse, + section: &KnowledgePageSectionResponse, + reason: &str, +) -> ConsolidationProposalDiff { + ConsolidationProposalDiff { + summary: format!( + "Create a reviewable memory candidate for knowledge page '{}' section '{}' because {reason}.", + page.page.page_key, section.section_key + ), + before: serde_json::json!({ + "page_id": page.page.page_id, + "section_id": section.section_id, + "previous_version_diff": page.page.previous_version_diff, + }), + after: serde_json::json!({ + "target": "derived_note", + "reason": reason, + "page_id": page.page.page_id, + "section_id": section.section_id, + "section_key": section.section_key, + }), + } +} + +pub(in crate::knowledge::watch::candidates) fn candidate_proposed_payload( + page: &KnowledgePageResponse, + section: &KnowledgePageSectionResponse, + reason: &str, +) -> Value { + let text = watch::truncate_chars( + format!( + "Plan: Review knowledge page {} section {} because source changes produced a {reason} delta.", + page.page.page_key, section.section_key + ) + .as_str(), + 220, + ); + + serde_json::json!({ + "type": "plan", + "key": format!( + "knowledge_delta_{}_{}", + page.page.page_key.replace('-', "_"), + section.section_key.replace('-', "_") + ), + "text": text, + "scope": "project_shared", + "importance": 0.65, + "confidence": 0.72, + "source_ref": { + "schema": "elf.knowledge_delta/v1", + "reason": reason, + "page_id": page.page.page_id, + "section_id": section.section_id, + "page_key": page.page.page_key, + "section_key": section.section_key, + "source_mutation_allowed": false, + } + }) +} + +fn candidate_markers(candidate: &KnowledgeDeltaMemoryCandidate) -> ConsolidationMarkers { + let marker = ConsolidationMarker { + severity: ConsolidationMarkerSeverity::Medium, + message: format!( + "Knowledge delta '{}' requires reviewer confirmation before memory promotion.", + candidate.reason + ), + source: candidate.source_refs.first().cloned(), + }; + + if candidate.reason == "conflict" { + ConsolidationMarkers { contradictions: vec![marker], staleness: Vec::new() } + } else { + ConsolidationMarkers { contradictions: Vec::new(), staleness: vec![marker] } + } +} diff --git a/packages/elf-service/src/knowledge/watch/candidates/refs.rs b/packages/elf-service/src/knowledge/watch/candidates/refs.rs new file mode 100644 index 00000000..bc24a268 --- /dev/null +++ b/packages/elf-service/src/knowledge/watch/candidates/refs.rs @@ -0,0 +1,46 @@ +use crate::knowledge::{ + ConsolidationInputRef, ConsolidationSourceKind, ConsolidationSourceSnapshot, + KnowledgePageResponse, KnowledgePageSectionResponse, KnowledgePageSourceRefResponse, + KnowledgeSourceKind, +}; + +pub(in crate::knowledge::watch::candidates) fn consolidation_input_ref( + source_ref: &KnowledgePageSourceRefResponse, + page: &KnowledgePageResponse, + section: &KnowledgePageSectionResponse, + reason: &str, +) -> Option { + let kind = consolidation_source_kind(source_ref.source_kind.as_str())?; + + Some(ConsolidationInputRef { + kind, + id: source_ref.source_id, + snapshot: ConsolidationSourceSnapshot { + status: source_ref.source_status.clone(), + updated_at: source_ref.source_updated_at, + content_hash: source_ref.source_content_hash.clone(), + embedding_version: None, + trace_version: None, + source_ref: source_ref.source_snapshot.clone(), + metadata: serde_json::json!({ + "schema": "elf.knowledge_delta.source_ref/v1", + "reason": reason, + "page_id": page.page.page_id, + "page_kind": page.page.page_kind, + "page_key": page.page.page_key, + "section_id": section.section_id, + "section_key": section.section_key, + }), + }, + }) +} + +fn consolidation_source_kind(source_kind: &str) -> Option { + match KnowledgeSourceKind::parse(source_kind)? { + KnowledgeSourceKind::Doc => Some(ConsolidationSourceKind::Doc), + KnowledgeSourceKind::DocChunk => Some(ConsolidationSourceKind::DocChunk), + KnowledgeSourceKind::Note => Some(ConsolidationSourceKind::Note), + KnowledgeSourceKind::Event => Some(ConsolidationSourceKind::Event), + KnowledgeSourceKind::Relation | KnowledgeSourceKind::Proposal => None, + } +} diff --git a/packages/elf-service/src/knowledge/watch/candidates/run.rs b/packages/elf-service/src/knowledge/watch/candidates/run.rs new file mode 100644 index 00000000..eb7f370b --- /dev/null +++ b/packages/elf-service/src/knowledge/watch/candidates/run.rs @@ -0,0 +1,47 @@ +use crate::knowledge::{ + BTreeSet, ConsolidationInputRef, ConsolidationRunCreateResponse, KnowledgeDeltaMemoryCandidate, + KnowledgePageChangedSource, KnowledgePageProposalRunSummary, Value, +}; + +pub(in crate::knowledge) fn candidate_run_input_refs( + candidates: &[KnowledgeDeltaMemoryCandidate], +) -> Vec { + let mut seen = BTreeSet::new(); + let mut out = Vec::new(); + + for source_ref in candidates.iter().flat_map(|candidate| &candidate.source_refs) { + if seen.insert((source_ref.kind.as_str().to_string(), source_ref.id)) { + out.push(source_ref.clone()); + } + } + + out +} + +pub(in crate::knowledge) fn knowledge_delta_source_snapshot( + changed_sources: &[KnowledgePageChangedSource], + candidates: &[KnowledgeDeltaMemoryCandidate], +) -> Value { + serde_json::json!({ + "schema": "elf.knowledge_delta.run_source_snapshot/v1", + "changed_sources": changed_sources, + "candidate_count": candidates.len(), + "candidate_reasons": candidates + .iter() + .map(|candidate| candidate.reason.clone()) + .collect::>(), + "source_mutation_allowed": false, + }) +} + +pub(in crate::knowledge) fn proposal_run_summary( + created: ConsolidationRunCreateResponse, + proposal_count: usize, +) -> KnowledgePageProposalRunSummary { + KnowledgePageProposalRunSummary { + run_id: created.run.run_id, + job_id: created.job_id, + proposal_count, + review_surface: "consolidation_proposals".to_string(), + } +} diff --git a/packages/elf-storage/src/docs.rs b/packages/elf-storage/src/docs.rs index 2bed675e..dff68ce3 100644 --- a/packages/elf-storage/src/docs.rs +++ b/packages/elf-storage/src/docs.rs @@ -1,264 +1,11 @@ //! Document persistence queries. -use serde_json::Value; -use sqlx::PgExecutor; -use time::OffsetDateTime; -use uuid::Uuid; - -use crate::{ - Result, - models::{DocChunk, DocDocument}, +mod chunks; +mod documents; +mod embeddings; + +pub use self::{ + chunks::{get_doc_chunk, insert_doc_chunk, list_doc_chunks}, + documents::{get_doc_document, insert_doc_document, mark_doc_deleted, normalize_source_ref}, + embeddings::insert_doc_chunk_embedding, }; - -/// Normalizes absent document source metadata to an empty JSON object. -pub fn normalize_source_ref(source_ref: Option) -> Value { - source_ref.unwrap_or(Value::Object(Default::default())) -} - -/// Inserts one document record into storage. -pub async fn insert_doc_document<'e, E>(executor: E, doc: &DocDocument) -> Result<()> -where - E: PgExecutor<'e>, -{ - sqlx::query( - "\ -INSERT INTO doc_documents ( - doc_id, - tenant_id, - project_id, - agent_id, - scope, - doc_type, - status, - title, - source_ref, - content, - content_bytes, - content_hash, - created_at, - updated_at -) -VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14) -ON CONFLICT (doc_id) DO UPDATE -SET - tenant_id = EXCLUDED.tenant_id, - project_id = EXCLUDED.project_id, - agent_id = EXCLUDED.agent_id, - scope = EXCLUDED.scope, - doc_type = EXCLUDED.doc_type, - status = EXCLUDED.status, - title = EXCLUDED.title, - source_ref = EXCLUDED.source_ref, - content = EXCLUDED.content, - content_bytes = EXCLUDED.content_bytes, - content_hash = EXCLUDED.content_hash, - updated_at = EXCLUDED.updated_at", - ) - .bind(doc.doc_id) - .bind(doc.tenant_id.as_str()) - .bind(doc.project_id.as_str()) - .bind(doc.agent_id.as_str()) - .bind(doc.scope.as_str()) - .bind(doc.doc_type.as_str()) - .bind(doc.status.as_str()) - .bind(doc.title.as_deref()) - .bind(&doc.source_ref) - .bind(doc.content.as_str()) - .bind(doc.content_bytes) - .bind(doc.content_hash.as_str()) - .bind(doc.created_at) - .bind(doc.updated_at) - .execute(executor) - .await?; - - Ok(()) -} - -/// Fetches one document record by tenant and document identifier. -pub async fn get_doc_document<'e, E>( - executor: E, - tenant_id: &str, - doc_id: Uuid, -) -> Result> -where - E: PgExecutor<'e>, -{ - let row = sqlx::query_as::<_, DocDocument>( - "\ - SELECT - doc_id, - tenant_id, - project_id, - agent_id, - scope, - doc_type, - status, - title, - COALESCE(source_ref, '{}'::jsonb) AS source_ref, - content, - content_bytes, - content_hash, - created_at, - updated_at -FROM doc_documents -WHERE tenant_id = $1 AND doc_id = $2 -LIMIT 1", - ) - .bind(tenant_id) - .bind(doc_id) - .fetch_optional(executor) - .await?; - - Ok(row) -} - -/// Inserts one document chunk row. -pub async fn insert_doc_chunk<'e, E>(executor: E, chunk: &DocChunk) -> Result<()> -where - E: PgExecutor<'e>, -{ - sqlx::query( - "\ -INSERT INTO doc_chunks ( - chunk_id, - doc_id, - chunk_index, - start_offset, - end_offset, - chunk_text, - chunk_hash, - created_at -) -VALUES ($1,$2,$3,$4,$5,$6,$7,$8) -ON CONFLICT (chunk_id) DO UPDATE -SET - doc_id = EXCLUDED.doc_id, - chunk_index = EXCLUDED.chunk_index, - start_offset = EXCLUDED.start_offset, - end_offset = EXCLUDED.end_offset, - chunk_text = EXCLUDED.chunk_text, - chunk_hash = EXCLUDED.chunk_hash", - ) - .bind(chunk.chunk_id) - .bind(chunk.doc_id) - .bind(chunk.chunk_index) - .bind(chunk.start_offset) - .bind(chunk.end_offset) - .bind(chunk.chunk_text.as_str()) - .bind(chunk.chunk_hash.as_str()) - .bind(chunk.created_at) - .execute(executor) - .await?; - - Ok(()) -} - -/// Lists all chunks for one document in chunk order. -pub async fn list_doc_chunks<'e, E>(executor: E, doc_id: Uuid) -> Result> -where - E: PgExecutor<'e>, -{ - let rows = sqlx::query_as::<_, DocChunk>( - "\ -SELECT - chunk_id, - doc_id, - chunk_index, - start_offset, - end_offset, - chunk_text, - chunk_hash, - created_at -FROM doc_chunks -WHERE doc_id = $1 -ORDER BY chunk_index ASC", - ) - .bind(doc_id) - .fetch_all(executor) - .await?; - - Ok(rows) -} - -/// Fetches one document chunk by chunk identifier. -pub async fn get_doc_chunk<'e, E>(executor: E, chunk_id: Uuid) -> Result> -where - E: PgExecutor<'e>, -{ - let row = sqlx::query_as::<_, DocChunk>( - "\ -SELECT - chunk_id, - doc_id, - chunk_index, - start_offset, - end_offset, - chunk_text, - chunk_hash, - created_at -FROM doc_chunks -WHERE chunk_id = $1 -LIMIT 1", - ) - .bind(chunk_id) - .fetch_optional(executor) - .await?; - - Ok(row) -} - -/// Upserts one dense or sparse embedding vector for a document chunk. -pub async fn insert_doc_chunk_embedding<'e, E>( - executor: E, - chunk_id: Uuid, - embedding_version: &str, - embedding_dim: i32, - vec: &str, -) -> Result<()> -where - E: PgExecutor<'e>, -{ - sqlx::query( - "\ -INSERT INTO doc_chunk_embeddings (chunk_id, embedding_version, embedding_dim, vec) -VALUES ($1, $2, $3, $4::text::vector) -ON CONFLICT (chunk_id, embedding_version) DO UPDATE -SET - embedding_dim = EXCLUDED.embedding_dim, - vec = EXCLUDED.vec, - created_at = now()", - ) - .bind(chunk_id) - .bind(embedding_version) - .bind(embedding_dim) - .bind(vec) - .execute(executor) - .await?; - - Ok(()) -} - -/// Marks one document record as deleted. -pub async fn mark_doc_deleted<'e, E>( - executor: E, - tenant_id: &str, - doc_id: Uuid, - now: OffsetDateTime, -) -> Result<()> -where - E: PgExecutor<'e>, -{ - sqlx::query( - "\ -UPDATE doc_documents -SET status = 'deleted', updated_at = $1 -WHERE tenant_id = $2 AND doc_id = $3", - ) - .bind(now) - .bind(tenant_id) - .bind(doc_id) - .execute(executor) - .await?; - - Ok(()) -} diff --git a/packages/elf-storage/src/docs/chunks.rs b/packages/elf-storage/src/docs/chunks.rs new file mode 100644 index 00000000..b02bc256 --- /dev/null +++ b/packages/elf-storage/src/docs/chunks.rs @@ -0,0 +1,99 @@ +use sqlx::PgExecutor; +use uuid::Uuid; + +use crate::{Result, models::DocChunk}; + +/// Inserts one document chunk row. +pub async fn insert_doc_chunk<'e, E>(executor: E, chunk: &DocChunk) -> Result<()> +where + E: PgExecutor<'e>, +{ + sqlx::query( + "\ +INSERT INTO doc_chunks ( + chunk_id, + doc_id, + chunk_index, + start_offset, + end_offset, + chunk_text, + chunk_hash, + created_at +) +VALUES ($1,$2,$3,$4,$5,$6,$7,$8) +ON CONFLICT (chunk_id) DO UPDATE +SET + doc_id = EXCLUDED.doc_id, + chunk_index = EXCLUDED.chunk_index, + start_offset = EXCLUDED.start_offset, + end_offset = EXCLUDED.end_offset, + chunk_text = EXCLUDED.chunk_text, + chunk_hash = EXCLUDED.chunk_hash", + ) + .bind(chunk.chunk_id) + .bind(chunk.doc_id) + .bind(chunk.chunk_index) + .bind(chunk.start_offset) + .bind(chunk.end_offset) + .bind(chunk.chunk_text.as_str()) + .bind(chunk.chunk_hash.as_str()) + .bind(chunk.created_at) + .execute(executor) + .await?; + + Ok(()) +} + +/// Lists all chunks for one document in chunk order. +pub async fn list_doc_chunks<'e, E>(executor: E, doc_id: Uuid) -> Result> +where + E: PgExecutor<'e>, +{ + let rows = sqlx::query_as::<_, DocChunk>( + "\ +SELECT + chunk_id, + doc_id, + chunk_index, + start_offset, + end_offset, + chunk_text, + chunk_hash, + created_at +FROM doc_chunks +WHERE doc_id = $1 +ORDER BY chunk_index ASC", + ) + .bind(doc_id) + .fetch_all(executor) + .await?; + + Ok(rows) +} + +/// Fetches one document chunk by chunk identifier. +pub async fn get_doc_chunk<'e, E>(executor: E, chunk_id: Uuid) -> Result> +where + E: PgExecutor<'e>, +{ + let row = sqlx::query_as::<_, DocChunk>( + "\ +SELECT + chunk_id, + doc_id, + chunk_index, + start_offset, + end_offset, + chunk_text, + chunk_hash, + created_at +FROM doc_chunks +WHERE chunk_id = $1 +LIMIT 1", + ) + .bind(chunk_id) + .fetch_optional(executor) + .await?; + + Ok(row) +} diff --git a/packages/elf-storage/src/docs/documents.rs b/packages/elf-storage/src/docs/documents.rs new file mode 100644 index 00000000..68049760 --- /dev/null +++ b/packages/elf-storage/src/docs/documents.rs @@ -0,0 +1,133 @@ +use serde_json::Value; +use sqlx::PgExecutor; +use time::OffsetDateTime; +use uuid::Uuid; + +use crate::{Result, models::DocDocument}; + +/// Normalizes absent document source metadata to an empty JSON object. +pub fn normalize_source_ref(source_ref: Option) -> Value { + source_ref.unwrap_or(Value::Object(Default::default())) +} + +/// Inserts one document record into storage. +pub async fn insert_doc_document<'e, E>(executor: E, doc: &DocDocument) -> Result<()> +where + E: PgExecutor<'e>, +{ + sqlx::query( + "\ +INSERT INTO doc_documents ( + doc_id, + tenant_id, + project_id, + agent_id, + scope, + doc_type, + status, + title, + source_ref, + content, + content_bytes, + content_hash, + created_at, + updated_at +) +VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14) +ON CONFLICT (doc_id) DO UPDATE +SET + tenant_id = EXCLUDED.tenant_id, + project_id = EXCLUDED.project_id, + agent_id = EXCLUDED.agent_id, + scope = EXCLUDED.scope, + doc_type = EXCLUDED.doc_type, + status = EXCLUDED.status, + title = EXCLUDED.title, + source_ref = EXCLUDED.source_ref, + content = EXCLUDED.content, + content_bytes = EXCLUDED.content_bytes, + content_hash = EXCLUDED.content_hash, + updated_at = EXCLUDED.updated_at", + ) + .bind(doc.doc_id) + .bind(doc.tenant_id.as_str()) + .bind(doc.project_id.as_str()) + .bind(doc.agent_id.as_str()) + .bind(doc.scope.as_str()) + .bind(doc.doc_type.as_str()) + .bind(doc.status.as_str()) + .bind(doc.title.as_deref()) + .bind(&doc.source_ref) + .bind(doc.content.as_str()) + .bind(doc.content_bytes) + .bind(doc.content_hash.as_str()) + .bind(doc.created_at) + .bind(doc.updated_at) + .execute(executor) + .await?; + + Ok(()) +} + +/// Fetches one document record by tenant and document identifier. +pub async fn get_doc_document<'e, E>( + executor: E, + tenant_id: &str, + doc_id: Uuid, +) -> Result> +where + E: PgExecutor<'e>, +{ + let row = sqlx::query_as::<_, DocDocument>( + "\ + SELECT + doc_id, + tenant_id, + project_id, + agent_id, + scope, + doc_type, + status, + title, + COALESCE(source_ref, '{}'::jsonb) AS source_ref, + content, + content_bytes, + content_hash, + created_at, + updated_at +FROM doc_documents +WHERE tenant_id = $1 AND doc_id = $2 +LIMIT 1", + ) + .bind(tenant_id) + .bind(doc_id) + .fetch_optional(executor) + .await?; + + Ok(row) +} + +/// Marks one document record as deleted. +pub async fn mark_doc_deleted<'e, E>( + executor: E, + tenant_id: &str, + doc_id: Uuid, + now: OffsetDateTime, +) -> Result<()> +where + E: PgExecutor<'e>, +{ + sqlx::query( + "\ +UPDATE doc_documents +SET status = 'deleted', updated_at = $1 +WHERE tenant_id = $2 AND doc_id = $3", + ) + .bind(now) + .bind(tenant_id) + .bind(doc_id) + .execute(executor) + .await?; + + Ok(()) +} diff --git a/packages/elf-storage/src/docs/embeddings.rs b/packages/elf-storage/src/docs/embeddings.rs new file mode 100644 index 00000000..e5ffa22b --- /dev/null +++ b/packages/elf-storage/src/docs/embeddings.rs @@ -0,0 +1,35 @@ +use sqlx::PgExecutor; +use uuid::Uuid; + +use crate::Result; + +/// Upserts one dense or sparse embedding vector for a document chunk. +pub async fn insert_doc_chunk_embedding<'e, E>( + executor: E, + chunk_id: Uuid, + embedding_version: &str, + embedding_dim: i32, + vec: &str, +) -> Result<()> +where + E: PgExecutor<'e>, +{ + sqlx::query( + "\ +INSERT INTO doc_chunk_embeddings (chunk_id, embedding_version, embedding_dim, vec) +VALUES ($1, $2, $3, $4::text::vector) +ON CONFLICT (chunk_id, embedding_version) DO UPDATE +SET + embedding_dim = EXCLUDED.embedding_dim, + vec = EXCLUDED.vec, + created_at = now()", + ) + .bind(chunk_id) + .bind(embedding_version) + .bind(embedding_dim) + .bind(vec) + .execute(executor) + .await?; + + Ok(()) +}