Skip to content

Commit ff18bf3

Browse files
Always remove disconnected peers with no channels
When a peer disconnects but still has channels, the peer's `peer_state` entry in the `per_peer_state` is not removed by the `peer_disconnected` function. If the channels of to that peer is later closed while still being disconnected (i.e. force closed), we therefore need to remove the peer from `peer_state` separately. To remove the peers separately, we push such peers to a separate HashSet that holds peers awaiting removal, and remove the peers on a timer to limit the negative effects on paralleism as much as possible.
1 parent 5d2c0a6 commit ff18bf3

File tree

1 file changed

+112
-16
lines changed

1 file changed

+112
-16
lines changed

lightning/src/ln/channelmanager.rs

Lines changed: 112 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -620,6 +620,8 @@ pub type SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L> = C
620620
// | |
621621
// | |__`best_block`
622622
// | |
623+
// | |__`pending_peers_awaiting_removal`
624+
// | |
623625
// | |__`pending_events`
624626
// | |
625627
// | |__`pending_background_events`
@@ -787,6 +789,16 @@ where
787789

788790
/// See `ChannelManager` struct-level documentation for lock order requirements.
789791
pending_events: Mutex<Vec<events::Event>>,
792+
/// When a peer disconnects but still has channels, the peer's `peer_state` entry in the
793+
/// `per_peer_state` is not removed by the `peer_disconnected` function. If the channels of
794+
/// to that peer is later closed while still being disconnected (i.e. force closed), we
795+
/// therefore need to remove the peer from `peer_state` separately.
796+
/// To avoid having to take the `per_peer_state` `write` lock once the channels are closed, we
797+
/// instead store such peers awaiting removal in this field, and remove them on a timer to
798+
/// limit the negative effects on parallelism as much as possible.
799+
///
800+
/// See `ChannelManager` struct-level documentation for lock order requirements.
801+
pending_peers_awaiting_removal: Mutex<HashSet<PublicKey>>,
790802
/// See `ChannelManager` struct-level documentation for lock order requirements.
791803
pending_background_events: Mutex<Vec<BackgroundEvent>>,
792804
/// Used when we have to take a BIG lock to make sure everything is self-consistent.
@@ -1317,10 +1329,11 @@ macro_rules! try_chan_entry {
13171329
}
13181330

13191331
macro_rules! remove_channel {
1320-
($self: expr, $entry: expr) => {
1332+
($self: expr, $entry: expr, $peer_state: expr) => {
13211333
{
13221334
let channel = $entry.remove_entry().1;
13231335
update_maps_on_chan_removal!($self, channel);
1336+
$self.add_pending_peer_to_be_removed(channel.get_counterparty_node_id(), $peer_state);
13241337
channel
13251338
}
13261339
}
@@ -1493,6 +1506,7 @@ where
14931506
per_peer_state: FairRwLock::new(HashMap::new()),
14941507

14951508
pending_events: Mutex::new(Vec::new()),
1509+
pending_peers_awaiting_removal: Mutex::new(HashSet::new()),
14961510
pending_background_events: Mutex::new(Vec::new()),
14971511
total_consistency_lock: RwLock::new(()),
14981512
persistence_notifier: Notifier::new(),
@@ -1731,7 +1745,7 @@ where
17311745
let (result, is_permanent) =
17321746
handle_monitor_update_res!(self, update_res, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, chan_entry.key(), NO_UPDATE);
17331747
if is_permanent {
1734-
remove_channel!(self, chan_entry);
1748+
remove_channel!(self, chan_entry, peer_state);
17351749
break result;
17361750
}
17371751
}
@@ -1742,7 +1756,7 @@ where
17421756
});
17431757

17441758
if chan_entry.get().is_shutdown() {
1745-
let channel = remove_channel!(self, chan_entry);
1759+
let channel = remove_channel!(self, chan_entry, peer_state);
17461760
if let Ok(channel_update) = self.get_channel_update_for_broadcast(&channel) {
17471761
peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
17481762
msg: channel_update
@@ -1845,7 +1859,7 @@ where
18451859
} else {
18461860
self.issue_channel_close_events(chan.get(),ClosureReason::HolderForceClosed);
18471861
}
1848-
remove_channel!(self, chan)
1862+
remove_channel!(self, chan, peer_state)
18491863
} else {
18501864
return Err(APIError::ChannelUnavailable{ err: format!("Channel with id {} not found for the passed counterparty node_id {}", log_bytes!(*channel_id), peer_node_id) });
18511865
}
@@ -1884,6 +1898,13 @@ where
18841898
}
18851899
}
18861900

1901+
fn add_pending_peer_to_be_removed(&self, counterparty_node_id: PublicKey, peer_state: &mut PeerState<<SP::Target as SignerProvider>::Signer>) {
1902+
let peer_should_be_removed = !peer_state.is_connected && peer_state.channel_by_id.len() == 0;
1903+
if peer_should_be_removed {
1904+
self.pending_peers_awaiting_removal.lock().unwrap().insert(counterparty_node_id);
1905+
}
1906+
}
1907+
18871908
/// Force closes a channel, immediately broadcasting the latest local transaction(s) and
18881909
/// rejecting new HTLCs on the given channel. Fails if `channel_id` is unknown to
18891910
/// the manager, or if the `counterparty_node_id` isn't the counterparty of the corresponding
@@ -3337,6 +3358,34 @@ where
33373358
true
33383359
}
33393360

3361+
/// Removes peers which have been been added to `pending_peers_awaiting_removal` which are
3362+
/// still disconnected and we have no channels to.
3363+
///
3364+
/// Must be called without the `per_peer_state` lock acquired.
3365+
fn remove_peers_awaiting_removal(&self) {
3366+
let mut pending_peers_awaiting_removal = HashSet::new();
3367+
mem::swap(&mut *self.pending_peers_awaiting_removal.lock().unwrap(), &mut pending_peers_awaiting_removal);
3368+
if pending_peers_awaiting_removal.len() > 0 {
3369+
let mut per_peer_state = self.per_peer_state.write().unwrap();
3370+
for counterparty_node_id in pending_peers_awaiting_removal.drain() {
3371+
match per_peer_state.entry(counterparty_node_id) {
3372+
hash_map::Entry::Occupied(entry) => {
3373+
// Remove the entry if the peer is still disconnected and we still
3374+
// have no channels to the peer.
3375+
let remove_entry = {
3376+
let peer_state = entry.get().lock().unwrap();
3377+
!peer_state.is_connected && peer_state.channel_by_id.len() == 0
3378+
};
3379+
if remove_entry {
3380+
entry.remove_entry();
3381+
}
3382+
},
3383+
hash_map::Entry::Vacant(_) => { /* The PeerState has already been removed */ }
3384+
}
3385+
}
3386+
}
3387+
}
3388+
33403389
#[cfg(any(test, feature = "_test_utils"))]
33413390
/// Process background events, for functional testing
33423391
pub fn test_process_background_events(&self) {
@@ -3415,13 +3464,14 @@ where
34153464
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
34163465
let peer_state = &mut *peer_state_lock;
34173466
let pending_msg_events = &mut peer_state.pending_msg_events;
3467+
let counterparty_node_id = *counterparty_node_id;
34183468
peer_state.channel_by_id.retain(|chan_id, chan| {
34193469
let chan_needs_persist = self.update_channel_fee(chan_id, chan, new_feerate);
34203470
if chan_needs_persist == NotifyOption::DoPersist { should_persist = NotifyOption::DoPersist; }
34213471

34223472
if let Err(e) = chan.timer_check_closing_negotiation_progress() {
34233473
let (needs_close, err) = convert_chan_err!(self, e, chan, chan_id);
3424-
handle_errors.push((Err(err), *counterparty_node_id));
3474+
handle_errors.push((Err(err), counterparty_node_id));
34253475
if needs_close { return false; }
34263476
}
34273477

@@ -3455,8 +3505,10 @@ where
34553505

34563506
true
34573507
});
3508+
self.add_pending_peer_to_be_removed(counterparty_node_id, peer_state);
34583509
}
34593510
}
3511+
self.remove_peers_awaiting_removal();
34603512

34613513
self.claimable_payments.lock().unwrap().claimable_htlcs.retain(|payment_hash, (_, htlcs)| {
34623514
if htlcs.is_empty() {
@@ -4211,7 +4263,7 @@ where
42114263
}
42124264
};
42134265
peer_state.pending_msg_events.push(send_msg_err_event);
4214-
let _ = remove_channel!(self, channel);
4266+
let _ = remove_channel!(self, channel, peer_state);
42154267
return Err(APIError::APIMisuseError { err: "Please use accept_inbound_channel_from_trusted_peer_0conf to accept channels with zero confirmations.".to_owned() });
42164268
}
42174269

@@ -4497,7 +4549,7 @@ where
44974549
let (result, is_permanent) =
44984550
handle_monitor_update_res!(self, update_res, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, chan_entry.key(), NO_UPDATE);
44994551
if is_permanent {
4500-
remove_channel!(self, chan_entry);
4552+
remove_channel!(self, chan_entry, peer_state);
45014553
break result;
45024554
}
45034555
}
@@ -4546,7 +4598,7 @@ where
45464598
// also implies there are no pending HTLCs left on the channel, so we can
45474599
// fully delete it from tracking (the channel monitor is still around to
45484600
// watch for old state broadcasts)!
4549-
(tx, Some(remove_channel!(self, chan_entry)))
4601+
(tx, Some(remove_channel!(self, chan_entry, peer_state)))
45504602
} else { (tx, None) }
45514603
},
45524604
hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id))
@@ -5049,12 +5101,11 @@ where
50495101
if let Some(peer_state_mutex) = per_peer_state.get(&counterparty_node_id) {
50505102
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
50515103
let peer_state = &mut *peer_state_lock;
5052-
let pending_msg_events = &mut peer_state.pending_msg_events;
50535104
if let hash_map::Entry::Occupied(chan_entry) = peer_state.channel_by_id.entry(funding_outpoint.to_channel_id()) {
5054-
let mut chan = remove_channel!(self, chan_entry);
5105+
let mut chan = remove_channel!(self, chan_entry, peer_state);
50555106
failed_channels.push(chan.force_shutdown(false));
50565107
if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
5057-
pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
5108+
peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
50585109
msg: update
50595110
});
50605111
}
@@ -5064,7 +5115,7 @@ where
50645115
ClosureReason::CommitmentTxConfirmed
50655116
};
50665117
self.issue_channel_close_events(&chan, reason);
5067-
pending_msg_events.push(events::MessageSendEvent::HandleError {
5118+
peer_state.pending_msg_events.push(events::MessageSendEvent::HandleError {
50685119
node_id: chan.get_counterparty_node_id(),
50695120
action: msgs::ErrorAction::SendErrorMessage {
50705121
msg: msgs::ErrorMessage { channel_id: chan.channel_id(), data: "Channel force-closed".to_owned() }
@@ -5106,7 +5157,7 @@ where
51065157
{
51075158
let per_peer_state = self.per_peer_state.read().unwrap();
51085159

5109-
for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
5160+
for (cp_id, peer_state_mutex) in per_peer_state.iter() {
51105161
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
51115162
let peer_state = &mut *peer_state_lock;
51125163
let pending_msg_events = &mut peer_state.pending_msg_events;
@@ -5146,6 +5197,7 @@ where
51465197
}
51475198
}
51485199
});
5200+
self.add_pending_peer_to_be_removed(*cp_id, peer_state);
51495201
}
51505202
}
51515203

@@ -5170,7 +5222,7 @@ where
51705222
{
51715223
let per_peer_state = self.per_peer_state.read().unwrap();
51725224

5173-
for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
5225+
for (cp_id, peer_state_mutex) in per_peer_state.iter() {
51745226
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
51755227
let peer_state = &mut *peer_state_lock;
51765228
let pending_msg_events = &mut peer_state.pending_msg_events;
@@ -5208,6 +5260,7 @@ where
52085260
}
52095261
}
52105262
});
5263+
self.add_pending_peer_to_be_removed(*cp_id, peer_state);
52115264
}
52125265
}
52135266

@@ -5783,7 +5836,7 @@ where
57835836
let mut timed_out_htlcs = Vec::new();
57845837
{
57855838
let per_peer_state = self.per_peer_state.read().unwrap();
5786-
for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
5839+
for (cp_id, peer_state_mutex) in per_peer_state.iter() {
57875840
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
57885841
let peer_state = &mut *peer_state_lock;
57895842
let pending_msg_events = &mut peer_state.pending_msg_events;
@@ -5867,6 +5920,7 @@ where
58675920
}
58685921
true
58695922
});
5923+
self.add_pending_peer_to_be_removed(*cp_id, peer_state);
58705924
}
58715925
}
58725926

@@ -6194,7 +6248,7 @@ where
61946248

61956249
let per_peer_state = self.per_peer_state.read().unwrap();
61966250

6197-
for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
6251+
for (cp_id, peer_state_mutex) in per_peer_state.iter() {
61986252
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
61996253
let peer_state = &mut *peer_state_lock;
62006254
let pending_msg_events = &mut peer_state.pending_msg_events;
@@ -6226,6 +6280,7 @@ where
62266280
}
62276281
retain
62286282
});
6283+
self.add_pending_peer_to_be_removed(*cp_id, peer_state);
62296284
}
62306285
//TODO: Also re-broadcast announcement_signatures
62316286
Ok(())
@@ -6739,6 +6794,8 @@ where
67396794

67406795
write_ver_prefix!(writer, SERIALIZATION_VERSION, MIN_SERIALIZATION_VERSION);
67416796

6797+
self.remove_peers_awaiting_removal();
6798+
67426799
self.genesis_hash.write(writer)?;
67436800
{
67446801
let best_block = self.best_block.read().unwrap();
@@ -7563,6 +7620,7 @@ where
75637620
per_peer_state: FairRwLock::new(per_peer_state),
75647621

75657622
pending_events: Mutex::new(pending_events_read),
7623+
pending_peers_awaiting_removal: Mutex::new(HashSet::new()),
75667624
pending_background_events: Mutex::new(pending_background_events_read),
75677625
total_consistency_lock: RwLock::new(()),
75687626
persistence_notifier: Notifier::new(),
@@ -8026,6 +8084,44 @@ mod tests {
80268084
}
80278085
}
80288086

8087+
#[test]
8088+
fn test_drop_disconnected_peers_when_removing_channels() {
8089+
let chanmon_cfgs = create_chanmon_cfgs(2);
8090+
let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
8091+
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
8092+
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
8093+
8094+
let chan = create_announced_chan_between_nodes(&nodes, 0, 1);
8095+
8096+
nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false);
8097+
nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false);
8098+
8099+
nodes[0].node.force_close_broadcasting_latest_txn(&chan.2, &nodes[1].node.get_our_node_id()).unwrap();
8100+
check_closed_broadcast!(nodes[0], true);
8101+
check_added_monitors!(nodes[0], 1);
8102+
check_closed_event!(nodes[0], 1, ClosureReason::HolderForceClosed);
8103+
8104+
{
8105+
// Assert that nodes[1] is awaiting removal for nodes[0] once nodes[1] has been
8106+
// disconnected and the channel between has been force closed.
8107+
let nodes_0_per_peer_state = nodes[0].node.per_peer_state.read().unwrap();
8108+
let nodes_0_pending_peers_awaiting_removal = nodes[0].node.pending_peers_awaiting_removal.lock().unwrap();
8109+
assert_eq!(nodes_0_pending_peers_awaiting_removal.len(), 1);
8110+
assert!(nodes_0_pending_peers_awaiting_removal.get(&nodes[1].node.get_our_node_id()).is_some());
8111+
// Assert that nodes[1] isn't removed before `timer_tick_occurred` has been executed.
8112+
assert_eq!(nodes_0_per_peer_state.len(), 1);
8113+
assert!(nodes_0_per_peer_state.get(&nodes[1].node.get_our_node_id()).is_some());
8114+
}
8115+
8116+
nodes[0].node.timer_tick_occurred();
8117+
8118+
{
8119+
// Assert that nodes[1] has now been removed.
8120+
assert_eq!(nodes[0].node.per_peer_state.read().unwrap().len(), 0);
8121+
assert_eq!(nodes[0].node.pending_peers_awaiting_removal.lock().unwrap().len(), 0);
8122+
}
8123+
}
8124+
80298125
#[test]
80308126
fn bad_inbound_payment_hash() {
80318127
// Add coverage for checking that a user-provided payment hash matches the payment secret.

0 commit comments

Comments
 (0)