Skip to content

feat: Make MonitorUpdatingPersister change persist type based on size #3834

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 155 additions & 3 deletions lightning/src/util/persist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,13 @@ where
/// If you have many stale updates stored (such as after a crash with pending lazy deletes), and
/// would like to get rid of them, consider using the
/// [`MonitorUpdatingPersister::cleanup_stale_updates`] function.
///
/// # Size-based persistence optimization
///
/// For small channel monitors (below `minimum_monitor_size_for_updates` bytes when serialized),
/// this persister will always write the full monitor instead of individual updates. This avoids
/// the overhead of managing update files and later compaction for tiny monitors that don't benefit
/// from differential updates.
pub struct MonitorUpdatingPersister<K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>
where
K::Target: KVStore,
Expand All @@ -458,6 +465,7 @@ where
kv_store: K,
logger: L,
maximum_pending_updates: u64,
minimum_monitor_size_for_updates: usize,
entropy_source: ES,
signer_provider: SP,
broadcaster: BI,
Expand Down Expand Up @@ -491,21 +499,52 @@ where
/// less frequent "waves."
/// - [`MonitorUpdatingPersister`] will potentially have more listing to do if you need to run
/// [`MonitorUpdatingPersister::cleanup_stale_updates`].
///
/// The `minimum_monitor_size_for_updates` parameter sets the minimum serialized size (in bytes)
/// for a [`ChannelMonitor`] to use update-based persistence. Monitors smaller than this threshold
/// will always be persisted in full, avoiding the overhead of managing update files for tiny
/// monitors. A reasonable default is 4096 bytes (4 KiB). Set to 0 to always use update-based
/// persistence regardless of size.
pub fn new(
kv_store: K, logger: L, maximum_pending_updates: u64, entropy_source: ES,
signer_provider: SP, broadcaster: BI, fee_estimator: FE,
kv_store: K, logger: L, maximum_pending_updates: u64,
minimum_monitor_size_for_updates: usize, entropy_source: ES, signer_provider: SP,
broadcaster: BI, fee_estimator: FE,
) -> Self {
MonitorUpdatingPersister {
kv_store,
logger,
maximum_pending_updates,
minimum_monitor_size_for_updates,
entropy_source,
signer_provider,
broadcaster,
fee_estimator,
}
}

/// Constructs a new [`MonitorUpdatingPersister`] with a default minimum monitor size threshold.
///
/// This is a convenience method that sets `minimum_monitor_size_for_updates` to 4096 bytes (4 KiB),
/// which is a reasonable default for most use cases. Monitors smaller than this will be persisted
/// in full rather than using update-based persistence.
///
/// For other parameters, see [`MonitorUpdatingPersister::new`].
pub fn new_with_default_threshold(
kv_store: K, logger: L, maximum_pending_updates: u64, entropy_source: ES,
signer_provider: SP, broadcaster: BI, fee_estimator: FE,
) -> Self {
Self::new(
kv_store,
logger,
maximum_pending_updates,
4096,
entropy_source,
signer_provider,
broadcaster,
fee_estimator,
)
}

/// Reads all stored channel monitors, along with any stored updates for them.
///
/// It is extremely important that your [`KVStore::read`] implementation uses the
Expand Down Expand Up @@ -752,7 +791,12 @@ where
) -> chain::ChannelMonitorUpdateStatus {
const LEGACY_CLOSED_CHANNEL_UPDATE_ID: u64 = u64::MAX;
if let Some(update) = update {
let persist_update = update.update_id != LEGACY_CLOSED_CHANNEL_UPDATE_ID
// Check if monitor is too small for update-based persistence
let monitor_size = monitor.serialized_length();
let use_full_persistence = monitor_size < self.minimum_monitor_size_for_updates;

let persist_update = !use_full_persistence
&& update.update_id != LEGACY_CLOSED_CHANNEL_UPDATE_ID
&& update.update_id % self.maximum_pending_updates != 0;
if persist_update {
let monitor_key = monitor_name.to_string();
Expand Down Expand Up @@ -1156,6 +1200,7 @@ mod tests {
kv_store: &TestStore::new(false),
logger: &TestLogger::new(),
maximum_pending_updates: persister_0_max_pending_updates,
minimum_monitor_size_for_updates: 0,
entropy_source: &chanmon_cfgs[0].keys_manager,
signer_provider: &chanmon_cfgs[0].keys_manager,
broadcaster: &chanmon_cfgs[0].tx_broadcaster,
Expand All @@ -1165,6 +1210,7 @@ mod tests {
kv_store: &TestStore::new(false),
logger: &TestLogger::new(),
maximum_pending_updates: persister_1_max_pending_updates,
minimum_monitor_size_for_updates: 0,
entropy_source: &chanmon_cfgs[1].keys_manager,
signer_provider: &chanmon_cfgs[1].keys_manager,
broadcaster: &chanmon_cfgs[1].tx_broadcaster,
Expand Down Expand Up @@ -1330,6 +1376,7 @@ mod tests {
kv_store: &TestStore::new(true),
logger: &TestLogger::new(),
maximum_pending_updates: 11,
minimum_monitor_size_for_updates: 0,
entropy_source: node_cfgs[0].keys_manager,
signer_provider: node_cfgs[0].keys_manager,
broadcaster: node_cfgs[0].tx_broadcaster,
Expand Down Expand Up @@ -1376,6 +1423,7 @@ mod tests {
kv_store: &TestStore::new(false),
logger: &TestLogger::new(),
maximum_pending_updates: test_max_pending_updates,
minimum_monitor_size_for_updates: 0,
entropy_source: &chanmon_cfgs[0].keys_manager,
signer_provider: &chanmon_cfgs[0].keys_manager,
broadcaster: &chanmon_cfgs[0].tx_broadcaster,
Expand All @@ -1385,6 +1433,7 @@ mod tests {
kv_store: &TestStore::new(false),
logger: &TestLogger::new(),
maximum_pending_updates: test_max_pending_updates,
minimum_monitor_size_for_updates: 0,
entropy_source: &chanmon_cfgs[1].keys_manager,
signer_provider: &chanmon_cfgs[1].keys_manager,
broadcaster: &chanmon_cfgs[1].tx_broadcaster,
Expand Down Expand Up @@ -1452,6 +1501,109 @@ mod tests {
.is_err());
}

// Test that the size-based optimization skips update-based persistence for small monitors
#[test]
fn test_size_based_optimization() {
let chanmon_cfgs = create_chanmon_cfgs(2);

// Create a persister with a huge minimum size threshold (100KB)
// This should force all monitors to use full persistence instead of updates
let large_threshold_persister = MonitorUpdatingPersister {
kv_store: &TestStore::new(false),
logger: &TestLogger::new(),
maximum_pending_updates: 5,
minimum_monitor_size_for_updates: 100_000, // 100KB threshold - way larger than any monitor
entropy_source: &chanmon_cfgs[0].keys_manager,
signer_provider: &chanmon_cfgs[0].keys_manager,
broadcaster: &chanmon_cfgs[0].tx_broadcaster,
fee_estimator: &chanmon_cfgs[0].fee_estimator,
};

// Create a persister with zero minimum size threshold
// This should allow all monitors to use update-based persistence
let small_threshold_persister = MonitorUpdatingPersister {
kv_store: &TestStore::new(false),
logger: &TestLogger::new(),
maximum_pending_updates: 5,
minimum_monitor_size_for_updates: 0, // 0 byte threshold - allows all monitors to use updates
entropy_source: &chanmon_cfgs[1].keys_manager,
signer_provider: &chanmon_cfgs[1].keys_manager,
broadcaster: &chanmon_cfgs[1].tx_broadcaster,
fee_estimator: &chanmon_cfgs[1].fee_estimator,
};

let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let chain_mon_0 = test_utils::TestChainMonitor::new(
Some(&chanmon_cfgs[0].chain_source),
&chanmon_cfgs[0].tx_broadcaster,
&chanmon_cfgs[0].logger,
&chanmon_cfgs[0].fee_estimator,
&large_threshold_persister,
&chanmon_cfgs[0].keys_manager,
);
let chain_mon_1 = test_utils::TestChainMonitor::new(
Some(&chanmon_cfgs[1].chain_source),
&chanmon_cfgs[1].tx_broadcaster,
&chanmon_cfgs[1].logger,
&chanmon_cfgs[1].fee_estimator,
&small_threshold_persister,
&chanmon_cfgs[1].keys_manager,
);
node_cfgs[0].chain_monitor = chain_mon_0;
node_cfgs[1].chain_monitor = chain_mon_1;
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);

// Create a channel and make a payment to trigger monitor updates
let _ = create_announced_chan_between_nodes(&nodes, 0, 1);
send_payment(&nodes[0], &vec![&nodes[1]][..], 1_000_000);

// Test passes if we can create the channels and send payments without issues.
// The actual verification is that the different persisters behave differently
// based on their thresholds, which we can verify by ensuring no panics occur.

// Verify that monitors were created
let persisted_data_0 =
large_threshold_persister.read_all_channel_monitors_with_updates().unwrap();
let persisted_data_1 =
small_threshold_persister.read_all_channel_monitors_with_updates().unwrap();

assert_eq!(persisted_data_0.len(), 1);
assert_eq!(persisted_data_1.len(), 1);

// Verify the monitors exist and are of reasonable size
for (_, monitor) in persisted_data_0.iter() {
let monitor_size = monitor.serialized_length();
// Just verify the monitor is not empty and reasonably sized
assert!(
monitor_size > 1000,
"Monitor should be at least 1KB in size, got {} bytes",
monitor_size
);
assert!(
monitor_size < 100_000,
"Monitor should be smaller than 100KB threshold, got {} bytes",
monitor_size
);
}

for (_, monitor) in persisted_data_1.iter() {
let monitor_size = monitor.serialized_length();
// Just verify the monitor is not empty and reasonably sized
assert!(
monitor_size > 1000,
"Monitor should be at least 1KB in size, got {} bytes",
monitor_size
);
// Since threshold is 0, this monitor should be large enough to use updates
assert!(
monitor_size > 0,
"Monitor should be larger than 0 byte threshold, got {} bytes",
monitor_size
);
}
}

fn persist_fn<P: Deref, ChannelSigner: EcdsaChannelSigner>(_persist: P) -> bool
where
P::Target: Persist<ChannelSigner>,
Expand Down
Loading