From c3021c468d55aa090231b36ed89444d05dcb1662 Mon Sep 17 00:00:00 2001 From: Yvette Carlisle Date: Tue, 30 Jun 2026 00:03:57 +0800 Subject: [PATCH] {"schema":"decodex/commit/1","summary":"Split worker trace expiry cleanup into a child module after strict validation.","authority":"manual"} --- apps/elf-worker/src/worker/trace_jobs.rs | 61 +++---------------- .../src/worker/trace_jobs/cleanup.rs | 59 ++++++++++++++++++ 2 files changed, 67 insertions(+), 53 deletions(-) create mode 100644 apps/elf-worker/src/worker/trace_jobs/cleanup.rs diff --git a/apps/elf-worker/src/worker/trace_jobs.rs b/apps/elf-worker/src/worker/trace_jobs.rs index 3131e90e..5e3e912e 100644 --- a/apps/elf-worker/src/worker/trace_jobs.rs +++ b/apps/elf-worker/src/worker/trace_jobs.rs @@ -1,5 +1,12 @@ +mod cleanup; + +pub(super) use cleanup::{ + purge_expired_cache, purge_expired_search_sessions, purge_expired_trace_candidates, + purge_expired_traces, +}; + use crate::worker::{ - self, Db, OffsetDateTime, PgConnection, PgExecutor, QueryBuilder, Result, TraceCandidateInsert, + self, Db, PgConnection, PgExecutor, QueryBuilder, Result, TraceCandidateInsert, TraceCandidateRecord, TraceItemInsert, TraceItemRecord, TraceOutboxJob, TracePayload, TraceRecord, TraceStageInsert, TraceStageItemInsert, TraceTrajectoryStageRecord, Uuid, Value, }; @@ -308,55 +315,3 @@ INSERT INTO search_trace_candidates ( Ok(()) } - -pub(super) async fn purge_expired_trace_candidates(db: &Db, now: OffsetDateTime) -> Result<()> { - let result = sqlx::query("DELETE FROM search_trace_candidates WHERE expires_at <= $1") - .bind(now) - .execute(&db.pool) - .await?; - - if result.rows_affected() > 0 { - tracing::info!(count = result.rows_affected(), "Purged expired search trace candidates."); - } - - Ok(()) -} - -pub(super) async fn purge_expired_traces(db: &Db, now: OffsetDateTime) -> Result<()> { - let result = sqlx::query("DELETE FROM search_traces WHERE expires_at <= $1") - .bind(now) - .execute(&db.pool) - .await?; - - if result.rows_affected() > 0 { - tracing::info!(count = result.rows_affected(), "Purged expired search traces."); - } - - Ok(()) -} - -pub(super) async fn purge_expired_cache(db: &Db, now: OffsetDateTime) -> Result<()> { - let result = sqlx::query("DELETE FROM llm_cache WHERE expires_at <= $1") - .bind(now) - .execute(&db.pool) - .await?; - - if result.rows_affected() > 0 { - tracing::info!(count = result.rows_affected(), "Purged expired LLM cache entries."); - } - - Ok(()) -} - -pub(super) async fn purge_expired_search_sessions(db: &Db, now: OffsetDateTime) -> Result<()> { - let result = sqlx::query("DELETE FROM search_sessions WHERE expires_at <= $1") - .bind(now) - .execute(&db.pool) - .await?; - - if result.rows_affected() > 0 { - tracing::info!(count = result.rows_affected(), "Purged expired search sessions."); - } - - Ok(()) -} diff --git a/apps/elf-worker/src/worker/trace_jobs/cleanup.rs b/apps/elf-worker/src/worker/trace_jobs/cleanup.rs new file mode 100644 index 00000000..0e9e4b18 --- /dev/null +++ b/apps/elf-worker/src/worker/trace_jobs/cleanup.rs @@ -0,0 +1,59 @@ +use crate::worker::{Db, OffsetDateTime, Result}; + +pub(in crate::worker) async fn purge_expired_trace_candidates( + db: &Db, + now: OffsetDateTime, +) -> Result<()> { + let result = sqlx::query("DELETE FROM search_trace_candidates WHERE expires_at <= $1") + .bind(now) + .execute(&db.pool) + .await?; + + if result.rows_affected() > 0 { + tracing::info!(count = result.rows_affected(), "Purged expired search trace candidates."); + } + + Ok(()) +} + +pub(in crate::worker) async fn purge_expired_traces(db: &Db, now: OffsetDateTime) -> Result<()> { + let result = sqlx::query("DELETE FROM search_traces WHERE expires_at <= $1") + .bind(now) + .execute(&db.pool) + .await?; + + if result.rows_affected() > 0 { + tracing::info!(count = result.rows_affected(), "Purged expired search traces."); + } + + Ok(()) +} + +pub(in crate::worker) async fn purge_expired_cache(db: &Db, now: OffsetDateTime) -> Result<()> { + let result = sqlx::query("DELETE FROM llm_cache WHERE expires_at <= $1") + .bind(now) + .execute(&db.pool) + .await?; + + if result.rows_affected() > 0 { + tracing::info!(count = result.rows_affected(), "Purged expired LLM cache entries."); + } + + Ok(()) +} + +pub(in crate::worker) async fn purge_expired_search_sessions( + db: &Db, + now: OffsetDateTime, +) -> Result<()> { + let result = sqlx::query("DELETE FROM search_sessions WHERE expires_at <= $1") + .bind(now) + .execute(&db.pool) + .await?; + + if result.rows_affected() > 0 { + tracing::info!(count = result.rows_affected(), "Purged expired search sessions."); + } + + Ok(()) +}