@@ -732,77 +732,74 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
732
732
}
733
733
734
734
fn do_read_event ( & self , peer_descriptor : & mut Descriptor , peer : & mut Peer < TransportImpl > , peers_needing_send : & mut HashSet < Descriptor > , node_id_to_descriptor : & mut HashMap < PublicKey , Descriptor > , data : & [ u8 ] ) -> Result < bool , PeerHandleError > {
735
- let pause_read = {
736
- match peer. transport . process_input ( data, & mut peer. pending_outbound_buffer ) {
737
- Err ( e) => {
738
- log_trace ! ( self . logger, "Error while processing input: {}" , e) ;
739
- return Err ( PeerHandleError { no_connection_possible : false } )
740
- } ,
741
- Ok ( newly_connected) => {
742
- if newly_connected {
743
- log_trace ! ( self . logger, "Finished noise handshake for connection with {}" , log_pubkey!( & peer. transport. get_their_node_id( ) ) ) ;
744
- }
745
735
746
- if newly_connected && peer. outbound {
747
- self . queue_init_message ( peer_descriptor, peer, peers_needing_send) ;
748
- }
736
+ match peer. transport . process_input ( data, & mut peer. pending_outbound_buffer ) {
737
+ Err ( e) => {
738
+ log_trace ! ( self . logger, "Error while processing input: {}" , e) ;
739
+ return Err ( PeerHandleError { no_connection_possible : false } )
740
+ } ,
741
+ Ok ( newly_connected) => {
742
+ if newly_connected {
743
+ log_trace ! ( self . logger, "Finished noise handshake for connection with {}" , log_pubkey!( & peer. transport. get_their_node_id( ) ) ) ;
744
+ }
749
745
750
- // If the transport layer placed items in the outbound queue, we need
751
- // to schedule ourselves for flush during the next process_events()
752
- if !peer. pending_outbound_buffer . is_empty ( ) {
753
- peers_needing_send. insert ( peer_descriptor. clone ( ) ) ;
754
- }
746
+ if newly_connected && peer. outbound {
747
+ self . queue_init_message ( peer_descriptor, peer, peers_needing_send) ;
748
+ }
749
+
750
+ // If the transport layer placed items in the outbound queue, we need
751
+ // to schedule ourselves for flush during the next process_events()
752
+ if !peer. pending_outbound_buffer . is_empty ( ) {
753
+ peers_needing_send. insert ( peer_descriptor. clone ( ) ) ;
755
754
}
756
755
}
756
+ }
757
757
758
- let mut received_messages = peer. transport . drain_messages ( & * self . logger ) ?;
758
+ let mut received_messages = peer. transport . drain_messages ( & * self . logger ) ?;
759
759
760
- if peer. transport . is_connected ( ) && peer. post_init_state . is_none ( ) && received_messages. len ( ) > 0 {
761
- let init_message = received_messages. remove ( 0 ) ;
762
- self . process_init_message ( init_message, peer_descriptor, peer, peers_needing_send, node_id_to_descriptor) ?;
763
- }
760
+ if peer. transport . is_connected ( ) && peer. post_init_state . is_none ( ) && received_messages. len ( ) > 0 {
761
+ let init_message = received_messages. remove ( 0 ) ;
762
+ self . process_init_message ( init_message, peer_descriptor, peer, peers_needing_send, node_id_to_descriptor) ?;
763
+ }
764
764
765
- for message in received_messages {
766
- macro_rules! try_potential_handleerror {
767
- ( $thing: expr) => {
768
- match $thing {
769
- Ok ( x) => x,
770
- Err ( e) => {
771
- match e. action {
772
- msgs:: ErrorAction :: DisconnectPeer { msg: _ } => {
773
- //TODO: Try to push msg
774
- log_trace!( self . logger, "Got Err handling message, disconnecting peer because {}" , e. err) ;
775
- return Err ( PeerHandleError { no_connection_possible: false } ) ;
776
- } ,
777
- msgs:: ErrorAction :: IgnoreError => {
778
- log_trace!( self . logger, "Got Err handling message, ignoring because {}" , e. err) ;
779
- continue ;
780
- } ,
781
- msgs:: ErrorAction :: SendErrorMessage { msg } => {
782
- log_trace!( self . logger, "Got Err handling message, sending Error message because {}" , e. err) ;
783
- self . enqueue_message( peers_needing_send, & mut peer. transport, & mut peer. pending_outbound_buffer, peer_descriptor, & msg) ;
784
- continue ;
785
- } ,
786
- }
765
+ for message in received_messages {
766
+ macro_rules! try_potential_handleerror {
767
+ ( $thing: expr) => {
768
+ match $thing {
769
+ Ok ( x) => x,
770
+ Err ( e) => {
771
+ match e. action {
772
+ msgs:: ErrorAction :: DisconnectPeer { msg: _ } => {
773
+ //TODO: Try to push msg
774
+ log_trace!( self . logger, "Got Err handling message, disconnecting peer because {}" , e. err) ;
775
+ return Err ( PeerHandleError { no_connection_possible: false } ) ;
776
+ } ,
777
+ msgs:: ErrorAction :: IgnoreError => {
778
+ log_trace!( self . logger, "Got Err handling message, ignoring because {}" , e. err) ;
779
+ continue ;
780
+ } ,
781
+ msgs:: ErrorAction :: SendErrorMessage { msg } => {
782
+ log_trace!( self . logger, "Got Err handling message, sending Error message because {}" , e. err) ;
783
+ self . enqueue_message( peers_needing_send, & mut peer. transport, & mut peer. pending_outbound_buffer, peer_descriptor, & msg) ;
784
+ continue ;
785
+ } ,
787
786
}
788
- } ;
789
- }
787
+ }
788
+ } ;
790
789
}
790
+ }
791
791
792
- if let Err ( handling_error) = self . handle_message ( message, peer_descriptor, peer, peers_needing_send) {
793
- match handling_error {
794
- MessageHandlingError :: PeerHandleError ( e) => { return Err ( e) } ,
795
- MessageHandlingError :: LightningError ( e) => {
796
- try_potential_handleerror ! ( Err ( e) ) ;
797
- } ,
798
- }
792
+ if let Err ( handling_error) = self . handle_message ( message, peer_descriptor, peer, peers_needing_send) {
793
+ match handling_error {
794
+ MessageHandlingError :: PeerHandleError ( e) => { return Err ( e) } ,
795
+ MessageHandlingError :: LightningError ( e) => {
796
+ try_potential_handleerror ! ( Err ( e) ) ;
797
+ } ,
799
798
}
800
799
}
800
+ }
801
801
802
- peer. pending_outbound_buffer . queue_space ( ) == 0 // pause_read
803
- } ;
804
-
805
- Ok ( pause_read)
802
+ Ok ( peer. pending_outbound_buffer . queue_space ( ) == 0 ) // pause_read
806
803
}
807
804
808
805
/// Process an incoming message and return a decision (ok, lightning error, peer handling error) regarding the next action with the peer
0 commit comments