Skip to content

Commit fd0b9cf

Browse files
committed
Decouple spending from chain notifications
To prepare for asynchronous processing of the sweep, we need to decouple the spending from the chain notifications. These notifications run in a sync context and wouldn't allow calls into an async trait. Instead we now periodically call into the sweeper, to open up the possibility to do so from an async context if desired.
1 parent f507778 commit fd0b9cf

File tree

2 files changed

+133
-86
lines changed

2 files changed

+133
-86
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 87 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,10 @@ use lightning::onion_message::messenger::AOnionMessenger;
3636
use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
3737
use lightning::routing::scoring::{ScoreUpdate, WriteableScore};
3838
use lightning::routing::utxo::UtxoLookup;
39+
use lightning::sign::{ChangeDestinationSource, OutputSpender};
3940
use lightning::util::logger::Logger;
40-
use lightning::util::persist::Persister;
41+
use lightning::util::persist::{KVStore, Persister};
42+
use lightning::util::sweep::OutputSweeper;
4143
#[cfg(feature = "std")]
4244
use lightning::util::wakers::Sleeper;
4345
use lightning_rapid_gossip_sync::RapidGossipSync;
@@ -132,6 +134,11 @@ const REBROADCAST_TIMER: u64 = 30;
132134
#[cfg(test)]
133135
const REBROADCAST_TIMER: u64 = 1;
134136

137+
#[cfg(not(test))]
138+
const SWEEPER_TIMER: u64 = 30;
139+
#[cfg(test)]
140+
const SWEEPER_TIMER: u64 = 1;
141+
135142
#[cfg(feature = "futures")]
136143
/// core::cmp::min is not currently const, so we define a trivial (and equivalent) replacement
137144
const fn min_u64(a: u64, b: u64) -> u64 {
@@ -308,6 +315,7 @@ macro_rules! define_run_body {
308315
$channel_manager: ident, $process_channel_manager_events: expr,
309316
$onion_messenger: ident, $process_onion_message_handler_events: expr,
310317
$peer_manager: ident, $gossip_sync: ident,
318+
$process_sweeper: expr,
311319
$logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr,
312320
$timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr,
313321
) => { {
@@ -322,6 +330,7 @@ macro_rules! define_run_body {
322330
let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER);
323331
let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
324332
let mut last_rebroadcast_call = $get_timer(REBROADCAST_TIMER);
333+
let mut last_sweeper_call = $get_timer(SWEEPER_TIMER);
325334
let mut have_pruned = false;
326335
let mut have_decayed_scorer = false;
327336

@@ -465,6 +474,12 @@ macro_rules! define_run_body {
465474
$chain_monitor.rebroadcast_pending_claims();
466475
last_rebroadcast_call = $get_timer(REBROADCAST_TIMER);
467476
}
477+
478+
if $timer_elapsed(&mut last_sweeper_call, SWEEPER_TIMER) {
479+
log_trace!($logger, "Regenerating sweeper spends if necessary");
480+
$process_sweeper;
481+
last_sweeper_call = $get_timer(SWEEPER_TIMER);
482+
}
468483
}
469484

470485
// After we exit, ensure we persist the ChannelManager one final time - this avoids
@@ -627,6 +642,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
627642
/// ```
628643
/// # use lightning::io;
629644
/// # use lightning::events::ReplayEvent;
645+
/// # use lightning::util::sweep::OutputSweeper;
630646
/// # use std::sync::{Arc, RwLock};
631647
/// # use std::sync::atomic::{AtomicBool, Ordering};
632648
/// # use std::time::SystemTime;
@@ -666,6 +682,9 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
666682
/// # F: lightning::chain::Filter + Send + Sync + 'static,
667683
/// # FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
668684
/// # UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
685+
/// # D: lightning::sign::ChangeDestinationSource + Send + Sync + 'static,
686+
/// # K: lightning::util::persist::KVStore + Send + Sync + 'static,
687+
/// # O: lightning::sign::OutputSpender + Send + Sync + 'static,
669688
/// # > {
670689
/// # peer_manager: Arc<PeerManager<B, F, FE, UL>>,
671690
/// # event_handler: Arc<EventHandler>,
@@ -677,14 +696,18 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
677696
/// # persister: Arc<Store>,
678697
/// # logger: Arc<Logger>,
679698
/// # scorer: Arc<Scorer>,
699+
/// # sweeper: Arc<OutputSweeper<Arc<B>, Arc<D>, Arc<FE>, Arc<F>, Arc<K>, Arc<Logger>, Arc<O>>>,
680700
/// # }
681701
/// #
682702
/// # async fn setup_background_processing<
683703
/// # B: lightning::chain::chaininterface::BroadcasterInterface + Send + Sync + 'static,
684704
/// # F: lightning::chain::Filter + Send + Sync + 'static,
685705
/// # FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
686706
/// # UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
687-
/// # >(node: Node<B, F, FE, UL>) {
707+
/// # D: lightning::sign::ChangeDestinationSource + Send + Sync + 'static,
708+
/// # K: lightning::util::persist::KVStore + Send + Sync + 'static,
709+
/// # O: lightning::sign::OutputSpender + Send + Sync + 'static,
710+
/// # >(node: Node<B, F, FE, UL, D, K, O>) {
688711
/// let background_persister = Arc::clone(&node.persister);
689712
/// let background_event_handler = Arc::clone(&node.event_handler);
690713
/// let background_chain_mon = Arc::clone(&node.chain_monitor);
@@ -695,7 +718,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
695718
/// let background_liquidity_manager = Arc::clone(&node.liquidity_manager);
696719
/// let background_logger = Arc::clone(&node.logger);
697720
/// let background_scorer = Arc::clone(&node.scorer);
698-
///
721+
/// let background_sweeper = Arc::clone(&node.sweeper);
699722
/// // Setup the sleeper.
700723
#[cfg_attr(
701724
feature = "std",
@@ -729,6 +752,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
729752
/// background_gossip_sync,
730753
/// background_peer_man,
731754
/// Some(background_liquidity_manager),
755+
/// Some(background_sweeper),
732756
/// background_logger,
733757
/// Some(background_scorer),
734758
/// sleeper,
@@ -767,6 +791,10 @@ pub async fn process_events_async<
767791
RGS: 'static + Deref<Target = RapidGossipSync<G, L>>,
768792
PM: 'static + Deref,
769793
LM: 'static + Deref,
794+
D: 'static + Deref,
795+
O: 'static + Deref,
796+
K: 'static + Deref,
797+
OS: 'static + Deref<Target = OutputSweeper<T, D, F, CF, K, L, O>>,
770798
S: 'static + Deref<Target = SC> + Send + Sync,
771799
SC: for<'b> WriteableScore<'b>,
772800
SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
@@ -775,12 +803,12 @@ pub async fn process_events_async<
775803
>(
776804
persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
777805
onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
778-
liquidity_manager: Option<LM>, logger: L, scorer: Option<S>, sleeper: Sleeper,
779-
mobile_interruptable_platform: bool, fetch_time: FetchTime,
806+
liquidity_manager: Option<LM>, sweeper: Option<OS>, logger: L, scorer: Option<S>,
807+
sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime,
780808
) -> Result<(), lightning::io::Error>
781809
where
782810
UL::Target: 'static + UtxoLookup,
783-
CF::Target: 'static + chain::Filter,
811+
CF::Target: 'static + chain::Filter + Sync + Send,
784812
T::Target: 'static + BroadcasterInterface,
785813
F::Target: 'static + FeeEstimator,
786814
L::Target: 'static + Logger,
@@ -790,6 +818,9 @@ where
790818
OM::Target: AOnionMessenger,
791819
PM::Target: APeerManager,
792820
LM::Target: ALiquidityManager,
821+
O::Target: 'static + OutputSpender,
822+
D::Target: 'static + ChangeDestinationSource,
823+
K::Target: 'static + KVStore,
793824
{
794825
let mut should_break = false;
795826
let async_event_handler = |event| {
@@ -833,6 +864,11 @@ where
833864
},
834865
peer_manager,
835866
gossip_sync,
867+
{
868+
if let Some(ref sweeper) = sweeper {
869+
let _ = sweeper.regenerate_and_broadcast_spend_if_necessary();
870+
}
871+
},
836872
logger,
837873
scorer,
838874
should_break,
@@ -953,14 +989,18 @@ impl BackgroundProcessor {
953989
LM: 'static + Deref + Send,
954990
S: 'static + Deref<Target = SC> + Send + Sync,
955991
SC: for<'b> WriteableScore<'b>,
992+
D: 'static + Deref,
993+
O: 'static + Deref,
994+
K: 'static + Deref,
995+
OS: 'static + Deref<Target = OutputSweeper<T, D, F, CF, K, L, O>> + Send + Sync,
956996
>(
957997
persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
958998
onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
959-
liquidity_manager: Option<LM>, logger: L, scorer: Option<S>,
999+
liquidity_manager: Option<LM>, sweeper: Option<OS>, logger: L, scorer: Option<S>,
9601000
) -> Self
9611001
where
9621002
UL::Target: 'static + UtxoLookup,
963-
CF::Target: 'static + chain::Filter,
1003+
CF::Target: 'static + chain::Filter + Sync + Send,
9641004
T::Target: 'static + BroadcasterInterface,
9651005
F::Target: 'static + FeeEstimator,
9661006
L::Target: 'static + Logger,
@@ -970,6 +1010,9 @@ impl BackgroundProcessor {
9701010
OM::Target: AOnionMessenger,
9711011
PM::Target: APeerManager,
9721012
LM::Target: ALiquidityManager,
1013+
O::Target: 'static + OutputSpender,
1014+
D::Target: 'static + ChangeDestinationSource,
1015+
K::Target: 'static + KVStore,
9731016
{
9741017
let stop_thread = Arc::new(AtomicBool::new(false));
9751018
let stop_thread_clone = stop_thread.clone();
@@ -1005,6 +1048,11 @@ impl BackgroundProcessor {
10051048
},
10061049
peer_manager,
10071050
gossip_sync,
1051+
{
1052+
if let Some(ref sweeper) = sweeper {
1053+
let _ = sweeper.regenerate_and_broadcast_spend_if_necessary();
1054+
}
1055+
},
10081056
logger,
10091057
scorer,
10101058
stop_thread.load(Ordering::Acquire),
@@ -1269,7 +1317,7 @@ mod tests {
12691317
Arc<test_utils::TestBroadcaster>,
12701318
Arc<TestWallet>,
12711319
Arc<test_utils::TestFeeEstimator>,
1272-
Arc<dyn Filter + Sync + Send>,
1320+
Arc<test_utils::TestChainSource>,
12731321
Arc<FilesystemStore>,
12741322
Arc<test_utils::TestLogger>,
12751323
Arc<KeysManager>,
@@ -1648,7 +1696,7 @@ mod tests {
16481696
best_block,
16491697
Arc::clone(&tx_broadcaster),
16501698
Arc::clone(&fee_estimator),
1651-
None::<Arc<dyn Filter + Sync + Send>>,
1699+
None::<Arc<test_utils::TestChainSource>>,
16521700
Arc::clone(&keys_manager),
16531701
wallet,
16541702
Arc::clone(&kv_store),
@@ -1888,6 +1936,7 @@ mod tests {
18881936
nodes[0].p2p_gossip_sync(),
18891937
nodes[0].peer_manager.clone(),
18901938
Some(Arc::clone(&nodes[0].liquidity_manager)),
1939+
Some(nodes[0].sweeper.clone()),
18911940
nodes[0].logger.clone(),
18921941
Some(nodes[0].scorer.clone()),
18931942
);
@@ -1982,6 +2031,7 @@ mod tests {
19822031
nodes[0].no_gossip_sync(),
19832032
nodes[0].peer_manager.clone(),
19842033
Some(Arc::clone(&nodes[0].liquidity_manager)),
2034+
Some(nodes[0].sweeper.clone()),
19852035
nodes[0].logger.clone(),
19862036
Some(nodes[0].scorer.clone()),
19872037
);
@@ -2025,6 +2075,7 @@ mod tests {
20252075
nodes[0].no_gossip_sync(),
20262076
nodes[0].peer_manager.clone(),
20272077
Some(Arc::clone(&nodes[0].liquidity_manager)),
2078+
Some(nodes[0].sweeper.clone()),
20282079
nodes[0].logger.clone(),
20292080
Some(nodes[0].scorer.clone()),
20302081
);
@@ -2058,6 +2109,7 @@ mod tests {
20582109
nodes[0].rapid_gossip_sync(),
20592110
nodes[0].peer_manager.clone(),
20602111
Some(Arc::clone(&nodes[0].liquidity_manager)),
2112+
Some(nodes[0].sweeper.clone()),
20612113
nodes[0].logger.clone(),
20622114
Some(nodes[0].scorer.clone()),
20632115
move |dur: Duration| {
@@ -2095,6 +2147,7 @@ mod tests {
20952147
nodes[0].p2p_gossip_sync(),
20962148
nodes[0].peer_manager.clone(),
20972149
Some(Arc::clone(&nodes[0].liquidity_manager)),
2150+
Some(nodes[0].sweeper.clone()),
20982151
nodes[0].logger.clone(),
20992152
Some(nodes[0].scorer.clone()),
21002153
);
@@ -2125,6 +2178,7 @@ mod tests {
21252178
nodes[0].no_gossip_sync(),
21262179
nodes[0].peer_manager.clone(),
21272180
Some(Arc::clone(&nodes[0].liquidity_manager)),
2181+
Some(nodes[0].sweeper.clone()),
21282182
nodes[0].logger.clone(),
21292183
Some(nodes[0].scorer.clone()),
21302184
);
@@ -2172,6 +2226,7 @@ mod tests {
21722226
nodes[0].no_gossip_sync(),
21732227
nodes[0].peer_manager.clone(),
21742228
Some(Arc::clone(&nodes[0].liquidity_manager)),
2229+
Some(nodes[0].sweeper.clone()),
21752230
nodes[0].logger.clone(),
21762231
Some(nodes[0].scorer.clone()),
21772232
);
@@ -2235,6 +2290,7 @@ mod tests {
22352290
nodes[0].no_gossip_sync(),
22362291
nodes[0].peer_manager.clone(),
22372292
Some(Arc::clone(&nodes[0].liquidity_manager)),
2293+
Some(nodes[0].sweeper.clone()),
22382294
nodes[0].logger.clone(),
22392295
Some(nodes[0].scorer.clone()),
22402296
);
@@ -2280,10 +2336,22 @@ mod tests {
22802336

22812337
advance_chain(&mut nodes[0], 3);
22822338

2339+
let tx_broadcaster = nodes[0].tx_broadcaster.clone();
2340+
let wait_for_sweep_tx = || -> Transaction {
2341+
loop {
2342+
let sweep_tx = tx_broadcaster.txn_broadcasted.lock().unwrap().pop();
2343+
if let Some(sweep_tx) = sweep_tx {
2344+
return sweep_tx;
2345+
}
2346+
2347+
std::thread::sleep(Duration::from_millis(10));
2348+
}
2349+
};
2350+
22832351
// Check we generate an initial sweeping tx.
22842352
assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
2353+
let sweep_tx_0 = wait_for_sweep_tx();
22852354
let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
2286-
let sweep_tx_0 = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
22872355
match tracked_output.status {
22882356
OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => {
22892357
assert_eq!(sweep_tx_0.compute_txid(), latest_spending_tx.compute_txid());
@@ -2294,8 +2362,8 @@ mod tests {
22942362
// Check we regenerate and rebroadcast the sweeping tx each block.
22952363
advance_chain(&mut nodes[0], 1);
22962364
assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
2365+
let sweep_tx_1 = wait_for_sweep_tx();
22972366
let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
2298-
let sweep_tx_1 = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
22992367
match tracked_output.status {
23002368
OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => {
23012369
assert_eq!(sweep_tx_1.compute_txid(), latest_spending_tx.compute_txid());
@@ -2306,8 +2374,8 @@ mod tests {
23062374

23072375
advance_chain(&mut nodes[0], 1);
23082376
assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
2377+
let sweep_tx_2 = wait_for_sweep_tx();
23092378
let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
2310-
let sweep_tx_2 = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
23112379
match tracked_output.status {
23122380
OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => {
23132381
assert_eq!(sweep_tx_2.compute_txid(), latest_spending_tx.compute_txid());
@@ -2387,6 +2455,7 @@ mod tests {
23872455
nodes[0].no_gossip_sync(),
23882456
nodes[0].peer_manager.clone(),
23892457
Some(Arc::clone(&nodes[0].liquidity_manager)),
2458+
Some(nodes[0].sweeper.clone()),
23902459
nodes[0].logger.clone(),
23912460
Some(nodes[0].scorer.clone()),
23922461
);
@@ -2417,6 +2486,7 @@ mod tests {
24172486
nodes[0].no_gossip_sync(),
24182487
nodes[0].peer_manager.clone(),
24192488
Some(Arc::clone(&nodes[0].liquidity_manager)),
2489+
Some(nodes[0].sweeper.clone()),
24202490
nodes[0].logger.clone(),
24212491
Some(nodes[0].scorer.clone()),
24222492
);
@@ -2513,6 +2583,7 @@ mod tests {
25132583
nodes[0].rapid_gossip_sync(),
25142584
nodes[0].peer_manager.clone(),
25152585
Some(Arc::clone(&nodes[0].liquidity_manager)),
2586+
Some(nodes[0].sweeper.clone()),
25162587
nodes[0].logger.clone(),
25172588
Some(nodes[0].scorer.clone()),
25182589
);
@@ -2546,6 +2617,7 @@ mod tests {
25462617
nodes[0].rapid_gossip_sync(),
25472618
nodes[0].peer_manager.clone(),
25482619
Some(Arc::clone(&nodes[0].liquidity_manager)),
2620+
Some(nodes[0].sweeper.clone()),
25492621
nodes[0].logger.clone(),
25502622
Some(nodes[0].scorer.clone()),
25512623
move |dur: Duration| {
@@ -2709,6 +2781,7 @@ mod tests {
27092781
nodes[0].no_gossip_sync(),
27102782
nodes[0].peer_manager.clone(),
27112783
Some(Arc::clone(&nodes[0].liquidity_manager)),
2784+
Some(nodes[0].sweeper.clone()),
27122785
nodes[0].logger.clone(),
27132786
Some(nodes[0].scorer.clone()),
27142787
);
@@ -2760,6 +2833,7 @@ mod tests {
27602833
nodes[0].no_gossip_sync(),
27612834
nodes[0].peer_manager.clone(),
27622835
Some(Arc::clone(&nodes[0].liquidity_manager)),
2836+
Some(nodes[0].sweeper.clone()),
27632837
nodes[0].logger.clone(),
27642838
Some(nodes[0].scorer.clone()),
27652839
move |dur: Duration| {

0 commit comments

Comments
 (0)