Skip to content

Commit c1c4679

Browse files
committed
split mutex
1 parent 6c5dbbe commit c1c4679

File tree

2 files changed

+82
-46
lines changed

2 files changed

+82
-46
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1117,7 +1117,6 @@ mod tests {
11171117
use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
11181118
use lightning::routing::router::{CandidateRouteHop, DefaultRouter, Path, RouteHop};
11191119
use lightning::routing::scoring::{ChannelUsage, LockableScore, ScoreLookUp, ScoreUpdate};
1120-
use lightning::routing::utxo::UtxoLookup;
11211120
use lightning::sign::{AsyncGetChangeDestinationScriptResult, ChangeDestinationSource, InMemorySigner, KeysManager};
11221121
use lightning::types::features::{ChannelFeatures, NodeFeatures};
11231122
use lightning::types::payment::PaymentHash;

lightning/src/util/sweep.rs

Lines changed: 82 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use crate::{impl_writeable_tlv_based, log_debug, log_error};
2828
use bitcoin::block::Header;
2929
use bitcoin::locktime::absolute::LockTime;
3030
use bitcoin::secp256k1::Secp256k1;
31-
use bitcoin::{BlockHash, Transaction, Txid};
31+
use bitcoin::{BlockHash, ScriptBuf, Transaction, Txid};
3232

3333
use core::ops::Deref;
3434

@@ -372,7 +372,7 @@ where
372372
output_spender: O, change_destination_source: D, kv_store: K, logger: L,
373373
) -> Self {
374374
let outputs = Vec::new();
375-
let sweeper_state = Mutex::new(SweeperState { outputs, best_block });
375+
let sweeper_state = Mutex::new(SweeperState { outputs, best_block, sweep_pending: false });
376376
Self {
377377
sweeper_state,
378378
broadcaster,
@@ -451,14 +451,71 @@ where
451451

452452
/// Regenerates and broadcasts the spending transaction for any outputs that are pending
453453
pub async fn regenerate_and_broadcast_spend_if_necessary_locked(&self) -> Result<(), ()> {
454-
let mut sweeper_state = self.sweeper_state.lock().unwrap();
455-
self.regenerate_and_broadcast_spend_if_necessary(&mut *sweeper_state).await
454+
// Collect spendable output descriptors.
455+
let respend_descriptors_clones: Vec<SpendableOutputDescriptor>;
456+
let respend_descriptors: Vec<&SpendableOutputDescriptor>;
457+
{
458+
let mut sweeper_state = self.sweeper_state.lock().unwrap();
459+
460+
// Prevent concurrent sweeping.
461+
if sweeper_state.sweep_pending {
462+
return Ok(());
463+
}
464+
465+
let cur_height = sweeper_state.best_block.height;
466+
let filter_fn = |o: &TrackedSpendableOutput| {
467+
if o.status.is_confirmed() {
468+
// Don't rebroadcast confirmed txs.
469+
return false;
470+
}
471+
472+
if o.status.is_delayed(cur_height) {
473+
// Don't generate and broadcast if still delayed
474+
return false;
475+
}
476+
477+
if o.status.latest_broadcast_height() >= Some(cur_height) {
478+
// Only broadcast once per block height.
479+
return false;
480+
}
481+
482+
true
483+
};
484+
485+
// Clone first, otherwise we can't take references that outlive the lock.
486+
respend_descriptors_clones =
487+
sweeper_state.outputs.iter().filter(|o| filter_fn(*o)).map(|o| o.descriptor.clone()).collect();
488+
489+
respend_descriptors = respend_descriptors_clones.iter().collect();
490+
491+
if respend_descriptors.is_empty() {
492+
// Nothing to do.
493+
return Ok(());
494+
}
495+
496+
// There is something to sweep. Block concurrent sweeps.
497+
sweeper_state.sweep_pending = true;
498+
}
499+
500+
// Request a new change address outside of the mutex to avoid the mutex crossing await.
501+
let change_destination_script_result = self.change_destination_source.get_change_destination_script().await;
502+
503+
// Sweep the outputs.
504+
{
505+
let mut sweeper_state = self.sweeper_state.lock().unwrap();
506+
507+
// Always allow a new sweep after this spend.
508+
sweeper_state.sweep_pending = false;
509+
510+
let change_destination_script = change_destination_script_result?;
511+
self.regenerate_and_broadcast_spend_if_necessary(&mut *sweeper_state, respend_descriptors, change_destination_script)
512+
}
456513
}
457514

458-
async fn regenerate_and_broadcast_spend_if_necessary(
459-
&self, sweeper_state: &mut SweeperState,
515+
fn regenerate_and_broadcast_spend_if_necessary(
516+
&self, sweeper_state: &mut SweeperState, respend_descriptors: Vec<&SpendableOutputDescriptor>, change_destination_script: ScriptBuf,
460517
) -> Result<(), ()> {
461-
let spending_tx_opt = self.regenerate_spend_if_necessary(sweeper_state).await;
518+
let spending_tx_opt = self.regenerate_spend_if_necessary(sweeper_state, respend_descriptors, change_destination_script);
462519
if let Some(spending_tx) = spending_tx_opt {
463520
self.persist_state(&*sweeper_state).map_err(|e| {
464521
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
@@ -470,39 +527,13 @@ where
470527
Ok(())
471528
}
472529

473-
async fn regenerate_spend_if_necessary(
474-
&self, sweeper_state: &mut SweeperState,
530+
fn regenerate_spend_if_necessary(
531+
&self, sweeper_state: &mut SweeperState, respend_descriptors: Vec<&SpendableOutputDescriptor>, change_destination_script: ScriptBuf,
475532
) -> Option<Transaction> {
476533
let cur_height = sweeper_state.best_block.height;
477534
let cur_hash = sweeper_state.best_block.block_hash;
478-
let filter_fn = |o: &TrackedSpendableOutput| {
479-
if o.status.is_confirmed() {
480-
// Don't rebroadcast confirmed txs.
481-
return false;
482-
}
483535

484-
if o.status.is_delayed(cur_height) {
485-
// Don't generate and broadcast if still delayed
486-
return false;
487-
}
488-
489-
if o.status.latest_broadcast_height() >= Some(cur_height) {
490-
// Only broadcast once per block height.
491-
return false;
492-
}
493-
494-
true
495-
};
496-
497-
let respend_descriptors: Vec<&SpendableOutputDescriptor> =
498-
sweeper_state.outputs.iter().filter(|o| filter_fn(*o)).map(|o| &o.descriptor).collect();
499-
500-
if respend_descriptors.is_empty() {
501-
// Nothing to do.
502-
return None;
503-
}
504-
505-
let spending_tx = match self.spend_outputs(&*sweeper_state, respend_descriptors).await {
536+
let spending_tx = match self.spend_outputs(&*sweeper_state, &respend_descriptors, change_destination_script) {
506537
Ok(spending_tx) => {
507538
log_debug!(
508539
self.logger,
@@ -517,10 +548,14 @@ where
517548
},
518549
};
519550

520-
// As we didn't modify the state so far, the same filter_fn yields the same elements as
521-
// above.
522-
let respend_outputs = sweeper_state.outputs.iter_mut().filter(|o| filter_fn(&**o));
523-
for output_info in respend_outputs {
551+
// Watch outputs and update status.
552+
for output_info in respend_descriptors {
553+
let output_info = sweeper_state
554+
.outputs
555+
.iter_mut()
556+
.find(|o| o.descriptor == *output_info)
557+
.expect("We should have found the output info");
558+
524559
if let Some(filter) = self.chain_data_source.as_ref() {
525560
let watched_output = output_info.to_watched_output(cur_hash);
526561
filter.register_output(watched_output);
@@ -573,17 +608,15 @@ where
573608
})
574609
}
575610

576-
async fn spend_outputs(
577-
&self, sweeper_state: &SweeperState, descriptors: Vec<&SpendableOutputDescriptor>,
611+
fn spend_outputs(
612+
&self, sweeper_state: &SweeperState, descriptors: &Vec<&SpendableOutputDescriptor>, change_destination_script: ScriptBuf,
578613
) -> Result<Transaction, ()> {
579614
let tx_feerate =
580615
self.fee_estimator.get_est_sat_per_1000_weight(ConfirmationTarget::OutputSpendingFee);
581-
let change_destination_script =
582-
self.change_destination_source.get_change_destination_script().await?;
583616
let cur_height = sweeper_state.best_block.height;
584617
let locktime = Some(LockTime::from_height(cur_height).unwrap_or(LockTime::ZERO));
585618
self.output_spender.spend_spendable_outputs(
586-
&descriptors,
619+
descriptors,
587620
Vec::new(),
588621
change_destination_script,
589622
tx_feerate,
@@ -746,11 +779,15 @@ where
746779
struct SweeperState {
747780
outputs: Vec<TrackedSpendableOutput>,
748781
best_block: BestBlock,
782+
sweep_pending: bool,
749783
}
750784

751785
impl_writeable_tlv_based!(SweeperState, {
752786
(0, outputs, required_vec),
753787
(2, best_block, required),
788+
789+
// TODO: Do not persist this field.
790+
(4, sweep_pending, required),
754791
});
755792

756793
/// A `enum` signalling to the [`OutputSweeper`] that it should delay spending an output until a

0 commit comments

Comments
 (0)