From 927b6647b6dafa1e1e5d11fc88f696946af54ea6 Mon Sep 17 00:00:00 2001 From: Yvette Carlisle Date: Mon, 29 Jun 2026 22:10:16 +0800 Subject: [PATCH] {"schema":"decodex/commit/1","summary":"Split ingestion profile default persistence into a storage child module after strict validation.","authority":"manual"} --- .../src/ingestion_profiles/storage.rs | 147 +----------------- .../ingestion_profiles/storage/defaults.rs | 146 +++++++++++++++++ 2 files changed, 153 insertions(+), 140 deletions(-) create mode 100644 packages/elf-service/src/ingestion_profiles/storage/defaults.rs diff --git a/packages/elf-service/src/ingestion_profiles/storage.rs b/packages/elf-service/src/ingestion_profiles/storage.rs index c2c4edfc..00995e8b 100644 --- a/packages/elf-service/src/ingestion_profiles/storage.rs +++ b/packages/elf-service/src/ingestion_profiles/storage.rs @@ -1,13 +1,16 @@ +mod defaults; + +pub(super) use self::defaults::{ + seed_default_profile, select_default_row, select_default_selector, upsert_default_row, +}; + use serde_json::Value; use sqlx::{FromRow, PgPool}; use time::OffsetDateTime; use crate::{ Error, Result, - ingestion_profiles::{ - ADD_EVENT_PIPELINE, DEFAULT_PROFILE_ID, DEFAULT_PROFILE_VERSION, profile, - types::IngestionProfileSelector, - }, + ingestion_profiles::{ADD_EVENT_PIPELINE, types::IngestionProfileSelector}, }; #[derive(FromRow)] @@ -34,13 +37,6 @@ pub(super) struct ProfileSummaryRow { pub(super) created_by: String, } -#[derive(FromRow)] -pub(super) struct ProfileDefaultRow { - pub(super) profile_id: String, - pub(super) version: Option, - pub(super) updated_at: OffsetDateTime, -} - pub(super) async fn next_profile_version( pool: &PgPool, tenant_id: &str, @@ -186,59 +182,6 @@ ORDER BY version DESC", Ok(rows) } -pub(super) async fn select_default_row( - pool: &PgPool, - tenant_id: &str, - project_id: &str, -) -> Result> { - let row = sqlx::query_as::<_, ProfileDefaultRow>( - "\ -SELECT profile_id, version, updated_at -FROM memory_ingestion_profile_defaults -WHERE tenant_id=$1 AND project_id=$2 AND pipeline=$3", - ) - .bind(tenant_id) - .bind(project_id) - .bind(ADD_EVENT_PIPELINE) - .fetch_optional(pool) - .await?; - - Ok(row) -} - -pub(super) async fn upsert_default_row( - pool: &PgPool, - tenant_id: &str, - project_id: &str, - profile_id: String, - version: i32, -) -> Result { - let row = sqlx::query_as::<_, ProfileDefaultRow>( - "\ -INSERT INTO memory_ingestion_profile_defaults ( - tenant_id, - project_id, - pipeline, - profile_id, - version -) VALUES ($1,$2,$3,$4,$5) -ON CONFLICT (tenant_id, project_id, pipeline) DO UPDATE -SET profile_id = EXCLUDED.profile_id, - version = EXCLUDED.version, - updated_at = now() -RETURNING profile_id, version, updated_at", - ) - .bind(tenant_id) - .bind(project_id) - .bind(ADD_EVENT_PIPELINE) - .bind(profile_id) - .bind(version) - .fetch_one(pool) - .await?; - - Ok(row) -} - pub(super) async fn select_profile( pool: &PgPool, tenant_id: &str, @@ -283,79 +226,3 @@ LIMIT 1", ), }) } - -pub(super) async fn select_default_selector( - pool: &PgPool, - tenant_id: &str, - project_id: &str, -) -> Result { - let row = sqlx::query_as::<_, (String, Option)>( - "SELECT profile_id, version FROM memory_ingestion_profile_defaults WHERE tenant_id=$1 AND project_id=$2 AND pipeline=$3", - ) - .bind(tenant_id) - .bind(project_id) - .bind(ADD_EVENT_PIPELINE) - .fetch_optional(pool) - .await?; - let row = match row { - Some((profile_id, version)) => IngestionProfileSelector { id: profile_id, version }, - None => IngestionProfileSelector { - id: DEFAULT_PROFILE_ID.to_string(), - version: Some(DEFAULT_PROFILE_VERSION), - }, - }; - - Ok(row) -} - -pub(super) async fn seed_default_profile( - pool: &PgPool, - tenant_id: &str, - project_id: &str, -) -> Result<()> { - let profile = - serde_json::to_value(profile::builtin_profile_v1()).map_err(|_| Error::InvalidRequest { - message: "Failed to serialize default ingestion profile.".to_string(), - })?; - - sqlx::query( - "\ -INSERT INTO memory_ingestion_profiles ( - tenant_id, - project_id, - pipeline, - profile_id, - version, - profile -) VALUES ($1,$2,$3,$4,$5,$6::jsonb) -ON CONFLICT DO NOTHING", - ) - .bind(tenant_id) - .bind(project_id) - .bind(ADD_EVENT_PIPELINE) - .bind(DEFAULT_PROFILE_ID) - .bind(DEFAULT_PROFILE_VERSION) - .bind(profile) - .execute(pool) - .await?; - sqlx::query( - "\ -INSERT INTO memory_ingestion_profile_defaults ( - tenant_id, - project_id, - pipeline, - profile_id, - version -) VALUES ($1,$2,$3,$4,$5) -ON CONFLICT DO NOTHING", - ) - .bind(tenant_id) - .bind(project_id) - .bind(ADD_EVENT_PIPELINE) - .bind(DEFAULT_PROFILE_ID) - .bind(DEFAULT_PROFILE_VERSION) - .execute(pool) - .await?; - - Ok(()) -} diff --git a/packages/elf-service/src/ingestion_profiles/storage/defaults.rs b/packages/elf-service/src/ingestion_profiles/storage/defaults.rs new file mode 100644 index 00000000..face86dc --- /dev/null +++ b/packages/elf-service/src/ingestion_profiles/storage/defaults.rs @@ -0,0 +1,146 @@ +use sqlx::{FromRow, PgPool}; +use time::OffsetDateTime; + +use crate::{ + Error, Result, + ingestion_profiles::{ + ADD_EVENT_PIPELINE, DEFAULT_PROFILE_ID, DEFAULT_PROFILE_VERSION, profile, + types::IngestionProfileSelector, + }, +}; + +#[derive(FromRow)] +pub(in crate::ingestion_profiles) struct ProfileDefaultRow { + pub(in crate::ingestion_profiles) profile_id: String, + pub(in crate::ingestion_profiles) version: Option, + pub(in crate::ingestion_profiles) updated_at: OffsetDateTime, +} + +pub(in crate::ingestion_profiles) async fn select_default_row( + pool: &PgPool, + tenant_id: &str, + project_id: &str, +) -> Result> { + let row = sqlx::query_as::<_, ProfileDefaultRow>( + "\ +SELECT profile_id, version, updated_at +FROM memory_ingestion_profile_defaults +WHERE tenant_id=$1 AND project_id=$2 AND pipeline=$3", + ) + .bind(tenant_id) + .bind(project_id) + .bind(ADD_EVENT_PIPELINE) + .fetch_optional(pool) + .await?; + + Ok(row) +} + +pub(in crate::ingestion_profiles) async fn upsert_default_row( + pool: &PgPool, + tenant_id: &str, + project_id: &str, + profile_id: String, + version: i32, +) -> Result { + let row = sqlx::query_as::<_, ProfileDefaultRow>( + "\ +INSERT INTO memory_ingestion_profile_defaults ( + tenant_id, + project_id, + pipeline, + profile_id, + version +) VALUES ($1,$2,$3,$4,$5) +ON CONFLICT (tenant_id, project_id, pipeline) DO UPDATE +SET profile_id = EXCLUDED.profile_id, + version = EXCLUDED.version, + updated_at = now() +RETURNING profile_id, version, updated_at", + ) + .bind(tenant_id) + .bind(project_id) + .bind(ADD_EVENT_PIPELINE) + .bind(profile_id) + .bind(version) + .fetch_one(pool) + .await?; + + Ok(row) +} + +pub(in crate::ingestion_profiles) async fn select_default_selector( + pool: &PgPool, + tenant_id: &str, + project_id: &str, +) -> Result { + let row = sqlx::query_as::<_, (String, Option)>( + "SELECT profile_id, version FROM memory_ingestion_profile_defaults WHERE tenant_id=$1 AND project_id=$2 AND pipeline=$3", + ) + .bind(tenant_id) + .bind(project_id) + .bind(ADD_EVENT_PIPELINE) + .fetch_optional(pool) + .await?; + let row = match row { + Some((profile_id, version)) => IngestionProfileSelector { id: profile_id, version }, + None => IngestionProfileSelector { + id: DEFAULT_PROFILE_ID.to_string(), + version: Some(DEFAULT_PROFILE_VERSION), + }, + }; + + Ok(row) +} + +pub(in crate::ingestion_profiles) async fn seed_default_profile( + pool: &PgPool, + tenant_id: &str, + project_id: &str, +) -> Result<()> { + let profile = + serde_json::to_value(profile::builtin_profile_v1()).map_err(|_| Error::InvalidRequest { + message: "Failed to serialize default ingestion profile.".to_string(), + })?; + + sqlx::query( + "\ +INSERT INTO memory_ingestion_profiles ( + tenant_id, + project_id, + pipeline, + profile_id, + version, + profile +) VALUES ($1,$2,$3,$4,$5,$6::jsonb) +ON CONFLICT DO NOTHING", + ) + .bind(tenant_id) + .bind(project_id) + .bind(ADD_EVENT_PIPELINE) + .bind(DEFAULT_PROFILE_ID) + .bind(DEFAULT_PROFILE_VERSION) + .bind(profile) + .execute(pool) + .await?; + sqlx::query( + "\ +INSERT INTO memory_ingestion_profile_defaults ( + tenant_id, + project_id, + pipeline, + profile_id, + version +) VALUES ($1,$2,$3,$4,$5) +ON CONFLICT DO NOTHING", + ) + .bind(tenant_id) + .bind(project_id) + .bind(ADD_EVENT_PIPELINE) + .bind(DEFAULT_PROFILE_ID) + .bind(DEFAULT_PROFILE_VERSION) + .execute(pool) + .await?; + + Ok(()) +}