Skip to content

Commit 6876187

Browse files
Aditya SharmaAditya Sharma
authored andcommitted
chainmonitor: Add persistence logic for StubChannelMonitor and appropriate helpers to reload it.
1 parent 4954c62 commit 6876187

File tree

8 files changed

+453
-31
lines changed

8 files changed

+453
-31
lines changed

fuzz/src/chanmon_consistency.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ use lightning::ln::channel::FEE_SPIKE_BUFFER_FEE_INCREASE_MULTIPLE;
4848
use lightning::ln::channel_state::ChannelDetails;
4949
use lightning::ln::channelmanager::{
5050
ChainParameters, ChannelManager, ChannelManagerReadArgs, PaymentId, PaymentSendFailure,
51-
RecipientOnionFields,
51+
RecipientOnionFields, StubChannel,
5252
};
5353
use lightning::ln::functional_test_utils::*;
5454
use lightning::ln::msgs::{
@@ -287,6 +287,22 @@ impl chain::Watch<TestChannelSigner> for TestChainMonitor {
287287
) -> Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, Option<PublicKey>)> {
288288
return self.chain_monitor.release_pending_monitor_events();
289289
}
290+
291+
fn watch_dummy(
292+
&self, funding_outpoint: OutPoint, stub_monitor: StubChannelMonitor<ChannelSigner>,
293+
) -> Result<(), ()> {
294+
return self.chain_monitor.watch_dummy(funding_outpoint, stub_monitor);
295+
}
296+
297+
fn stale_or_missing_channel_monitor(&self, stub_chan: &StubChannel) -> bool {
298+
return self.chain_monitor.stale_or_missing_channel_monitor(stub_chan);
299+
}
300+
301+
fn panic_and_persist_stub_channel(
302+
&self, funding_outpoint: OutPoint, stub_monitor: StubChannelMonitor<ChannelSigner>,
303+
) -> Result<(), ()> {
304+
return self.chain_monitor.panic_and_persist_stub_channel(funding_outpoint, stub_monitor);
305+
}
290306
}
291307

292308
struct KeyProvider {

lightning/src/chain/chainmonitor.rs

Lines changed: 177 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use bitcoin::hash_types::{Txid, BlockHash};
2929
use crate::chain;
3030
use crate::chain::{ChannelMonitorUpdateStatus, Filter, WatchedOutput};
3131
use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
32-
use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs, WithChannelMonitor};
32+
use crate::chain::channelmonitor::{ChannelMonitor, StubChannel, StubChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs, WithChannelMonitor};
3333
use crate::chain::transaction::{OutPoint, TransactionData};
3434
use crate::ln::types::ChannelId;
3535
use crate::sign::ecdsa::EcdsaChannelSigner;
@@ -163,6 +163,9 @@ pub trait Persist<ChannelSigner: EcdsaChannelSigner> {
163163
/// Archiving the data in a backup location (rather than deleting it fully) is useful for
164164
/// hedging against data loss in case of unexpected failure.
165165
fn archive_persisted_channel(&self, channel_funding_outpoint: OutPoint);
166+
/// Persist a new channel's data in response to a [`chain::Watch::watch_dummy`] call. This is
167+
/// called by [`ChannelManager`] for new stub channels received from peer storage backup,
168+
fn persist_new_stub_channel(&self, funding_txo: OutPoint, stub_monitor: &StubChannelMonitor<ChannelSigner>) -> Result<(), bitcoin::io::Error>;
166169
}
167170

168171
struct MonitorHolder<ChannelSigner: EcdsaChannelSigner> {
@@ -199,6 +202,19 @@ pub struct LockedChannelMonitor<'a, ChannelSigner: EcdsaChannelSigner> {
199202
funding_txo: OutPoint,
200203
}
201204

205+
/// A read-only reference to a current StubChannelMonitors similar to [LockedChannelMonitor]
206+
pub struct LockedStubChannelMonitor<'a, ChannelSigner: EcdsaChannelSigner> {
207+
lock: RwLockReadGuard<'a, HashMap<OutPoint, StubChannelMonitor<ChannelSigner>>>,
208+
funding_txo: OutPoint,
209+
}
210+
211+
impl<ChannelSigner: EcdsaChannelSigner> Deref for LockedStubChannelMonitor<'_, ChannelSigner> {
212+
type Target = StubChannelMonitor<ChannelSigner>;
213+
fn deref(&self) -> &StubChannelMonitor<ChannelSigner> {
214+
&self.lock.get(&self.funding_txo).expect("Checked at construction")
215+
}
216+
}
217+
202218
impl<ChannelSigner: EcdsaChannelSigner> Deref for LockedChannelMonitor<'_, ChannelSigner> {
203219
type Target = ChannelMonitor<ChannelSigner>;
204220
fn deref(&self) -> &ChannelMonitor<ChannelSigner> {
@@ -230,6 +246,8 @@ pub struct ChainMonitor<ChannelSigner: EcdsaChannelSigner, C: Deref, T: Deref, F
230246
P::Target: Persist<ChannelSigner>,
231247
{
232248
monitors: RwLock<HashMap<OutPoint, MonitorHolder<ChannelSigner>>>,
249+
stub_monitors: RwLock<HashMap<OutPoint, StubChannelMonitor<ChannelSigner>>>,
250+
233251
chain_source: Option<C>,
234252
broadcaster: T,
235253
logger: L,
@@ -264,9 +282,10 @@ where C::Target: chain::Filter,
264282
/// updated `txdata`.
265283
///
266284
/// Calls which represent a new blockchain tip height should set `best_height`.
267-
fn process_chain_data<FN>(&self, header: &Header, best_height: Option<u32>, txdata: &TransactionData, process: FN)
285+
fn process_chain_data<FN, SN>(&self, header: &Header, best_height: Option<u32>, txdata: &TransactionData, process: FN, stub_process: SN)
268286
where
269-
FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs>
287+
FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs>,
288+
SN: Fn(&StubChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs>
270289
{
271290
let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
272291
let funding_outpoints = hash_set_from_iter(self.monitors.read().unwrap().keys().cloned());
@@ -296,6 +315,14 @@ where C::Target: chain::Filter,
296315
}
297316
}
298317

318+
let stub_monitors = self.stub_monitors.write().unwrap();
319+
for (funding_outpoint, stub_monitor) in stub_monitors.iter() {
320+
if self.update_stub_with_chain_data(header, best_height, txdata, &stub_process, funding_outpoint, stub_monitor).is_err() {
321+
log_error!(self.logger, "{}", err_str);
322+
panic!("{}", err_str);
323+
};
324+
}
325+
299326
if let Some(height) = best_height {
300327
// If the best block height is being updated, update highest_chain_height under the
301328
// monitors write lock.
@@ -307,6 +334,34 @@ where C::Target: chain::Filter,
307334
}
308335
}
309336

337+
fn update_stub_with_chain_data<SN>(&self, header: &Header, _best_height: Option<u32>, txdata: &TransactionData, stub_process: SN,
338+
_funding_outpoint: &OutPoint, stub_monitor: &StubChannelMonitor<ChannelSigner>) -> Result<(), ()>
339+
where SN: Fn(&StubChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs> {
340+
let logger = WithChannelMonitor::from_stub(&self.logger, stub_monitor);
341+
let mut txn_outputs;
342+
{
343+
txn_outputs = stub_process(stub_monitor, txdata);
344+
}
345+
346+
if let Some(ref chain_source) = self.chain_source {
347+
let block_hash = header.block_hash();
348+
for (txid, mut outputs) in txn_outputs.drain(..) {
349+
for (idx, output) in outputs.drain(..) {
350+
// Register any new outputs with the chain source for filtering
351+
let output = WatchedOutput {
352+
block_hash: Some(block_hash),
353+
outpoint: OutPoint { txid, index: idx as u16 },
354+
script_pubkey: output.script_pubkey,
355+
};
356+
log_trace!(logger, "Adding monitoring for spends of outpoint from stub {} to the filter", output.outpoint);
357+
chain_source.register_output(output);
358+
}
359+
}
360+
}
361+
362+
Ok(())
363+
}
364+
310365
fn update_monitor_with_chain_data<FN>(
311366
&self, header: &Header, best_height: Option<u32>, txdata: &TransactionData, process: FN, funding_outpoint: &OutPoint,
312367
monitor_state: &MonitorHolder<ChannelSigner>, channel_count: usize,
@@ -381,6 +436,7 @@ where C::Target: chain::Filter,
381436
pub fn new(chain_source: Option<C>, broadcaster: T, logger: L, feeest: F, persister: P) -> Self {
382437
Self {
383438
monitors: RwLock::new(new_hash_map()),
439+
stub_monitors: RwLock::new(new_hash_map()),
384440
chain_source,
385441
broadcaster,
386442
logger,
@@ -430,6 +486,20 @@ where C::Target: chain::Filter,
430486
}
431487
}
432488

489+
/// Gets the [`LockedStubChannelMonitor`] for a given funding outpoint, returning an `Err` if no
490+
/// such [`StubChannelMonitor`] is currently being monitored for.
491+
///
492+
/// Note that the result holds a mutex over our monitor set, and should not be held
493+
/// indefinitely.
494+
pub fn get_stub_monitor(&self, funding_txo: OutPoint) -> Result<LockedStubChannelMonitor<'_, ChannelSigner>, ()> {
495+
let lock = self.stub_monitors.read().unwrap();
496+
if lock.get(&funding_txo).is_some() {
497+
Ok(LockedStubChannelMonitor { lock, funding_txo })
498+
} else {
499+
Err(())
500+
}
501+
}
502+
433503
/// Lists the funding outpoint and channel ID of each [`ChannelMonitor`] being monitored.
434504
///
435505
/// Note that [`ChannelMonitor`]s are not removed when a channel is closed as they are always
@@ -441,6 +511,17 @@ where C::Target: chain::Filter,
441511
}).collect()
442512
}
443513

514+
/// Lists the funding outpoint and channel ID of each [`StubChannelMonitor`] being monitored.
515+
///
516+
/// Note that [`StubChannelMonitor`]s are not removed when a channel is closed as they are always
517+
/// monitoring for on-chain state resolutions.
518+
pub fn list_stub_monitors(&self) -> Vec<(OutPoint, ChannelId)> {
519+
self.stub_monitors.read().unwrap().iter().map(|(outpoint, stub_monitor)| {
520+
let channel_id = stub_monitor.channel_id();
521+
(*outpoint, channel_id)
522+
}).collect()
523+
}
524+
444525
#[cfg(not(c_bindings))]
445526
/// Lists the pending updates for each [`ChannelMonitor`] (by `OutPoint` being monitored).
446527
/// Each `Vec<u64>` contains `update_id`s from [`ChannelMonitor::get_latest_update_id`] for updates
@@ -672,6 +753,9 @@ where
672753
self.process_chain_data(header, Some(height), &txdata, |monitor, txdata| {
673754
monitor.block_connected(
674755
header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &self.logger)
756+
}, |stub_monitor, txdata| {
757+
stub_monitor.block_connected(
758+
header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &self.logger)
675759
});
676760
// Assume we may have some new events and wake the event processor
677761
self.event_notifier.notify();
@@ -701,6 +785,9 @@ where
701785
self.process_chain_data(header, None, txdata, |monitor, txdata| {
702786
monitor.transactions_confirmed(
703787
header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &self.logger)
788+
}, |stub_monitor, txdata| {
789+
stub_monitor.transactions_confirmed(
790+
header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &self.logger)
704791
});
705792
// Assume we may have some new events and wake the event processor
706793
self.event_notifier.notify();
@@ -723,6 +810,10 @@ where
723810
monitor.best_block_updated(
724811
header, height, &*self.broadcaster, &*self.fee_estimator, &self.logger
725812
)
813+
}, |stub_monitor, txdata| {
814+
debug_assert!(txdata.is_empty());
815+
stub_monitor.best_block_updated(
816+
header, height, &*self.broadcaster, &*self.fee_estimator, &self.logger)
726817
});
727818
// Assume we may have some new events and wake the event processor
728819
self.event_notifier.notify();
@@ -749,6 +840,89 @@ where C::Target: chain::Filter,
749840
L::Target: Logger,
750841
P::Target: Persist<ChannelSigner>,
751842
{
843+
fn watch_dummy(&self, funding_outpoint: OutPoint, stub_monitor: StubChannelMonitor<ChannelSigner>) -> Result<(), ()> {
844+
let logger = WithChannelMonitor::from_stub(&self.logger, &stub_monitor);
845+
let mut stub_monitors = self.stub_monitors.write().unwrap();
846+
let entry = match stub_monitors.entry(funding_outpoint) {
847+
hash_map::Entry::Occupied(_) => {
848+
log_error!(logger, "Failed to add new channel data: channel monitor for given outpoint is already present");
849+
return Err(());
850+
},
851+
hash_map::Entry::Vacant(e) => e,
852+
};
853+
log_trace!(logger, "Got new StubChannelMonitor for channel {}", stub_monitor.channel_id());
854+
855+
if let Some(ref chain_source) = self.chain_source {
856+
stub_monitor.load_outputs_to_watch(chain_source , &self.logger);
857+
}
858+
entry.insert(stub_monitor);
859+
860+
Ok(())
861+
}
862+
863+
fn stale_or_missing_channel_monitor(&self, stub_chan: &StubChannel) -> bool {
864+
let monitors = self.monitors.read().unwrap();
865+
866+
if let Some(mon) = monitors.get(&stub_chan.funding_outpoint) {
867+
return stub_chan.get_min_seen_secret() < mon.monitor.get_min_seen_secret();
868+
}
869+
return true;
870+
}
871+
872+
fn panic_and_persist_stub_channel(&self, funding_outpoint: OutPoint, stub_monitor: StubChannelMonitor<ChannelSigner>) -> Result<(), ()> {
873+
let logger = WithChannelMonitor::from_stub(&self.logger, &stub_monitor);
874+
let mut monitors = self.monitors.write().unwrap();
875+
{
876+
let mut stub_monitors = self.stub_monitors.write().unwrap();
877+
match stub_monitors.entry(funding_outpoint) {
878+
hash_map::Entry::Occupied(mut stub) => {
879+
let stored_stub = stub.get_mut();
880+
if stub_monitor.get_min_seen_secret() < stored_stub.get_min_seen_secret() {
881+
// This would be useful if we received a more recent StubMonitor than stored.
882+
log_error!(logger, "Updating StubChannelMonitor with the latest data.");
883+
stored_stub.update_from_new_stub_monitor(&stub_monitor);
884+
}
885+
log_debug!(logger, "StubChannelMonitor already exists.");
886+
return Ok(());
887+
},
888+
hash_map::Entry::Vacant(_) => {},
889+
};
890+
}
891+
892+
match monitors.entry(funding_outpoint) {
893+
hash_map::Entry::Occupied(p) => {
894+
if p.get().monitor.get_min_seen_secret() > stub_monitor.get_min_seen_secret() {
895+
let persist_res = self.persister.persist_new_stub_channel(funding_outpoint, &stub_monitor);
896+
log_debug!(logger, "Persisting new StubChannel for ChannelID: {}", stub_monitor.channel_id());
897+
if persist_res.is_err() {
898+
log_error!(logger, "Failed to add new dummy channel data");
899+
return Err(());
900+
}
901+
#[cfg(not(test))]
902+
panic!("We've lost state for channel {}. Persisting the StubChannelMonitor", stub_monitor.channel_id());
903+
904+
#[cfg(test)]
905+
return self.watch_dummy(funding_outpoint, stub_monitor);
906+
}
907+
log_debug!(logger, "Skip StubChannelMonitor for {}, an up-to-date ChannelMonitor already exists.", stub_monitor.channel_id());
908+
return Ok(());
909+
},
910+
hash_map::Entry::Vacant(_) => {},
911+
};
912+
913+
log_debug!(logger, "Persisting new StubChannel for ChannelID: {}", stub_monitor.channel_id());
914+
let persist_res = self.persister.persist_new_stub_channel(funding_outpoint, &stub_monitor);
915+
916+
if persist_res.is_err() {
917+
log_error!(logger, "Failed to add new dummy channel data");
918+
return Err(());
919+
}
920+
#[cfg(not(test))]
921+
panic!("Found a missing channel {}", stub_monitor.channel_id());
922+
#[cfg(test)]
923+
return self.watch_dummy(funding_outpoint, stub_monitor);
924+
}
925+
752926
fn watch_channel(&self, funding_outpoint: OutPoint, monitor: ChannelMonitor<ChannelSigner>) -> Result<ChannelMonitorUpdateStatus, ()> {
753927
let logger = WithChannelMonitor::from(&self.logger, &monitor, None);
754928
let mut monitors = self.monitors.write().unwrap();

0 commit comments

Comments
 (0)