Skip to content

Optimize ChannelMonitor persistence on block connections. #2966

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 108 additions & 12 deletions lightning/src/chain/chainmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use crate::prelude::*;
use crate::sync::{RwLock, RwLockReadGuard, Mutex, MutexGuard};
use core::ops::Deref;
use core::sync::atomic::{AtomicUsize, Ordering};
use bitcoin::hashes::Hash;
use bitcoin::secp256k1::PublicKey;

/// `Persist` defines behavior for persisting channel monitors: this could mean
Expand Down Expand Up @@ -260,10 +261,11 @@ where C::Target: chain::Filter,
{
let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
let funding_outpoints = hash_set_from_iter(self.monitors.read().unwrap().keys().cloned());
let channel_count = funding_outpoints.len();
for funding_outpoint in funding_outpoints.iter() {
let monitor_lock = self.monitors.read().unwrap();
if let Some(monitor_state) = monitor_lock.get(funding_outpoint) {
if self.update_monitor_with_chain_data(header, txdata, &process, funding_outpoint, &monitor_state).is_err() {
if self.update_monitor_with_chain_data(header, best_height, txdata, &process, funding_outpoint, &monitor_state, channel_count).is_err() {
// Take the monitors lock for writing so that we poison it and any future
// operations going forward fail immediately.
core::mem::drop(monitor_lock);
Expand All @@ -278,7 +280,7 @@ where C::Target: chain::Filter,
let monitor_states = self.monitors.write().unwrap();
for (funding_outpoint, monitor_state) in monitor_states.iter() {
if !funding_outpoints.contains(funding_outpoint) {
if self.update_monitor_with_chain_data(header, txdata, &process, funding_outpoint, &monitor_state).is_err() {
if self.update_monitor_with_chain_data(header, best_height, txdata, &process, funding_outpoint, &monitor_state, channel_count).is_err() {
log_error!(self.logger, "{}", err_str);
panic!("{}", err_str);
}
Expand All @@ -297,14 +299,29 @@ where C::Target: chain::Filter,
}

fn update_monitor_with_chain_data<FN>(
&self, header: &Header, txdata: &TransactionData, process: FN, funding_outpoint: &OutPoint,
monitor_state: &MonitorHolder<ChannelSigner>
&self, header: &Header, best_height: Option<u32>, txdata: &TransactionData, process: FN, funding_outpoint: &OutPoint,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the reason best_height is used to offset the modulus?

Copy link
Contributor Author

@G8XSU G8XSU Jun 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use it to distribute monitor persistence across time.

monitor_state: &MonitorHolder<ChannelSigner>, channel_count: usize,
) -> Result<(), ()> where FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs> {
let monitor = &monitor_state.monitor;
let logger = WithChannelMonitor::from(&self.logger, &monitor, None);
let mut txn_outputs;
{
txn_outputs = process(monitor, txdata);

let mut txn_outputs = process(monitor, txdata);

let get_partition_key = |funding_outpoint: &OutPoint| {
let funding_txid_hash = funding_outpoint.txid.to_raw_hash();
let funding_txid_hash_bytes = funding_txid_hash.as_byte_array();
let funding_txid_u32 = u32::from_be_bytes([funding_txid_hash_bytes[0], funding_txid_hash_bytes[1], funding_txid_hash_bytes[2], funding_txid_hash_bytes[3]]);
funding_txid_u32.wrapping_add(best_height.unwrap_or_default())
};

let partition_factor = if channel_count < 15 {
5
} else {
50 // ~ 8hours
};

let has_pending_claims = monitor_state.monitor.has_pending_claims();
if has_pending_claims || get_partition_key(funding_outpoint) % partition_factor == 0 {
log_trace!(logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor));
match self.persister.update_persisted_channel(*funding_outpoint, None, monitor) {
ChannelMonitorUpdateStatus::Completed =>
Expand All @@ -313,10 +330,10 @@ where C::Target: chain::Filter,
),
ChannelMonitorUpdateStatus::InProgress => {
log_trace!(logger, "Channel Monitor sync for channel {} in progress.", log_funding_info!(monitor));
},
}
ChannelMonitorUpdateStatus::UnrecoverableError => {
return Err(());
},
}
}
}

Expand Down Expand Up @@ -870,14 +887,17 @@ impl<ChannelSigner: EcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref,

#[cfg(test)]
mod tests {
use crate::check_added_monitors;
use crate::{check_added_monitors, check_closed_event};
use crate::{expect_payment_path_successful, get_event_msg};
use crate::{get_htlc_update_msgs, get_revoke_commit_msgs};
use crate::chain::{ChannelMonitorUpdateStatus, Watch};
use crate::events::{Event, MessageSendEvent, MessageSendEventsProvider};
use crate::chain::channelmonitor::ANTI_REORG_DELAY;
use crate::events::{ClosureReason, Event, MessageSendEvent, MessageSendEventsProvider};
use crate::ln::functional_test_utils::*;
use crate::ln::msgs::ChannelMessageHandler;

const CHAINSYNC_MONITOR_PARTITION_FACTOR: u32 = 5;

#[test]
fn test_async_ooo_offchain_updates() {
// Test that if we have multiple offchain updates being persisted and they complete
Expand Down Expand Up @@ -983,6 +1003,79 @@ mod tests {
check_added_monitors!(nodes[0], 1);
}

#[test]
fn test_chainsync_triggers_distributed_monitor_persistence() {
let chanmon_cfgs = create_chanmon_cfgs(3);
let node_cfgs = create_node_cfgs(3, &chanmon_cfgs);
let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]);
let nodes = create_network(3, &node_cfgs, &node_chanmgrs);

// Use FullBlockViaListen to avoid duplicate calls to process_chain_data and skips_blocks() in
// case of other connect_styles.
*nodes[0].connect_style.borrow_mut() = ConnectStyle::FullBlockViaListen;
*nodes[1].connect_style.borrow_mut() = ConnectStyle::FullBlockViaListen;
*nodes[2].connect_style.borrow_mut() = ConnectStyle::FullBlockViaListen;

let _channel_1 = create_announced_chan_between_nodes(&nodes, 0, 1).2;
let channel_2 = create_announced_chan_between_nodes_with_value(&nodes, 0, 2, 1_000_000, 0).2;

chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().clear();
chanmon_cfgs[1].persister.chain_sync_monitor_persistences.lock().unwrap().clear();
chanmon_cfgs[2].persister.chain_sync_monitor_persistences.lock().unwrap().clear();

connect_blocks(&nodes[0], CHAINSYNC_MONITOR_PARTITION_FACTOR * 2);
connect_blocks(&nodes[1], CHAINSYNC_MONITOR_PARTITION_FACTOR * 2);
connect_blocks(&nodes[2], CHAINSYNC_MONITOR_PARTITION_FACTOR * 2);

// Connecting [`DEFAULT_CHAINSYNC_PARTITION_FACTOR`] * 2 blocks should trigger only 2 writes
// per monitor/channel.
assert_eq!(2 * 2, chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().len());
assert_eq!(2, chanmon_cfgs[1].persister.chain_sync_monitor_persistences.lock().unwrap().len());
assert_eq!(2, chanmon_cfgs[2].persister.chain_sync_monitor_persistences.lock().unwrap().len());

// Test that monitors with pending_claims are persisted on every block.
// Now, close channel_2 i.e. b/w node-0 and node-2 to create pending_claim in node[0].
nodes[0].node.force_close_broadcasting_latest_txn(&channel_2, &nodes[2].node.get_our_node_id(), "Channel force-closed".to_string()).unwrap();
check_closed_event!(&nodes[0], 1, ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(true) }, false,
[nodes[2].node.get_our_node_id()], 1000000);
check_closed_broadcast(&nodes[0], 1, true);
let close_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0);
assert_eq!(close_tx.len(), 1);

mine_transaction(&nodes[2], &close_tx[0]);
check_added_monitors(&nodes[2], 1);
check_closed_broadcast(&nodes[2], 1, true);
check_closed_event!(&nodes[2], 1, ClosureReason::CommitmentTxConfirmed, false,
[nodes[0].node.get_our_node_id()], 1000000);

chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().clear();
chanmon_cfgs[2].persister.chain_sync_monitor_persistences.lock().unwrap().clear();

// For channel_2, there should be a monitor write for every block connection.
// We connect [`DEFAULT_CHAINSYNC_MONITOR_PARTITION_FACTOR`] blocks since we don't know when
// channel_1 monitor persistence will occur, with [`DEFAULT_CHAINSYNC_MONITOR_PARTITION_FACTOR`]
// it will be persisted exactly once.
connect_blocks(&nodes[0], CHAINSYNC_MONITOR_PARTITION_FACTOR);
connect_blocks(&nodes[2], CHAINSYNC_MONITOR_PARTITION_FACTOR);

// DEFAULT_CHAINSYNC_MONITOR_PARTITION_FACTOR writes for channel_2 due to pending_claim, 1 for
// channel_1
assert_eq!((CHAINSYNC_MONITOR_PARTITION_FACTOR + 1) as usize, chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().len());
// For node[2], there is no pending_claim
assert_eq!(1, chanmon_cfgs[2].persister.chain_sync_monitor_persistences.lock().unwrap().len());

// Confirm claim for node[0] with ANTI_REORG_DELAY and reset monitor write counter.
mine_transaction(&nodes[0], &close_tx[0]);
connect_blocks(&nodes[0], ANTI_REORG_DELAY - 1);
check_added_monitors(&nodes[0], 1);
chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().clear();

// Again connect 1 full cycle of DEFAULT_CHAINSYNC_MONITOR_PARTITION_FACTOR blocks, it should only
// result in 1 write per monitor/channel.
connect_blocks(&nodes[0], CHAINSYNC_MONITOR_PARTITION_FACTOR);
assert_eq!(2, chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().len());
}

#[test]
#[cfg(feature = "std")]
fn update_during_chainsync_poisons_channel() {
Expand All @@ -991,12 +1084,15 @@ mod tests {
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
create_announced_chan_between_nodes(&nodes, 0, 1);
*nodes[0].connect_style.borrow_mut() = ConnectStyle::FullBlockViaListen;

chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::UnrecoverableError);

assert!(std::panic::catch_unwind(|| {
// Returning an UnrecoverableError should always panic immediately
connect_blocks(&nodes[0], 1);
// Connecting [`DEFAULT_CHAINSYNC_PARTITION_FACTOR`] blocks so that we trigger some persistence
// after accounting for block-height based partitioning/distribution.
connect_blocks(&nodes[0], CHAINSYNC_MONITOR_PARTITION_FACTOR);
}).is_err());
assert!(std::panic::catch_unwind(|| {
// ...and also poison our locks causing later use to panic as well
Expand Down
6 changes: 6 additions & 0 deletions lightning/src/chain/channelmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1812,6 +1812,12 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitor<Signer> {
);
}

/// Returns true if the monitor has pending claim requests that are not fully confirmed yet.
pub fn has_pending_claims(&self) -> bool
{
self.inner.lock().unwrap().onchain_tx_handler.has_pending_claims()
}

/// Triggers rebroadcasts of pending claims from a force-closed channel after a transaction
/// signature generation failure.
pub fn signer_unblocked<B: Deref, F: Deref, L: Deref>(
Expand Down
7 changes: 7 additions & 0 deletions lightning/src/chain/onchaintx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,13 @@ impl<ChannelSigner: EcdsaChannelSigner> OnchainTxHandler<ChannelSigner> {
}
}

/// Returns true if we are currently tracking any pending claim requests that are not fully
/// confirmed yet.
pub(super) fn has_pending_claims(&self) -> bool
{
self.pending_claim_requests.len() != 0
}

/// Lightning security model (i.e being able to redeem/timeout HTLC or penalize counterparty
/// onchain) lays on the assumption of claim transactions getting confirmed before timelock
/// expiration (CSV or CLTV following cases). In case of high-fee spikes, claim tx may get stuck
Expand Down
11 changes: 9 additions & 2 deletions lightning/src/util/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -545,12 +545,16 @@ pub struct TestPersister {
///
/// [`ChannelMonitor`]: channelmonitor::ChannelMonitor
pub offchain_monitor_updates: Mutex<HashMap<OutPoint, HashSet<u64>>>,
/// When we get an update_persisted_channel call with no ChannelMonitorUpdate, we insert the
/// monitor's funding outpoint here.
pub chain_sync_monitor_persistences: Mutex<VecDeque<OutPoint>>
}
impl TestPersister {
pub fn new() -> Self {
Self {
update_rets: Mutex::new(VecDeque::new()),
offchain_monitor_updates: Mutex::new(new_hash_map()),
chain_sync_monitor_persistences: Mutex::new(VecDeque::new())
}
}

Expand All @@ -573,15 +577,18 @@ impl<Signer: sign::ecdsa::EcdsaChannelSigner> chainmonitor::Persist<Signer> for
ret = update_ret;
}

if let Some(update) = update {
if let Some(update) = update {
self.offchain_monitor_updates.lock().unwrap().entry(funding_txo).or_insert(new_hash_set()).insert(update.update_id);
} else {
self.chain_sync_monitor_persistences.lock().unwrap().push_back(funding_txo);
}
ret
}

fn archive_persisted_channel(&self, funding_txo: OutPoint) {
// remove the channel from the offchain_monitor_updates map
// remove the channel from the offchain_monitor_updates and chain_sync_monitor_persistences.
self.offchain_monitor_updates.lock().unwrap().remove(&funding_txo);
self.chain_sync_monitor_persistences.lock().unwrap().retain(|x| x != &funding_txo);
}
}

Expand Down
Loading