Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lightning/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ libm = { version = "0.2", default-features = false }
inventory = { version = "0.3", optional = true }

# RGB and related
bincode = "1.3"
futures = "0.3"
rgb-lib = { version = "0.3.0-beta.5", features = [
"electrum",
Expand All @@ -59,7 +60,6 @@ rgb-lib = { version = "0.3.0-beta.5", features = [
serde = { version = "^1.0", features = [
"derive",
] }
serde_json = "^1.0"
tokio = { version = "1.14.1", features = [
"macros",
"rt-multi-thread",
Expand Down
4 changes: 3 additions & 1 deletion lightning/src/ln/chan_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use crate::ln::msgs::DecodeError;
use crate::rgb_utils::{color_htlc, is_tx_colored};
use crate::sign::EntropySource;
use crate::types::payment::{PaymentHash, PaymentPreimage};
use crate::util::persist::KVStoreSync;
use crate::util::ser::{Readable, ReadableArgs, RequiredWrapper, Writeable, Writer};
use crate::util::transaction_utils;

Expand Down Expand Up @@ -2156,6 +2157,7 @@ impl<'a> TrustedCommitmentTransaction<'a> {
pub fn get_htlc_sigs<T: secp256k1::Signing, ES: Deref>(
&self, htlc_base_key: &SecretKey, channel_parameters: &DirectedChannelTransactionParameters,
entropy_source: &ES, secp_ctx: &Secp256k1<T>, ldk_data_dir: &PathBuf,
rgb_kv_store: &dyn KVStoreSync,
) -> Result<Vec<Signature>, ()> where ES::Target: EntropySource {
let inner = self.inner;
let keys = &inner.keys;
Expand All @@ -2167,7 +2169,7 @@ impl<'a> TrustedCommitmentTransaction<'a> {
assert!(this_htlc.transaction_output_index.is_some());
let mut htlc_tx = build_htlc_transaction(&txid, inner.feerate_per_kw, channel_parameters.contest_delay(), &this_htlc, &self.channel_type_features, &keys.broadcaster_delayed_payment_key, &keys.revocation_key);
if inner.is_colored() {
if let Err(_e) = color_htlc(&mut htlc_tx, this_htlc, ldk_data_dir) {
if let Err(_e) = color_htlc(&mut htlc_tx, this_htlc, ldk_data_dir, rgb_kv_store) {
return Err(());
}
}
Expand Down
64 changes: 35 additions & 29 deletions lightning/src/ln/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,8 @@ use crate::ln::types::ChannelId;
use crate::ln::LN_MAX_MSG_LEN;
use crate::offers::static_invoice::StaticInvoice;
use crate::rgb_utils::{
color_closing, color_commitment, color_htlc, get_rgb_channel_info_path,
get_rgb_channel_info_pending, parse_rgb_channel_info, rename_rgb_files,
update_rgb_channel_amount_pending,
color_closing, color_commitment, color_htlc, get_rgb_channel_info_pending, update_rgb_channel_id,
update_rgb_channel_amount_pending, RgbKvStoreExt,
};
use crate::routing::gossip::NodeId;
use crate::sign::ecdsa::EcdsaChannelSigner;
Expand All @@ -104,6 +103,8 @@ use crate::prelude::*;
use crate::sign::type_resolver::ChannelSignerType;
#[cfg(any(test, fuzzing, debug_assertions))]
use crate::sync::Mutex;
use crate::sync::Arc;
use crate::util::persist::KVStoreSync;
use core::ops::Deref;
use core::time::Duration;
use core::{cmp, fmt, mem};
Expand Down Expand Up @@ -3147,6 +3148,9 @@ where
pub(super) is_colored: bool,

pub(crate) ldk_data_dir: PathBuf,

/// KVStore for RGB data persistence
pub(crate) rgb_kv_store: Arc<dyn KVStoreSync + Send + Sync>,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you choose to use KVStoreSync instead of KVStore (its async version)? I would use the latter if possible, since RLN runs in an async environment

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used KVStoreSync because all the call sites are in synchronous code paths: color_commitment, color_htlc, color_closing, update_rgb_channel_id, handle_funding, etc. are all sync functions inside channel.rs and channelmanager.rs, which are entirely sync.

Using the async KVStore would require making all of these async, which would mean changing a big chunk of LDK's internals. do you think it's worth doing that, or is KVStoreSync ok for now?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about doing like ldk-node does? Which is implementing an inner store and then implementing for the wrapper store both KVStore and KVStoreSync. That way we could use the KVStoreSync in LDK (where we don't have an async environment) and KVStore in RLN (where we do have an async environment)

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, sounds like a plan.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking into it, KVStoreSyncWrapper already exists and it wraps any KVStoreSync into KVStore by boxing the sync result into a future. So the rust-lightning side can stay as-is with KVStoreSync and on the RLN side we just wrap the sea-orm store with KVStoreSyncWrapper to expose the async KVStore trait where needed.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But wouldn't this mean we are actually doing sync calls to the DB? What I would like to achieve is to actually do async calls. To me what SqliteStore and SqliteStoreInner in ldk-node are doing is a bit different, but I could be wrong.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, KVStoreSyncWrapper wraps a sync result in Box::pin(async move { res }), so it is still sync under the hood.

Since the rgb_kv_store calls are all sync, I'll keep the bound as Arc<dyn KVStoreSync + Send + Sync> in this PR then on the RLN side I could implement both KVStoreSync and KVStore on SeaOrmKvStore directly (like ldk-node), drop KVStoreSyncWrapper, and .await sea-orm futures directly in the async impl.

Let me know if I'm missing something, this is quite a topic!

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems the way to go!

There are some partially related implementation details that I think should be discussed though:

  1. dyn KVStoreSync forces dynamic dispatch on every read/write/remove/list at runtime. I would instead add a KV: KVStoreSync + Send + Sync + 'static generic parameter and store Arc<KV>, this would give monomorphization (static dispatch at compile time) and be consistent with the existing pattern.
  2. currently each KVStoreSync call does: detect runtime -> thread::scope -> spawn OS thread -> DB_RUNTIME.block_on(future) -> join. That adds a thread lifecycle overhead for each DB call. Moreover, the DB_RUNTIME is a current thread runtime, which serializes every DB access through a single thread. With the current design, under high load, concurrent calls to the DB would block each other. So even if you raise max_connections, the dedicated runtime would process futures one at a time. You should instead use tokio::task::block_in_place + Handle::current().block_on() and eliminate the dedicated DB_RUNTIME.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed on both points 🙏

Conclusions:

  • for this PR: I'll replace Arc<dyn KVStoreSync + Send + Sync> with a generic KV: KVStoreSync + Send + Sync + 'static on ChannelContext, ChannelManager, OutboundPayments, and all constructors/deserialization. static dispatch + consistent with existing LDK patterns.

  • on the RLN side: I'll implement both KVStoreSync and KVStore directly on SeaOrmKvStore like ldk-node's SqliteStore. the KVStore impl will .await sea-orm futures natively (real async, no wrapper). the KVStoreSync impl will use block_in_place + Handle::current().block_on(), dropping the dedicated DB_RUNTIME and the per-call thread::scope overhead. KVStoreSyncWrapper goes away.

Let me know if you have any correction before I can jump to the implementation, thanks!

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can proceed. Just a couple of notes:

on ChannelContext, ChannelManager, OutboundPayments, and all constructors/deserialization

I haven't checked that you added it only where actually necessary, but generally speaking let's add the generic instead of using dyn

I'll implement both KVStoreSync and KVStore directly on SeaOrmKvStore like ldk-node's SqliteStore

Yes. I'm not sure why we also need RlnDatabase though, seems we could keep only that or SeaOrmKvStore.

}

/// A channel struct implementing this trait can receive an initial counterparty commitment
Expand Down Expand Up @@ -3247,7 +3251,7 @@ where
let temporary_channel_id = context.channel_id;
context.channel_id = channel_id;
if context.is_colored() {
rename_rgb_files(&context.channel_id, &temporary_channel_id, &context.ldk_data_dir);
update_rgb_channel_id(&context.channel_id, &temporary_channel_id, context.rgb_kv_store.as_ref());
}

assert!(!context.channel_state.is_monitor_update_in_progress()); // We have not had any monitor(s) yet to fail update!
Expand Down Expand Up @@ -3417,6 +3421,7 @@ where
open_channel_fields: msgs::CommonOpenChannelFields,
push_asset_amount: Option<u64>,
ldk_data_dir: PathBuf,
rgb_kv_store: Arc<dyn KVStoreSync + Send + Sync>,
) -> Result<(FundingScope, ChannelContext<SP>), ChannelError>
where
ES::Target: EntropySource,
Expand Down Expand Up @@ -3741,6 +3746,7 @@ where

is_colored: funding.consignment_endpoint.is_some(),
ldk_data_dir,
rgb_kv_store,
};

Ok((funding, channel_context))
Expand All @@ -3767,6 +3773,7 @@ where
consignment_endpoint: Option<RgbTransport>,
ldk_data_dir: PathBuf,
push_asset_amount: Option<u64>,
rgb_kv_store: Arc<dyn KVStoreSync + Send + Sync>,
) -> Result<(FundingScope, ChannelContext<SP>), APIError>
where
ES::Target: EntropySource,
Expand Down Expand Up @@ -3987,6 +3994,7 @@ where

is_colored: funding.consignment_endpoint.is_some(),
ldk_data_dir,
rgb_kv_store,
};

Ok((funding, channel_context))
Expand Down Expand Up @@ -4365,13 +4373,8 @@ where

/// Get the channel local RGB amount
pub fn get_local_rgb_amount(&self) -> u64 {
let info_file_path = get_rgb_channel_info_path(
&self.channel_id.0.as_hex().to_string(),
&self.ldk_data_dir,
false,
);
if info_file_path.exists() {
let rgb_info = parse_rgb_channel_info(&info_file_path);
let channel_id_str = self.channel_id.0.as_hex().to_string();
if let Ok(rgb_info) = self.rgb_kv_store.read_rgb_channel_info(&channel_id_str, false) {
rgb_info.local_rgb_amount
} else {
0
Expand All @@ -4380,13 +4383,8 @@ where

/// Get the channel remote RGB amount
pub fn get_remote_rgb_amount(&self) -> u64 {
let info_file_path = get_rgb_channel_info_path(
&self.channel_id.0.as_hex().to_string(),
&self.ldk_data_dir,
false,
);
if info_file_path.exists() {
let rgb_info = parse_rgb_channel_info(&info_file_path);
let channel_id_str = self.channel_id.0.as_hex().to_string();
if let Ok(rgb_info) = self.rgb_kv_store.read_rgb_channel_info(&channel_id_str, false) {
rgb_info.remote_rgb_amount
} else {
0
Expand Down Expand Up @@ -5101,7 +5099,7 @@ where
&holder_keys.revocation_key,
);
if self.is_colored() {
color_htlc(&mut htlc_tx, htlc, &self.ldk_data_dir)
color_htlc(&mut htlc_tx, htlc, &self.ldk_data_dir, self.rgb_kv_store.as_ref())
.expect("successful htlc coloring");
}

Expand Down Expand Up @@ -7345,6 +7343,7 @@ where
&self.context.channel_id,
&mut closing_transaction,
&self.context.ldk_data_dir,
self.context.rgb_kv_store.as_ref(),
)
.expect("successful closing TX coloring");
}
Expand Down Expand Up @@ -8935,7 +8934,7 @@ where
&self.context.channel_id,
rgb_offered_htlc,
rgb_received_htlc,
&self.context.ldk_data_dir,
self.context.rgb_kv_store.as_ref(),
);
}

Expand Down Expand Up @@ -11749,7 +11748,7 @@ where
let were_node_one = node_id.as_slice() < counterparty_node_id.as_slice();

let contract_id = if self.context.is_colored() {
let (rgb_info, _) = get_rgb_channel_info_pending(&self.context.channel_id, &self.context.ldk_data_dir);
let rgb_info = get_rgb_channel_info_pending(&self.context.channel_id, self.context.rgb_kv_store.as_ref());
Some(rgb_info.contract_id)
} else {
None
Expand Down Expand Up @@ -12903,7 +12902,7 @@ where
}
}
if self.context.is_colored() && rgb_received_htlc > 0 {
update_rgb_channel_amount_pending(&self.context.channel_id, 0, rgb_received_htlc, &self.context.ldk_data_dir);
update_rgb_channel_amount_pending(&self.context.channel_id, 0, rgb_received_htlc, self.context.rgb_kv_store.as_ref());
}
if let Some((feerate, update_state)) = self.context.pending_update_fee {
if update_state == FeeUpdateState::AwaitingRemoteRevokeToAnnounce {
Expand Down Expand Up @@ -13570,6 +13569,7 @@ where
fee_estimator: &LowerBoundedFeeEstimator<F>, entropy_source: &ES, signer_provider: &SP, counterparty_node_id: PublicKey, their_features: &InitFeatures,
channel_value_satoshis: u64, push_msat: u64, user_id: u128, config: &UserConfig, current_chain_height: u32,
outbound_scid_alias: u64, temporary_channel_id: Option<ChannelId>, logger: L, consignment_endpoint: Option<RgbTransport>, ldk_data_dir: PathBuf, push_asset_amount: Option<u64>,
rgb_kv_store: Arc<dyn KVStoreSync + Send + Sync>,
) -> Result<OutboundV1Channel<SP>, APIError>
where ES::Target: EntropySource,
F::Target: FeeEstimator,
Expand Down Expand Up @@ -13610,6 +13610,7 @@ where
consignment_endpoint,
ldk_data_dir,
push_asset_amount,
rgb_kv_store,
)?;
let unfunded_context = UnfundedChannelContext {
unfunded_channel_age_ticks: 0,
Expand Down Expand Up @@ -13693,7 +13694,7 @@ where
let temporary_channel_id = self.context.channel_id;
self.context.channel_id = ChannelId::v1_from_funding_outpoint(funding_txo);
if self.context.is_colored() {
rename_rgb_files(&self.context.channel_id, &temporary_channel_id, &self.context.ldk_data_dir);
update_rgb_channel_id(&self.context.channel_id, &temporary_channel_id, self.context.rgb_kv_store.as_ref());
}

// If the funding transaction is a coinbase transaction, we need to set the minimum depth to 100.
Expand Down Expand Up @@ -13947,7 +13948,8 @@ where
fee_estimator: &LowerBoundedFeeEstimator<F>, entropy_source: &ES, signer_provider: &SP,
counterparty_node_id: PublicKey, our_supported_features: &ChannelTypeFeatures,
their_features: &InitFeatures, msg: &msgs::OpenChannel, user_id: u128, config: &UserConfig,
current_chain_height: u32, logger: &L, is_0conf: bool, ldk_data_dir: PathBuf
current_chain_height: u32, logger: &L, is_0conf: bool, ldk_data_dir: PathBuf,
rgb_kv_store: Arc<dyn KVStoreSync + Send + Sync>,
) -> Result<InboundV1Channel<SP>, ChannelError>
where ES::Target: EntropySource,
F::Target: FeeEstimator,
Expand Down Expand Up @@ -13989,6 +13991,7 @@ where
msg.common_fields.clone(),
msg.push_asset_amount,
ldk_data_dir,
rgb_kv_store,
)?;
let unfunded_context = UnfundedChannelContext {
unfunded_channel_age_ticks: 0,
Expand Down Expand Up @@ -14189,7 +14192,7 @@ where
counterparty_node_id: PublicKey, their_features: &InitFeatures, funding_satoshis: u64,
funding_inputs: Vec<FundingTxInput>, user_id: u128, config: &UserConfig,
current_chain_height: u32, outbound_scid_alias: u64, funding_confirmation_target: ConfirmationTarget,
logger: L, ldk_data_dir: PathBuf,
logger: L, ldk_data_dir: PathBuf, rgb_kv_store: Arc<dyn KVStoreSync + Send + Sync>,
) -> Result<Self, APIError>
where ES::Target: EntropySource,
F::Target: FeeEstimator,
Expand Down Expand Up @@ -14233,6 +14236,7 @@ where
None,
ldk_data_dir,
None,
rgb_kv_store,
)?;
let unfunded_context = UnfundedChannelContext {
unfunded_channel_age_ticks: 0,
Expand Down Expand Up @@ -14343,7 +14347,7 @@ where
holder_node_id: PublicKey, counterparty_node_id: PublicKey, our_supported_features: &ChannelTypeFeatures,
their_features: &InitFeatures, msg: &msgs::OpenChannelV2,
user_id: u128, config: &UserConfig, current_chain_height: u32, logger: &L,
ldk_data_dir: PathBuf,
ldk_data_dir: PathBuf, rgb_kv_store: Arc<dyn KVStoreSync + Send + Sync>,
) -> Result<Self, ChannelError>
where ES::Target: EntropySource,
F::Target: FeeEstimator,
Expand Down Expand Up @@ -14390,6 +14394,7 @@ where
msg.common_fields.clone(),
None,
ldk_data_dir,
rgb_kv_store,
)?;
let channel_id = ChannelId::v2_from_revocation_basepoints(
&funding.get_holder_pubkeys().revocation_basepoint,
Expand Down Expand Up @@ -15080,16 +15085,16 @@ where
}
}

impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, &'c ChannelTypeFeatures, PathBuf)>
impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, &'c ChannelTypeFeatures, PathBuf, Arc<dyn KVStoreSync + Send + Sync>)>
for FundedChannel<SP>
where
ES::Target: EntropySource,
SP::Target: SignerProvider,
{
fn read<R: io::Read>(
reader: &mut R, args: (&'a ES, &'b SP, &'c ChannelTypeFeatures, PathBuf),
reader: &mut R, args: (&'a ES, &'b SP, &'c ChannelTypeFeatures, PathBuf, Arc<dyn KVStoreSync + Send + Sync>),
) -> Result<Self, DecodeError> {
let (entropy_source, signer_provider, our_supported_features, ldk_data_dir) = args;
let (entropy_source, signer_provider, our_supported_features, ldk_data_dir, rgb_kv_store) = args;
let ver = read_ver_prefix!(reader, SERIALIZATION_VERSION);
if ver <= 2 {
return Err(DecodeError::UnknownVersion);
Expand Down Expand Up @@ -15888,6 +15893,7 @@ where
interactive_tx_signing_session,
is_colored: consignment_endpoint.is_some(),
ldk_data_dir,
rgb_kv_store,
},
holder_commitment_point,
pending_splice,
Expand Down
Loading