@@ -293,6 +293,12 @@ impl<ChanSigner: ChannelKeys> ChannelHolder<ChanSigner> {
293
293
}
294
294
}
295
295
296
+ /// State we hold per-peer. In the future we should put channels in here, but for now we only hold
297
+ /// the latest Init features we heard from the peer.
298
+ struct PeerState {
299
+ latest_features : InitFeatures ,
300
+ }
301
+
296
302
#[ cfg( not( any( target_pointer_width = "32" , target_pointer_width = "64" ) ) ) ]
297
303
const ERR : ( ) = "You need at least 32 bit pointers (well, usize, but we'll assume they're the same) for ChannelManager::latest_block_height" ;
298
304
@@ -346,6 +352,14 @@ pub struct ChannelManager<ChanSigner: ChannelKeys> {
346
352
channel_state : Mutex < ChannelHolder < ChanSigner > > ,
347
353
our_network_key : SecretKey ,
348
354
355
+ /// Per-peer state storage.
356
+ /// Because adding or removing an entry is rare, we usually take an outer read lock and then
357
+ /// operate on the inner value freely. Sadly, this prevents parallel operation when opening a
358
+ /// new channel.
359
+ /// If we are connected to a peer we always at least have an entry here, even if no channels
360
+ /// are currently open with that peer.
361
+ per_peer_state : RwLock < HashMap < PublicKey , Mutex < PeerState > > > ,
362
+
349
363
pending_events : Mutex < Vec < events:: Event > > ,
350
364
/// Used when we have to take a BIG lock to make sure everything is self-consistent.
351
365
/// Essentially just when we're serializing ourselves out.
@@ -628,6 +642,8 @@ impl<ChanSigner: ChannelKeys> ChannelManager<ChanSigner> {
628
642
} ) ,
629
643
our_network_key : keys_manager. get_node_secret ( ) ,
630
644
645
+ per_peer_state : RwLock :: new ( HashMap :: new ( ) ) ,
646
+
631
647
pending_events : Mutex :: new ( Vec :: new ( ) ) ,
632
648
total_consistency_lock : RwLock :: new ( ( ) ) ,
633
649
@@ -2798,6 +2814,7 @@ impl<ChanSigner: ChannelKeys> ChannelMessageHandler for ChannelManager<ChanSigne
2798
2814
let _ = self . total_consistency_lock . read ( ) . unwrap ( ) ;
2799
2815
let mut failed_channels = Vec :: new ( ) ;
2800
2816
let mut failed_payments = Vec :: new ( ) ;
2817
+ let mut no_channels_remain = true ;
2801
2818
{
2802
2819
let mut channel_state_lock = self . channel_state . lock ( ) . unwrap ( ) ;
2803
2820
let channel_state = channel_state_lock. borrow_parts ( ) ;
@@ -2836,6 +2853,8 @@ impl<ChanSigner: ChannelKeys> ChannelMessageHandler for ChannelManager<ChanSigne
2836
2853
short_to_id. remove ( & short_id) ;
2837
2854
}
2838
2855
return false ;
2856
+ } else {
2857
+ no_channels_remain = false ;
2839
2858
}
2840
2859
}
2841
2860
true
@@ -2861,6 +2880,10 @@ impl<ChanSigner: ChannelKeys> ChannelMessageHandler for ChannelManager<ChanSigne
2861
2880
}
2862
2881
} ) ;
2863
2882
}
2883
+ if no_channels_remain {
2884
+ self . per_peer_state . write ( ) . unwrap ( ) . remove ( their_node_id) ;
2885
+ }
2886
+
2864
2887
for failure in failed_channels. drain ( ..) {
2865
2888
self . finish_force_close_channel ( failure) ;
2866
2889
}
@@ -2871,10 +2894,25 @@ impl<ChanSigner: ChannelKeys> ChannelMessageHandler for ChannelManager<ChanSigne
2871
2894
}
2872
2895
}
2873
2896
2874
- fn peer_connected ( & self , their_node_id : & PublicKey , _init_msg : & msgs:: Init ) {
2897
+ fn peer_connected ( & self , their_node_id : & PublicKey , init_msg : & msgs:: Init ) {
2875
2898
log_debug ! ( self , "Generating channel_reestablish events for {}" , log_pubkey!( their_node_id) ) ;
2876
2899
2877
2900
let _ = self . total_consistency_lock . read ( ) . unwrap ( ) ;
2901
+
2902
+ {
2903
+ let mut peer_state_lock = self . per_peer_state . write ( ) . unwrap ( ) ;
2904
+ match peer_state_lock. entry ( their_node_id. clone ( ) ) {
2905
+ hash_map:: Entry :: Vacant ( e) => {
2906
+ e. insert ( Mutex :: new ( PeerState {
2907
+ latest_features : init_msg. features . clone ( ) ,
2908
+ } ) ) ;
2909
+ } ,
2910
+ hash_map:: Entry :: Occupied ( e) => {
2911
+ e. get ( ) . lock ( ) . unwrap ( ) . latest_features = init_msg. features . clone ( ) ;
2912
+ } ,
2913
+ }
2914
+ }
2915
+
2878
2916
let mut channel_state_lock = self . channel_state . lock ( ) . unwrap ( ) ;
2879
2917
let channel_state = channel_state_lock. borrow_parts ( ) ;
2880
2918
let pending_msg_events = channel_state. pending_msg_events ;
@@ -3141,6 +3179,14 @@ impl<ChanSigner: ChannelKeys + Writeable> Writeable for ChannelManager<ChanSigne
3141
3179
}
3142
3180
}
3143
3181
3182
+ let per_peer_state = self . per_peer_state . write ( ) . unwrap ( ) ;
3183
+ ( per_peer_state. len ( ) as u64 ) . write ( writer) ?;
3184
+ for ( peer_pubkey, peer_state_mutex) in per_peer_state. iter ( ) {
3185
+ peer_pubkey. write ( writer) ?;
3186
+ let peer_state = peer_state_mutex. lock ( ) . unwrap ( ) ;
3187
+ peer_state. latest_features . write ( writer) ?;
3188
+ }
3189
+
3144
3190
Ok ( ( ) )
3145
3191
}
3146
3192
}
@@ -3274,6 +3320,16 @@ impl<'a, R : ::std::io::Read, ChanSigner: ChannelKeys + Readable<R>> ReadableArg
3274
3320
claimable_htlcs. insert ( payment_hash, previous_hops) ;
3275
3321
}
3276
3322
3323
+ let peer_count: u64 = Readable :: read ( reader) ?;
3324
+ let mut per_peer_state = HashMap :: with_capacity ( cmp:: min ( peer_count as usize , 128 ) ) ;
3325
+ for _ in 0 ..peer_count {
3326
+ let peer_pubkey = Readable :: read ( reader) ?;
3327
+ let peer_state = PeerState {
3328
+ latest_features : Readable :: read ( reader) ?,
3329
+ } ;
3330
+ per_peer_state. insert ( peer_pubkey, Mutex :: new ( peer_state) ) ;
3331
+ }
3332
+
3277
3333
let channel_manager = ChannelManager {
3278
3334
genesis_hash,
3279
3335
fee_estimator : args. fee_estimator ,
@@ -3293,6 +3349,8 @@ impl<'a, R : ::std::io::Read, ChanSigner: ChannelKeys + Readable<R>> ReadableArg
3293
3349
} ) ,
3294
3350
our_network_key : args. keys_manager . get_node_secret ( ) ,
3295
3351
3352
+ per_peer_state : RwLock :: new ( per_peer_state) ,
3353
+
3296
3354
pending_events : Mutex :: new ( Vec :: new ( ) ) ,
3297
3355
total_consistency_lock : RwLock :: new ( ( ) ) ,
3298
3356
keys_manager : args. keys_manager ,
0 commit comments