@@ -7,7 +7,7 @@ use util::byte_utils;
7
7
use util:: events:: { EventsProvider , Event } ;
8
8
use util:: logger:: Logger ;
9
9
10
- use std:: collections:: { HashMap , LinkedList } ;
10
+ use std:: collections:: { HashMap , hash_map , LinkedList } ;
11
11
use std:: sync:: { Arc , Mutex } ;
12
12
use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
13
13
use std:: { cmp, error, mem, hash, fmt} ;
@@ -90,6 +90,18 @@ struct PeerHolder<Descriptor: SocketDescriptor> {
90
90
/// Only add to this set when noise completes:
91
91
node_id_to_descriptor : HashMap < PublicKey , Descriptor > ,
92
92
}
93
+ struct MutPeerHolder < ' a , Descriptor : SocketDescriptor + ' a > {
94
+ peers : & ' a mut HashMap < Descriptor , Peer > ,
95
+ node_id_to_descriptor : & ' a mut HashMap < PublicKey , Descriptor > ,
96
+ }
97
+ impl < Descriptor : SocketDescriptor > PeerHolder < Descriptor > {
98
+ fn borrow_parts ( & mut self ) -> MutPeerHolder < Descriptor > {
99
+ MutPeerHolder {
100
+ peers : & mut self . peers ,
101
+ node_id_to_descriptor : & mut self . node_id_to_descriptor ,
102
+ }
103
+ }
104
+ }
93
105
94
106
pub struct PeerManager < Descriptor : SocketDescriptor > {
95
107
message_handler : MessageHandler ,
@@ -100,7 +112,6 @@ pub struct PeerManager<Descriptor: SocketDescriptor> {
100
112
logger : Arc < Logger > ,
101
113
}
102
114
103
-
104
115
macro_rules! encode_msg {
105
116
( $msg: expr, $msg_code: expr) => {
106
117
{
@@ -267,14 +278,14 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
267
278
268
279
fn do_read_event ( & self , peer_descriptor : & mut Descriptor , data : Vec < u8 > ) -> Result < bool , PeerHandleError > {
269
280
let pause_read = {
270
- let mut peers = self . peers . lock ( ) . unwrap ( ) ;
271
- let ( should_insert_node_id, pause_read) = match peers. peers . get_mut ( peer_descriptor) {
281
+ let mut peers_lock = self . peers . lock ( ) . unwrap ( ) ;
282
+ let peers = peers_lock. borrow_parts ( ) ;
283
+ let pause_read = match peers. peers . get_mut ( peer_descriptor) {
272
284
None => panic ! ( "Descriptor for read_event is not already known to PeerManager" ) ,
273
285
Some ( peer) => {
274
286
assert ! ( peer. pending_read_buffer. len( ) > 0 ) ;
275
287
assert ! ( peer. pending_read_buffer. len( ) > peer. pending_read_buffer_pos) ;
276
288
277
- let mut insert_node_id = None ;
278
289
let mut read_pos = 0 ;
279
290
while read_pos < data. len ( ) {
280
291
{
@@ -353,6 +364,18 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
353
364
}
354
365
}
355
366
367
+ macro_rules! insert_node_id {
368
+ ( ) => {
369
+ match peers. node_id_to_descriptor. entry( peer. their_node_id. unwrap( ) ) {
370
+ hash_map:: Entry :: Occupied ( _) => {
371
+ peer. their_node_id = None ; // Unset so that we don't generate a peer_disconnected event
372
+ return Err ( PeerHandleError { no_connection_possible: false } )
373
+ } ,
374
+ hash_map:: Entry :: Vacant ( entry) => entry. insert( peer_descriptor. clone( ) ) ,
375
+ } ;
376
+ }
377
+ }
378
+
356
379
let next_step = peer. channel_encryptor . get_noise_step ( ) ;
357
380
match next_step {
358
381
NextNoiseStep :: ActOne => {
@@ -366,7 +389,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
366
389
peer. pending_read_buffer = [ 0 ; 18 ] . to_vec ( ) ; // Message length header is 18 bytes
367
390
peer. pending_read_is_header = true ;
368
391
369
- insert_node_id = Some ( peer . their_node_id . unwrap ( ) ) ;
392
+ insert_node_id ! ( ) ;
370
393
let mut local_features = msgs:: LocalFeatures :: new ( ) ;
371
394
if self . initial_syncs_sent . load ( Ordering :: Acquire ) < INITIAL_SYNCS_TO_SEND {
372
395
self . initial_syncs_sent . fetch_add ( 1 , Ordering :: AcqRel ) ;
@@ -382,7 +405,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
382
405
peer. pending_read_buffer = [ 0 ; 18 ] . to_vec ( ) ; // Message length header is 18 bytes
383
406
peer. pending_read_is_header = true ;
384
407
peer. their_node_id = Some ( their_node_id) ;
385
- insert_node_id = Some ( peer . their_node_id . unwrap ( ) ) ;
408
+ insert_node_id ! ( ) ;
386
409
} ,
387
410
NextNoiseStep :: NoiseComplete => {
388
411
if peer. pending_read_is_header {
@@ -607,15 +630,10 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
607
630
608
631
Self :: do_attempt_write_data ( peer_descriptor, peer) ;
609
632
610
- ( insert_node_id /* should_insert_node_id */ , peer. pending_outbound_buffer . len ( ) > 10 ) // pause_read
633
+ peer. pending_outbound_buffer . len ( ) > 10 // pause_read
611
634
}
612
635
} ;
613
636
614
- match should_insert_node_id {
615
- Some ( node_id) => { peers. node_id_to_descriptor . insert ( node_id, peer_descriptor. clone ( ) ) ; } ,
616
- None => { }
617
- } ;
618
-
619
637
pause_read
620
638
} ;
621
639
0 commit comments