Skip to content

Commit 4ade519

Browse files
committed
refactor: Clean up the previously destructured read_event() path
Now that the Init handling has been moved and the dependencies are more clear, deduplicate the parameters of the read_event() and helper functions to make use of Peer. The overall pattern remains the same, read_event() does the locking and passes in the separate items (peer, peers_needing_send, node_id_to_descriptor) to do_read_event(). The handle_message() path is also cleaned up now that post_init_state is guaranteed to be valid at that point.
1 parent dda89e3 commit 4ade519

File tree

1 file changed

+33
-54
lines changed

1 file changed

+33
-54
lines changed

lightning/src/ln/peers/handler.rs

Lines changed: 33 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -632,7 +632,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
632632
None => panic!("Descriptor for read_event is not already known to PeerManager"),
633633
Some(peer) => peer
634634
};
635-
self.do_read_event(peer_descriptor, data, peer.outbound, &mut peer.post_init_state, &mut peer.transport, &mut peer.pending_outbound_buffer, &mut peers.node_id_to_descriptor, &mut peers.peers_needing_send)
635+
self.do_read_event(peer_descriptor, peer, &mut peers.peers_needing_send, &mut peers.node_id_to_descriptor, data)
636636
};
637637

638638
match result {
@@ -685,19 +685,19 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
685685
}
686686

687687
// Add an Init message to the outbound queue
688-
fn queue_init_message(&self, peers_needing_send: &mut HashSet<Descriptor>, message_queuer: &mut impl MessageQueuer, pending_outbound_buffer: &mut impl PayloadQueuer, descriptor: &Descriptor, should_request_full_sync: bool) {
688+
fn queue_init_message(&self, descriptor: &Descriptor, peer: &mut Peer<TransportImpl>, peers_needing_send: &mut HashSet<Descriptor>) {
689689
let mut features = InitFeatures::known();
690-
if !should_request_full_sync {
690+
if !self.message_handler.route_handler.should_request_full_sync(&peer.transport.get_their_node_id()) {
691691
features.clear_initial_routing_sync();
692692
}
693693

694694
let resp = msgs::Init { features };
695-
self.enqueue_message(peers_needing_send, message_queuer, pending_outbound_buffer, descriptor, &resp);
695+
self.enqueue_message(peers_needing_send, &mut peer.transport, &mut peer.pending_outbound_buffer, descriptor, &resp);
696696
}
697697

698698
// Process an incoming Init message and set Peer and PeerManager state accordingly
699-
fn process_init_message(&self, message: Message, descriptor: &Descriptor, peers_needing_send: &mut HashSet<Descriptor>, node_id_to_descriptor: &mut HashMap<PublicKey, Descriptor>, pending_outbound_buffer: &mut impl PayloadQueuer, outbound: bool, transport: &mut impl ITransport, post_init_state: &mut Option<PostInitState>) -> Result<(), PeerHandleError> {
700-
let their_node_id = transport.get_their_node_id();
699+
fn process_init_message(&self, message: Message, descriptor: &Descriptor, peer: &mut Peer<TransportImpl>, peers_needing_send: &mut HashSet<Descriptor>, node_id_to_descriptor: &mut HashMap<PublicKey, Descriptor>) -> Result<(), PeerHandleError> {
700+
let their_node_id = peer.transport.get_their_node_id();
701701

702702
match message {
703703
Message::Init(ref init_message) => {
@@ -713,12 +713,14 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
713713
peers_needing_send.insert(descriptor.clone());
714714
}
715715

716-
if !outbound {
717-
self.queue_init_message(peers_needing_send, transport, pending_outbound_buffer, descriptor, self.message_handler.route_handler.should_request_full_sync(&their_node_id));
716+
if !peer.outbound {
717+
self.queue_init_message(descriptor, peer, peers_needing_send);
718718
}
719719
node_id_to_descriptor.insert(their_node_id.clone(), descriptor.clone());
720720
self.message_handler.chan_handler.peer_connected(&their_node_id, init_message);
721-
*post_init_state = Some(new_post_init_state);
721+
722+
assert!(peer.post_init_state.is_none());
723+
peer.post_init_state = Some(new_post_init_state);
722724
}
723725
_ => {
724726
log_trace!(self.logger, "Peer {} sent non-Init first message", log_pubkey!(&their_node_id));
@@ -729,43 +731,35 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
729731
Ok(())
730732
}
731733

732-
fn do_read_event(&self,
733-
peer_descriptor: &mut Descriptor,
734-
data: &[u8],
735-
outbound: bool,
736-
post_init_state: &mut Option<PostInitState>,
737-
transport: &mut impl ITransport,
738-
pending_outbound_buffer: &mut impl PayloadQueuer,
739-
node_id_to_descriptor: &mut HashMap<PublicKey, Descriptor>,
740-
peers_needing_send: &mut HashSet<Descriptor>) -> Result<bool, PeerHandleError> {
734+
fn do_read_event(&self, peer_descriptor: &mut Descriptor, peer: &mut Peer<TransportImpl>, peers_needing_send: &mut HashSet<Descriptor>, node_id_to_descriptor: &mut HashMap<PublicKey, Descriptor>, data: &[u8]) -> Result<bool, PeerHandleError> {
741735
let pause_read = {
742-
match transport.process_input(data, pending_outbound_buffer) {
736+
match peer.transport.process_input(data, &mut peer.pending_outbound_buffer) {
743737
Err(e) => {
744738
log_trace!(self.logger, "Error while processing input: {}", e);
745739
return Err(PeerHandleError { no_connection_possible: false })
746740
},
747741
Ok(newly_connected) => {
748742
if newly_connected {
749-
log_trace!(self.logger, "Finished noise handshake for connection with {}", log_pubkey!(&transport.get_their_node_id()));
743+
log_trace!(self.logger, "Finished noise handshake for connection with {}", log_pubkey!(&peer.transport.get_their_node_id()));
750744
}
751745

752-
if newly_connected && outbound {
753-
self.queue_init_message(peers_needing_send, transport, pending_outbound_buffer, peer_descriptor, self.message_handler.route_handler.should_request_full_sync(&transport.get_their_node_id()));
746+
if newly_connected && peer.outbound {
747+
self.queue_init_message(peer_descriptor, peer, peers_needing_send);
754748
}
755749

756750
// If the transport layer placed items in the outbound queue, we need
757751
// to schedule ourselves for flush during the next process_events()
758-
if !pending_outbound_buffer.is_empty() {
752+
if !peer.pending_outbound_buffer.is_empty() {
759753
peers_needing_send.insert(peer_descriptor.clone());
760754
}
761755
}
762756
}
763757

764-
let mut received_messages = transport.drain_messages(&*self.logger)?;
758+
let mut received_messages = peer.transport.drain_messages(&*self.logger)?;
765759

766-
if transport.is_connected() && post_init_state.is_none() && received_messages.len() > 0 {
760+
if peer.transport.is_connected() && peer.post_init_state.is_none() && received_messages.len() > 0 {
767761
let init_message = received_messages.remove(0);
768-
self.process_init_message(init_message, peer_descriptor, peers_needing_send, node_id_to_descriptor, pending_outbound_buffer, outbound, transport, post_init_state)?;
762+
self.process_init_message(init_message, peer_descriptor, peer, peers_needing_send, node_id_to_descriptor)?;
769763
}
770764

771765
for message in received_messages {
@@ -786,7 +780,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
786780
},
787781
msgs::ErrorAction::SendErrorMessage { msg } => {
788782
log_trace!(self.logger, "Got Err handling message, sending Error message because {}", e.err);
789-
self.enqueue_message(peers_needing_send, transport, pending_outbound_buffer, peer_descriptor, &msg);
783+
self.enqueue_message(peers_needing_send, &mut peer.transport, &mut peer.pending_outbound_buffer, peer_descriptor, &msg);
790784
continue;
791785
},
792786
}
@@ -795,7 +789,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
795789
}
796790
}
797791

798-
if let Err(handling_error) = self.handle_message(peers_needing_send, peer_descriptor, message, post_init_state, transport.get_their_node_id(), transport, pending_outbound_buffer) {
792+
if let Err(handling_error) = self.handle_message(message, peer_descriptor, peer, peers_needing_send) {
799793
match handling_error {
800794
MessageHandlingError::PeerHandleError(e) => { return Err(e) },
801795
MessageHandlingError::LightningError(e) => {
@@ -805,22 +799,21 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
805799
}
806800
}
807801

808-
pending_outbound_buffer.queue_space() == 0 // pause_read
802+
peer.pending_outbound_buffer.queue_space() == 0 // pause_read
809803
};
810804

811805
Ok(pause_read)
812806
}
813807

814808
/// Process an incoming message and return a decision (ok, lightning error, peer handling error) regarding the next action with the peer
815809
fn handle_message(&self,
816-
peers_needing_send: &mut HashSet<Descriptor>,
817-
peer_descriptor: &mut Descriptor,
818810
message: wire::Message,
819-
post_init_state: &mut Option<PostInitState>,
820-
their_node_id: PublicKey,
821-
message_queuer: &mut impl MessageQueuer,
822-
pending_outbound_buffer: &mut impl PayloadQueuer
823-
) -> Result<(), MessageHandlingError> {
811+
peer_descriptor: &mut Descriptor,
812+
peer: &mut Peer<TransportImpl>,
813+
peers_needing_send: &mut HashSet<Descriptor>) -> Result<(), MessageHandlingError> {
814+
815+
let their_node_id = peer.transport.get_their_node_id();
816+
let post_init_state = peer.post_init_state.as_mut().unwrap();
824817
log_trace!(self.logger, "Received message of type {} from {}", message.type_id(), log_pubkey!(&their_node_id));
825818

826819
match message {
@@ -852,33 +845,19 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
852845
wire::Message::Ping(msg) => {
853846
if msg.ponglen < 65532 {
854847
let resp = msgs::Pong { byteslen: msg.ponglen };
855-
self.enqueue_message(peers_needing_send, message_queuer, pending_outbound_buffer, &peer_descriptor, &resp);
848+
self.enqueue_message(peers_needing_send, &mut peer.transport, &mut peer.pending_outbound_buffer, &peer_descriptor, &resp);
856849
}
857850
},
858851
wire::Message::Pong(_msg) => {
859-
if let Some(ref mut state) = post_init_state {
860-
state.awaiting_pong = false;
861-
} else {
862-
panic!("Received Pong before Init and didn't catch it earlier!");
863-
}
852+
post_init_state.awaiting_pong = false;
864853
},
865854

866855
// Channel messages:
867856
wire::Message::OpenChannel(msg) => {
868-
match post_init_state {
869-
None => panic!("Received OpenChannel before Init and didn't catch it earlier!"),
870-
Some(state) => {
871-
self.message_handler.chan_handler.handle_open_channel(&their_node_id, state.their_features.clone(), &msg);
872-
}
873-
}
857+
self.message_handler.chan_handler.handle_open_channel(&their_node_id, post_init_state.their_features.clone(), &msg);
874858
},
875859
wire::Message::AcceptChannel(msg) => {
876-
match post_init_state {
877-
None => panic!("Received AcceptChannel before Init and didn't catch it earlier!"),
878-
Some(state) => {
879-
self.message_handler.chan_handler.handle_accept_channel(&their_node_id, state.their_features.clone(), &msg);
880-
}
881-
}
860+
self.message_handler.chan_handler.handle_accept_channel(&their_node_id, post_init_state.their_features.clone(), &msg);
882861
},
883862

884863
wire::Message::FundingCreated(msg) => {

0 commit comments

Comments
 (0)