@@ -1588,15 +1588,37 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
1588
1588
}
1589
1589
1590
1590
/// Process an incoming message and return a decision (ok, lightning error, peer handling error) regarding the next action with the peer
1591
+ ///
1591
1592
/// Returns the message back if it needs to be broadcasted to all other peers.
1592
1593
fn handle_message (
1593
1594
& self ,
1594
1595
peer_mutex : & Mutex < Peer > ,
1595
- mut peer_lock : MutexGuard < Peer > ,
1596
- message : wire:: Message < <<CMH as core :: ops :: Deref >:: Target as wire:: CustomMessageReader >:: CustomMessage >
1597
- ) -> Result < Option < wire:: Message < <<CMH as core :: ops :: Deref >:: Target as wire:: CustomMessageReader >:: CustomMessage > > , MessageHandlingError > {
1596
+ peer_lock : MutexGuard < Peer > ,
1597
+ message : wire:: Message < <<CMH as Deref >:: Target as wire:: CustomMessageReader >:: CustomMessage >
1598
+ ) -> Result < Option < wire:: Message < <<CMH as Deref >:: Target as wire:: CustomMessageReader >:: CustomMessage > > , MessageHandlingError > {
1598
1599
let their_node_id = peer_lock. their_node_id . clone ( ) . expect ( "We know the peer's public key by the time we receive messages" ) . 0 ;
1599
1600
let logger = WithContext :: from ( & self . logger , Some ( their_node_id) , None ) ;
1601
+
1602
+ let message = match self . do_handle_message_holding_peer_lock ( peer_lock, message, & their_node_id, & logger) ? {
1603
+ Some ( processed_message) => processed_message,
1604
+ None => return Ok ( None ) ,
1605
+ } ;
1606
+
1607
+ self . do_handle_message_without_peer_lock ( peer_mutex, message, & their_node_id, & logger)
1608
+ }
1609
+
1610
+ // Conducts all message processing that requires us to hold the `peer_lock`.
1611
+ //
1612
+ // Returns `None` if the message was fully processed and otherwise returns the message back to
1613
+ // allow it to be subsequently processed by `do_handle_message_without_peer_lock`.
1614
+ fn do_handle_message_holding_peer_lock < ' a > (
1615
+ & self ,
1616
+ mut peer_lock : MutexGuard < Peer > ,
1617
+ message : wire:: Message < <<CMH as Deref >:: Target as wire:: CustomMessageReader >:: CustomMessage > ,
1618
+ their_node_id : & PublicKey ,
1619
+ logger : & WithContext < ' a , L >
1620
+ ) -> Result < Option < wire:: Message < <<CMH as Deref >:: Target as wire:: CustomMessageReader >:: CustomMessage > > , MessageHandlingError >
1621
+ {
1600
1622
peer_lock. received_message_since_timer_tick = true ;
1601
1623
1602
1624
// Need an Init as first message
@@ -1677,8 +1699,20 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
1677
1699
peer_lock. received_channel_announce_since_backlogged = true ;
1678
1700
}
1679
1701
1680
- mem:: drop ( peer_lock) ;
1702
+ Ok ( Some ( message) )
1703
+ }
1681
1704
1705
+ // Conducts all message processing that doesn't require us to hold the `peer_lock`.
1706
+ //
1707
+ // Returns the message back if it needs to be broadcasted to all other peers.
1708
+ fn do_handle_message_without_peer_lock < ' a > (
1709
+ & self ,
1710
+ peer_mutex : & Mutex < Peer > ,
1711
+ message : wire:: Message < <<CMH as Deref >:: Target as wire:: CustomMessageReader >:: CustomMessage > ,
1712
+ their_node_id : & PublicKey ,
1713
+ logger : & WithContext < ' a , L >
1714
+ ) -> Result < Option < wire:: Message < <<CMH as Deref >:: Target as wire:: CustomMessageReader >:: CustomMessage > > , MessageHandlingError >
1715
+ {
1682
1716
if is_gossip_msg ( message. type_id ( ) ) {
1683
1717
log_gossip ! ( logger, "Received message {:?} from {}" , message, log_pubkey!( their_node_id) ) ;
1684
1718
} else {
@@ -1880,7 +1914,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
1880
1914
Ok ( should_forward)
1881
1915
}
1882
1916
1883
- fn forward_broadcast_msg ( & self , peers : & HashMap < Descriptor , Mutex < Peer > > , msg : & wire:: Message < <<CMH as core :: ops :: Deref >:: Target as wire:: CustomMessageReader >:: CustomMessage > , except_node : Option < & PublicKey > ) {
1917
+ fn forward_broadcast_msg ( & self , peers : & HashMap < Descriptor , Mutex < Peer > > , msg : & wire:: Message < <<CMH as Deref >:: Target as wire:: CustomMessageReader >:: CustomMessage > , except_node : Option < & PublicKey > ) {
1884
1918
match msg {
1885
1919
wire:: Message :: ChannelAnnouncement ( ref msg) => {
1886
1920
log_gossip ! ( self . logger, "Sending message to all peers except {:?} or the announced channel's counterparties: {:?}" , except_node, msg) ;
@@ -2272,7 +2306,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
2272
2306
// We do not have the peers write lock, so we just store that we're
2273
2307
// about to disconnect the peer and do it after we finish
2274
2308
// processing most messages.
2275
- let msg = msg. map ( |msg| wire:: Message :: < <<CMH as core :: ops :: Deref >:: Target as wire:: CustomMessageReader >:: CustomMessage > :: Error ( msg) ) ;
2309
+ let msg = msg. map ( |msg| wire:: Message :: < <<CMH as Deref >:: Target as wire:: CustomMessageReader >:: CustomMessage > :: Error ( msg) ) ;
2276
2310
peers_to_disconnect. insert ( node_id, msg) ;
2277
2311
} ,
2278
2312
msgs:: ErrorAction :: DisconnectPeerWithWarning { msg } => {
0 commit comments