Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 135 additions & 3 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ use std::collections::HashMap;
use std::convert::TryInto;
use std::default::Default;
use std::path::PathBuf;
use std::sync::atomic::AtomicU64;
use std::sync::{Arc, Mutex, Once, RwLock};
use std::time::SystemTime;
use std::time::{Duration, SystemTime};
use std::{fmt, fs};

use bdk_wallet::template::Bip84;
Expand All @@ -29,7 +30,6 @@ use lightning::routing::gossip::NodeAlias;
use lightning::routing::router::DefaultRouter;
use lightning::routing::scoring::{
CombinedScorer, ProbabilisticScorer, ProbabilisticScoringDecayParameters,
ProbabilisticScoringFeeParameters,
};
use lightning::sign::{EntropySource, NodeSigner};
use lightning::util::config::HTLCInterceptionFlags;
Expand All @@ -47,6 +47,8 @@ use crate::config::{
default_user_config, may_announce_channel, AnnounceError, AsyncPaymentsRole,
BitcoindRestClientConfig, Config, ElectrumSyncConfig, EsploraSyncConfig,
DEFAULT_ESPLORA_SERVER_URL, DEFAULT_LOG_FILENAME, DEFAULT_LOG_LEVEL,
DEFAULT_MAX_PROBE_AMOUNT_MSAT, DEFAULT_MAX_PROBE_LOCKED_MSAT, DEFAULT_PROBING_INTERVAL_SECS,
MIN_PROBE_AMOUNT_MSAT,
};
use crate::connection::ConnectionManager;
use crate::entropy::NodeEntropy;
Expand All @@ -72,6 +74,7 @@ use crate::logger::{log_error, LdkLogger, LogLevel, LogWriter, Logger};
use crate::message_handler::NodeCustomMessageHandler;
use crate::payment::asynchronous::om_mailbox::OnionMessageMailbox;
use crate::peer_store::PeerStore;
use crate::probing;
use crate::runtime::{Runtime, RuntimeSpawner};
use crate::tx_broadcaster::TransactionBroadcaster;
use crate::types::{
Expand Down Expand Up @@ -150,6 +153,37 @@ impl std::fmt::Debug for LogWriterConfig {
}
}

enum ProbingStrategyKind {
HighDegree { top_n: usize },
Random { max_hops: usize },
Custom(Arc<dyn probing::ProbingStrategy>),
}

struct ProbingStrategyConfig {
kind: ProbingStrategyKind,
interval: Duration,
max_locked_msat: u64,
}

impl fmt::Debug for ProbingStrategyConfig {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let kind_str = match &self.kind {
ProbingStrategyKind::HighDegree { top_n } => {
format!("HighDegree {{ top_n: {} }}", top_n)
},
ProbingStrategyKind::Random { max_hops } => {
format!("Random {{ max_hops: {} }}", max_hops)
},
ProbingStrategyKind::Custom(_) => "Custom(<probing strategy>)".to_string(),
};
f.debug_struct("ProbingStrategyConfig")
.field("kind", &kind_str)
.field("interval", &self.interval)
.field("max_locked_msat", &self.max_locked_msat)
.finish()
}
}

/// An error encountered during building a [`Node`].
///
/// [`Node`]: crate::Node
Expand Down Expand Up @@ -245,6 +279,7 @@ pub struct NodeBuilder {
runtime_handle: Option<tokio::runtime::Handle>,
pathfinding_scores_sync_config: Option<PathfindingScoresSyncConfig>,
recovery_mode: bool,
probing_strategy: Option<ProbingStrategyConfig>,
}

impl NodeBuilder {
Expand Down Expand Up @@ -273,6 +308,7 @@ impl NodeBuilder {
async_payments_role: None,
pathfinding_scores_sync_config,
recovery_mode,
probing_strategy: None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Most optional fields here are initialized as local variables before the struct literal (e.g. let chain_data_source_config = None;). Could probing_strategy follow the same pattern for consistency? While at it, async_payments_role has the same inconsistency. Would be nice to align both.

}
}

Expand Down Expand Up @@ -557,6 +593,64 @@ impl NodeBuilder {
self
}

/// Configures background probing toward the highest-degree nodes in the network graph.
///
/// `top_n` controls how many of the most-connected nodes are cycled through.
pub fn set_high_degree_probing_strategy(&mut self, top_n: usize) -> &mut Self {
let kind = ProbingStrategyKind::HighDegree { top_n };
self.probing_strategy = Some(self.make_probing_config(kind));
self
}

/// Configures background probing via random graph walks of up to `max_hops` hops.
pub fn set_random_probing_strategy(&mut self, max_hops: usize) -> &mut Self {
let kind = ProbingStrategyKind::Random { max_hops };
self.probing_strategy = Some(self.make_probing_config(kind));
self
}

/// Configures a custom probing strategy for background channel probing.
///
/// When set, the node will periodically call [`ProbingStrategy::next_probe`] and dispatch the
/// returned probe via the channel manager.
pub fn set_probing_strategy(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe set_custom_probing_strategy?

&mut self, strategy: Arc<dyn probing::ProbingStrategy>,
) -> &mut Self {
let kind = ProbingStrategyKind::Custom(strategy);
self.probing_strategy = Some(self.make_probing_config(kind));
self
}

/// Overrides the interval between probe attempts. Only has effect if a probing strategy is set.
pub fn set_probing_interval(&mut self, interval: Duration) -> &mut Self {
if let Some(cfg) = &mut self.probing_strategy {
cfg.interval = interval;
}
self
}

/// Overrides the maximum millisatoshis that may be locked in in-flight probes at any time.
/// Only has effect if a probing strategy is set.
pub fn set_max_probe_locked_msat(&mut self, max_msat: u64) -> &mut Self {
if let Some(cfg) = &mut self.probing_strategy {
cfg.max_locked_msat = max_msat;
}
self
}

fn make_probing_config(&self, kind: ProbingStrategyKind) -> ProbingStrategyConfig {
let existing = self.probing_strategy.as_ref();
ProbingStrategyConfig {
kind,
interval: existing
.map(|c| c.interval)
.unwrap_or(Duration::from_secs(DEFAULT_PROBING_INTERVAL_SECS)),
max_locked_msat: existing
.map(|c| c.max_locked_msat)
.unwrap_or(DEFAULT_MAX_PROBE_LOCKED_MSAT),
}
}

/// Builds a [`Node`] instance with a [`SqliteStore`] backend and according to the options
/// previously configured.
pub fn build(&self, node_entropy: NodeEntropy) -> Result<Node, BuildError> {
Expand Down Expand Up @@ -697,6 +791,7 @@ impl NodeBuilder {
runtime,
logger,
Arc::new(DynStoreWrapper(kv_store)),
self.probing_strategy.as_ref(),
)
}
}
Expand Down Expand Up @@ -942,6 +1037,11 @@ impl ArcedNodeBuilder {
self.inner.write().unwrap().set_wallet_recovery_mode();
}

/// Configures a probing strategy for background channel probing.
pub fn set_probing_strategy(&self, strategy: Arc<dyn probing::ProbingStrategy>) {
self.inner.write().unwrap().set_probing_strategy(strategy);
}

/// Builds a [`Node`] instance with a [`SqliteStore`] backend and according to the options
/// previously configured.
pub fn build(&self, node_entropy: Arc<NodeEntropy>) -> Result<Arc<Node>, BuildError> {
Expand Down Expand Up @@ -1058,6 +1158,7 @@ fn build_with_store_internal(
pathfinding_scores_sync_config: Option<&PathfindingScoresSyncConfig>,
async_payments_role: Option<AsyncPaymentsRole>, recovery_mode: bool, seed_bytes: [u8; 64],
runtime: Arc<Runtime>, logger: Arc<Logger>, kv_store: Arc<DynStore>,
probing_config: Option<&ProbingStrategyConfig>,
) -> Result<Node, BuildError> {
optionally_install_rustls_cryptoprovider();

Expand Down Expand Up @@ -1449,7 +1550,7 @@ fn build_with_store_internal(
},
}

let scoring_fee_params = ProbabilisticScoringFeeParameters::default();
let scoring_fee_params = config.scoring_fee_params.clone();
let router = Arc::new(DefaultRouter::new(
Arc::clone(&network_graph),
Arc::clone(&logger),
Expand Down Expand Up @@ -1783,6 +1884,36 @@ fn build_with_store_internal(
_leak_checker.0.push(Arc::downgrade(&wallet) as Weak<dyn Any + Send + Sync>);
}

let prober = probing_config.map(|probing_cfg| {
let strategy: Arc<dyn probing::ProbingStrategy> = match &probing_cfg.kind {
ProbingStrategyKind::HighDegree { top_n } => {
Arc::new(probing::HighDegreeStrategy::new(
network_graph.clone(),
*top_n,
MIN_PROBE_AMOUNT_MSAT,
DEFAULT_MAX_PROBE_AMOUNT_MSAT,
))
},
ProbingStrategyKind::Random { max_hops } => Arc::new(probing::RandomStrategy::new(
network_graph.clone(),
channel_manager.clone(),
*max_hops,
MIN_PROBE_AMOUNT_MSAT,
DEFAULT_MAX_PROBE_AMOUNT_MSAT,
)),
ProbingStrategyKind::Custom(s) => s.clone(),
};
Arc::new(probing::Prober {
channel_manager: channel_manager.clone(),
logger: logger.clone(),
strategy,
interval: probing_cfg.interval,
liquidity_limit_multiplier: Some(config.probing_liquidity_limit_multiplier),
max_locked_msat: probing_cfg.max_locked_msat,
locked_msat: Arc::new(AtomicU64::new(0)),
})
});

Ok(Node {
runtime,
stop_sender,
Expand Down Expand Up @@ -1815,6 +1946,7 @@ fn build_with_store_internal(
om_mailbox,
async_payments_role,
hrn_resolver,
prober,
#[cfg(cycle_tests)]
_leak_checker,
})
Expand Down
18 changes: 16 additions & 2 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use bitcoin::Network;
use lightning::ln::msgs::SocketAddress;
use lightning::routing::gossip::NodeAlias;
use lightning::routing::router::RouteParametersConfig;
use lightning::routing::scoring::ProbabilisticScoringFeeParameters;
use lightning::util::config::{
ChannelConfig as LdkChannelConfig, MaxDustHTLCExposure as LdkMaxDustHTLCExposure, UserConfig,
};
Expand All @@ -27,6 +28,10 @@ const DEFAULT_BDK_WALLET_SYNC_INTERVAL_SECS: u64 = 80;
const DEFAULT_LDK_WALLET_SYNC_INTERVAL_SECS: u64 = 30;
const DEFAULT_FEE_RATE_CACHE_UPDATE_INTERVAL_SECS: u64 = 60 * 10;
const DEFAULT_PROBING_LIQUIDITY_LIMIT_MULTIPLIER: u64 = 3;
pub(crate) const DEFAULT_PROBING_INTERVAL_SECS: u64 = 10;
pub(crate) const DEFAULT_MAX_PROBE_LOCKED_MSAT: u64 = 100_000_000; // 100k sats
pub(crate) const MIN_PROBE_AMOUNT_MSAT: u64 = 1_000_000; // 1k sats
pub(crate) const DEFAULT_MAX_PROBE_AMOUNT_MSAT: u64 = 10_000_000; // 10k sats
const DEFAULT_ANCHOR_PER_CHANNEL_RESERVE_SATS: u64 = 25_000;

// The default timeout after which we abort a wallet syncing operation.
Expand Down Expand Up @@ -128,9 +133,11 @@ pub(crate) const HRN_RESOLUTION_TIMEOUT_SECS: u64 = 5;
/// | `log_level` | Debug |
/// | `anchor_channels_config` | Some(..) |
/// | `route_parameters` | None |
/// | `scoring_fee_params` | See [`ProbabilisticScoringFeeParameters`] |
///
/// See [`AnchorChannelsConfig`] and [`RouteParametersConfig`] for more information regarding their
/// respective default values.
/// See [`AnchorChannelsConfig`], [`RouteParametersConfig`], and
/// [`ProbabilisticScoringFeeParameters`] for more information regarding their respective default
/// values.
///
/// [`Node`]: crate::Node
pub struct Config {
Expand Down Expand Up @@ -192,6 +199,12 @@ pub struct Config {
/// **Note:** If unset, default parameters will be used, and you will be able to override the
/// parameters on a per-payment basis in the corresponding method calls.
pub route_parameters: Option<RouteParametersConfig>,
/// Parameters for the probabilistic scorer used when computing payment routes.
///
/// These correspond to [`ProbabilisticScoringFeeParameters`] in LDK. If unset, LDK defaults
/// are used. Notably, [`ProbabilisticScoringFeeParameters::probing_diversity_penalty_msat`]
/// should be set to a non-zero value for some of the probing strategies.
pub scoring_fee_params: ProbabilisticScoringFeeParameters,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I agree with this addition of ProbabilisticScoringFeeParameters to the node's config. Node operators who don't use probing have no reason to interact with this field, yet it's non-optional. Probing itself is off by default and requires an explicit set_*_probing_strategy() call in the builder, so surfacing scorer parameters unconditionally in Config seems inconsistent, even if there's a default to fall back to. More broadly, do we expect nodes to ever probe by default? If not, could this live on a probing-specific config instead, or at minimum be optional?

}

impl Default for Config {
Expand All @@ -206,6 +219,7 @@ impl Default for Config {
anchor_channels_config: Some(AnchorChannelsConfig::default()),
route_parameters: None,
node_alias: None,
scoring_fee_params: ProbabilisticScoringFeeParameters::default(),
}
}
}
Expand Down
24 changes: 20 additions & 4 deletions src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use core::future::Future;
use core::task::{Poll, Waker};
use std::collections::VecDeque;
use std::ops::Deref;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};

use bitcoin::blockdata::locktime::absolute::LockTime;
Expand Down Expand Up @@ -494,6 +495,7 @@ where
static_invoice_store: Option<StaticInvoiceStore>,
onion_messenger: Arc<OnionMessenger>,
om_mailbox: Option<Arc<OnionMessageMailbox>>,
probe_locked_msat: Option<Arc<AtomicU64>>,
}

impl<L: Deref + Clone + Sync + Send + 'static> EventHandler<L>
Expand All @@ -509,7 +511,7 @@ where
payment_store: Arc<PaymentStore>, peer_store: Arc<PeerStore<L>>,
static_invoice_store: Option<StaticInvoiceStore>, onion_messenger: Arc<OnionMessenger>,
om_mailbox: Option<Arc<OnionMessageMailbox>>, runtime: Arc<Runtime>, logger: L,
config: Arc<Config>,
config: Arc<Config>, probe_locked_msat: Option<Arc<AtomicU64>>,
) -> Self {
Self {
event_queue,
Expand All @@ -528,6 +530,7 @@ where
static_invoice_store,
onion_messenger,
om_mailbox,
probe_locked_msat,
}
}

Expand Down Expand Up @@ -1111,8 +1114,22 @@ where

LdkEvent::PaymentPathSuccessful { .. } => {},
LdkEvent::PaymentPathFailed { .. } => {},
LdkEvent::ProbeSuccessful { .. } => {},
LdkEvent::ProbeFailed { .. } => {},
LdkEvent::ProbeSuccessful { path, .. } => {
if let Some(counter) = &self.probe_locked_msat {
let amount: u64 = path.hops.iter().map(|h| h.fee_msat).sum();
let _ = counter.fetch_update(Ordering::AcqRel, Ordering::Acquire, |v| {
Some(v.saturating_sub(amount))
});
}
},
LdkEvent::ProbeFailed { path, .. } => {
if let Some(counter) = &self.probe_locked_msat {
let amount: u64 = path.hops.iter().map(|h| h.fee_msat).sum();
let _ = counter.fetch_update(Ordering::AcqRel, Ordering::Acquire, |v| {
Some(v.saturating_sub(amount))
});
}
},
LdkEvent::HTLCHandlingFailed { failure_type, .. } => {
if let Some(liquidity_source) = self.liquidity_source.as_ref() {
liquidity_source.handle_htlc_handling_failed(failure_type).await;
Expand Down Expand Up @@ -1356,7 +1373,6 @@ where
);
}
}

if let Some(liquidity_source) = self.liquidity_source.as_ref() {
let skimmed_fee_msat = skimmed_fee_msat.unwrap_or(0);
liquidity_source
Expand Down
Loading