Skip to content

Commit e8d6a88

Browse files
committed
Process updates before archiving monitors.
1 parent 0d7ae86 commit e8d6a88

File tree

1 file changed

+72
-10
lines changed

1 file changed

+72
-10
lines changed

lightning/src/util/persist.rs

Lines changed: 72 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -400,28 +400,34 @@ where
400400
/// If you have many stale updates stored (such as after a crash with pending lazy deletes), and
401401
/// would like to get rid of them, consider using the
402402
/// [`MonitorUpdatingPersister::cleanup_stale_updates`] function.
403-
pub struct MonitorUpdatingPersister<K: Deref, L: Deref, ES: Deref, SP: Deref>
403+
pub struct MonitorUpdatingPersister<K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>
404404
where
405405
K::Target: KVStore,
406406
L::Target: Logger,
407407
ES::Target: EntropySource + Sized,
408408
SP::Target: SignerProvider + Sized,
409+
BI::Target: BroadcasterInterface,
410+
FE::Target: FeeEstimator
409411
{
410412
kv_store: K,
411413
logger: L,
412414
maximum_pending_updates: u64,
413415
entropy_source: ES,
414416
signer_provider: SP,
417+
broadcaster: BI,
418+
fee_estimator: FE
415419
}
416420

417421
#[allow(dead_code)]
418-
impl<K: Deref, L: Deref, ES: Deref, SP: Deref>
419-
MonitorUpdatingPersister<K, L, ES, SP>
422+
impl<K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>
423+
MonitorUpdatingPersister<K, L, ES, SP, BI, FE>
420424
where
421425
K::Target: KVStore,
422426
L::Target: Logger,
423427
ES::Target: EntropySource + Sized,
424428
SP::Target: SignerProvider + Sized,
429+
BI::Target: BroadcasterInterface,
430+
FE::Target: FeeEstimator
425431
{
426432
/// Constructs a new [`MonitorUpdatingPersister`].
427433
///
@@ -441,14 +447,16 @@ where
441447
/// [`MonitorUpdatingPersister::cleanup_stale_updates`].
442448
pub fn new(
443449
kv_store: K, logger: L, maximum_pending_updates: u64, entropy_source: ES,
444-
signer_provider: SP,
450+
signer_provider: SP, broadcaster: BI, fee_estimator: FE
445451
) -> Self {
446452
MonitorUpdatingPersister {
447453
kv_store,
448454
logger,
449455
maximum_pending_updates,
450456
entropy_source,
451457
signer_provider,
458+
broadcaster,
459+
fee_estimator
452460
}
453461
}
454462

@@ -639,13 +647,15 @@ where
639647
}
640648
}
641649

642-
impl<ChannelSigner: EcdsaChannelSigner, K: Deref, L: Deref, ES: Deref, SP: Deref>
643-
Persist<ChannelSigner> for MonitorUpdatingPersister<K, L, ES, SP>
650+
impl<ChannelSigner: EcdsaChannelSigner, K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>
651+
Persist<ChannelSigner> for MonitorUpdatingPersister<K, L, ES, SP, BI, FE>
644652
where
645653
K::Target: KVStore,
646654
L::Target: Logger,
647655
ES::Target: EntropySource + Sized,
648656
SP::Target: SignerProvider + Sized,
657+
BI::Target: BroadcasterInterface,
658+
FE::Target: FeeEstimator
649659
{
650660
/// Persists a new channel. This means writing the entire monitor to the
651661
/// parametrized [`KVStore`].
@@ -770,13 +780,53 @@ where
770780
Ok((_block_hash, monitor)) => monitor,
771781
Err(_) => return
772782
};
783+
784+
let mut current_update_id = monitor.get_latest_update_id();
785+
loop {
786+
current_update_id = match current_update_id.checked_add(1) {
787+
Some(next_update_id) => next_update_id,
788+
None => break,
789+
};
790+
let update_name = UpdateName::from(current_update_id);
791+
let update = match self.read_monitor_update(&monitor_name, &update_name) {
792+
Ok(update) => update,
793+
Err(err) if err.kind() == io::ErrorKind::NotFound => {
794+
// We can't find any more updates, so we are done.
795+
break;
796+
}
797+
Err(err) => {
798+
log_error!(
799+
self.logger,
800+
"Monitor update read failed. monitor: {} update: {} reason: {:?}",
801+
monitor_name.as_str(),
802+
update_name.as_str(),
803+
err
804+
);
805+
panic!()
806+
},
807+
};
808+
809+
monitor.update_monitor(&update, &self.broadcaster, &self.fee_estimator, &self.logger)
810+
.map_err(|e| {
811+
log_error!(
812+
self.logger,
813+
"Monitor update failed. monitor: {} update: {} reason: {:?}",
814+
monitor_name.as_str(),
815+
update_name.as_str(),
816+
e
817+
);
818+
io::Error::new(io::ErrorKind::Other, "Monitor update failed")
819+
})
820+
.expect("Could not apply monitor update during archiving");
821+
}
822+
773823
match self.kv_store.write(
774824
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
775825
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
776826
monitor_name.as_str(),
777-
&monitor.encode()
827+
&monitor.encode(),
778828
) {
779-
Ok(()) => {},
829+
Ok(()) => {}
780830
Err(_e) => return,
781831
};
782832
let _ = self.kv_store.remove(
@@ -788,12 +838,14 @@ where
788838
}
789839
}
790840

791-
impl<K: Deref, L: Deref, ES: Deref, SP: Deref> MonitorUpdatingPersister<K, L, ES, SP>
841+
impl<K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref> MonitorUpdatingPersister<K, L, ES, SP, BI, FE>
792842
where
793843
ES::Target: EntropySource + Sized,
794844
K::Target: KVStore,
795845
L::Target: Logger,
796-
SP::Target: SignerProvider + Sized
846+
SP::Target: SignerProvider + Sized,
847+
BI::Target: BroadcasterInterface,
848+
FE::Target: FeeEstimator
797849
{
798850
// Cleans up monitor updates for given monitor in range `start..=end`.
799851
fn cleanup_in_range(&self, monitor_name: MonitorName, start: u64, end: u64) {
@@ -962,13 +1014,17 @@ mod tests {
9621014
maximum_pending_updates: persister_0_max_pending_updates,
9631015
entropy_source: &chanmon_cfgs[0].keys_manager,
9641016
signer_provider: &chanmon_cfgs[0].keys_manager,
1017+
broadcaster: &chanmon_cfgs[0].tx_broadcaster,
1018+
fee_estimator: &chanmon_cfgs[0].fee_estimator,
9651019
};
9661020
let persister_1 = MonitorUpdatingPersister {
9671021
kv_store: &TestStore::new(false),
9681022
logger: &TestLogger::new(),
9691023
maximum_pending_updates: persister_1_max_pending_updates,
9701024
entropy_source: &chanmon_cfgs[1].keys_manager,
9711025
signer_provider: &chanmon_cfgs[1].keys_manager,
1026+
broadcaster: &chanmon_cfgs[0].tx_broadcaster,
1027+
fee_estimator: &chanmon_cfgs[0].fee_estimator,
9721028
};
9731029
let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
9741030
let chain_mon_0 = test_utils::TestChainMonitor::new(
@@ -1129,6 +1185,8 @@ mod tests {
11291185
maximum_pending_updates: 11,
11301186
entropy_source: node_cfgs[0].keys_manager,
11311187
signer_provider: node_cfgs[0].keys_manager,
1188+
broadcaster: node_cfgs[0].tx_broadcaster,
1189+
fee_estimator: node_cfgs[0].fee_estimator,
11321190
};
11331191
match ro_persister.persist_new_channel(test_txo, &added_monitors[0].1) {
11341192
ChannelMonitorUpdateStatus::UnrecoverableError => {
@@ -1168,13 +1226,17 @@ mod tests {
11681226
maximum_pending_updates: test_max_pending_updates,
11691227
entropy_source: &chanmon_cfgs[0].keys_manager,
11701228
signer_provider: &chanmon_cfgs[0].keys_manager,
1229+
broadcaster: &chanmon_cfgs[0].tx_broadcaster,
1230+
fee_estimator: &chanmon_cfgs[0].fee_estimator,
11711231
};
11721232
let persister_1 = MonitorUpdatingPersister {
11731233
kv_store: &TestStore::new(false),
11741234
logger: &TestLogger::new(),
11751235
maximum_pending_updates: test_max_pending_updates,
11761236
entropy_source: &chanmon_cfgs[1].keys_manager,
11771237
signer_provider: &chanmon_cfgs[1].keys_manager,
1238+
broadcaster: &chanmon_cfgs[0].tx_broadcaster,
1239+
fee_estimator: &chanmon_cfgs[0].fee_estimator,
11781240
};
11791241
let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
11801242
let chain_mon_0 = test_utils::TestChainMonitor::new(

0 commit comments

Comments
 (0)