Skip to content

Commit d1c36bd

Browse files
Aditya SharmaAditya Sharma
authored andcommitted
channelmanager: Create FundRecoverer to take our node in offline mode so that we can just send a BogusChannelReestablish and close all the StubChannelMonitors and sweep the funds from the events.
1 parent b3e1128 commit d1c36bd

File tree

7 files changed

+951
-105
lines changed

7 files changed

+951
-105
lines changed

lightning/src/chain/chainmonitor.rs

Lines changed: 90 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -165,8 +165,8 @@ pub trait Persist<ChannelSigner: EcdsaChannelSigner> {
165165
fn archive_persisted_channel(&self, channel_funding_outpoint: OutPoint);
166166
}
167167

168-
struct MonitorHolder<ChannelSigner: EcdsaChannelSigner> {
169-
monitor: ChannelMonitor<ChannelSigner>,
168+
pub(crate) struct MonitorHolder<ChannelSigner: EcdsaChannelSigner> {
169+
pub(crate) monitor: ChannelMonitor<ChannelSigner>,
170170
/// The full set of pending monitor updates for this Channel.
171171
///
172172
/// Note that this lock must be held from [`ChannelMonitor::update_monitor`] through to
@@ -181,7 +181,7 @@ struct MonitorHolder<ChannelSigner: EcdsaChannelSigner> {
181181
/// could cause users to have a full [`ChannelMonitor`] on disk as well as a
182182
/// [`ChannelMonitorUpdate`] which was already applied. While this isn't an issue for the
183183
/// LDK-provided update-based [`Persist`], it is somewhat surprising for users so we avoid it.
184-
pending_monitor_updates: Mutex<Vec<u64>>,
184+
pub(crate) pending_monitor_updates: Mutex<Vec<u64>>,
185185
}
186186

187187
impl<ChannelSigner: EcdsaChannelSigner> MonitorHolder<ChannelSigner> {
@@ -195,8 +195,8 @@ impl<ChannelSigner: EcdsaChannelSigner> MonitorHolder<ChannelSigner> {
195195
/// Note that this holds a mutex in [`ChainMonitor`] and may block other events until it is
196196
/// released.
197197
pub struct LockedChannelMonitor<'a, ChannelSigner: EcdsaChannelSigner> {
198-
lock: RwLockReadGuard<'a, HashMap<OutPoint, MonitorHolder<ChannelSigner>>>,
199-
funding_txo: OutPoint,
198+
pub(crate) lock: RwLockReadGuard<'a, HashMap<OutPoint, MonitorHolder<ChannelSigner>>>,
199+
pub(crate) funding_txo: OutPoint,
200200
}
201201

202202
impl<ChannelSigner: EcdsaChannelSigner> Deref for LockedChannelMonitor<'_, ChannelSigner> {
@@ -247,73 +247,19 @@ pub struct ChainMonitor<ChannelSigner: EcdsaChannelSigner, C: Deref, T: Deref, F
247247
event_notifier: Notifier,
248248
}
249249

250-
impl<ChannelSigner: EcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> ChainMonitor<ChannelSigner, C, T, F, L, P>
251-
where C::Target: chain::Filter,
252-
T::Target: BroadcasterInterface,
253-
F::Target: FeeEstimator,
254-
L::Target: Logger,
255-
P::Target: Persist<ChannelSigner>,
256-
{
257-
/// Dispatches to per-channel monitors, which are responsible for updating their on-chain view
258-
/// of a channel and reacting accordingly based on transactions in the given chain data. See
259-
/// [`ChannelMonitor::block_connected`] for details. Any HTLCs that were resolved on chain will
260-
/// be returned by [`chain::Watch::release_pending_monitor_events`].
261-
///
262-
/// Calls back to [`chain::Filter`] if any monitor indicated new outputs to watch. Subsequent
263-
/// calls must not exclude any transactions matching the new outputs nor any in-block
264-
/// descendants of such transactions. It is not necessary to re-fetch the block to obtain
265-
/// updated `txdata`.
266-
///
267-
/// Calls which represent a new blockchain tip height should set `best_height`.
268-
fn process_chain_data<FN>(&self, header: &Header, best_height: Option<u32>, txdata: &TransactionData, process: FN)
250+
pub(crate) fn update_monitor_with_chain_data_util <FN, P: Deref, ChannelSigner, C:Deref, L:Deref>(
251+
persister: &P, chain_source: &Option<C>, logger: &L, header: &Header, best_height: Option<u32>, txdata: &TransactionData, process: FN, funding_outpoint: &OutPoint,
252+
monitor_state: &MonitorHolder<ChannelSigner>, channel_count: usize,
253+
) -> Result<(), ()>
269254
where
270-
FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs>,
255+
C::Target: chain::Filter,
256+
FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs>,
257+
P::Target: Persist<ChannelSigner>,
258+
L::Target: Logger,
259+
ChannelSigner: EcdsaChannelSigner,
271260
{
272-
let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
273-
let funding_outpoints = hash_set_from_iter(self.monitors.read().unwrap().keys().cloned());
274-
let channel_count = funding_outpoints.len();
275-
for funding_outpoint in funding_outpoints.iter() {
276-
let monitor_lock = self.monitors.read().unwrap();
277-
if let Some(monitor_state) = monitor_lock.get(funding_outpoint) {
278-
if self.update_monitor_with_chain_data(header, best_height, txdata, &process, funding_outpoint, &monitor_state, channel_count).is_err() {
279-
// Take the monitors lock for writing so that we poison it and any future
280-
// operations going forward fail immediately.
281-
core::mem::drop(monitor_lock);
282-
let _poison = self.monitors.write().unwrap();
283-
log_error!(self.logger, "{}", err_str);
284-
panic!("{}", err_str);
285-
}
286-
}
287-
}
288-
289-
// do some followup cleanup if any funding outpoints were added in between iterations
290-
let monitor_states = self.monitors.write().unwrap();
291-
for (funding_outpoint, monitor_state) in monitor_states.iter() {
292-
if !funding_outpoints.contains(funding_outpoint) {
293-
if self.update_monitor_with_chain_data(header, best_height, txdata, &process, funding_outpoint, &monitor_state, channel_count).is_err() {
294-
log_error!(self.logger, "{}", err_str);
295-
panic!("{}", err_str);
296-
}
297-
}
298-
}
299-
300-
if let Some(height) = best_height {
301-
// If the best block height is being updated, update highest_chain_height under the
302-
// monitors write lock.
303-
let old_height = self.highest_chain_height.load(Ordering::Acquire);
304-
let new_height = height as usize;
305-
if new_height > old_height {
306-
self.highest_chain_height.store(new_height, Ordering::Release);
307-
}
308-
}
309-
}
310-
311-
fn update_monitor_with_chain_data<FN>(
312-
&self, header: &Header, best_height: Option<u32>, txdata: &TransactionData, process: FN, funding_outpoint: &OutPoint,
313-
monitor_state: &MonitorHolder<ChannelSigner>, channel_count: usize,
314-
) -> Result<(), ()> where FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs> {
315261
let monitor = &monitor_state.monitor;
316-
let logger = WithChannelMonitor::from(&self.logger, &monitor, None);
262+
let logger = WithChannelMonitor::from(logger, &monitor, None);
317263

318264
let mut txn_outputs = process(monitor, txdata);
319265

@@ -338,7 +284,7 @@ where C::Target: chain::Filter,
338284
// `ChannelMonitorUpdate` after a channel persist for a channel with the same
339285
// `latest_update_id`.
340286
let _pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap();
341-
match self.persister.update_persisted_channel(*funding_outpoint, None, monitor) {
287+
match persister.update_persisted_channel(*funding_outpoint, None, monitor) {
342288
ChannelMonitorUpdateStatus::Completed =>
343289
log_trace!(logger, "Finished syncing Channel Monitor for channel {} for block-data",
344290
log_funding_info!(monitor)
@@ -354,7 +300,7 @@ where C::Target: chain::Filter,
354300

355301
// Register any new outputs with the chain source for filtering, storing any dependent
356302
// transactions from within the block that previously had not been included in txdata.
357-
if let Some(ref chain_source) = self.chain_source {
303+
if let Some(ref chain_source_ref) = chain_source {
358304
let block_hash = header.block_hash();
359305
for (txid, mut outputs) in txn_outputs.drain(..) {
360306
for (idx, output) in outputs.drain(..) {
@@ -365,13 +311,85 @@ where C::Target: chain::Filter,
365311
script_pubkey: output.script_pubkey,
366312
};
367313
log_trace!(logger, "Adding monitoring for spends of outpoint {} to the filter", output.outpoint);
368-
chain_source.register_output(output);
314+
chain_source_ref.register_output(output);
369315
}
370316
}
371317
}
372318
Ok(())
373319
}
374320

321+
pub(crate) fn process_chain_data_util<FN, ChannelSigner: EcdsaChannelSigner, L: Deref, P: Deref, C: Deref>(persister: &P, chain_source: &Option<C>,
322+
logger: &L, monitors: &RwLock<HashMap<OutPoint, MonitorHolder<ChannelSigner>>>, highest_chain_height: &AtomicUsize,
323+
header: &Header, best_height: Option<u32>, txdata: &TransactionData, process: FN)
324+
where
325+
FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs>,
326+
L::Target: Logger,
327+
P::Target: Persist<ChannelSigner>,
328+
C::Target: chain::Filter,
329+
{
330+
let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
331+
let funding_outpoints = hash_set_from_iter(monitors.read().unwrap().keys().cloned());
332+
let channel_count = funding_outpoints.len();
333+
for funding_outpoint in funding_outpoints.iter() {
334+
let monitor_lock = monitors.read().unwrap();
335+
if let Some(monitor_state) = monitor_lock.get(funding_outpoint) {
336+
if update_monitor_with_chain_data_util(persister, chain_source, logger, header, best_height, txdata, &process, funding_outpoint, &monitor_state, channel_count).is_err() {
337+
// Take the monitors lock for writing so that we poison it and any future
338+
// operations going forward fail immediately.
339+
core::mem::drop(monitor_lock);
340+
let _poison = monitors.write().unwrap();
341+
log_error!(logger, "{}", err_str);
342+
panic!("{}", err_str);
343+
}
344+
}
345+
}
346+
347+
// do some followup cleanup if any funding outpoints were added in between iterations
348+
let monitor_states = monitors.write().unwrap();
349+
for (funding_outpoint, monitor_state) in monitor_states.iter() {
350+
if !funding_outpoints.contains(funding_outpoint) {
351+
if update_monitor_with_chain_data_util(persister, chain_source, logger, header, best_height, txdata, &process, funding_outpoint, &monitor_state, channel_count).is_err() {
352+
log_error!(logger, "{}", err_str);
353+
panic!("{}", err_str);
354+
}
355+
}
356+
}
357+
358+
if let Some(height) = best_height {
359+
// If the best block height is being updated, update highest_chain_height under the
360+
// monitors write lock.
361+
let old_height = highest_chain_height.load(Ordering::Acquire);
362+
let new_height = height as usize;
363+
if new_height > old_height {
364+
highest_chain_height.store(new_height, Ordering::Release);
365+
}
366+
}
367+
}
368+
impl<ChannelSigner: EcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> ChainMonitor<ChannelSigner, C, T, F, L, P>
369+
where C::Target: chain::Filter,
370+
T::Target: BroadcasterInterface,
371+
F::Target: FeeEstimator,
372+
L::Target: Logger,
373+
P::Target: Persist<ChannelSigner>,
374+
{
375+
/// Dispatches to per-channel monitors, which are responsible for updating their on-chain view
376+
/// of a channel and reacting accordingly based on transactions in the given chain data. See
377+
/// [`ChannelMonitor::block_connected`] for details. Any HTLCs that were resolved on chain will
378+
/// be returned by [`chain::Watch::release_pending_monitor_events`].
379+
///
380+
/// Calls back to [`chain::Filter`] if any monitor indicated new outputs to watch. Subsequent
381+
/// calls must not exclude any transactions matching the new outputs nor any in-block
382+
/// descendants of such transactions. It is not necessary to re-fetch the block to obtain
383+
/// updated `txdata`.
384+
///
385+
/// Calls which represent a new blockchain tip height should set `best_height`.
386+
fn process_chain_data<FN>(&self, header: &Header, best_height: Option<u32>, txdata: &TransactionData, process: FN)
387+
where
388+
FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs>,
389+
{
390+
process_chain_data_util(&self.persister, &self.chain_source, &self.logger, &self.monitors, &self.highest_chain_height, header, best_height, txdata, process);
391+
}
392+
375393
/// Creates a new `ChainMonitor` used to watch on-chain activity pertaining to channels.
376394
///
377395
/// When an optional chain source implementing [`chain::Filter`] is provided, the chain monitor
@@ -750,17 +768,6 @@ where C::Target: chain::Filter,
750768
L::Target: Logger,
751769
P::Target: Persist<ChannelSigner>,
752770
{
753-
fn get_stub_cids_with_counterparty(&self, counterparty_node_id: PublicKey) -> Vec<ChannelId> {
754-
let stub_monitors = self.stub_monitors.read().unwrap();
755-
let mut stubs = vec![];
756-
for (_, mon) in stub_monitors.iter() {
757-
if mon.get_counterparty_node_id() == Some(counterparty_node_id) {
758-
stubs.push(mon.channel_id());
759-
}
760-
}
761-
stubs
762-
}
763-
764771
fn watch_channel(&self, funding_outpoint: OutPoint, monitor: ChannelMonitor<ChannelSigner>) -> Result<ChannelMonitorUpdateStatus, ()> {
765772
let logger = WithChannelMonitor::from(&self.logger, &monitor, None);
766773
let mut monitors = self.monitors.write().unwrap();

lightning/src/chain/channelmonitor.rs

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1440,6 +1440,13 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitor<Signer> {
14401440
})
14411441
}
14421442

1443+
pub(crate) fn merge_commitment_secret(&mut self, monitor: ChannelMonitor<Signer>) {
1444+
if self.get_min_seen_secret() > monitor.get_min_seen_secret() {
1445+
let inner = monitor.inner.lock().unwrap();
1446+
self.inner.lock().unwrap().commitment_secrets = inner.commitment_secrets.clone();
1447+
}
1448+
}
1449+
14431450
/// Returns a [`ChannelMonitor`] using [`StubChannelMonitor`] and other
14441451
/// important information to sweep funds and create penalty transactions.
14451452
pub(crate) fn new_stub(secp_ctx: Secp256k1<secp256k1::All>, stub_channel: &StubChannelMonitor, keys: Signer, channel_parameters: ChannelTransactionParameters ,funding_info_scriptbuf: ScriptBuf, destination_script: ScriptBuf) -> ChannelMonitor<Signer> {
@@ -1689,6 +1696,10 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitor<Signer> {
16891696
}
16901697
}
16911698

1699+
pub fn update_latest_state_from_new_stubmonitor(&self, stub: &StubChannelMonitor) {
1700+
self.inner.lock().unwrap().update_latest_state_from_new_stubmonitor(stub);
1701+
}
1702+
16921703
/// Get the list of HTLCs who's status has been updated on chain. This should be called by
16931704
/// ChannelManager via [`chain::Watch::release_pending_monitor_events`].
16941705
pub fn get_and_clear_pending_monitor_events(&self) -> Vec<MonitorEvent> {
@@ -3532,13 +3543,14 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitorImpl<Signer> {
35323543
self.current_holder_commitment_number
35333544
}
35343545

3535-
/// Updates the [`StubChannelMonitor`] when we receive a new more recent
3536-
/// peer storage from our peer. This souldn't be called through [`ChannelMonitor`].
3537-
fn update_latest_state_from_new_stubchannelmonitor(&mut self, stub: &StubChannelMonitor<Signer>) {
3538-
let inner = stub.inner.lock().unwrap();
3539-
self.commitment_secrets = inner.commitment_secrets.clone();
3540-
self.counterparty_claimable_outpoints = inner.counterparty_claimable_outpoints.clone();
3541-
self.their_cur_per_commitment_points = inner.their_cur_per_commitment_points.clone();
3546+
/// Updates the [`ChannelMonitor`] when we receive a new more recent
3547+
/// peer storage from our peer.
3548+
fn update_latest_state_from_new_stubmonitor(&mut self, stub: &StubChannelMonitor) {
3549+
let mut latest_state = new_hash_map();
3550+
latest_state.insert(stub.latest_state.unwrap(), Vec::new());
3551+
self.commitment_secrets = stub.commitment_secrets.clone();
3552+
self.counterparty_claimable_outpoints = latest_state;
3553+
self.their_cur_per_commitment_points = stub.their_cur_per_commitment_points.clone();
35423554
}
35433555

35443556
/// Attempts to claim a counterparty commitment transaction's outputs using the revocation key and

lightning/src/chain/mod.rs

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -305,15 +305,6 @@ pub trait Watch<ChannelSigner: EcdsaChannelSigner> {
305305
/// For details on asynchronous [`ChannelMonitor`] updating and returning
306306
/// [`MonitorEvent::Completed`] here, see [`ChannelMonitorUpdateStatus::InProgress`].
307307
fn release_pending_monitor_events(&self) -> Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, Option<PublicKey>)>;
308-
309-
/// Retrieves a list of channel IDs for [`StubChannelMonitor`] associated with a specific counterparty node ID.
310-
///
311-
/// This function searches through the collection of [`StubChannelMonitor`] and collects the channel IDs
312-
/// of those monitors that have the specified counterparty node ID.
313-
///
314-
/// This is used by [`FundRecoverer`] to fetch all the [`ChannelId`] with a peer that needs recovery so that we can send them
315-
/// `BogusChannelReestablish`.
316-
fn get_stub_cids_with_counterparty(&self, counterparty_node_id: PublicKey) -> Vec<ChannelId>;
317308
}
318309

319310
/// The `Filter` trait defines behavior for indicating chain activity of interest pertaining to

lightning/src/events/mod.rs

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ pub use bump_transaction::BumpTransactionEvent;
2020

2121
use crate::blinded_path::message::OffersContext;
2222
use crate::blinded_path::payment::{Bolt12OfferContext, Bolt12RefundContext, PaymentContext, PaymentContextRef};
23-
use crate::chain::transaction;
23+
use crate::chain::{transaction, BestBlock};
2424
use crate::ln::channelmanager::{InterceptId, PaymentId, RecipientOnionFields};
2525
use crate::ln::channel::FUNDING_CONF_DEADLINE_BLOCKS;
2626
use crate::ln::features::ChannelTypeFeatures;
@@ -2281,6 +2281,13 @@ impl MaybeReadable for Event {
22812281
}
22822282
}
22832283

2284+
#[derive(Clone, Debug, PartialEq, Eq)]
2285+
pub enum RecoveryEvent {
2286+
RescanBlock {
2287+
rescan_from: BestBlock
2288+
},
2289+
}
2290+
22842291
/// An event generated by ChannelManager which indicates a message should be sent to a peer (or
22852292
/// broadcast to most peers).
22862293
/// These events are handled by PeerManager::process_events if you are using a PeerManager.
@@ -2630,6 +2637,10 @@ pub trait EventsProvider {
26302637
fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler;
26312638
}
26322639

2640+
pub trait RecoverEventsProvider: EventsProvider {
2641+
fn process_pending_recovery_events<RH: Deref>(&self, handler:RH) where RH::Target: RecoveryHandler;
2642+
}
2643+
26332644
/// An error type that may be returned to LDK in order to safely abort event handling if it can't
26342645
/// currently succeed (e.g., due to a persistence failure).
26352646
///
@@ -2661,3 +2672,19 @@ impl<T: EventHandler> EventHandler for Arc<T> {
26612672
self.deref().handle_event(event)
26622673
}
26632674
}
2675+
2676+
pub trait RecoveryHandler {
2677+
fn handle_recovery_event(&self, event: RecoveryEvent) -> Result<(), ReplayEvent>;
2678+
}
2679+
2680+
impl<F> RecoveryHandler for F where F: Fn(RecoveryEvent) -> Result<(), ReplayEvent> {
2681+
fn handle_recovery_event(&self, event: RecoveryEvent) -> Result<(), ReplayEvent> {
2682+
self(event)
2683+
}
2684+
}
2685+
2686+
impl<T: RecoveryHandler> RecoveryHandler for Arc<T> {
2687+
fn handle_recovery_event(&self, event: RecoveryEvent) -> Result<(), ReplayEvent> {
2688+
self.deref().handle_recovery_event(event)
2689+
}
2690+
}

0 commit comments

Comments
 (0)