diff --git a/Cargo.lock b/Cargo.lock index 18aa607..4cc33a0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1288,7 +1288,8 @@ dependencies = [ "chrono-tz", "futures", "hotfix-message", - "mongodb", + "hotfix-store", + "hotfix-store-mongodb", "rcgen", "rustls", "rustls-native-certs", @@ -1296,7 +1297,6 @@ dependencies = [ "rustls-pki-types", "serde", "tempfile", - "testcontainers", "thiserror", "tokio", "tokio-rustls", @@ -1373,6 +1373,33 @@ dependencies = [ "thiserror", ] +[[package]] +name = "hotfix-store" +version = "0.1.0" +dependencies = [ + "async-trait", + "chrono", + "tempfile", + "thiserror", + "tokio", + "uuid", +] + +[[package]] +name = "hotfix-store-mongodb" +version = "0.1.0" +dependencies = [ + "async-trait", + "chrono", + "futures", + "hotfix-store", + "mongodb", + "serde", + "testcontainers", + "tokio", + "uuid", +] + [[package]] name = "hotfix-web" version = "0.1.8" diff --git a/crates/hotfix-store-mongodb/Cargo.toml b/crates/hotfix-store-mongodb/Cargo.toml new file mode 100644 index 0000000..9fe08da --- /dev/null +++ b/crates/hotfix-store-mongodb/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "hotfix-store-mongodb" +description = "MongoDB message store implementation for the hotfix FIX engine" +version = "0.1.0" +authors.workspace = true +edition.workspace = true +license.workspace = true +readme = "README.md" +homepage.workspace = true +repository.workspace = true +keywords.workspace = true +categories.workspace = true + +[lints] +workspace = true + +[dependencies] +hotfix-store = { version = "0.1.0", path = "../hotfix-store" } +async-trait = { workspace = true } +chrono = { workspace = true } +futures = { workspace = true } +mongodb = { workspace = true } +serde = { workspace = true, features = ["derive"] } + +[dev-dependencies] +hotfix-store = { version = "0.1.0", path = "../hotfix-store", features = ["test-utils"] } +testcontainers = { workspace = true } +tokio = { workspace = true, features = ["test-util", "rt-multi-thread", "macros"] } +uuid = { workspace = true, features = ["v4"] } diff --git a/crates/hotfix-store-mongodb/README.md b/crates/hotfix-store-mongodb/README.md new file mode 100644 index 0000000..b5b1700 --- /dev/null +++ b/crates/hotfix-store-mongodb/README.md @@ -0,0 +1,38 @@ +# hotfix-store-mongodb + +MongoDB message store implementation for the [HotFIX](https://github.com/Validus-Risk-Management/hotfix) FIX engine. + +## Overview + +This crate provides `MongoDbMessageStore`, a persistent message store backed by MongoDB. It implements the +`MessageStore` trait from [hotfix-store](https://crates.io/crates/hotfix-store). + +## Usage + +```rust +use hotfix_store_mongodb::{Client, MongoDbMessageStore}; + +// Connect to MongoDB +let client = Client::with_uri_str("mongodb://localhost:27017").await?; +let db = client.database("myapp"); + +// Create the store +let store = MongoDbMessageStore::new(db, Some("fix_messages")).await?; +``` + +## Features + +- Persistent storage of FIX messages and sequence numbers +- Automatic index creation for efficient queries +- Session cleanup with `cleanup_older_than()` method + +## Cleanup + +The store provides a method to clean up old session data: + +```rust +use chrono::Duration; + +// Delete sequences older than 30 days +let deleted_count = store.cleanup_older_than(Duration::days(30)).await?; +``` diff --git a/crates/hotfix/src/store/mongodb.rs b/crates/hotfix-store-mongodb/src/lib.rs similarity index 80% rename from crates/hotfix/src/store/mongodb.rs rename to crates/hotfix-store-mongodb/src/lib.rs index ad89093..50c2990 100644 --- a/crates/hotfix/src/store/mongodb.rs +++ b/crates/hotfix-store-mongodb/src/lib.rs @@ -1,3 +1,18 @@ +//! MongoDB message store implementation for the hotfix FIX engine. +//! +//! This crate provides [`MongoDbMessageStore`], a persistent message store +//! backed by MongoDB. +//! +//! # Example +//! +//! ```ignore +//! use hotfix_store_mongodb::{Client, MongoDbMessageStore}; +//! +//! let client = Client::with_uri_str("mongodb://localhost:27017").await?; +//! let db = client.database("myapp"); +//! let store = MongoDbMessageStore::new(db, Some("fix_messages")).await?; +//! ``` + use async_trait::async_trait; use chrono::{DateTime, Duration, TimeZone, Utc}; use futures::TryStreamExt; @@ -9,9 +24,10 @@ use mongodb::options::{FindOneOptions, IndexOptions, ReplaceOptions}; use mongodb::{Collection, Database, IndexModel}; use serde::{Deserialize, Serialize}; -pub use mongodb::Client; +use hotfix_store::MessageStore; +use hotfix_store::error::{Result, StoreError}; -use crate::store::{MessageStore, Result, StoreError}; +pub use mongodb::Client; #[derive(Debug, Deserialize, Serialize)] struct SequenceMeta { @@ -30,6 +46,10 @@ struct Message { data: Binary, } +/// A MongoDB-backed message store implementation. +/// +/// This store persists messages and sequence numbers to MongoDB, +/// allowing session state to survive application restarts. pub struct MongoDbMessageStore { meta_collection: Collection, message_collection: Collection, @@ -37,7 +57,26 @@ pub struct MongoDbMessageStore { } impl MongoDbMessageStore { - pub async fn new(db: Database, collection_name: Option<&str>) -> anyhow::Result { + /// Creates a new MongoDB message store. + /// + /// # Arguments + /// + /// * `db` - The MongoDB database to use + /// * `collection_name` - Optional collection name (defaults to "messages") + /// + /// # Errors + /// + /// Returns `StoreError::Initialization` if the store cannot be initialized. + pub async fn new(db: Database, collection_name: Option<&str>) -> Result { + Self::new_inner(db, collection_name) + .await + .map_err(|e| StoreError::Initialization(e.into())) + } + + async fn new_inner( + db: Database, + collection_name: Option<&str>, + ) -> mongodb::error::Result { let collection_name = collection_name.unwrap_or("messages"); let meta_collection = db.collection(collection_name); let message_collection = db.collection(collection_name); @@ -53,7 +92,9 @@ impl MongoDbMessageStore { Ok(store) } - async fn ensure_indexes(meta_collection: &Collection) -> anyhow::Result<()> { + async fn ensure_indexes( + meta_collection: &Collection, + ) -> mongodb::error::Result<()> { let meta_index = IndexModel::builder() .keys(doc! { "meta": 1, "_id": -1 }) .build(); @@ -71,7 +112,7 @@ impl MongoDbMessageStore { async fn get_or_default_sequence( meta_collection: &Collection, - ) -> anyhow::Result { + ) -> mongodb::error::Result { let options = FindOneOptions::builder().sort(doc! { "_id": -1 }).build(); let res = meta_collection .find_one(doc! { "meta": true }) @@ -87,7 +128,7 @@ impl MongoDbMessageStore { async fn new_sequence( meta_collection: &Collection, - ) -> anyhow::Result { + ) -> mongodb::error::Result { let sequence_id = ObjectId::new(); let initial_meta = SequenceMeta { object_id: sequence_id, @@ -103,7 +144,20 @@ impl MongoDbMessageStore { /// Deletes sequences older than the specified age, along with their associated messages. /// - /// Returns the number of deleted sequences. + /// This method is useful for cleaning up old session data from MongoDB. + /// The current active sequence is never deleted, even if it matches the age criteria. + /// + /// # Arguments + /// + /// * `age` - The minimum age of sequences to delete + /// + /// # Returns + /// + /// The number of deleted sequences. + /// + /// # Errors + /// + /// Returns `StoreError::Cleanup` if the cleanup operation fails. pub async fn cleanup_older_than(&self, age: Duration) -> Result { let cutoff = BsonDateTime::from_millis((Utc::now() - age).timestamp_millis()); diff --git a/crates/hotfix-store-mongodb/tests/conformance.rs b/crates/hotfix-store-mongodb/tests/conformance.rs new file mode 100644 index 0000000..310a1c9 --- /dev/null +++ b/crates/hotfix-store-mongodb/tests/conformance.rs @@ -0,0 +1,48 @@ +//! Conformance tests for MongoDbMessageStore using the test harness from hotfix-store. + +use hotfix_store::MessageStore; +use hotfix_store::test_utils::TestStoreFactory; +use hotfix_store_mongodb::{Client, MongoDbMessageStore}; +use testcontainers::runners::AsyncRunner; +use testcontainers::{ContainerAsync, GenericImage}; +use tokio::sync::OnceCell; + +static MONGO_CONTAINER: OnceCell> = OnceCell::const_new(); +const MONGO_PORT: u16 = 27017; + +struct MongodbTestStoreFactory { + client: Client, + collection_name: String, +} + +impl MongodbTestStoreFactory { + async fn new() -> Self { + let container = MONGO_CONTAINER.get_or_init(Self::init_container).await; + let host = container.get_host().await.unwrap(); + let port = container.get_host_port_ipv4(MONGO_PORT).await.unwrap(); + let uri = format!("mongodb://{host}:{port}"); + let client = Client::with_uri_str(&uri).await.unwrap(); + + Self { + client, + collection_name: uuid::Uuid::new_v4().to_string(), + } + } + + async fn init_container() -> ContainerAsync { + GenericImage::new("mongo", "8.0").start().await.unwrap() + } +} + +#[async_trait::async_trait] +impl TestStoreFactory for MongodbTestStoreFactory { + async fn create_store(&self) -> Box { + let db = self.client.database("hotfixConformanceTests"); + let store = MongoDbMessageStore::new(db, Some(&self.collection_name)) + .await + .unwrap(); + Box::new(store) + } +} + +hotfix_store::conformance_tests!(mongodb, MongodbTestStoreFactory::new().await); diff --git a/crates/hotfix/tests/mongodb_store_tests.rs b/crates/hotfix-store-mongodb/tests/mongodb_tests.rs similarity index 94% rename from crates/hotfix/tests/mongodb_store_tests.rs rename to crates/hotfix-store-mongodb/tests/mongodb_tests.rs index c031d08..0629b73 100644 --- a/crates/hotfix/tests/mongodb_store_tests.rs +++ b/crates/hotfix-store-mongodb/tests/mongodb_tests.rs @@ -1,8 +1,12 @@ -#![cfg(feature = "mongodb")] +//! MongoDB-specific tests for MongoDbMessageStore. +//! +//! These tests cover MongoDB-specific functionality such as connection failure handling +//! and the cleanup_older_than method. use chrono::Duration; -use hotfix::store::mongodb::{Client, MongoDbMessageStore}; -use hotfix::store::{MessageStore, StoreError}; +use hotfix_store::MessageStore; +use hotfix_store::error::StoreError; +use hotfix_store_mongodb::{Client, MongoDbMessageStore}; use testcontainers::runners::AsyncRunner; use testcontainers::{ContainerAsync, GenericImage}; diff --git a/crates/hotfix-store/Cargo.toml b/crates/hotfix-store/Cargo.toml new file mode 100644 index 0000000..fb6fada --- /dev/null +++ b/crates/hotfix-store/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "hotfix-store" +description = "Message store traits and implementations for the HotFIX engine" +version = "0.1.0" +authors.workspace = true +edition.workspace = true +license.workspace = true +readme = "README.md" +homepage.workspace = true +repository.workspace = true +keywords.workspace = true +categories.workspace = true + +[features] +default = ["test-utils"] +test-utils = ["dep:tokio"] + +[lints] +workspace = true + +[dependencies] +async-trait = { workspace = true } +chrono = { workspace = true } +thiserror = { workspace = true } +tokio = { workspace = true, features = ["time"], optional = true } + +[dev-dependencies] +tokio = { workspace = true, features = ["test-util", "rt-multi-thread", "macros", "time"] } +uuid = { workspace = true, features = ["v4"] } +tempfile = "3" diff --git a/crates/hotfix-store/README.md b/crates/hotfix-store/README.md new file mode 100644 index 0000000..2f72b7c --- /dev/null +++ b/crates/hotfix-store/README.md @@ -0,0 +1,50 @@ +# hotfix-store + +Message store traits and implementations for the [HotFIX](https://github.com/Validus-Risk-Management/hotfix) FIX engine. + +## Overview + +This crate provides the `MessageStore` trait and core implementations for persisting FIX session state, including +messages and sequence numbers. + +## Implementations + +- **InMemoryMessageStore**: A non-persistent store for testing +- **FileStore**: A file-based store for simple persistence + +Additional implementations are available in separate crates: + +- [hotfix-store-mongodb](https://crates.io/crates/hotfix-store-mongodb): MongoDB-backed store + +## Usage + +```rust +use hotfix_store::{MessageStore, InMemoryMessageStore, FileStore}; + +// In-memory store (for testing) +let store = InMemoryMessageStore::default (); + +// File-based store (for persistence) +let store = FileStore::new("/path/to/store", "session_name") ?; +``` + +## Test Utilities + +The `test-utils` feature provides a test harness for verifying custom `MessageStore` implementations: + +```rust +use hotfix_store::test_utils::TestStoreFactory; +use hotfix_store::conformance_tests; + +struct MyStoreFactory; + +#[async_trait::async_trait] +impl TestStoreFactory for MyStoreFactory { + async fn create_store(&self) -> Box { + Box::new(MyStore::new()) + } +} + +// Generates all conformance tests for your implementation +conformance_tests!(my_store, MyStoreFactory); +``` diff --git a/crates/hotfix/src/store/error.rs b/crates/hotfix-store/src/error.rs similarity index 90% rename from crates/hotfix/src/store/error.rs rename to crates/hotfix-store/src/error.rs index 894172f..677e647 100644 --- a/crates/hotfix/src/store/error.rs +++ b/crates/hotfix-store/src/error.rs @@ -8,6 +8,10 @@ pub type BoxError = Box; /// Errors that can occur during message store operations. #[derive(Debug, Error)] pub enum StoreError { + /// Failed to initialize the store. + #[error("failed to initialize store: {0}")] + Initialization(#[source] BoxError), + /// Failed to persist a message to the store. #[error("failed to persist message (seq_num: {sequence_number})")] PersistMessage { diff --git a/crates/hotfix/src/store/file.rs b/crates/hotfix-store/src/file.rs similarity index 81% rename from crates/hotfix/src/store/file.rs rename to crates/hotfix-store/src/file.rs index 60bd1f5..21436f0 100644 --- a/crates/hotfix/src/store/file.rs +++ b/crates/hotfix-store/src/file.rs @@ -1,5 +1,7 @@ -use crate::store::{MessageStore, Result, StoreError}; -use anyhow::Context; +//! A file-based message store for persistence. + +use crate::MessageStore; +use crate::error::{Result, StoreError}; use chrono::{DateTime, Utc}; use std::collections::HashMap; use std::fs::{File, OpenOptions}; @@ -33,8 +35,17 @@ pub struct FileStore { } impl FileStore { - pub fn new(directory: impl AsRef, name: &str) -> anyhow::Result { - let base_path = directory.as_ref().join(name); + /// Creates a new file store in the specified directory with the given name. + /// + /// # Errors + /// + /// Returns `StoreError::Initialization` if the store files cannot be created or read. + pub fn new(directory: impl AsRef, name: &str) -> Result { + Self::new_inner(directory.as_ref(), name).map_err(|e| StoreError::Initialization(e.into())) + } + + fn new_inner(directory: &Path, name: &str) -> std::io::Result { + let base_path = directory.join(name); std::fs::create_dir_all(directory)?; let body_path = base_path.with_extension("body"); @@ -84,11 +95,16 @@ impl FileStore { /// Retrieves the session creation time from the session file. /// /// It initialises the session file if it doesn't exist. - fn get_or_create_session_time(base_path: &Path) -> anyhow::Result> { + fn get_or_create_session_time(base_path: &Path) -> std::io::Result> { let session_path = base_path.with_extension("session"); let session_time = if session_path.exists() { let content = std::fs::read_to_string(&session_path)?; - content.trim().parse::>()? + content.trim().parse::>().map_err(|e| { + std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!("failed to parse session time: {e}"), + ) + })? } else { let now = Utc::now(); std::fs::write(&session_path, now.to_rfc3339())?; @@ -101,11 +117,15 @@ impl FileStore { /// Retrieves the sequence numbers from the seqnums file. /// /// It defaults to `(0, 0)` if the file doesn't exist or if it's empty. - fn read_initial_seqnums(base_path: &Path) -> anyhow::Result<(u64, u64)> { + fn read_initial_seqnums(base_path: &Path) -> std::io::Result<(u64, u64)> { let seqnums_path = base_path.with_extension("seqnums"); let (sender_seq_number, target_seq_number) = if seqnums_path.exists() { - let content = - std::fs::read_to_string(&seqnums_path).context("failed to read seqnums file")?; + let content = std::fs::read_to_string(&seqnums_path).map_err(|e| { + std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!("failed to read seqnums file: {e}"), + ) + })?; if content.trim().is_empty() { (0u64, 0u64) } else { @@ -118,32 +138,41 @@ impl FileStore { Ok((sender_seq_number, target_seq_number)) } - fn parse_seqnums(content: &str) -> anyhow::Result<(u64, u64)> { + fn parse_seqnums(content: &str) -> std::io::Result<(u64, u64)> { let parts: Vec<&str> = content.trim().split(':').map(|s| s.trim()).collect(); if parts.len() != 2 { - anyhow::bail!("invalid seqnums format"); + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "invalid seqnums format", + )); } - let sender = parts[0] - .parse::() - .context("failed to parse sender sequence number")?; - let target = parts[1] - .parse::() - .context("failed to parse target sequence number")?; + let sender = parts[0].parse::().map_err(|e| { + std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!("failed to parse sender sequence number: {e}"), + ) + })?; + let target = parts[1].parse::().map_err(|e| { + std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!("failed to parse target sequence number: {e}"), + ) + })?; Ok((sender, target)) } - fn load_message_index(header_path: &Path) -> anyhow::Result> { + fn load_message_index(header_path: &Path) -> std::io::Result> { let mut index = HashMap::new(); if !header_path.exists() { return Ok(index); } - let file = File::open(header_path).context("failed to open header file for reading")?; + let file = File::open(header_path)?; let reader = BufReader::new(file); for line in reader.lines() { - let line = line.context("failed to read header line")?; + let line = line?; let parts: Vec<&str> = line.trim().split(',').collect(); if parts.len() != 3 { continue; diff --git a/crates/hotfix/src/store/in_memory.rs b/crates/hotfix-store/src/in_memory.rs similarity index 86% rename from crates/hotfix/src/store/in_memory.rs rename to crates/hotfix-store/src/in_memory.rs index 5468fe3..39c8629 100644 --- a/crates/hotfix/src/store/in_memory.rs +++ b/crates/hotfix-store/src/in_memory.rs @@ -1,7 +1,13 @@ -use crate::store::{MessageStore, Result}; +//! An in-memory message store that loses its state on restart. + +use crate::{MessageStore, Result}; use chrono::{DateTime, Utc}; use std::collections::HashMap; +/// An in-memory message store implementation. +/// +/// This store keeps all messages in memory and does not persist them. +/// Use this only for testing or when persistence is not required. #[derive(Debug)] pub struct InMemoryMessageStore { sender_seq_number: u64, diff --git a/crates/hotfix-store/src/lib.rs b/crates/hotfix-store/src/lib.rs new file mode 100644 index 0000000..23b6fdf --- /dev/null +++ b/crates/hotfix-store/src/lib.rs @@ -0,0 +1,67 @@ +//! Message store traits and implementations for the hotfix FIX engine. +//! +//! This crate provides the [`MessageStore`] trait and several implementations: +//! +//! - [`InMemoryMessageStore`]: An in-memory store for testing (loses state on restart) +//! - [`FileStore`]: A file-based store for persistence +//! +//! # Features +//! +//! - `test-utils`: Enables the [`test_utils`] module with test harness for store implementations + +/// Error types for store operations. +pub mod error; + +/// File-based message store for persistence. +pub mod file; + +/// In-memory message store (non-persistent). +pub mod in_memory; + +/// Test utilities for message store implementations. +#[cfg(feature = "test-utils")] +pub mod test_utils; + +pub use error::{BoxError, Result, StoreError}; +pub use file::FileStore; +pub use in_memory::InMemoryMessageStore; + +use chrono::DateTime; + +/// A trait for storing and retrieving FIX messages and sequence numbers. +/// +/// Message stores are responsible for: +/// - Persisting outgoing messages for potential resend +/// - Tracking sender and target sequence numbers +/// - Storing session creation time +/// +/// Implementations should be async-safe and handle concurrent access appropriately. +#[async_trait::async_trait] +pub trait MessageStore: Send + Sync { + /// Adds a message to the store with the given sequence number. + async fn add(&mut self, sequence_number: u64, message: &[u8]) -> Result<()>; + + /// Retrieves messages in the given sequence number range (inclusive). + async fn get_slice(&self, begin: usize, end: usize) -> Result>>; + + /// Returns the next sender sequence number (current + 1). + fn next_sender_seq_number(&self) -> u64; + + /// Returns the next target sequence number (current + 1). + fn next_target_seq_number(&self) -> u64; + + /// Increments the sender sequence number by 1. + async fn increment_sender_seq_number(&mut self) -> Result<()>; + + /// Increments the target sequence number by 1. + async fn increment_target_seq_number(&mut self) -> Result<()>; + + /// Sets the target sequence number to a specific value. + async fn set_target_seq_number(&mut self, seq_number: u64) -> Result<()>; + + /// Resets the store, clearing all messages and resetting sequence numbers. + async fn reset(&mut self) -> Result<()>; + + /// Returns the creation time of the current session. + fn creation_time(&self) -> DateTime; +} diff --git a/crates/hotfix-store/src/test_utils.rs b/crates/hotfix-store/src/test_utils.rs new file mode 100644 index 0000000..3e6d259 --- /dev/null +++ b/crates/hotfix-store/src/test_utils.rs @@ -0,0 +1,449 @@ +//! Test utilities for message store implementations. +//! +//! This module provides a test harness for verifying that message store +//! implementations conform to the expected behavior of the [`MessageStore`] trait. + +use crate::MessageStore; + +/// A factory trait for creating message store instances during testing. +/// +/// Implement this trait for each message store implementation you want to test. +/// The test functions in this module will use this factory to create fresh +/// store instances for each test. +#[async_trait::async_trait] +pub trait TestStoreFactory: Send + Sync { + /// Creates a new message store instance. + /// + /// For persistent stores, this should return a store that can be + /// reopened with the same data (by calling `create_store` again). + async fn create_store(&self) -> Box; + + /// Returns `true` if the store persists data across restarts. + /// + /// This is used to skip persistence-related tests for in-memory stores. + fn is_persistent(&self) -> bool { + true + } +} + +/// Tests that a new store starts with sequence numbers at 1. +pub async fn test_new_store_initialization(factory: &dyn TestStoreFactory) { + let store = factory.create_store().await; + + assert_eq!(store.next_sender_seq_number(), 1); + assert_eq!(store.next_target_seq_number(), 1); +} + +/// Tests adding and retrieving messages. +pub async fn test_add_and_get_messages(factory: &dyn TestStoreFactory) { + let mut store = factory.create_store().await; + + let message1 = b"test message 1"; + let message2 = b"test message 2"; + let message3 = b"test message 3"; + + store + .add(1, message1) + .await + .expect("Failed to add message 1"); + store + .add(2, message2) + .await + .expect("Failed to add message 2"); + store + .add(3, message3) + .await + .expect("Failed to add message 3"); + + let messages = store.get_slice(1, 3).await.expect("Failed to get messages"); + assert_eq!(messages.len(), 3); + assert_eq!(messages[0], message1); + assert_eq!(messages[1], message2); + assert_eq!(messages[2], message3); +} + +/// Tests getting a partial range of messages. +pub async fn test_get_slice_partial_range(factory: &dyn TestStoreFactory) { + let mut store = factory.create_store().await; + + let message1 = b"message 1"; + let message2 = b"message 2"; + let message3 = b"message 3"; + let message4 = b"message 4"; + + store + .add(1, message1) + .await + .expect("Failed to add message 1"); + store + .add(2, message2) + .await + .expect("Failed to add message 2"); + store + .add(3, message3) + .await + .expect("Failed to add message 3"); + store + .add(4, message4) + .await + .expect("Failed to add message 4"); + + let messages = store.get_slice(2, 3).await.expect("Failed to get messages"); + assert_eq!(messages.len(), 2); + assert_eq!(messages[0], message2); + assert_eq!(messages[1], message3); +} + +/// Tests getting a slice from an empty store. +pub async fn test_get_slice_empty_range(factory: &dyn TestStoreFactory) { + let store = factory.create_store().await; + + let messages = store.get_slice(1, 3).await.expect("Failed to get messages"); + assert_eq!(messages.len(), 0); +} + +/// Tests incrementing the sender sequence number. +pub async fn test_increment_sender_seq_number(factory: &dyn TestStoreFactory) { + let mut store = factory.create_store().await; + + assert_eq!(store.next_sender_seq_number(), 1); + + store + .increment_sender_seq_number() + .await + .expect("Failed to increment sender seq number"); + assert_eq!(store.next_sender_seq_number(), 2); + + store + .increment_sender_seq_number() + .await + .expect("Failed to increment sender seq number"); + assert_eq!(store.next_sender_seq_number(), 3); +} + +/// Tests incrementing the target sequence number. +pub async fn test_increment_target_seq_number(factory: &dyn TestStoreFactory) { + let mut store = factory.create_store().await; + + assert_eq!(store.next_target_seq_number(), 1); + + store + .increment_target_seq_number() + .await + .expect("Failed to increment target seq number"); + assert_eq!(store.next_target_seq_number(), 2); + + store + .increment_target_seq_number() + .await + .expect("Failed to increment target seq number"); + assert_eq!(store.next_target_seq_number(), 3); +} + +/// Tests setting the target sequence number directly. +pub async fn test_set_target_seq_number(factory: &dyn TestStoreFactory) { + let mut store = factory.create_store().await; + + assert_eq!(store.next_target_seq_number(), 1); + + store + .set_target_seq_number(10) + .await + .expect("Failed to set target seq number"); + assert_eq!(store.next_target_seq_number(), 11); + + store + .set_target_seq_number(5) + .await + .expect("Failed to set target seq number"); + assert_eq!(store.next_target_seq_number(), 6); +} + +/// Tests resetting the store. +pub async fn test_reset_store(factory: &dyn TestStoreFactory) { + let mut store = factory.create_store().await; + + // Add some messages and increment sequence numbers + store + .add(1, b"message 1") + .await + .expect("Failed to add message"); + store + .add(2, b"message 2") + .await + .expect("Failed to add message"); + store + .increment_sender_seq_number() + .await + .expect("Failed to increment sender seq number"); + store + .increment_target_seq_number() + .await + .expect("Failed to increment target seq number"); + + assert_eq!(store.next_sender_seq_number(), 2); + assert_eq!(store.next_target_seq_number(), 2); + + let messages_before_reset = store.get_slice(1, 2).await.expect("Failed to get messages"); + assert_eq!(messages_before_reset.len(), 2); + + // Reset the store + store.reset().await.expect("Failed to reset store"); + + // Verify sequence numbers are reset + assert_eq!(store.next_sender_seq_number(), 1); + assert_eq!(store.next_target_seq_number(), 1); + + // Verify messages are cleared + let messages_after_reset = store.get_slice(1, 2).await.expect("Failed to get messages"); + assert_eq!(messages_after_reset.len(), 0); +} + +/// Tests that data persists across store instances (for persistent stores only). +pub async fn test_persistence_across_store_instances(factory: &dyn TestStoreFactory) { + if !factory.is_persistent() { + return; + } + + // Create first store instance and add data + { + let mut store1 = factory.create_store().await; + store1 + .add(1, b"persistent message") + .await + .expect("Failed to add message"); + store1 + .increment_sender_seq_number() + .await + .expect("Failed to increment sender seq number"); + store1 + .set_target_seq_number(5) + .await + .expect("Failed to set target seq number"); + } + + // Create second store instance and verify data persists + { + let store2 = factory.create_store().await; + + assert_eq!(store2.next_sender_seq_number(), 2); + assert_eq!(store2.next_target_seq_number(), 6); + + let messages = store2 + .get_slice(1, 1) + .await + .expect("Failed to get messages"); + assert_eq!(messages.len(), 1); + assert_eq!(messages[0], b"persistent message"); + } +} + +/// Tests getting a slice beyond available messages. +pub async fn test_get_slice_beyond_available_messages(factory: &dyn TestStoreFactory) { + let mut store = factory.create_store().await; + + store + .add(1, b"only message") + .await + .expect("Failed to add message"); + + let messages = store + .get_slice(1, 10) + .await + .expect("Failed to get messages"); + assert_eq!(messages.len(), 1); + assert_eq!(messages[0], b"only message"); +} + +/// Tests overwriting an existing message. +pub async fn test_overwrite_existing_message(factory: &dyn TestStoreFactory) { + let mut store = factory.create_store().await; + + store + .add(1, b"original message") + .await + .expect("Failed to add original message"); + store + .add(1, b"updated message") + .await + .expect("Failed to add updated message"); + + let messages = store.get_slice(1, 1).await.expect("Failed to get messages"); + assert_eq!(messages.len(), 1); + assert_eq!(messages[0], b"updated message"); +} + +/// Tests that creation time is set on new stores. +pub async fn test_creation_time_is_set(factory: &dyn TestStoreFactory) { + use chrono::Utc; + + let before = Utc::now(); + let store = factory.create_store().await; + let after = Utc::now(); + + assert!(before <= store.creation_time()); + assert!(store.creation_time() <= after); +} + +/// Tests that creation time is preserved across store restarts (for persistent stores only). +pub async fn test_creation_time_is_preserved(factory: &dyn TestStoreFactory) { + if !factory.is_persistent() { + return; + } + + let store = factory.create_store().await; + let creation_time1 = store.creation_time(); + drop(store); + + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + + let store = factory.create_store().await; + let creation_time2 = store.creation_time(); + + assert_eq!(creation_time1, creation_time2); +} + +/// Tests that creation time is updated on reset. +pub async fn test_creation_time_gets_reset_correctly(factory: &dyn TestStoreFactory) { + use chrono::Utc; + + let mut store = factory.create_store().await; + + tokio::time::sleep(std::time::Duration::from_millis(2)).await; + let after_sleep = Utc::now(); + tokio::time::sleep(std::time::Duration::from_millis(2)).await; + + store.reset().await.expect("failed to reset store"); + let reset_creation_time = store.creation_time(); + assert!(reset_creation_time > after_sleep); + + if !factory.is_persistent() { + return; + } + + drop(store); + let store = factory.create_store().await; + assert_eq!(reset_creation_time, store.creation_time()); +} + +/// Generates conformance tests for a message store implementation. +/// +/// This macro creates a module containing all the standard conformance tests +/// for a [`MessageStore`] implementation. Each test gets its own test function, +/// allowing for parallel execution and clear test reporting. +/// +/// # Arguments +/// +/// * `$mod_name` - The name of the module to create (e.g., `in_memory`, `file`, `mongodb`) +/// * `$factory` - An expression that creates a [`TestStoreFactory`] instance. +/// This can be a sync expression (e.g., `MyFactory::new()`) or an async +/// expression (e.g., `MyFactory::new().await`). +/// +/// # Example +/// +/// ```ignore +/// use hotfix_store::conformance_tests; +/// +/// struct MyStoreFactory; +/// +/// #[async_trait::async_trait] +/// impl TestStoreFactory for MyStoreFactory { +/// async fn create_store(&self) -> Box { +/// Box::new(MyStore::new()) +/// } +/// } +/// +/// conformance_tests!(my_store, MyStoreFactory); +/// ``` +#[macro_export] +macro_rules! conformance_tests { + ($mod_name:ident, $factory:expr) => { + mod $mod_name { + use super::*; + + #[tokio::test] + async fn test_new_store_initialization() { + let factory = $factory; + $crate::test_utils::test_new_store_initialization(&factory).await; + } + + #[tokio::test] + async fn test_add_and_get_messages() { + let factory = $factory; + $crate::test_utils::test_add_and_get_messages(&factory).await; + } + + #[tokio::test] + async fn test_get_slice_partial_range() { + let factory = $factory; + $crate::test_utils::test_get_slice_partial_range(&factory).await; + } + + #[tokio::test] + async fn test_get_slice_empty_range() { + let factory = $factory; + $crate::test_utils::test_get_slice_empty_range(&factory).await; + } + + #[tokio::test] + async fn test_increment_sender_seq_number() { + let factory = $factory; + $crate::test_utils::test_increment_sender_seq_number(&factory).await; + } + + #[tokio::test] + async fn test_increment_target_seq_number() { + let factory = $factory; + $crate::test_utils::test_increment_target_seq_number(&factory).await; + } + + #[tokio::test] + async fn test_set_target_seq_number() { + let factory = $factory; + $crate::test_utils::test_set_target_seq_number(&factory).await; + } + + #[tokio::test] + async fn test_reset_store() { + let factory = $factory; + $crate::test_utils::test_reset_store(&factory).await; + } + + #[tokio::test] + async fn test_persistence_across_store_instances() { + let factory = $factory; + $crate::test_utils::test_persistence_across_store_instances(&factory).await; + } + + #[tokio::test] + async fn test_get_slice_beyond_available_messages() { + let factory = $factory; + $crate::test_utils::test_get_slice_beyond_available_messages(&factory).await; + } + + #[tokio::test] + async fn test_overwrite_existing_message() { + let factory = $factory; + $crate::test_utils::test_overwrite_existing_message(&factory).await; + } + + #[tokio::test] + async fn test_creation_time_is_set() { + let factory = $factory; + $crate::test_utils::test_creation_time_is_set(&factory).await; + } + + #[tokio::test] + async fn test_creation_time_is_preserved() { + let factory = $factory; + $crate::test_utils::test_creation_time_is_preserved(&factory).await; + } + + #[tokio::test] + async fn test_creation_time_gets_reset_correctly() { + let factory = $factory; + $crate::test_utils::test_creation_time_gets_reset_correctly(&factory).await; + } + } + }; +} diff --git a/crates/hotfix-store/tests/conformance.rs b/crates/hotfix-store/tests/conformance.rs new file mode 100644 index 0000000..c7f7e20 --- /dev/null +++ b/crates/hotfix-store/tests/conformance.rs @@ -0,0 +1,53 @@ +//! Conformance tests for InMemoryMessageStore and FileStore implementations. + +use std::path::PathBuf; +use std::{env, fs}; + +use hotfix_store::test_utils::TestStoreFactory; +use hotfix_store::{FileStore, InMemoryMessageStore, MessageStore}; + +struct InMemoryMessageStoreTestFactory; + +#[async_trait::async_trait] +impl TestStoreFactory for InMemoryMessageStoreTestFactory { + async fn create_store(&self) -> Box { + Box::new(InMemoryMessageStore::default()) + } + + fn is_persistent(&self) -> bool { + false + } +} + +struct FileStoreTestFactory { + directory: PathBuf, + name: String, +} + +impl FileStoreTestFactory { + fn new() -> Self { + Self { + directory: env::temp_dir(), + name: format!("file_store_test_{}", uuid::Uuid::new_v4()), + } + } +} + +#[async_trait::async_trait] +impl TestStoreFactory for FileStoreTestFactory { + async fn create_store(&self) -> Box { + Box::new(FileStore::new(&self.directory, &self.name).expect("Failed to create file store")) + } +} + +impl Drop for FileStoreTestFactory { + fn drop(&mut self) { + let base_path = self.directory.join(&self.name); + for ext in ["header", "body", "seqnums", "session"] { + let _ = fs::remove_file(base_path.with_extension(ext)); + } + } +} + +hotfix_store::conformance_tests!(in_memory, InMemoryMessageStoreTestFactory); +hotfix_store::conformance_tests!(file, FileStoreTestFactory::new()); diff --git a/crates/hotfix/tests/file_store_tests.rs b/crates/hotfix-store/tests/file_store_tests.rs similarity index 98% rename from crates/hotfix/tests/file_store_tests.rs rename to crates/hotfix-store/tests/file_store_tests.rs index cc74978..9a151d7 100644 --- a/crates/hotfix/tests/file_store_tests.rs +++ b/crates/hotfix-store/tests/file_store_tests.rs @@ -1,5 +1,6 @@ -use hotfix::store::file::FileStore; -use hotfix::store::{MessageStore, StoreError}; +//! FileStore-specific tests for error handling and edge cases. + +use hotfix_store::{FileStore, MessageStore, StoreError}; use std::fs; use tempfile::TempDir; diff --git a/crates/hotfix/Cargo.toml b/crates/hotfix/Cargo.toml index 79dff7f..e333ade 100644 --- a/crates/hotfix/Cargo.toml +++ b/crates/hotfix/Cargo.toml @@ -14,21 +14,22 @@ categories.workspace = true [features] default = ["test-utils"] fix44 = ["hotfix-message/fix44"] -mongodb = ["dep:mongodb"] -test-utils = [] +mongodb = ["dep:hotfix-store-mongodb"] +test-utils = ["hotfix-store/test-utils"] [lints] workspace = true [dependencies] hotfix-message = { version = "0.3.0", path = "../hotfix-message", features = ["utils-chrono"] } +hotfix-store = { version = "0.1.0", path = "../hotfix-store" } +hotfix-store-mongodb = { version = "0.1.0", path = "../hotfix-store-mongodb", optional = true } anyhow = { workspace = true } async-trait = { workspace = true } chrono = { workspace = true, features = ["serde"] } chrono-tz = { workspace = true, features = ["serde"] } futures = { workspace = true } -mongodb = { workspace = true, optional = true } rustls-pki-types = { workspace = true } rustls = { workspace = true } rustls-native-certs = { workspace = true } @@ -48,5 +49,4 @@ hotfix-message = { version = "0.3.0", path = "../hotfix-message", features = ["f rcgen = "0.13" rustls = { workspace = true, features = ["ring"] } tempfile = "3" -testcontainers = { workspace = true } tokio = { workspace = true, features = ["test-util"] } diff --git a/crates/hotfix/src/initiator.rs b/crates/hotfix/src/initiator.rs index 8f0e1c2..146ad90 100644 --- a/crates/hotfix/src/initiator.rs +++ b/crates/hotfix/src/initiator.rs @@ -30,7 +30,7 @@ impl Initiator { pub async fn start( config: SessionConfig, application: impl Application, - store: impl MessageStore + Send + Sync + 'static, + store: impl MessageStore + 'static, ) -> Result { let session_ref = InternalSessionRef::new(config.clone(), application, store)?; let (completion_tx, completion_rx) = watch::channel(false); diff --git a/crates/hotfix/src/session/session_ref.rs b/crates/hotfix/src/session/session_ref.rs index 4bff916..78c938e 100644 --- a/crates/hotfix/src/session/session_ref.rs +++ b/crates/hotfix/src/session/session_ref.rs @@ -23,7 +23,7 @@ impl InternalSessionRef { pub fn new( config: SessionConfig, application: impl Application, - store: impl MessageStore + Send + Sync + 'static, + store: impl MessageStore + 'static, ) -> Result { let (event_sender, event_receiver) = mpsc::channel::(100); let (outbound_message_sender, outbound_message_receiver) = mpsc::channel::(10); diff --git a/crates/hotfix/src/store.rs b/crates/hotfix/src/store.rs index cba3438..9450240 100644 --- a/crates/hotfix/src/store.rs +++ b/crates/hotfix/src/store.rs @@ -1,34 +1,24 @@ -//! Implementations of in-memory and persistent message stores holding session state. +//! Message store implementations (re-exported from hotfix-store). //! //! By default, only the [in_memory] store is included. Further message store implementations, -//! such as `mongodb` and `redb` can be enabled through feature flags. +//! such as `mongodb` can be enabled through feature flags. /// Error types for store operations. -pub mod error; +pub use hotfix_store::error; /// An in-memory message store that loses its state on restart. Only use this for testing. -pub mod in_memory; - -#[cfg(feature = "mongodb")] -/// A message store using MongoDB for persistence. -pub mod mongodb; +pub use hotfix_store::in_memory; /// A file-based message store for persistence. -pub mod file; +pub use hotfix_store::file; -pub use error::*; +#[cfg(feature = "mongodb")] +/// A message store using MongoDB for persistence. +pub use hotfix_store_mongodb as mongodb; -use chrono::DateTime; +#[cfg(feature = "test-utils")] +/// Test utilities for message store implementations. +pub use hotfix_store::test_utils; -#[async_trait::async_trait] -pub trait MessageStore { - async fn add(&mut self, sequence_number: u64, message: &[u8]) -> Result<()>; - async fn get_slice(&self, begin: usize, end: usize) -> Result>>; - fn next_sender_seq_number(&self) -> u64; - fn next_target_seq_number(&self) -> u64; - async fn increment_sender_seq_number(&mut self) -> Result<()>; - async fn increment_target_seq_number(&mut self) -> Result<()>; - async fn set_target_seq_number(&mut self, seq_number: u64) -> Result<()>; - async fn reset(&mut self) -> Result<()>; - fn creation_time(&self) -> DateTime; -} +pub use hotfix_store::error::*; +pub use hotfix_store::{FileStore, InMemoryMessageStore, MessageStore}; diff --git a/crates/hotfix/tests/common_store_tests.rs b/crates/hotfix/tests/common_store_tests.rs deleted file mode 100644 index 086401e..0000000 --- a/crates/hotfix/tests/common_store_tests.rs +++ /dev/null @@ -1,451 +0,0 @@ -use std::path::PathBuf; -use std::{env, fs}; - -use chrono::Utc; -use hotfix::store::MessageStore; -use hotfix::store::file::FileStore; -use hotfix::store::in_memory::InMemoryMessageStore; - -#[tokio::test] -async fn test_new_store_initialization() { - for factory in create_test_store_factories().await { - let store = factory.create_store().await; - - assert_eq!(store.next_sender_seq_number(), 1); - assert_eq!(store.next_target_seq_number(), 1); - } -} - -#[tokio::test] -async fn test_add_and_get_messages() { - for factory in create_test_store_factories().await { - let mut store = factory.create_store().await; - - let message1 = b"test message 1"; - let message2 = b"test message 2"; - let message3 = b"test message 3"; - - store - .add(1, message1) - .await - .expect("Failed to add message 1"); - store - .add(2, message2) - .await - .expect("Failed to add message 2"); - store - .add(3, message3) - .await - .expect("Failed to add message 3"); - - let messages = store.get_slice(1, 3).await.expect("Failed to get messages"); - assert_eq!(messages.len(), 3); - assert_eq!(messages[0], message1); - assert_eq!(messages[1], message2); - assert_eq!(messages[2], message3); - } -} - -#[tokio::test] -async fn test_get_slice_partial_range() { - for factory in create_test_store_factories().await { - let mut store = factory.create_store().await; - - let message1 = b"message 1"; - let message2 = b"message 2"; - let message3 = b"message 3"; - let message4 = b"message 4"; - - store - .add(1, message1) - .await - .expect("Failed to add message 1"); - store - .add(2, message2) - .await - .expect("Failed to add message 2"); - store - .add(3, message3) - .await - .expect("Failed to add message 3"); - store - .add(4, message4) - .await - .expect("Failed to add message 4"); - - let messages = store.get_slice(2, 3).await.expect("Failed to get messages"); - assert_eq!(messages.len(), 2); - assert_eq!(messages[0], message2); - assert_eq!(messages[1], message3); - } -} - -#[tokio::test] -async fn test_get_slice_empty_range() { - for factory in create_test_store_factories().await { - let store = factory.create_store().await; - - let messages = store.get_slice(1, 3).await.expect("Failed to get messages"); - assert_eq!(messages.len(), 0); - } -} - -#[tokio::test] -async fn test_increment_sender_seq_number() { - for factory in create_test_store_factories().await { - let mut store = factory.create_store().await; - - assert_eq!(store.next_sender_seq_number(), 1); - - store - .increment_sender_seq_number() - .await - .expect("Failed to increment sender seq number"); - assert_eq!(store.next_sender_seq_number(), 2); - - store - .increment_sender_seq_number() - .await - .expect("Failed to increment sender seq number"); - assert_eq!(store.next_sender_seq_number(), 3); - } -} - -#[tokio::test] -async fn test_increment_target_seq_number() { - for factory in create_test_store_factories().await { - let mut store = factory.create_store().await; - - assert_eq!(store.next_target_seq_number(), 1); - - store - .increment_target_seq_number() - .await - .expect("Failed to increment target seq number"); - assert_eq!(store.next_target_seq_number(), 2); - - store - .increment_target_seq_number() - .await - .expect("Failed to increment target seq number"); - assert_eq!(store.next_target_seq_number(), 3); - } -} - -#[tokio::test] -async fn test_set_target_seq_number() { - for factory in create_test_store_factories().await { - let mut store = factory.create_store().await; - - assert_eq!(store.next_target_seq_number(), 1); - - store - .set_target_seq_number(10) - .await - .expect("Failed to set target seq number"); - assert_eq!(store.next_target_seq_number(), 11); - - store - .set_target_seq_number(5) - .await - .expect("Failed to set target seq number"); - assert_eq!(store.next_target_seq_number(), 6); - } -} - -#[tokio::test] -async fn test_reset_store() { - for factory in create_test_store_factories().await { - let mut store = factory.create_store().await; - - // Add some messages and increment sequence numbers - store - .add(1, b"message 1") - .await - .expect("Failed to add message"); - store - .add(2, b"message 2") - .await - .expect("Failed to add message"); - store - .increment_sender_seq_number() - .await - .expect("Failed to increment sender seq number"); - store - .increment_target_seq_number() - .await - .expect("Failed to increment target seq number"); - - assert_eq!(store.next_sender_seq_number(), 2); - assert_eq!(store.next_target_seq_number(), 2); - - let messages_before_reset = store.get_slice(1, 2).await.expect("Failed to get messages"); - assert_eq!(messages_before_reset.len(), 2); - - // Reset the store - store.reset().await.expect("Failed to reset store"); - - // Verify sequence numbers are reset - assert_eq!(store.next_sender_seq_number(), 1); - assert_eq!(store.next_target_seq_number(), 1); - - // Verify messages are cleared - let messages_after_reset = store.get_slice(1, 2).await.expect("Failed to get messages"); - assert_eq!(messages_after_reset.len(), 0); - } -} - -#[tokio::test] -async fn test_persistence_across_store_instances() { - for factory in create_test_store_factories().await { - if !factory.is_persistent() { - continue; - } - - // Create first store instance and add data - { - let mut store1 = factory.create_store().await; - store1 - .add(1, b"persistent message") - .await - .expect("Failed to add message"); - store1 - .increment_sender_seq_number() - .await - .expect("Failed to increment sender seq number"); - store1 - .set_target_seq_number(5) - .await - .expect("Failed to set target seq number"); - } - - // Create second store instance and verify data persists - { - let store2 = factory.create_store().await; - - assert_eq!(store2.next_sender_seq_number(), 2); - assert_eq!(store2.next_target_seq_number(), 6); - - let messages = store2 - .get_slice(1, 1) - .await - .expect("Failed to get messages"); - assert_eq!(messages.len(), 1); - assert_eq!(messages[0], b"persistent message"); - } - } -} - -#[tokio::test] -async fn test_get_slice_beyond_available_messages() { - for factory in create_test_store_factories().await { - let mut store = factory.create_store().await; - - store - .add(1, b"only message") - .await - .expect("Failed to add message"); - - let messages = store - .get_slice(1, 10) - .await - .expect("Failed to get messages"); - assert_eq!(messages.len(), 1); - assert_eq!(messages[0], b"only message"); - } -} - -#[tokio::test] -async fn test_overwrite_existing_message() { - for factory in create_test_store_factories().await { - let mut store = factory.create_store().await; - - store - .add(1, b"original message") - .await - .expect("Failed to add original message"); - store - .add(1, b"updated message") - .await - .expect("Failed to add updated message"); - - let messages = store.get_slice(1, 1).await.expect("Failed to get messages"); - assert_eq!(messages.len(), 1); - assert_eq!(messages[0], b"updated message"); - } -} - -#[tokio::test] -async fn test_creation_time_is_set() { - for factory in create_test_store_factories().await { - let before = Utc::now(); - let store = factory.create_store().await; - let after = Utc::now(); - - assert!(before <= store.creation_time()); - assert!(store.creation_time() <= after); - } -} - -#[tokio::test] -async fn test_creation_time_is_preserved() { - for factory in create_test_store_factories().await { - if !factory.is_persistent() { - continue; - } - - let store = factory.create_store().await; - let creation_time1 = store.creation_time(); - drop(store); - - tokio::time::sleep(std::time::Duration::from_millis(10)).await; - - let store = factory.create_store().await; - let creation_time2 = store.creation_time(); - - assert_eq!(creation_time1, creation_time2); - } -} - -#[tokio::test] -async fn test_creation_time_gets_reset_correctly() { - for factory in create_test_store_factories().await { - let mut store = factory.create_store().await; - - tokio::time::sleep(std::time::Duration::from_millis(2)).await; - let after_sleep = Utc::now(); - tokio::time::sleep(std::time::Duration::from_millis(2)).await; - - store.reset().await.expect("failed to reset store"); - let reset_creation_time = store.creation_time(); - assert!(reset_creation_time > after_sleep); - - if !factory.is_persistent() { - continue; - } - - drop(store); - let store = factory.create_store().await; - assert_eq!(reset_creation_time, store.creation_time()); - } -} - -#[async_trait::async_trait] -pub trait TestStoreFactory { - async fn create_store(&self) -> Box; - fn is_persistent(&self) -> bool { - true - } -} - -async fn create_test_store_factories() -> Vec> { - #[allow(unused_mut)] - let mut stores: Vec> = vec![ - // Add in-memory store factory - Box::new(InMemoryMessageStoreTestFactory {}) as Box, - // Add file store factory - Box::new(FileStoreTestFactory::new()) as Box, - ]; - - #[cfg(feature = "mongodb")] - { - stores.push( - Box::new(mongodb_test_utils::MongodbTestStoreFactory::new().await) - as Box, - ); - } - - stores -} - -struct InMemoryMessageStoreTestFactory; - -#[async_trait::async_trait] -impl TestStoreFactory for InMemoryMessageStoreTestFactory { - async fn create_store(&self) -> Box { - Box::new(InMemoryMessageStore::default()) - } - - fn is_persistent(&self) -> bool { - false - } -} - -pub(crate) struct FileStoreTestFactory { - directory: PathBuf, - name: String, -} - -impl FileStoreTestFactory { - pub(crate) fn new() -> Self { - Self { - directory: env::temp_dir(), - name: format!("file_store_test_{}", uuid::Uuid::new_v4()), - } - } -} - -#[async_trait::async_trait] -impl TestStoreFactory for FileStoreTestFactory { - async fn create_store(&self) -> Box { - Box::new(FileStore::new(&self.directory, &self.name).expect("Failed to create file store")) - } -} - -impl Drop for FileStoreTestFactory { - fn drop(&mut self) { - let base_path = self.directory.join(&self.name); - for ext in ["header", "body", "seqnums", "session"] { - let _ = fs::remove_file(base_path.with_extension(ext)); - } - } -} - -#[cfg(feature = "mongodb")] -mod mongodb_test_utils { - use crate::TestStoreFactory; - use hotfix::store::MessageStore; - use hotfix::store::mongodb::MongoDbMessageStore; - use mongodb::Client; - use testcontainers::runners::AsyncRunner; - use testcontainers::{ContainerAsync, GenericImage}; - use tokio::sync::OnceCell; - - static MONGO_CONTAINER: OnceCell> = OnceCell::const_new(); - const MONGO_PORT: u16 = 27017; - - pub(crate) struct MongodbTestStoreFactory { - client: Client, - collection_name: String, - } - - impl MongodbTestStoreFactory { - pub(crate) async fn new() -> Self { - let container = MONGO_CONTAINER.get_or_init(Self::init_container).await; - let host = container.get_host().await.unwrap(); - let port = container.get_host_port_ipv4(MONGO_PORT).await.unwrap(); - let uri = format!("mongodb://{host}:{port}"); - let client = Client::with_uri_str(&uri).await.unwrap(); - - Self { - client, - collection_name: uuid::Uuid::new_v4().to_string(), - } - } - - async fn init_container() -> ContainerAsync { - GenericImage::new("mongo", "8.0").start().await.unwrap() - } - } - - #[async_trait::async_trait] - impl TestStoreFactory for MongodbTestStoreFactory { - async fn create_store(&self) -> Box { - let db = self.client.database("hotfixIntegrationTests"); - let store = MongoDbMessageStore::new(db, Some(&self.collection_name)) - .await - .unwrap(); - Box::new(store) - } - } -}