diff --git a/.github/workflows/lint-pr-title.yml b/.github/workflows/lint-pr-title.yml index fa177ea..f14235f 100644 --- a/.github/workflows/lint-pr-title.yml +++ b/.github/workflows/lint-pr-title.yml @@ -10,7 +10,7 @@ on: jobs: main: name: Validate PR title - runs-on: ubuntu-slim + runs-on: ubuntu-latest permissions: pull-requests: read steps: diff --git a/crates/hotfix/src/store/error.rs b/crates/hotfix/src/store/error.rs index 0f90898..894172f 100644 --- a/crates/hotfix/src/store/error.rs +++ b/crates/hotfix/src/store/error.rs @@ -32,6 +32,10 @@ pub enum StoreError { /// Failed to reset the store. #[error("failed to reset store")] Reset(#[source] BoxError), + + /// Failed to cleanup old sequences. + #[error("failed to cleanup old sequences")] + Cleanup(#[source] BoxError), } /// A specialized Result type for store operations. diff --git a/crates/hotfix/src/store/mongodb.rs b/crates/hotfix/src/store/mongodb.rs index 22e4ee6..ad89093 100644 --- a/crates/hotfix/src/store/mongodb.rs +++ b/crates/hotfix/src/store/mongodb.rs @@ -1,10 +1,10 @@ use async_trait::async_trait; -use chrono::{DateTime, Utc}; +use chrono::{DateTime, Duration, TimeZone, Utc}; use futures::TryStreamExt; -use mongodb::bson::Binary; use mongodb::bson::doc; use mongodb::bson::oid::ObjectId; use mongodb::bson::spec::BinarySubtype; +use mongodb::bson::{Binary, DateTime as BsonDateTime}; use mongodb::options::{FindOneOptions, IndexOptions, ReplaceOptions}; use mongodb::{Collection, Database, IndexModel}; use serde::{Deserialize, Serialize}; @@ -18,7 +18,7 @@ struct SequenceMeta { #[serde(rename = "_id")] object_id: ObjectId, meta: bool, - creation_time: DateTime, + creation_time: BsonDateTime, sender_seq_number: u64, target_seq_number: u64, } @@ -92,7 +92,7 @@ impl MongoDbMessageStore { let initial_meta = SequenceMeta { object_id: sequence_id, meta: true, - creation_time: Utc::now(), + creation_time: BsonDateTime::now(), sender_seq_number: 0, target_seq_number: 0, }; @@ -100,6 +100,53 @@ impl MongoDbMessageStore { Ok(initial_meta) } + + /// Deletes sequences older than the specified age, along with their associated messages. + /// + /// Returns the number of deleted sequences. + pub async fn cleanup_older_than(&self, age: Duration) -> Result { + let cutoff = BsonDateTime::from_millis((Utc::now() - age).timestamp_millis()); + + // Find old sequence IDs (excluding current sequence) + let filter = doc! { + "meta": true, + "creation_time": { "$lt": cutoff }, + "_id": { "$ne": self.current_sequence.object_id } + }; + let mut cursor = self + .meta_collection + .find(filter) + .await + .map_err(|e| StoreError::Cleanup(e.into()))?; + + let mut old_sequence_ids = Vec::new(); + while let Some(meta) = cursor + .try_next() + .await + .map_err(|e| StoreError::Cleanup(e.into()))? + { + old_sequence_ids.push(meta.object_id); + } + + if old_sequence_ids.is_empty() { + return Ok(0); + } + + // Delete messages first to avoid orphaned meta documents + self.message_collection + .delete_many(doc! { "sequence_id": { "$in": &old_sequence_ids } }) + .await + .map_err(|e| StoreError::Cleanup(e.into()))?; + + // Delete sequence metas + let result = self + .meta_collection + .delete_many(doc! { "_id": { "$in": &old_sequence_ids } }) + .await + .map_err(|e| StoreError::Cleanup(e.into()))?; + + Ok(result.deleted_count) + } } #[async_trait] @@ -215,6 +262,9 @@ impl MessageStore for MongoDbMessageStore { } fn creation_time(&self) -> DateTime { - self.current_sequence.creation_time + #[allow(clippy::expect_used)] + Utc.timestamp_millis_opt(self.current_sequence.creation_time.timestamp_millis()) + .single() + .expect("BsonDateTime is guaranteed to store valid timestamp") } } diff --git a/crates/hotfix/tests/common_store_tests.rs b/crates/hotfix/tests/common_store_tests.rs index 50e3a37..086401e 100644 --- a/crates/hotfix/tests/common_store_tests.rs +++ b/crates/hotfix/tests/common_store_tests.rs @@ -312,12 +312,13 @@ async fn test_creation_time_gets_reset_correctly() { for factory in create_test_store_factories().await { let mut store = factory.create_store().await; - tokio::time::sleep(std::time::Duration::from_millis(10)).await; + tokio::time::sleep(std::time::Duration::from_millis(2)).await; let after_sleep = Utc::now(); + tokio::time::sleep(std::time::Duration::from_millis(2)).await; store.reset().await.expect("failed to reset store"); let reset_creation_time = store.creation_time(); - assert!(reset_creation_time >= after_sleep); + assert!(reset_creation_time > after_sleep); if !factory.is_persistent() { continue; diff --git a/crates/hotfix/tests/mongodb_store_tests.rs b/crates/hotfix/tests/mongodb_store_tests.rs index 9e688df..c031d08 100644 --- a/crates/hotfix/tests/mongodb_store_tests.rs +++ b/crates/hotfix/tests/mongodb_store_tests.rs @@ -1,5 +1,6 @@ #![cfg(feature = "mongodb")] +use chrono::Duration; use hotfix::store::mongodb::{Client, MongoDbMessageStore}; use hotfix::store::{MessageStore, StoreError}; use testcontainers::runners::AsyncRunner; @@ -114,3 +115,77 @@ async fn test_state_preserved_after_failed_set_target() { // State should be unchanged assert_eq!(store.next_target_seq_number(), initial_target_seq); } + +#[tokio::test] +async fn test_cleanup_removes_old_sequences() { + let (container, mut store) = create_dedicated_container_and_store().await; + + // Add a message to the initial sequence + store.add(1, b"message in sequence 1").await.unwrap(); + + // Reset creates a new sequence, making the first one "old" + store.reset().await.unwrap(); + store.add(1, b"message in sequence 2").await.unwrap(); + + // Reset again to have two old sequences + store.reset().await.unwrap(); + store.add(1, b"message in sequence 3").await.unwrap(); + + // Small delay to ensure old sequences have earlier timestamps than the cutoff + tokio::time::sleep(std::time::Duration::from_millis(1)).await; + + // Cleanup with zero duration should delete all old sequences + let deleted = store.cleanup_older_than(Duration::zero()).await.unwrap(); + + assert_eq!(deleted, 2); + + drop(container); +} + +#[tokio::test] +async fn test_cleanup_preserves_current_sequence() { + let (container, mut store) = create_dedicated_container_and_store().await; + + // Add messages to current sequence + store.add(1, b"message 1").await.unwrap(); + store.add(2, b"message 2").await.unwrap(); + + // Cleanup with zero duration - current sequence should be preserved + let deleted = store.cleanup_older_than(Duration::zero()).await.unwrap(); + + assert_eq!(deleted, 0); + + // Verify messages are still accessible + let messages = store.get_slice(1, 2).await.unwrap(); + assert_eq!(messages.len(), 2); + + drop(container); +} + +#[tokio::test] +async fn test_cleanup_respects_age_threshold() { + let (container, mut store) = create_dedicated_container_and_store().await; + + // Create an old sequence + store.reset().await.unwrap(); + + // Cleanup with a large duration should not delete anything + let deleted = store.cleanup_older_than(Duration::days(365)).await.unwrap(); + + assert_eq!(deleted, 0); + + drop(container); +} + +#[tokio::test] +async fn test_cleanup_after_connection_drop() { + let (container, store) = create_dedicated_container_and_store().await; + + // Stop the container + container.stop().await.unwrap(); + + // Attempt cleanup - should fail + let result = store.cleanup_older_than(Duration::zero()).await; + + assert!(matches!(result, Err(StoreError::Cleanup(_)))); +}