@@ -466,6 +466,8 @@ pub(super) struct PeerState<Signer: Sign> {
466
466
/// Messages to send to the peer - pushed to in the same lock that they are generated in (except
467
467
/// for broadcast messages, where ordering isn't as strict).
468
468
pub ( super ) pending_msg_events : Vec < MessageSendEvent > ,
469
+ /// Represents wether we're connected to the node or not.
470
+ connected : bool ,
469
471
}
470
472
471
473
/// Stores a PaymentSecret and any other data we may need to validate an inbound payment is
@@ -595,6 +597,8 @@ pub type SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L> = C
595
597
// | |
596
598
// | |__`best_block`
597
599
// | |
600
+ // | |__`pending_peers_awaiting_removal`
601
+ // | |
598
602
// | |__`pending_events`
599
603
// | |
600
604
// | |__`pending_background_events`
@@ -763,6 +767,16 @@ where
763
767
764
768
/// See `ChannelManager` struct-level documentation for lock order requirements.
765
769
pending_events : Mutex < Vec < events:: Event > > ,
770
+ /// When a peer disconnects but still has channels, the peer's `peer_state` entry in the
771
+ /// `per_peer_state` is not removed by the `peer_disconnected` function. If the channels of
772
+ /// to that peer is later closed while still being disconnected (i.e. force closed), we
773
+ /// therefore need to remove the peer from `peer_state` separately.
774
+ /// To avoid having to take the `per_peer_state` `write` lock once the channels are closed, we
775
+ /// instead store such peers awaiting removal in this field, and remove them on a timer to
776
+ /// limit the negative effects on parallelism as much as possible.
777
+ ///
778
+ /// See `ChannelManager` struct-level documentation for lock order requirements.
779
+ pending_peers_awaiting_removal : Mutex < HashSet < PublicKey > > ,
766
780
/// See `ChannelManager` struct-level documentation for lock order requirements.
767
781
pending_background_events : Mutex < Vec < BackgroundEvent > > ,
768
782
/// Used when we have to take a BIG lock to make sure everything is self-consistent.
@@ -1293,10 +1307,11 @@ macro_rules! try_chan_entry {
1293
1307
}
1294
1308
1295
1309
macro_rules! remove_channel {
1296
- ( $self: expr, $entry: expr) => {
1310
+ ( $self: expr, $entry: expr, $peer_state : expr ) => {
1297
1311
{
1298
1312
let channel = $entry. remove_entry( ) . 1 ;
1299
1313
update_maps_on_chan_removal!( $self, channel) ;
1314
+ $self. add_pending_peer_to_be_removed( channel. get_counterparty_node_id( ) , $peer_state) ;
1300
1315
channel
1301
1316
}
1302
1317
}
@@ -1470,6 +1485,7 @@ where
1470
1485
per_peer_state : FairRwLock :: new ( HashMap :: new ( ) ) ,
1471
1486
1472
1487
pending_events : Mutex :: new ( Vec :: new ( ) ) ,
1488
+ pending_peers_awaiting_removal : Mutex :: new ( HashSet :: new ( ) ) ,
1473
1489
pending_background_events : Mutex :: new ( Vec :: new ( ) ) ,
1474
1490
total_consistency_lock : RwLock :: new ( ( ) ) ,
1475
1491
persistence_notifier : Notifier :: new ( ) ,
@@ -1708,7 +1724,7 @@ where
1708
1724
let ( result, is_permanent) =
1709
1725
handle_monitor_update_res ! ( self , update_res, chan_entry. get_mut( ) , RAACommitmentOrder :: CommitmentFirst , chan_entry. key( ) , NO_UPDATE ) ;
1710
1726
if is_permanent {
1711
- remove_channel ! ( self , chan_entry) ;
1727
+ remove_channel ! ( self , chan_entry, peer_state ) ;
1712
1728
break result;
1713
1729
}
1714
1730
}
@@ -1719,7 +1735,7 @@ where
1719
1735
} ) ;
1720
1736
1721
1737
if chan_entry. get ( ) . is_shutdown ( ) {
1722
- let channel = remove_channel ! ( self , chan_entry) ;
1738
+ let channel = remove_channel ! ( self , chan_entry, peer_state ) ;
1723
1739
if let Ok ( channel_update) = self . get_channel_update_for_broadcast ( & channel) {
1724
1740
peer_state. pending_msg_events . push ( events:: MessageSendEvent :: BroadcastChannelUpdate {
1725
1741
msg : channel_update
@@ -1822,7 +1838,7 @@ where
1822
1838
} else {
1823
1839
self . issue_channel_close_events ( chan. get ( ) , ClosureReason :: HolderForceClosed ) ;
1824
1840
}
1825
- remove_channel ! ( self , chan)
1841
+ remove_channel ! ( self , chan, peer_state )
1826
1842
} else {
1827
1843
return Err ( APIError :: ChannelUnavailable { err : format ! ( "Channel with id {} not found for the passed counterparty node_id {}" , log_bytes!( * channel_id) , peer_node_id) } ) ;
1828
1844
}
@@ -1861,6 +1877,13 @@ where
1861
1877
}
1862
1878
}
1863
1879
1880
+ fn add_pending_peer_to_be_removed ( & self , counterparty_node_id : PublicKey , peer_state : & mut PeerState < <SP :: Target as SignerProvider >:: Signer > ) {
1881
+ let peer_should_be_removed = !peer_state. connected && peer_state. channel_by_id . len ( ) == 0 ;
1882
+ if peer_should_be_removed {
1883
+ self . pending_peers_awaiting_removal . lock ( ) . unwrap ( ) . insert ( counterparty_node_id) ;
1884
+ }
1885
+ }
1886
+
1864
1887
/// Force closes a channel, immediately broadcasting the latest local transaction(s) and
1865
1888
/// rejecting new HTLCs on the given channel. Fails if `channel_id` is unknown to
1866
1889
/// the manager, or if the `counterparty_node_id` isn't the counterparty of the corresponding
@@ -3278,6 +3301,34 @@ where
3278
3301
true
3279
3302
}
3280
3303
3304
+ /// Removes peers which have been been added to `pending_peers_awaiting_removal` which are
3305
+ /// still disconnected and we have no channels to.
3306
+ ///
3307
+ /// Must be called without the `per_peer_state` lock acquired.
3308
+ fn remove_peers_awaiting_removal ( & self ) {
3309
+ let mut pending_peers_awaiting_removal = HashSet :: new ( ) ;
3310
+ mem:: swap ( & mut * self . pending_peers_awaiting_removal . lock ( ) . unwrap ( ) , & mut pending_peers_awaiting_removal) ;
3311
+ if pending_peers_awaiting_removal. len ( ) > 0 {
3312
+ let mut per_peer_state = self . per_peer_state . write ( ) . unwrap ( ) ;
3313
+ for counterparty_node_id in pending_peers_awaiting_removal. drain ( ) {
3314
+ match per_peer_state. entry ( counterparty_node_id) {
3315
+ hash_map:: Entry :: Occupied ( entry) => {
3316
+ // Remove the entry if the peer is still disconnected and we still
3317
+ // have no channels to the peer.
3318
+ let remove_entry = {
3319
+ let peer_state = entry. get ( ) . lock ( ) . unwrap ( ) ;
3320
+ !peer_state. connected && peer_state. channel_by_id . len ( ) == 0
3321
+ } ;
3322
+ if remove_entry {
3323
+ entry. remove_entry ( ) ;
3324
+ }
3325
+ } ,
3326
+ hash_map:: Entry :: Vacant ( _) => { /* The PeerState has already been removed */ }
3327
+ }
3328
+ }
3329
+ }
3330
+ }
3331
+
3281
3332
#[ cfg( any( test, feature = "_test_utils" ) ) ]
3282
3333
/// Process background events, for functional testing
3283
3334
pub fn test_process_background_events ( & self ) {
@@ -3356,13 +3407,14 @@ where
3356
3407
let mut peer_state_lock = peer_state_mutex. lock ( ) . unwrap ( ) ;
3357
3408
let peer_state = & mut * peer_state_lock;
3358
3409
let pending_msg_events = & mut peer_state. pending_msg_events ;
3410
+ let counterparty_node_id = * counterparty_node_id;
3359
3411
peer_state. channel_by_id . retain ( |chan_id, chan| {
3360
3412
let chan_needs_persist = self . update_channel_fee ( chan_id, chan, new_feerate) ;
3361
3413
if chan_needs_persist == NotifyOption :: DoPersist { should_persist = NotifyOption :: DoPersist ; }
3362
3414
3363
3415
if let Err ( e) = chan. timer_check_closing_negotiation_progress ( ) {
3364
3416
let ( needs_close, err) = convert_chan_err ! ( self , e, chan, chan_id) ;
3365
- handle_errors. push ( ( Err ( err) , * counterparty_node_id) ) ;
3417
+ handle_errors. push ( ( Err ( err) , counterparty_node_id) ) ;
3366
3418
if needs_close { return false ; }
3367
3419
}
3368
3420
@@ -3396,8 +3448,10 @@ where
3396
3448
3397
3449
true
3398
3450
} ) ;
3451
+ self . add_pending_peer_to_be_removed ( counterparty_node_id, peer_state) ;
3399
3452
}
3400
3453
}
3454
+ self . remove_peers_awaiting_removal ( ) ;
3401
3455
3402
3456
self . claimable_payments . lock ( ) . unwrap ( ) . claimable_htlcs . retain ( |payment_hash, ( _, htlcs) | {
3403
3457
if htlcs. is_empty ( ) {
@@ -4133,7 +4187,7 @@ where
4133
4187
}
4134
4188
} ;
4135
4189
peer_state. pending_msg_events . push ( send_msg_err_event) ;
4136
- let _ = remove_channel ! ( self , channel) ;
4190
+ let _ = remove_channel ! ( self , channel, peer_state ) ;
4137
4191
return Err ( APIError :: APIMisuseError { err : "Please use accept_inbound_channel_from_trusted_peer_0conf to accept channels with zero confirmations." . to_owned ( ) } ) ;
4138
4192
}
4139
4193
@@ -4419,7 +4473,7 @@ where
4419
4473
let ( result, is_permanent) =
4420
4474
handle_monitor_update_res ! ( self , update_res, chan_entry. get_mut( ) , RAACommitmentOrder :: CommitmentFirst , chan_entry. key( ) , NO_UPDATE ) ;
4421
4475
if is_permanent {
4422
- remove_channel ! ( self , chan_entry) ;
4476
+ remove_channel ! ( self , chan_entry, peer_state ) ;
4423
4477
break result;
4424
4478
}
4425
4479
}
@@ -4468,7 +4522,7 @@ where
4468
4522
// also implies there are no pending HTLCs left on the channel, so we can
4469
4523
// fully delete it from tracking (the channel monitor is still around to
4470
4524
// watch for old state broadcasts)!
4471
- ( tx, Some ( remove_channel ! ( self , chan_entry) ) )
4525
+ ( tx, Some ( remove_channel ! ( self , chan_entry, peer_state ) ) )
4472
4526
} else { ( tx, None ) }
4473
4527
} ,
4474
4528
hash_map:: Entry :: Vacant ( _) => return Err ( MsgHandleErrInternal :: send_err_msg_no_close ( format ! ( "Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}" , counterparty_node_id) , msg. channel_id ) )
@@ -4971,12 +5025,11 @@ where
4971
5025
if let Some ( peer_state_mutex) = per_peer_state. get ( & counterparty_node_id) {
4972
5026
let mut peer_state_lock = peer_state_mutex. lock ( ) . unwrap ( ) ;
4973
5027
let peer_state = & mut * peer_state_lock;
4974
- let pending_msg_events = & mut peer_state. pending_msg_events ;
4975
5028
if let hash_map:: Entry :: Occupied ( chan_entry) = peer_state. channel_by_id . entry ( funding_outpoint. to_channel_id ( ) ) {
4976
- let mut chan = remove_channel ! ( self , chan_entry) ;
5029
+ let mut chan = remove_channel ! ( self , chan_entry, peer_state ) ;
4977
5030
failed_channels. push ( chan. force_shutdown ( false ) ) ;
4978
5031
if let Ok ( update) = self . get_channel_update_for_broadcast ( & chan) {
4979
- pending_msg_events. push ( events:: MessageSendEvent :: BroadcastChannelUpdate {
5032
+ peer_state . pending_msg_events . push ( events:: MessageSendEvent :: BroadcastChannelUpdate {
4980
5033
msg : update
4981
5034
} ) ;
4982
5035
}
@@ -4986,7 +5039,7 @@ where
4986
5039
ClosureReason :: CommitmentTxConfirmed
4987
5040
} ;
4988
5041
self . issue_channel_close_events ( & chan, reason) ;
4989
- pending_msg_events. push ( events:: MessageSendEvent :: HandleError {
5042
+ peer_state . pending_msg_events . push ( events:: MessageSendEvent :: HandleError {
4990
5043
node_id : chan. get_counterparty_node_id ( ) ,
4991
5044
action : msgs:: ErrorAction :: SendErrorMessage {
4992
5045
msg : msgs:: ErrorMessage { channel_id : chan. channel_id ( ) , data : "Channel force-closed" . to_owned ( ) }
@@ -5028,7 +5081,7 @@ where
5028
5081
{
5029
5082
let per_peer_state = self . per_peer_state . read ( ) . unwrap ( ) ;
5030
5083
5031
- for ( _cp_id , peer_state_mutex) in per_peer_state. iter ( ) {
5084
+ for ( cp_id , peer_state_mutex) in per_peer_state. iter ( ) {
5032
5085
let mut peer_state_lock = peer_state_mutex. lock ( ) . unwrap ( ) ;
5033
5086
let peer_state = & mut * peer_state_lock;
5034
5087
let pending_msg_events = & mut peer_state. pending_msg_events ;
@@ -5068,6 +5121,7 @@ where
5068
5121
}
5069
5122
}
5070
5123
} ) ;
5124
+ self . add_pending_peer_to_be_removed ( * cp_id, peer_state) ;
5071
5125
}
5072
5126
}
5073
5127
@@ -5092,7 +5146,7 @@ where
5092
5146
{
5093
5147
let per_peer_state = self . per_peer_state . read ( ) . unwrap ( ) ;
5094
5148
5095
- for ( _cp_id , peer_state_mutex) in per_peer_state. iter ( ) {
5149
+ for ( cp_id , peer_state_mutex) in per_peer_state. iter ( ) {
5096
5150
let mut peer_state_lock = peer_state_mutex. lock ( ) . unwrap ( ) ;
5097
5151
let peer_state = & mut * peer_state_lock;
5098
5152
let pending_msg_events = & mut peer_state. pending_msg_events ;
@@ -5130,6 +5184,7 @@ where
5130
5184
}
5131
5185
}
5132
5186
} ) ;
5187
+ self . add_pending_peer_to_be_removed ( * cp_id, peer_state) ;
5133
5188
}
5134
5189
}
5135
5190
@@ -5693,7 +5748,7 @@ where
5693
5748
let mut timed_out_htlcs = Vec :: new ( ) ;
5694
5749
{
5695
5750
let per_peer_state = self . per_peer_state . read ( ) . unwrap ( ) ;
5696
- for ( _cp_id , peer_state_mutex) in per_peer_state. iter ( ) {
5751
+ for ( cp_id , peer_state_mutex) in per_peer_state. iter ( ) {
5697
5752
let mut peer_state_lock = peer_state_mutex. lock ( ) . unwrap ( ) ;
5698
5753
let peer_state = & mut * peer_state_lock;
5699
5754
let pending_msg_events = & mut peer_state. pending_msg_events ;
@@ -5777,6 +5832,7 @@ where
5777
5832
}
5778
5833
true
5779
5834
} ) ;
5835
+ self . add_pending_peer_to_be_removed ( * cp_id, peer_state) ;
5780
5836
}
5781
5837
}
5782
5838
@@ -6023,6 +6079,7 @@ where
6023
6079
let mut peer_state_lock = peer_state_mutex. lock ( ) . unwrap ( ) ;
6024
6080
let peer_state = & mut * peer_state_lock;
6025
6081
let pending_msg_events = & mut peer_state. pending_msg_events ;
6082
+ peer_state. connected = false ;
6026
6083
peer_state. channel_by_id . retain ( |_, chan| {
6027
6084
chan. remove_uncommitted_htlcs_and_mark_paused ( & self . logger ) ;
6028
6085
if chan. is_shutdown ( ) {
@@ -6088,17 +6145,20 @@ where
6088
6145
channel_by_id : HashMap :: new ( ) ,
6089
6146
latest_features : init_msg. features . clone ( ) ,
6090
6147
pending_msg_events : Vec :: new ( ) ,
6148
+ connected : true ,
6091
6149
} ) ) ;
6092
6150
} ,
6093
6151
hash_map:: Entry :: Occupied ( e) => {
6094
- e. get ( ) . lock ( ) . unwrap ( ) . latest_features = init_msg. features . clone ( ) ;
6152
+ let mut peer_state = e. get ( ) . lock ( ) . unwrap ( ) ;
6153
+ peer_state. latest_features = init_msg. features . clone ( ) ;
6154
+ peer_state. connected = true ;
6095
6155
} ,
6096
6156
}
6097
6157
}
6098
6158
6099
6159
let per_peer_state = self . per_peer_state . read ( ) . unwrap ( ) ;
6100
6160
6101
- for ( _cp_id , peer_state_mutex) in per_peer_state. iter ( ) {
6161
+ for ( cp_id , peer_state_mutex) in per_peer_state. iter ( ) {
6102
6162
let mut peer_state_lock = peer_state_mutex. lock ( ) . unwrap ( ) ;
6103
6163
let peer_state = & mut * peer_state_lock;
6104
6164
let pending_msg_events = & mut peer_state. pending_msg_events ;
@@ -6130,6 +6190,7 @@ where
6130
6190
}
6131
6191
retain
6132
6192
} ) ;
6193
+ self . add_pending_peer_to_be_removed ( * cp_id, peer_state) ;
6133
6194
}
6134
6195
//TODO: Also re-broadcast announcement_signatures
6135
6196
Ok ( ( ) )
@@ -6637,6 +6698,8 @@ where
6637
6698
6638
6699
write_ver_prefix ! ( writer, SERIALIZATION_VERSION , MIN_SERIALIZATION_VERSION ) ;
6639
6700
6701
+ self . remove_peers_awaiting_removal ( ) ;
6702
+
6640
6703
self . genesis_hash . write ( writer) ?;
6641
6704
{
6642
6705
let best_block = self . best_block . read ( ) . unwrap ( ) ;
@@ -7102,6 +7165,7 @@ where
7102
7165
channel_by_id : peer_channels. remove ( & peer_pubkey) . unwrap_or ( HashMap :: new ( ) ) ,
7103
7166
latest_features : Readable :: read ( reader) ?,
7104
7167
pending_msg_events : Vec :: new ( ) ,
7168
+ connected : false ,
7105
7169
} ;
7106
7170
per_peer_state. insert ( peer_pubkey, Mutex :: new ( peer_state) ) ;
7107
7171
}
@@ -7457,6 +7521,7 @@ where
7457
7521
per_peer_state : FairRwLock :: new ( per_peer_state) ,
7458
7522
7459
7523
pending_events : Mutex :: new ( pending_events_read) ,
7524
+ pending_peers_awaiting_removal : Mutex :: new ( HashSet :: new ( ) ) ,
7460
7525
pending_background_events : Mutex :: new ( pending_background_events_read) ,
7461
7526
total_consistency_lock : RwLock :: new ( ( ) ) ,
7462
7527
persistence_notifier : Notifier :: new ( ) ,
@@ -7924,6 +7989,44 @@ mod tests {
7924
7989
}
7925
7990
}
7926
7991
7992
+ #[ test]
7993
+ fn test_drop_disconnected_peers_when_removing_channels ( ) {
7994
+ let chanmon_cfgs = create_chanmon_cfgs ( 2 ) ;
7995
+ let node_cfgs = create_node_cfgs ( 2 , & chanmon_cfgs) ;
7996
+ let node_chanmgrs = create_node_chanmgrs ( 2 , & node_cfgs, & [ None , None ] ) ;
7997
+ let nodes = create_network ( 2 , & node_cfgs, & node_chanmgrs) ;
7998
+
7999
+ let chan = create_announced_chan_between_nodes ( & nodes, 0 , 1 ) ;
8000
+
8001
+ nodes[ 0 ] . node . peer_disconnected ( & nodes[ 1 ] . node . get_our_node_id ( ) , false ) ;
8002
+ nodes[ 1 ] . node . peer_disconnected ( & nodes[ 0 ] . node . get_our_node_id ( ) , false ) ;
8003
+
8004
+ nodes[ 0 ] . node . force_close_broadcasting_latest_txn ( & chan. 2 , & nodes[ 1 ] . node . get_our_node_id ( ) ) . unwrap ( ) ;
8005
+ check_closed_broadcast ! ( nodes[ 0 ] , true ) ;
8006
+ check_added_monitors ! ( nodes[ 0 ] , 1 ) ;
8007
+ check_closed_event ! ( nodes[ 0 ] , 1 , ClosureReason :: HolderForceClosed ) ;
8008
+
8009
+ {
8010
+ // Assert that nodes[1] is awaiting removal for nodes[0] once nodes[1] has been
8011
+ // disconnected and the channel between has been force closed.
8012
+ let nodes_0_per_peer_state = nodes[ 0 ] . node . per_peer_state . read ( ) . unwrap ( ) ;
8013
+ let nodes_0_pending_peers_awaiting_removal = nodes[ 0 ] . node . pending_peers_awaiting_removal . lock ( ) . unwrap ( ) ;
8014
+ assert_eq ! ( nodes_0_pending_peers_awaiting_removal. len( ) , 1 ) ;
8015
+ assert ! ( nodes_0_pending_peers_awaiting_removal. get( & nodes[ 1 ] . node. get_our_node_id( ) ) . is_some( ) ) ;
8016
+ // Assert that nodes[1] isn't removed before `timer_tick_occurred` has been executed.
8017
+ assert_eq ! ( nodes_0_per_peer_state. len( ) , 1 ) ;
8018
+ assert ! ( nodes_0_per_peer_state. get( & nodes[ 1 ] . node. get_our_node_id( ) ) . is_some( ) ) ;
8019
+ }
8020
+
8021
+ nodes[ 0 ] . node . timer_tick_occurred ( ) ;
8022
+
8023
+ {
8024
+ // Assert that nodes[1] has now been removed.
8025
+ assert_eq ! ( nodes[ 0 ] . node. per_peer_state. read( ) . unwrap( ) . len( ) , 0 ) ;
8026
+ assert_eq ! ( nodes[ 0 ] . node. pending_peers_awaiting_removal. lock( ) . unwrap( ) . len( ) , 0 ) ;
8027
+ }
8028
+ }
8029
+
7927
8030
#[ test]
7928
8031
fn bad_inbound_payment_hash ( ) {
7929
8032
// Add coverage for checking that a user-provided payment hash matches the payment secret.
0 commit comments