Skip to content

Commit ebd6e06

Browse files
Aditya SharmaAditya Sharma
authored andcommitted
channelmanager: Create FundRecoverer to take our node in offline mode so that we can just send a BogusChannelReestablish and close all the StubChannelMonitors and sweep the funds from the events.
1 parent fb53c43 commit ebd6e06

File tree

5 files changed

+218
-1
lines changed

5 files changed

+218
-1
lines changed

fuzz/src/chanmon_consistency.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,10 @@ impl chain::Watch<TestChannelSigner> for TestChainMonitor {
303303
) -> Result<(), ()> {
304304
return self.chain_monitor.panic_and_persist_stub_channel(funding_outpoint, stub_monitor);
305305
}
306+
307+
fn get_stub_cids_with_counterparty(&self, counterparty_node_id: PublicKey) -> Vec<ChannelId> {
308+
return self.chain_monitor.get_stub_cids_with_counterparty(counterparty_node_id);
309+
}
306310
}
307311

308312
struct KeyProvider {

lightning/src/chain/chainmonitor.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -860,6 +860,17 @@ where C::Target: chain::Filter,
860860
Ok(())
861861
}
862862

863+
fn get_stub_cids_with_counterparty(&self, counterparty_node_id: PublicKey) -> Vec<ChannelId> {
864+
let stub_monitors = self.stub_monitors.read().unwrap();
865+
let mut stubs = vec![];
866+
for (_, mon) in stub_monitors.iter() {
867+
if mon.get_counterparty_node_id() == Some(counterparty_node_id) {
868+
stubs.push(mon.channel_id());
869+
}
870+
}
871+
stubs
872+
}
873+
863874
fn stale_or_missing_channel_monitor(&self, stub_chan: &StubChannel) -> bool {
864875
let monitors = self.monitors.read().unwrap();
865876

@@ -1061,7 +1072,7 @@ impl<ChannelSigner: EcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref,
10611072
L::Target: Logger,
10621073
P::Target: Persist<ChannelSigner>,
10631074
{
1064-
/// Processes [`SpendableOutputs`] events produced from each [`ChannelMonitor`] upon maturity.
1075+
/// Processes [`SpendableOutputs`] events produced from each [`ChannelMonitor`] & [`StubChannelMonitor`] upon maturity.
10651076
///
10661077
/// For channels featuring anchor outputs, this method will also process [`BumpTransaction`]
10671078
/// events produced from each [`ChannelMonitor`] while there is a balance to claim onchain
@@ -1083,6 +1094,14 @@ impl<ChannelSigner: EcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref,
10831094
}
10841095
}
10851096
}
1097+
for stub_monitor_state in self.stub_monitors.read().unwrap().values() {
1098+
match stub_monitor_state.process_pending_events(&handler) {
1099+
Ok(()) => {},
1100+
Err(ReplayEvent ()) => {
1101+
self.event_notifier.notify();
1102+
}
1103+
}
1104+
}
10861105
}
10871106
}
10881107

lightning/src/chain/mod.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,15 @@ pub trait Watch<ChannelSigner: EcdsaChannelSigner> {
345345
/// - In test builds, instead of panicking, it delegates the handling of a missing channel to the
346346
/// `watch_dummy` function.
347347
fn panic_and_persist_stub_channel(&self, funding_outpoint: OutPoint, stub_monitor: StubChannelMonitor<ChannelSigner>) -> Result<(), ()>;
348+
349+
/// Retrieves a list of channel IDs for [`StubChannelMonitor`] associated with a specific counterparty node ID.
350+
///
351+
/// This function searches through the collection of [`StubChannelMonitor`] and collects the channel IDs
352+
/// of those monitors that have the specified counterparty node ID.
353+
///
354+
/// This is used by [`FundRecoverer`] to fetch all the [`ChannelId`] with a peer that needs recovery so that we can send them
355+
/// `BogusChannelReestablish`.
356+
fn get_stub_cids_with_counterparty(&self, counterparty_node_id: PublicKey) -> Vec<ChannelId>;
348357
}
349358

350359
/// The `Filter` trait defines behavior for indicating chain activity of interest pertaining to

lightning/src/ln/channelmanager.rs

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2313,6 +2313,188 @@ where
23132313
logger: L,
23142314
}
23152315

2316+
2317+
/// This works as a mock [`ChannelMessageHandler`] it is used mainly when a user wants to run their node in
2318+
/// offline mode i.e. This node won't communicate with any peer except sending a BogusChannelReestablish
2319+
/// for all the [`StubChannelMonitors`] being tracked by the [`ChainMonitor`].
2320+
///
2321+
/// [`FundRecoverer`] is parameterized by a number of components to achieve this.
2322+
/// - [`chain::Watch`] (typically [`ChainMonitor`]) for on-chain monitoring and enforcement of each
2323+
/// channel
2324+
/// - [`SignerProvider`] for providing signers whose operations are scoped to individual channels
2325+
/// - [`Logger`] for logging operational information of varying degrees
2326+
///
2327+
/// Additionally, it implements the following traits:
2328+
/// - [`ChannelMessageHandler`] to handle off-chain channel activity from peers
2329+
/// - [`MessageSendEventsProvider`] to similarly send such messages to peers
2330+
///
2331+
pub struct FundRecoverer<SP: Deref, L:Deref, M: Deref>
2332+
where
2333+
M::Target: chain::Watch<<SP::Target as SignerProvider>::EcdsaSigner>,
2334+
SP::Target: SignerProvider,
2335+
L::Target: Logger
2336+
{
2337+
default_configuration: UserConfig,
2338+
chain_monitor: M,
2339+
chain_hash: ChainHash,
2340+
per_peer_state: FairRwLock<HashMap<PublicKey, Mutex<PeerState<SP>>>>,
2341+
logger: L,
2342+
}
2343+
2344+
impl<SP:Deref, L:Deref, M:Deref> MessageSendEventsProvider for FundRecoverer<SP, L, M>
2345+
where
2346+
M::Target: chain::Watch<<SP::Target as SignerProvider>::EcdsaSigner>,
2347+
SP::Target: SignerProvider,
2348+
L::Target: Logger
2349+
{
2350+
fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
2351+
let mut pending_events = Vec::new();
2352+
let events = RefCell::new(Vec::new());
2353+
let per_peer_state = self.per_peer_state.read().unwrap();
2354+
for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
2355+
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
2356+
let peer_state = &mut *peer_state_lock;
2357+
if peer_state.pending_msg_events.len() > 0 {
2358+
pending_events.append(&mut peer_state.pending_msg_events);
2359+
}
2360+
}
2361+
if !pending_events.is_empty() {
2362+
events.replace(pending_events);
2363+
}
2364+
events.into_inner()
2365+
}
2366+
}
2367+
2368+
impl<SP:Deref, L: Deref, M:Deref> FundRecoverer<SP, L, M>
2369+
where
2370+
M::Target: chain::Watch<<SP::Target as SignerProvider>::EcdsaSigner>,
2371+
SP::Target: SignerProvider,
2372+
L::Target: Logger
2373+
{
2374+
/// Creates a new instance of `FundRecoverer`.
2375+
///
2376+
/// This function initializes a `FundRecoverer` with the provided `chain_monitor`,
2377+
/// `logger`, configuration, and chain parameters. The `FundRecoverer` is set up with
2378+
/// the default configuration and a chain hash derived from the genesis block of the
2379+
/// specified network.
2380+
pub fn new(chain_monitor: M, logger: L, config: UserConfig, params: ChainParameters) -> Self {
2381+
return Self { default_configuration: config.clone(),
2382+
chain_monitor,
2383+
chain_hash: ChainHash::using_genesis_block(params.network),
2384+
per_peer_state: FairRwLock::new(new_hash_map()),
2385+
logger
2386+
}
2387+
}
2388+
}
2389+
2390+
impl<SP:Deref, L:Deref, M:Deref> ChannelMessageHandler for FundRecoverer<SP, L, M>
2391+
where
2392+
M::Target: chain::Watch<<SP::Target as SignerProvider>::EcdsaSigner>,
2393+
SP::Target: SignerProvider,
2394+
L::Target: Logger
2395+
{
2396+
fn handle_open_channel(&self, _their_node_id: &PublicKey, _msg: &msgs::OpenChannel) {}
2397+
fn handle_accept_channel(&self, _their_node_id: &PublicKey, _msg: &msgs::AcceptChannel) {}
2398+
fn handle_funding_created(&self, _their_node_id: &PublicKey, _msg: &msgs::FundingCreated) {}
2399+
fn handle_funding_signed(&self, _their_node_id: &PublicKey, _msg: &msgs::FundingSigned) {}
2400+
fn handle_channel_ready(&self, _their_node_id: &PublicKey, _msg: &msgs::ChannelReady) {}
2401+
fn handle_shutdown(&self, _their_node_id: &PublicKey, _msg: &msgs::Shutdown) {}
2402+
fn handle_closing_signed(&self, _their_node_id: &PublicKey, _msg: &msgs::ClosingSigned) {}
2403+
fn handle_update_add_htlc(&self, _their_node_id: &PublicKey, _msg: &msgs::UpdateAddHTLC) {}
2404+
fn handle_update_fulfill_htlc(&self, _their_node_id: &PublicKey, _msg: &msgs::UpdateFulfillHTLC) {}
2405+
fn handle_update_fail_htlc(&self, _their_node_id: &PublicKey, _msg: &msgs::UpdateFailHTLC) {}
2406+
fn handle_update_fail_malformed_htlc(&self, _their_node_id: &PublicKey, _msg: &msgs::UpdateFailMalformedHTLC) {}
2407+
fn handle_commitment_signed(&self, _their_node_id: &PublicKey, _msg: &msgs::CommitmentSigned) {}
2408+
fn handle_revoke_and_ack(&self, _their_node_id: &PublicKey, _msg: &msgs::RevokeAndACK) {}
2409+
fn handle_update_fee(&self, _their_node_id: &PublicKey, _msg: &msgs::UpdateFee) {}
2410+
fn handle_announcement_signatures(&self, _their_node_id: &PublicKey, _msg: &msgs::AnnouncementSignatures) {}
2411+
fn handle_channel_update(&self, _their_node_id: &PublicKey, _msg: &msgs::ChannelUpdate) {}
2412+
fn handle_open_channel_v2(&self, _their_node_id: &PublicKey, _msg: &msgs::OpenChannelV2) {}
2413+
fn handle_accept_channel_v2(&self, _their_node_id: &PublicKey, _msg: &msgs::AcceptChannelV2) {}
2414+
fn handle_stfu(&self, _their_node_id: &PublicKey, _msg: &msgs::Stfu) {}
2415+
#[cfg(splicing)]
2416+
fn handle_splice_init(&self, _their_node_id: &PublicKey, _msg: &msgs::SpliceInit) {}
2417+
#[cfg(splicing)]
2418+
fn handle_splice_ack(&self, _their_node_id: &PublicKey, _msg: &msgs::SpliceAck) {}
2419+
#[cfg(splicing)]
2420+
fn handle_splice_locked(&self, _their_node_id: &PublicKey, _msg: &msgs::SpliceLocked) {}
2421+
fn handle_tx_add_input(&self, _their_node_id: &PublicKey, _msg: &msgs::TxAddInput) {}
2422+
fn handle_tx_add_output(&self, _their_node_id: &PublicKey, _msg: &msgs::TxAddOutput) {}
2423+
fn handle_tx_remove_input(&self, _their_node_id: &PublicKey, _msg: &msgs::TxRemoveInput) {}
2424+
fn handle_tx_remove_output(&self, _their_node_id: &PublicKey, _msg: &msgs::TxRemoveOutput) {}
2425+
fn handle_tx_complete(&self, _their_node_id: &PublicKey, _msg: &msgs::TxComplete) {}
2426+
fn handle_tx_signatures(&self, _their_node_id: &PublicKey, _msg: &msgs::TxSignatures) {}
2427+
fn handle_tx_init_rbf(&self, _their_node_id: &PublicKey, _msg: &msgs::TxInitRbf) {}
2428+
fn handle_tx_ack_rbf(&self, _their_node_id: &PublicKey, _msg: &msgs::TxAckRbf) {}
2429+
fn handle_tx_abort(&self, _their_node_id: &PublicKey, _msg: &msgs::TxAbort) {}
2430+
fn handle_peer_storage(&self, _their_node_id: &PublicKey, _msg: &msgs::PeerStorageMessage) {}
2431+
fn handle_your_peer_storage(&self, _their_node_id: &PublicKey, _msg: &msgs::YourPeerStorageMessage) {}
2432+
fn peer_disconnected(&self, _their_node_id: &PublicKey) {}
2433+
2434+
fn peer_connected(&self, counterparty_node_id: &PublicKey, init_msg: &msgs::Init, _inbound: bool) -> Result<(), ()> {
2435+
let logger = WithContext::from(&self.logger, Some(*counterparty_node_id), None, None);
2436+
2437+
{
2438+
let mut peer_state_lock = self.per_peer_state.write().unwrap();
2439+
match peer_state_lock.entry(counterparty_node_id.clone()) {
2440+
hash_map::Entry::Vacant(e) => {
2441+
e.insert(Mutex::new(PeerState {
2442+
channel_by_id: new_hash_map(),
2443+
inbound_channel_request_by_id: new_hash_map(),
2444+
latest_features: init_msg.features.clone(),
2445+
pending_msg_events: Vec::new(),
2446+
in_flight_monitor_updates: BTreeMap::new(),
2447+
monitor_update_blocked_actions: BTreeMap::new(),
2448+
actions_blocking_raa_monitor_updates: BTreeMap::new(),
2449+
is_connected: true,
2450+
}));
2451+
},
2452+
hash_map::Entry::Occupied(e) => {
2453+
let mut peer_state = e.get().lock().unwrap();
2454+
peer_state.latest_features = init_msg.features.clone();
2455+
2456+
debug_assert!(!peer_state.is_connected, "A peer shouldn't be connected twice");
2457+
peer_state.is_connected = true;
2458+
},
2459+
}
2460+
}
2461+
log_debug!(logger, "Generating Bogus channel_reestablish events for all the stub channels with peer {}", log_pubkey!(counterparty_node_id));
2462+
2463+
let per_peer_state = self.per_peer_state.read().unwrap();
2464+
if let Some(peer_state_mutex) = per_peer_state.get(counterparty_node_id) {
2465+
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
2466+
let peer_state = &mut *peer_state_lock;
2467+
let pending_msg_events = &mut peer_state.pending_msg_events;
2468+
for cid in self.chain_monitor.get_stub_cids_with_counterparty(*counterparty_node_id) {
2469+
pending_msg_events.push(MessageSendEvent::SendChannelReestablish {
2470+
node_id: *counterparty_node_id,
2471+
msg: msgs::ChannelReestablish {
2472+
channel_id: cid,
2473+
next_local_commitment_number: 0,
2474+
next_remote_commitment_number: 0,
2475+
your_last_per_commitment_secret: [1u8; 32],
2476+
my_current_per_commitment_point: PublicKey::from_slice(&[2u8; 33]).unwrap(),
2477+
next_funding_txid: None,
2478+
},
2479+
})
2480+
}
2481+
}
2482+
Ok(())
2483+
}
2484+
2485+
fn handle_channel_reestablish(&self, _their_node_id: &PublicKey, _msg: &msgs::ChannelReestablish) {}
2486+
fn handle_error(&self, _their_node_id: &PublicKey, _msg: &msgs::ErrorMessage) {}
2487+
fn provided_node_features(&self) -> NodeFeatures {
2488+
provided_node_features(&self.default_configuration)
2489+
}
2490+
fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures {
2491+
provided_init_features(&self.default_configuration)
2492+
}
2493+
fn get_chain_hashes(&self) -> Option<Vec<ChainHash>> {
2494+
Some(vec![self.chain_hash])
2495+
}
2496+
}
2497+
23162498
/// Chain-related parameters used to construct a new `ChannelManager`.
23172499
///
23182500
/// Typically, the block-specific parameters are derived from the best block hash for the network,

lightning/src/util/test_utils.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -462,6 +462,9 @@ impl<'a> chain::Watch<TestChannelSigner> for TestChainMonitor<'a> {
462462
return self.chain_monitor.panic_and_persist_stub_channel(funding_outpoint, stub_monitor);
463463
}
464464

465+
fn get_stub_cids_with_counterparty(&self, counterparty_node_id: PublicKey) -> Vec<ChannelId> {
466+
return self.chain_monitor.get_stub_cids_with_counterparty(counterparty_node_id);
467+
}
465468
}
466469

467470
#[cfg(test)]

0 commit comments

Comments
 (0)