Skip to content

Commit 2c4f824

Browse files
authored
Merge pull request #2528 from arik-so/arik/2023-08-2470-shorter-term-monitor-locks
Release monitor write lock in between update iterations
2 parents 61d896d + f80284c commit 2c4f824

File tree

2 files changed

+69
-48
lines changed

2 files changed

+69
-48
lines changed

lightning/src/chain/chainmonitor.rs

Lines changed: 60 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ use crate::ln::channelmanager::ChannelDetails;
4242

4343
use crate::prelude::*;
4444
use crate::sync::{RwLock, RwLockReadGuard, Mutex, MutexGuard};
45+
use core::iter::FromIterator;
4546
use core::ops::Deref;
4647
use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
4748
use bitcoin::secp256k1::PublicKey;
@@ -285,7 +286,22 @@ where C::Target: chain::Filter,
285286
where
286287
FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs>
287288
{
289+
let funding_outpoints: HashSet<OutPoint> = HashSet::from_iter(self.monitors.read().unwrap().keys().cloned());
290+
for funding_outpoint in funding_outpoints.iter() {
291+
let monitor_lock = self.monitors.read().unwrap();
292+
if let Some(monitor_state) = monitor_lock.get(funding_outpoint) {
293+
self.update_monitor_with_chain_data(header, best_height, txdata, &process, funding_outpoint, &monitor_state);
294+
}
295+
}
296+
297+
// do some followup cleanup if any funding outpoints were added in between iterations
288298
let monitor_states = self.monitors.write().unwrap();
299+
for (funding_outpoint, monitor_state) in monitor_states.iter() {
300+
if !funding_outpoints.contains(funding_outpoint) {
301+
self.update_monitor_with_chain_data(header, best_height, txdata, &process, funding_outpoint, &monitor_state);
302+
}
303+
}
304+
289305
if let Some(height) = best_height {
290306
// If the best block height is being updated, update highest_chain_height under the
291307
// monitors write lock.
@@ -295,55 +311,55 @@ where C::Target: chain::Filter,
295311
self.highest_chain_height.store(new_height, Ordering::Release);
296312
}
297313
}
314+
}
298315

299-
for (funding_outpoint, monitor_state) in monitor_states.iter() {
300-
let monitor = &monitor_state.monitor;
301-
let mut txn_outputs;
302-
{
303-
txn_outputs = process(monitor, txdata);
304-
let update_id = MonitorUpdateId {
305-
contents: UpdateOrigin::ChainSync(self.sync_persistence_id.get_increment()),
306-
};
307-
let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap();
308-
if let Some(height) = best_height {
309-
if !monitor_state.has_pending_chainsync_updates(&pending_monitor_updates) {
310-
// If there are not ChainSync persists awaiting completion, go ahead and
311-
// set last_chain_persist_height here - we wouldn't want the first
312-
// InProgress to always immediately be considered "overly delayed".
313-
monitor_state.last_chain_persist_height.store(height as usize, Ordering::Release);
314-
}
316+
fn update_monitor_with_chain_data<FN>(&self, header: &BlockHeader, best_height: Option<u32>, txdata: &TransactionData, process: FN, funding_outpoint: &OutPoint, monitor_state: &MonitorHolder<ChannelSigner>) where FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs> {
317+
let monitor = &monitor_state.monitor;
318+
let mut txn_outputs;
319+
{
320+
txn_outputs = process(monitor, txdata);
321+
let update_id = MonitorUpdateId {
322+
contents: UpdateOrigin::ChainSync(self.sync_persistence_id.get_increment()),
323+
};
324+
let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap();
325+
if let Some(height) = best_height {
326+
if !monitor_state.has_pending_chainsync_updates(&pending_monitor_updates) {
327+
// If there are not ChainSync persists awaiting completion, go ahead and
328+
// set last_chain_persist_height here - we wouldn't want the first
329+
// InProgress to always immediately be considered "overly delayed".
330+
monitor_state.last_chain_persist_height.store(height as usize, Ordering::Release);
315331
}
332+
}
316333

317-
log_trace!(self.logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor));
318-
match self.persister.update_persisted_channel(*funding_outpoint, None, monitor, update_id) {
319-
ChannelMonitorUpdateStatus::Completed =>
320-
log_trace!(self.logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)),
321-
ChannelMonitorUpdateStatus::PermanentFailure => {
322-
monitor_state.channel_perm_failed.store(true, Ordering::Release);
323-
self.pending_monitor_events.lock().unwrap().push((*funding_outpoint, vec![MonitorEvent::UpdateFailed(*funding_outpoint)], monitor.get_counterparty_node_id()));
324-
self.event_notifier.notify();
325-
},
326-
ChannelMonitorUpdateStatus::InProgress => {
327-
log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor));
328-
pending_monitor_updates.push(update_id);
329-
},
334+
log_trace!(self.logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor));
335+
match self.persister.update_persisted_channel(*funding_outpoint, None, monitor, update_id) {
336+
ChannelMonitorUpdateStatus::Completed =>
337+
log_trace!(self.logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)),
338+
ChannelMonitorUpdateStatus::PermanentFailure => {
339+
monitor_state.channel_perm_failed.store(true, Ordering::Release);
340+
self.pending_monitor_events.lock().unwrap().push((*funding_outpoint, vec![MonitorEvent::UpdateFailed(*funding_outpoint)], monitor.get_counterparty_node_id()));
341+
self.event_notifier.notify();
342+
}
343+
ChannelMonitorUpdateStatus::InProgress => {
344+
log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor));
345+
pending_monitor_updates.push(update_id);
330346
}
331347
}
348+
}
332349

333-
// Register any new outputs with the chain source for filtering, storing any dependent
334-
// transactions from within the block that previously had not been included in txdata.
335-
if let Some(ref chain_source) = self.chain_source {
336-
let block_hash = header.block_hash();
337-
for (txid, mut outputs) in txn_outputs.drain(..) {
338-
for (idx, output) in outputs.drain(..) {
339-
// Register any new outputs with the chain source for filtering
340-
let output = WatchedOutput {
341-
block_hash: Some(block_hash),
342-
outpoint: OutPoint { txid, index: idx as u16 },
343-
script_pubkey: output.script_pubkey,
344-
};
345-
chain_source.register_output(output)
346-
}
350+
// Register any new outputs with the chain source for filtering, storing any dependent
351+
// transactions from within the block that previously had not been included in txdata.
352+
if let Some(ref chain_source) = self.chain_source {
353+
let block_hash = header.block_hash();
354+
for (txid, mut outputs) in txn_outputs.drain(..) {
355+
for (idx, output) in outputs.drain(..) {
356+
// Register any new outputs with the chain source for filtering
357+
let output = WatchedOutput {
358+
block_hash: Some(block_hash),
359+
outpoint: OutPoint { txid, index: idx as u16 },
360+
script_pubkey: output.script_pubkey,
361+
};
362+
chain_source.register_output(output)
347363
}
348364
}
349365
}
@@ -976,7 +992,7 @@ mod tests {
976992
assert!(err.contains("ChannelMonitor storage failure")));
977993
check_added_monitors!(nodes[0], 2); // After the failure we generate a close-channel monitor update
978994
check_closed_broadcast!(nodes[0], true);
979-
check_closed_event!(nodes[0], 1, ClosureReason::ProcessingError { err: "ChannelMonitor storage failure".to_string() },
995+
check_closed_event!(nodes[0], 1, ClosureReason::ProcessingError { err: "ChannelMonitor storage failure".to_string() },
980996
[nodes[1].node.get_our_node_id()], 100000);
981997

982998
// However, as the ChainMonitor is still waiting for the original persistence to complete,

lightning/src/ln/monitor_tests.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2191,7 +2191,7 @@ fn test_anchors_aggregated_revoked_htlc_tx() {
21912191

21922192
// Alice should see that Bob is trying to claim to HTLCs, so she should now try to claim them at
21932193
// the second level instead.
2194-
let revoked_claims = {
2194+
let revoked_claim_transactions = {
21952195
let txn = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0);
21962196
assert_eq!(txn.len(), 2);
21972197

@@ -2205,10 +2205,14 @@ fn test_anchors_aggregated_revoked_htlc_tx() {
22052205
check_spends!(revoked_htlc_claim, htlc_tx);
22062206
}
22072207

2208-
txn
2208+
let mut revoked_claim_transaction_map = HashMap::new();
2209+
for current_tx in txn.into_iter() {
2210+
revoked_claim_transaction_map.insert(current_tx.txid(), current_tx);
2211+
}
2212+
revoked_claim_transaction_map
22092213
};
22102214
for node in &nodes {
2211-
mine_transactions(node, &revoked_claims.iter().collect::<Vec<_>>());
2215+
mine_transactions(node, &revoked_claim_transactions.values().collect::<Vec<_>>());
22122216
}
22132217

22142218

@@ -2234,7 +2238,8 @@ fn test_anchors_aggregated_revoked_htlc_tx() {
22342238
let spend_tx = nodes[0].keys_manager.backing.spend_spendable_outputs(
22352239
&[&outputs[0]], Vec::new(), Script::new_op_return(&[]), 253, None, &Secp256k1::new(),
22362240
).unwrap();
2237-
check_spends!(spend_tx, revoked_claims[idx]);
2241+
2242+
check_spends!(spend_tx, revoked_claim_transactions.get(&spend_tx.input[0].previous_output.txid).unwrap());
22382243
} else {
22392244
panic!("unexpected event");
22402245
}

0 commit comments

Comments
 (0)