diff --git a/orange-sdk/src/trusted_wallet/spark/mod.rs b/orange-sdk/src/trusted_wallet/spark/mod.rs index b0314cc..5330435 100644 --- a/orange-sdk/src/trusted_wallet/spark/mod.rs +++ b/orange-sdk/src/trusted_wallet/spark/mod.rs @@ -333,7 +333,7 @@ impl Spark { }, }; - let spark_store = Arc::new(spark_store::SparkStore(store)); + let spark_store = Arc::new(spark_store::SparkStore::new(store)); let builder = SdkBuilder::new(spark_config, seed).with_storage(spark_store); let spark_wallet = Arc::new(builder.build().await.map_err(|e| { diff --git a/orange-sdk/src/trusted_wallet/spark/spark_store.rs b/orange-sdk/src/trusted_wallet/spark/spark_store.rs index 5ccbd98..a9ea482 100644 --- a/orange-sdk/src/trusted_wallet/spark/spark_store.rs +++ b/orange-sdk/src/trusted_wallet/spark/spark_store.rs @@ -18,6 +18,7 @@ use ldk_node::bitcoin::hashes::sha256::Hash as Sha256; use ldk_node::bitcoin::hex::DisplayHex; use ldk_node::lightning::util::persist::KVSTORE_NAMESPACE_KEY_MAX_LEN; use ldk_node::lightning::util::persist::KVStore; +use tokio::sync::Mutex; const SPARK_PRIMARY_NAMESPACE: &str = "spark"; const SPARK_CACHE_NAMESPACE: &str = "cache"; @@ -35,7 +36,10 @@ const REVISION_KEY: &str = "revision"; const LOCAL_REVISION_KEY: &str = "local_revision"; #[derive(Clone)] -pub(crate) struct SparkStore(pub(crate) Arc); +pub(crate) struct SparkStore { + pub(crate) store: Arc, + local_revision_lock: Arc>, +} /// The Spark sdk can produce keys that are too long, we just truncate them here fn sanitize_key(key: String) -> String { @@ -99,11 +103,15 @@ fn bytes_to_record(bytes: &[u8]) -> Result { } impl SparkStore { + pub(crate) fn new(store: Arc) -> Self { + Self { store, local_revision_lock: Arc::new(Mutex::new(())) } + } + /// Read the sync state for a given RecordId, if it exists. async fn read_sync_state(&self, id: &RecordId) -> Result, StorageError> { let key = record_id_key(id); match KVStore::read( - self.0.as_ref(), + self.store.as_ref(), SPARK_PRIMARY_NAMESPACE, SPARK_SYNC_STATE_NAMESPACE, &key, @@ -126,7 +134,7 @@ impl SparkStore { let key = record_id_key(&record.id); let data = record_to_bytes(record)?; KVStore::write( - self.0.as_ref(), + self.store.as_ref(), SPARK_PRIMARY_NAMESPACE, SPARK_SYNC_STATE_NAMESPACE, &key, @@ -139,8 +147,13 @@ impl SparkStore { /// Read a u64 value from the sync_rev namespace. async fn read_revision(&self, key: &str) -> Result { - match KVStore::read(self.0.as_ref(), SPARK_PRIMARY_NAMESPACE, SPARK_SYNC_REV_NAMESPACE, key) - .await + match KVStore::read( + self.store.as_ref(), + SPARK_PRIMARY_NAMESPACE, + SPARK_SYNC_REV_NAMESPACE, + key, + ) + .await { Ok(bytes) => { let s = String::from_utf8(bytes) @@ -160,7 +173,7 @@ impl SparkStore { /// Write a u64 value to the sync_rev namespace. async fn write_revision(&self, key: &str, value: u64) -> Result<(), StorageError> { KVStore::write( - self.0.as_ref(), + self.store.as_ref(), SPARK_PRIMARY_NAMESPACE, SPARK_SYNC_REV_NAMESPACE, key, @@ -182,6 +195,7 @@ impl SparkStore { /// Allocate the next local outgoing revision counter. async fn next_local_revision(&self) -> Result { + let _guard = self.local_revision_lock.lock().await; let current = self.read_revision(LOCAL_REVISION_KEY).await?; let next = current + 1; self.write_revision(LOCAL_REVISION_KEY, next).await?; @@ -194,7 +208,7 @@ impl breez_sdk_spark::Storage for SparkStore { async fn delete_cached_item(&self, key: String) -> Result<(), StorageError> { let key = sanitize_key(key); KVStore::remove( - self.0.as_ref(), + self.store.as_ref(), SPARK_PRIMARY_NAMESPACE, SPARK_CACHE_NAMESPACE, &key, @@ -207,8 +221,13 @@ impl breez_sdk_spark::Storage for SparkStore { async fn get_cached_item(&self, key: String) -> Result, StorageError> { let key = sanitize_key(key); - match KVStore::read(self.0.as_ref(), SPARK_PRIMARY_NAMESPACE, SPARK_CACHE_NAMESPACE, &key) - .await + match KVStore::read( + self.store.as_ref(), + SPARK_PRIMARY_NAMESPACE, + SPARK_CACHE_NAMESPACE, + &key, + ) + .await { Ok(bytes) => Ok(Some(String::from_utf8(bytes).map_err(|e| { StorageError::Serialization(format!("Invalid UTF-8 in cached item: {e:?}")) @@ -226,7 +245,7 @@ impl breez_sdk_spark::Storage for SparkStore { async fn set_cached_item(&self, key: String, value: String) -> Result<(), StorageError> { let key = sanitize_key(key); KVStore::write( - self.0.as_ref(), + self.store.as_ref(), SPARK_PRIMARY_NAMESPACE, SPARK_CACHE_NAMESPACE, &key, @@ -241,14 +260,14 @@ impl breez_sdk_spark::Storage for SparkStore { &self, request: StorageListPaymentsRequest, ) -> Result, StorageError> { let keys = - KVStore::list(self.0.as_ref(), SPARK_PRIMARY_NAMESPACE, SPARK_PAYMENTS_NAMESPACE) + KVStore::list(self.store.as_ref(), SPARK_PRIMARY_NAMESPACE, SPARK_PAYMENTS_NAMESPACE) .await .map_err(|e| StorageError::Implementation(format!("{e:?}")))?; let mut payments = Vec::with_capacity(keys.len()); for key in keys { let data = KVStore::read( - self.0.as_ref(), + self.store.as_ref(), SPARK_PRIMARY_NAMESPACE, SPARK_PAYMENTS_NAMESPACE, &key, @@ -286,7 +305,7 @@ impl breez_sdk_spark::Storage for SparkStore { .map_err(|e| StorageError::Serialization(format!("{e:?}")))?; KVStore::write( - self.0.as_ref(), + self.store.as_ref(), SPARK_PRIMARY_NAMESPACE, SPARK_PAYMENTS_NAMESPACE, &payment.id, @@ -302,7 +321,7 @@ impl breez_sdk_spark::Storage for SparkStore { ) -> Result<(), StorageError> { // Read existing metadata to merge (COALESCE behavior) let existing = match KVStore::read( - self.0.as_ref(), + self.store.as_ref(), SPARK_PRIMARY_NAMESPACE, SPARK_METADATA_NAMESPACE, &payment_id, @@ -330,7 +349,7 @@ impl breez_sdk_spark::Storage for SparkStore { .map_err(|e| StorageError::Serialization(format!("{e:?}")))?; KVStore::write( - self.0.as_ref(), + self.store.as_ref(), SPARK_PRIMARY_NAMESPACE, SPARK_METADATA_NAMESPACE, &payment_id, @@ -344,10 +363,14 @@ impl breez_sdk_spark::Storage for SparkStore { async fn get_payment_by_id( &self, id: String, ) -> Result { - let data = - KVStore::read(self.0.as_ref(), SPARK_PRIMARY_NAMESPACE, SPARK_PAYMENTS_NAMESPACE, &id) - .await - .map_err(|e| StorageError::Implementation(format!("{e:?}")))?; + let data = KVStore::read( + self.store.as_ref(), + SPARK_PRIMARY_NAMESPACE, + SPARK_PAYMENTS_NAMESPACE, + &id, + ) + .await + .map_err(|e| StorageError::Implementation(format!("{e:?}")))?; let payment: breez_sdk_spark::Payment = serde_json::from_slice(&data) .map_err(|e| StorageError::Serialization(format!("{e:?}")))?; @@ -391,7 +414,7 @@ impl breez_sdk_spark::Storage for SparkStore { } let keys = - KVStore::list(self.0.as_ref(), SPARK_PRIMARY_NAMESPACE, SPARK_METADATA_NAMESPACE) + KVStore::list(self.store.as_ref(), SPARK_PRIMARY_NAMESPACE, SPARK_METADATA_NAMESPACE) .await .map_err(|e| StorageError::Implementation(format!("{e:?}")))?; @@ -401,7 +424,7 @@ impl breez_sdk_spark::Storage for SparkStore { for key in keys { let data = KVStore::read( - self.0.as_ref(), + self.store.as_ref(), SPARK_PRIMARY_NAMESPACE, SPARK_METADATA_NAMESPACE, &key, @@ -447,7 +470,7 @@ impl breez_sdk_spark::Storage for SparkStore { serde_json::to_vec(&info).map_err(|e| StorageError::Serialization(format!("{e:?}")))?; KVStore::write( - self.0.as_ref(), + self.store.as_ref(), SPARK_PRIMARY_NAMESPACE, SPARK_DEPOSITS_NAMESPACE, &id, @@ -462,7 +485,7 @@ impl breez_sdk_spark::Storage for SparkStore { async fn delete_deposit(&self, txid: String, vout: u32) -> Result<(), StorageError> { let id = format!("{txid}:{vout}"); KVStore::remove( - self.0.as_ref(), + self.store.as_ref(), SPARK_PRIMARY_NAMESPACE, SPARK_DEPOSITS_NAMESPACE, &id, @@ -475,14 +498,14 @@ impl breez_sdk_spark::Storage for SparkStore { async fn list_deposits(&self) -> Result, StorageError> { let keys = - KVStore::list(self.0.as_ref(), SPARK_PRIMARY_NAMESPACE, SPARK_DEPOSITS_NAMESPACE) + KVStore::list(self.store.as_ref(), SPARK_PRIMARY_NAMESPACE, SPARK_DEPOSITS_NAMESPACE) .await .map_err(|e| StorageError::Implementation(format!("{e:?}")))?; let mut deposits = Vec::with_capacity(keys.len()); for key in keys { let data = KVStore::read( - self.0.as_ref(), + self.store.as_ref(), SPARK_PRIMARY_NAMESPACE, SPARK_DEPOSITS_NAMESPACE, &key, @@ -504,7 +527,7 @@ impl breez_sdk_spark::Storage for SparkStore { let id = format!("{txid}:{vout}"); let data = match KVStore::read( - self.0.as_ref(), + self.store.as_ref(), SPARK_PRIMARY_NAMESPACE, SPARK_DEPOSITS_NAMESPACE, &id, @@ -539,7 +562,7 @@ impl breez_sdk_spark::Storage for SparkStore { .map_err(|e| StorageError::Serialization(format!("{e:?}")))?; KVStore::write( - self.0.as_ref(), + self.store.as_ref(), SPARK_PRIMARY_NAMESPACE, SPARK_DEPOSITS_NAMESPACE, &id, @@ -599,7 +622,7 @@ impl breez_sdk_spark::Storage for SparkStore { let key = format!("{local_revision:020}"); KVStore::write( - self.0.as_ref(), + self.store.as_ref(), SPARK_PRIMARY_NAMESPACE, SPARK_SYNC_OUT_NAMESPACE, &key, @@ -617,7 +640,7 @@ impl breez_sdk_spark::Storage for SparkStore { // Remove the pending outgoing record let key = format!("{local_revision:020}"); let _ = KVStore::remove( - self.0.as_ref(), + self.store.as_ref(), SPARK_PRIMARY_NAMESPACE, SPARK_SYNC_OUT_NAMESPACE, &key, @@ -638,7 +661,7 @@ impl breez_sdk_spark::Storage for SparkStore { &self, limit: u32, ) -> Result, StorageError> { let mut keys = - KVStore::list(self.0.as_ref(), SPARK_PRIMARY_NAMESPACE, SPARK_SYNC_OUT_NAMESPACE) + KVStore::list(self.store.as_ref(), SPARK_PRIMARY_NAMESPACE, SPARK_SYNC_OUT_NAMESPACE) .await .map_err(|e| StorageError::Implementation(format!("{e:?}")))?; @@ -649,7 +672,7 @@ impl breez_sdk_spark::Storage for SparkStore { let mut results = Vec::new(); for key in keys { let data = KVStore::read( - self.0.as_ref(), + self.store.as_ref(), SPARK_PRIMARY_NAMESPACE, SPARK_SYNC_OUT_NAMESPACE, &key, @@ -690,7 +713,7 @@ impl breez_sdk_spark::Storage for SparkStore { let data = record_to_bytes(&record)?; KVStore::write( - self.0.as_ref(), + self.store.as_ref(), SPARK_PRIMARY_NAMESPACE, SPARK_SYNC_IN_NAMESPACE, &key, @@ -705,7 +728,7 @@ impl breez_sdk_spark::Storage for SparkStore { async fn delete_incoming_record(&self, record: Record) -> Result<(), StorageError> { let key = incoming_record_key(&record); let _ = KVStore::remove( - self.0.as_ref(), + self.store.as_ref(), SPARK_PRIMARY_NAMESPACE, SPARK_SYNC_IN_NAMESPACE, &key, @@ -716,15 +739,16 @@ impl breez_sdk_spark::Storage for SparkStore { } async fn get_incoming_records(&self, limit: u32) -> Result, StorageError> { - let keys = KVStore::list(self.0.as_ref(), SPARK_PRIMARY_NAMESPACE, SPARK_SYNC_IN_NAMESPACE) - .await - .map_err(|e| StorageError::Implementation(format!("{e:?}")))?; + let keys = + KVStore::list(self.store.as_ref(), SPARK_PRIMARY_NAMESPACE, SPARK_SYNC_IN_NAMESPACE) + .await + .map_err(|e| StorageError::Implementation(format!("{e:?}")))?; // Read all incoming records let mut incoming: Vec = Vec::new(); for key in keys { let data = KVStore::read( - self.0.as_ref(), + self.store.as_ref(), SPARK_PRIMARY_NAMESPACE, SPARK_SYNC_IN_NAMESPACE, &key, @@ -751,7 +775,7 @@ impl breez_sdk_spark::Storage for SparkStore { async fn get_latest_outgoing_change(&self) -> Result, StorageError> { let mut keys = - KVStore::list(self.0.as_ref(), SPARK_PRIMARY_NAMESPACE, SPARK_SYNC_OUT_NAMESPACE) + KVStore::list(self.store.as_ref(), SPARK_PRIMARY_NAMESPACE, SPARK_SYNC_OUT_NAMESPACE) .await .map_err(|e| StorageError::Implementation(format!("{e:?}")))?; @@ -763,10 +787,14 @@ impl breez_sdk_spark::Storage for SparkStore { keys.sort(); let key = keys.last().unwrap(); - let data = - KVStore::read(self.0.as_ref(), SPARK_PRIMARY_NAMESPACE, SPARK_SYNC_OUT_NAMESPACE, key) - .await - .map_err(|e| StorageError::Implementation(format!("{e:?}")))?; + let data = KVStore::read( + self.store.as_ref(), + SPARK_PRIMARY_NAMESPACE, + SPARK_SYNC_OUT_NAMESPACE, + key, + ) + .await + .map_err(|e| StorageError::Implementation(format!("{e:?}")))?; let v: serde_json::Value = serde_json::from_slice(&data) .map_err(|e| StorageError::Serialization(format!("{e:?}")))?; @@ -801,6 +829,20 @@ impl breez_sdk_spark::Storage for SparkStore { #[cfg(test)] mod tests { use super::*; + use ldk_node::io::sqlite_store::SqliteStore; + use std::collections::HashSet; + use std::path::PathBuf; + use std::time::{SystemTime, UNIX_EPOCH}; + + fn test_store() -> (SparkStore, PathBuf) { + let path = std::env::temp_dir().join(format!( + "orange-sdk-spark-store-test-{}", + SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos() + )); + let store = SqliteStore::new(path.clone(), Some("orange.sqlite".to_string()), None) + .expect("sqlite store"); + (SparkStore::new(Arc::new(store)), path) + } fn record(record_type: &str, data_id: &str, revision: u64) -> Record { Record { @@ -827,6 +869,31 @@ mod tests { assert_ne!(incoming_record_key(&a), incoming_record_key(&b)); } + #[tokio::test(flavor = "multi_thread")] + async fn next_local_revision_is_unique_under_concurrency() { + let (store, path) = test_store(); + let mut handles = Vec::new(); + + for _ in 0..32 { + let store = store.clone(); + handles.push(tokio::spawn(async move { + store.next_local_revision().await.expect("next local revision") + })); + } + + let mut revisions = Vec::new(); + for handle in handles { + revisions.push(handle.await.expect("revision task")); + } + + let unique: HashSet<_> = revisions.iter().copied().collect(); + assert_eq!(unique.len(), revisions.len(), "duplicate revisions: {revisions:?}"); + assert_eq!(unique.len(), 32); + + drop(store); + let _ = std::fs::remove_dir_all(path); + } + #[test] fn oversized_keys_use_bounded_hash_key() { let id = RecordId::new("a".repeat(KVSTORE_NAMESPACE_KEY_MAX_LEN), "b".to_string());