From 98aae517592cfe6e284147baf1064c1ee615bcec Mon Sep 17 00:00:00 2001 From: David Steiner Date: Mon, 26 Jan 2026 16:02:27 +0100 Subject: [PATCH 1/4] Add helper function to clean up sequences older than a given age in MongoDB store --- crates/hotfix/src/store/error.rs | 4 +++ crates/hotfix/src/store/mongodb.rs | 51 ++++++++++++++++++++++++++++-- 2 files changed, 53 insertions(+), 2 deletions(-) diff --git a/crates/hotfix/src/store/error.rs b/crates/hotfix/src/store/error.rs index 0f90898..894172f 100644 --- a/crates/hotfix/src/store/error.rs +++ b/crates/hotfix/src/store/error.rs @@ -32,6 +32,10 @@ pub enum StoreError { /// Failed to reset the store. #[error("failed to reset store")] Reset(#[source] BoxError), + + /// Failed to cleanup old sequences. + #[error("failed to cleanup old sequences")] + Cleanup(#[source] BoxError), } /// A specialized Result type for store operations. diff --git a/crates/hotfix/src/store/mongodb.rs b/crates/hotfix/src/store/mongodb.rs index 22e4ee6..c173a52 100644 --- a/crates/hotfix/src/store/mongodb.rs +++ b/crates/hotfix/src/store/mongodb.rs @@ -1,10 +1,10 @@ use async_trait::async_trait; -use chrono::{DateTime, Utc}; +use chrono::{DateTime, Duration, Utc}; use futures::TryStreamExt; -use mongodb::bson::Binary; use mongodb::bson::doc; use mongodb::bson::oid::ObjectId; use mongodb::bson::spec::BinarySubtype; +use mongodb::bson::{Binary, DateTime as BsonDateTime}; use mongodb::options::{FindOneOptions, IndexOptions, ReplaceOptions}; use mongodb::{Collection, Database, IndexModel}; use serde::{Deserialize, Serialize}; @@ -100,6 +100,53 @@ impl MongoDbMessageStore { Ok(initial_meta) } + + /// Deletes sequences older than the specified age, along with their associated messages. + /// + /// Returns the number of deleted sequences. + pub async fn cleanup_older_than(&self, age: Duration) -> Result { + let cutoff = BsonDateTime::from_millis((Utc::now() - age).timestamp_millis()); + + // Find old sequence IDs (excluding current sequence) + let filter = doc! { + "meta": true, + "creation_time": { "$lt": cutoff }, + "_id": { "$ne": self.current_sequence.object_id } + }; + let mut cursor = self + .meta_collection + .find(filter) + .await + .map_err(|e| StoreError::Cleanup(e.into()))?; + + let mut old_sequence_ids = Vec::new(); + while let Some(meta) = cursor + .try_next() + .await + .map_err(|e| StoreError::Cleanup(e.into()))? + { + old_sequence_ids.push(meta.object_id); + } + + if old_sequence_ids.is_empty() { + return Ok(0); + } + + // Delete messages first to avoid orphaned meta documents + self.message_collection + .delete_many(doc! { "sequence_id": { "$in": &old_sequence_ids } }) + .await + .map_err(|e| StoreError::Cleanup(e.into()))?; + + // Delete sequence metas + let result = self + .meta_collection + .delete_many(doc! { "_id": { "$in": &old_sequence_ids } }) + .await + .map_err(|e| StoreError::Cleanup(e.into()))?; + + Ok(result.deleted_count) + } } #[async_trait] From 78a3d7a35b4e43d76deaeb4e39736c583103de02 Mon Sep 17 00:00:00 2001 From: David Steiner Date: Mon, 26 Jan 2026 16:07:56 +0100 Subject: [PATCH 2/4] Add test cases for MongoDB store cleanups --- crates/hotfix/tests/mongodb_store_tests.rs | 75 ++++++++++++++++++++++ 1 file changed, 75 insertions(+) diff --git a/crates/hotfix/tests/mongodb_store_tests.rs b/crates/hotfix/tests/mongodb_store_tests.rs index 9e688df..98b2608 100644 --- a/crates/hotfix/tests/mongodb_store_tests.rs +++ b/crates/hotfix/tests/mongodb_store_tests.rs @@ -1,5 +1,6 @@ #![cfg(feature = "mongodb")] +use chrono::Duration; use hotfix::store::mongodb::{Client, MongoDbMessageStore}; use hotfix::store::{MessageStore, StoreError}; use testcontainers::runners::AsyncRunner; @@ -114,3 +115,77 @@ async fn test_state_preserved_after_failed_set_target() { // State should be unchanged assert_eq!(store.next_target_seq_number(), initial_target_seq); } + +#[tokio::test] +async fn test_cleanup_removes_old_sequences() { + let (container, mut store) = create_dedicated_container_and_store().await; + + // Add a message to the initial sequence + store.add(1, b"message in sequence 1").await.unwrap(); + + // Reset creates a new sequence, making the first one "old" + store.reset().await.unwrap(); + store.add(1, b"message in sequence 2").await.unwrap(); + + // Reset again to have two old sequences + store.reset().await.unwrap(); + store.add(1, b"message in sequence 3").await.unwrap(); + + // Small delay to ensure old sequences have earlier timestamps than the cutoff + tokio::time::sleep(std::time::Duration::from_millis(2)).await; + + // Cleanup with zero duration should delete all old sequences + let deleted = store.cleanup_older_than(Duration::zero()).await.unwrap(); + + assert_eq!(deleted, 2); + + drop(container); +} + +#[tokio::test] +async fn test_cleanup_preserves_current_sequence() { + let (container, mut store) = create_dedicated_container_and_store().await; + + // Add messages to current sequence + store.add(1, b"message 1").await.unwrap(); + store.add(2, b"message 2").await.unwrap(); + + // Cleanup with zero duration - current sequence should be preserved + let deleted = store.cleanup_older_than(Duration::zero()).await.unwrap(); + + assert_eq!(deleted, 0); + + // Verify messages are still accessible + let messages = store.get_slice(1, 2).await.unwrap(); + assert_eq!(messages.len(), 2); + + drop(container); +} + +#[tokio::test] +async fn test_cleanup_respects_age_threshold() { + let (container, mut store) = create_dedicated_container_and_store().await; + + // Create an old sequence + store.reset().await.unwrap(); + + // Cleanup with a large duration should not delete anything + let deleted = store.cleanup_older_than(Duration::days(365)).await.unwrap(); + + assert_eq!(deleted, 0); + + drop(container); +} + +#[tokio::test] +async fn test_cleanup_after_connection_drop() { + let (container, store) = create_dedicated_container_and_store().await; + + // Stop the container + container.stop().await.unwrap(); + + // Attempt cleanup - should fail + let result = store.cleanup_older_than(Duration::zero()).await; + + assert!(matches!(result, Err(StoreError::Cleanup(_)))); +} From a204f8df632b07c9f4d26dcc14ca2fd044a91647 Mon Sep 17 00:00:00 2001 From: David Steiner Date: Tue, 27 Jan 2026 09:21:49 +0100 Subject: [PATCH 3/4] Fix timestamp comparisons by moving the BsonDateTime for timestamps --- crates/hotfix/src/store/mongodb.rs | 11 +++++++---- crates/hotfix/tests/common_store_tests.rs | 5 +++-- crates/hotfix/tests/mongodb_store_tests.rs | 2 +- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/crates/hotfix/src/store/mongodb.rs b/crates/hotfix/src/store/mongodb.rs index c173a52..ad89093 100644 --- a/crates/hotfix/src/store/mongodb.rs +++ b/crates/hotfix/src/store/mongodb.rs @@ -1,5 +1,5 @@ use async_trait::async_trait; -use chrono::{DateTime, Duration, Utc}; +use chrono::{DateTime, Duration, TimeZone, Utc}; use futures::TryStreamExt; use mongodb::bson::doc; use mongodb::bson::oid::ObjectId; @@ -18,7 +18,7 @@ struct SequenceMeta { #[serde(rename = "_id")] object_id: ObjectId, meta: bool, - creation_time: DateTime, + creation_time: BsonDateTime, sender_seq_number: u64, target_seq_number: u64, } @@ -92,7 +92,7 @@ impl MongoDbMessageStore { let initial_meta = SequenceMeta { object_id: sequence_id, meta: true, - creation_time: Utc::now(), + creation_time: BsonDateTime::now(), sender_seq_number: 0, target_seq_number: 0, }; @@ -262,6 +262,9 @@ impl MessageStore for MongoDbMessageStore { } fn creation_time(&self) -> DateTime { - self.current_sequence.creation_time + #[allow(clippy::expect_used)] + Utc.timestamp_millis_opt(self.current_sequence.creation_time.timestamp_millis()) + .single() + .expect("BsonDateTime is guaranteed to store valid timestamp") } } diff --git a/crates/hotfix/tests/common_store_tests.rs b/crates/hotfix/tests/common_store_tests.rs index 50e3a37..086401e 100644 --- a/crates/hotfix/tests/common_store_tests.rs +++ b/crates/hotfix/tests/common_store_tests.rs @@ -312,12 +312,13 @@ async fn test_creation_time_gets_reset_correctly() { for factory in create_test_store_factories().await { let mut store = factory.create_store().await; - tokio::time::sleep(std::time::Duration::from_millis(10)).await; + tokio::time::sleep(std::time::Duration::from_millis(2)).await; let after_sleep = Utc::now(); + tokio::time::sleep(std::time::Duration::from_millis(2)).await; store.reset().await.expect("failed to reset store"); let reset_creation_time = store.creation_time(); - assert!(reset_creation_time >= after_sleep); + assert!(reset_creation_time > after_sleep); if !factory.is_persistent() { continue; diff --git a/crates/hotfix/tests/mongodb_store_tests.rs b/crates/hotfix/tests/mongodb_store_tests.rs index 98b2608..c031d08 100644 --- a/crates/hotfix/tests/mongodb_store_tests.rs +++ b/crates/hotfix/tests/mongodb_store_tests.rs @@ -132,7 +132,7 @@ async fn test_cleanup_removes_old_sequences() { store.add(1, b"message in sequence 3").await.unwrap(); // Small delay to ensure old sequences have earlier timestamps than the cutoff - tokio::time::sleep(std::time::Duration::from_millis(2)).await; + tokio::time::sleep(std::time::Duration::from_millis(1)).await; // Cleanup with zero duration should delete all old sequences let deleted = store.cleanup_older_than(Duration::zero()).await.unwrap(); From e40e2acd5a37c306bfadd2d68ba74b1570505c92 Mon Sep 17 00:00:00 2001 From: David Steiner Date: Tue, 27 Jan 2026 10:04:43 +0100 Subject: [PATCH 4/4] Switch from experimental ubuntu-slim to ubuntu-latest --- .github/workflows/lint-pr-title.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/lint-pr-title.yml b/.github/workflows/lint-pr-title.yml index fa177ea..f14235f 100644 --- a/.github/workflows/lint-pr-title.yml +++ b/.github/workflows/lint-pr-title.yml @@ -10,7 +10,7 @@ on: jobs: main: name: Validate PR title - runs-on: ubuntu-slim + runs-on: ubuntu-latest permissions: pull-requests: read steps: