Skip to content

Commit 6d05572

Browse files
committed
refactor: Clean up event handling path w.r.t connected state
Many of the is_connected() checks were duplicative due to the macro continuing out of the match statement if the peer referenced by the node_id had not seen an Init message yet. Remove the macro in favor of a function on PeerHolder that will return an Option<> and use it instead. Split out a new helper function Peer::is_initialized() that will return true if the peer has seen an Init message.
1 parent 41d1439 commit 6d05572

File tree

1 file changed

+90
-100
lines changed

1 file changed

+90
-100
lines changed

lightning/src/ln/peers/handler.rs

Lines changed: 90 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,13 @@ struct Peer<TransportImpl: ITransport> {
181181
}
182182

183183
impl<TransportImpl: ITransport> Peer<TransportImpl> {
184+
185+
/// Returns true if an INIT message has been received from this peer. Implies that this node
186+
/// can send and receive encrypted messages.
187+
fn is_initialized(&self) -> bool {
188+
self.their_features.is_some()
189+
}
190+
184191
/// Returns true if the channel announcements/updates for the given channel should be
185192
/// forwarded to this peer.
186193
/// If we are sending our routing table to this peer and we have not yet sent channel
@@ -214,6 +221,30 @@ struct PeerHolder<Descriptor: SocketDescriptor, TransportImpl: ITransport> {
214221
node_id_to_descriptor: HashMap<PublicKey, Descriptor>,
215222
}
216223

224+
impl<Descriptor: SocketDescriptor, TransportImpl: ITransport> PeerHolder<Descriptor, TransportImpl> {
225+
fn initialized_peer_by_node_id(&mut self, node_id: &PublicKey) -> Option<(Descriptor, &mut Peer<TransportImpl>)> {
226+
match self.node_id_to_descriptor.get_mut(node_id) {
227+
None => None,
228+
Some(descriptor) => {
229+
assert!(self.peers.contains_key(descriptor), "Invalid PeerHolder state");
230+
231+
match self.peers.get_mut(&descriptor) {
232+
None => panic!("Invalid PeerHolder state!"),
233+
Some(peer) => {
234+
235+
// their_features is set after receiving an Init message
236+
if !peer.is_initialized() {
237+
None
238+
} else {
239+
Some((descriptor.clone(), peer))
240+
}
241+
}
242+
}
243+
}
244+
}
245+
}
246+
}
247+
217248
#[cfg(not(any(target_pointer_width = "32", target_pointer_width = "64")))]
218249
fn _check_usize_is_32_or_64() {
219250
// See below, less than 32 bit pointers may be unsafe here!
@@ -425,7 +456,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
425456
fn get_peer_node_ids(&self) -> Vec<PublicKey> {
426457
let peers = self.peers.lock().unwrap();
427458
peers.peers.values().filter_map(|p| {
428-
if !p.transport.is_connected() || p.their_features.is_none() {
459+
if !p.is_initialized() {
429460
return None;
430461
}
431462
Some(p.transport.get_their_node_id())
@@ -904,105 +935,76 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
904935
let mut peers_lock = self.peers.lock().unwrap();
905936
let peers = &mut *peers_lock;
906937
for event in events_generated.drain(..) {
907-
macro_rules! get_peer_for_forwarding {
908-
($node_id: expr, $handle_no_such_peer: block) => {
909-
{
910-
let descriptor = match peers.node_id_to_descriptor.get($node_id) {
911-
Some(descriptor) => descriptor.clone(),
912-
None => {
913-
$handle_no_such_peer;
914-
continue;
915-
},
916-
};
917-
match peers.peers.get_mut(&descriptor) {
918-
Some(peer) => {
919-
if peer.their_features.is_none() {
920-
$handle_no_such_peer;
921-
continue;
922-
}
923-
(descriptor, peer)
924-
},
925-
None => panic!("Inconsistent peers set state!"),
926-
}
927-
}
928-
}
929-
}
930938
match event {
931939
MessageSendEvent::SendAcceptChannel { ref node_id, ref msg } => {
932940
log_trace!(self.logger, "Handling SendAcceptChannel event in peer_handler for node {} for channel {}",
933941
log_pubkey!(node_id),
934942
log_bytes!(msg.temporary_channel_id));
935-
let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
936-
//TODO: Drop the pending channel? (or just let it timeout, but that sucks)
937-
});
938-
if peer.transport.is_connected() {
943+
if let Some((mut descriptor, peer)) = peers.initialized_peer_by_node_id(node_id) {
939944
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
945+
self.do_attempt_write_data(&mut descriptor, &mut peer.sync_status, &mut peer.transport, &mut peer.pending_outbound_buffer);
946+
} else {
947+
//TODO: Drop the pending channel? (or just let it timeout, but that sucks)
940948
}
941-
self.do_attempt_write_data(&mut descriptor, &mut peer.sync_status, &mut peer.transport, &mut peer.pending_outbound_buffer);
942949
},
943950
MessageSendEvent::SendOpenChannel { ref node_id, ref msg } => {
944951
log_trace!(self.logger, "Handling SendOpenChannel event in peer_handler for node {} for channel {}",
945952
log_pubkey!(node_id),
946953
log_bytes!(msg.temporary_channel_id));
947-
let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
948-
//TODO: Drop the pending channel? (or just let it timeout, but that sucks)
949-
});
950-
if peer.transport.is_connected() {
954+
if let Some((mut descriptor, peer)) = peers.initialized_peer_by_node_id(node_id) {
951955
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
956+
self.do_attempt_write_data(&mut descriptor, &mut peer.sync_status, &mut peer.transport, &mut peer.pending_outbound_buffer);
957+
} else {
958+
//TODO: Drop the pending channel? (or just let it timeout, but that sucks)
952959
}
953-
self.do_attempt_write_data(&mut descriptor, &mut peer.sync_status, &mut peer.transport, &mut peer.pending_outbound_buffer);
954960
},
955961
MessageSendEvent::SendFundingCreated { ref node_id, ref msg } => {
956962
log_trace!(self.logger, "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})",
957963
log_pubkey!(node_id),
958964
log_bytes!(msg.temporary_channel_id),
959965
log_funding_channel_id!(msg.funding_txid, msg.funding_output_index));
960-
let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
961-
//TODO: generate a DiscardFunding event indicating to the wallet that
962-
//they should just throw away this funding transaction
963-
});
964-
if peer.transport.is_connected() {
966+
if let Some((mut descriptor, peer)) = peers.initialized_peer_by_node_id(node_id) {
965967
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
968+
self.do_attempt_write_data(&mut descriptor, &mut peer.sync_status, &mut peer.transport, &mut peer.pending_outbound_buffer);
969+
} else {
970+
//TODO: generate a DiscardFunding event indicating to the wallet that
971+
//they should just throw away this funding transaction
966972
}
967-
self.do_attempt_write_data(&mut descriptor, &mut peer.sync_status, &mut peer.transport, &mut peer.pending_outbound_buffer);
968973
},
969974
MessageSendEvent::SendFundingSigned { ref node_id, ref msg } => {
970975
log_trace!(self.logger, "Handling SendFundingSigned event in peer_handler for node {} for channel {}",
971976
log_pubkey!(node_id),
972977
log_bytes!(msg.channel_id));
973-
let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
974-
//TODO: generate a DiscardFunding event indicating to the wallet that
975-
//they should just throw away this funding transaction
976-
});
977-
if peer.transport.is_connected() {
978+
if let Some((mut descriptor, peer)) = peers.initialized_peer_by_node_id(node_id) {
978979
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
980+
self.do_attempt_write_data(&mut descriptor, &mut peer.sync_status, &mut peer.transport, &mut peer.pending_outbound_buffer);
981+
} else {
982+
//TODO: generate a DiscardFunding event indicating to the wallet that
983+
//they should just throw away this funding transaction
979984
}
980-
self.do_attempt_write_data(&mut descriptor, &mut peer.sync_status, &mut peer.transport, &mut peer.pending_outbound_buffer);
981985
},
982986
MessageSendEvent::SendFundingLocked { ref node_id, ref msg } => {
983987
log_trace!(self.logger, "Handling SendFundingLocked event in peer_handler for node {} for channel {}",
984988
log_pubkey!(node_id),
985989
log_bytes!(msg.channel_id));
986-
let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
987-
//TODO: Do whatever we're gonna do for handling dropped messages
988-
});
989-
if peer.transport.is_connected() {
990+
if let Some((mut descriptor, peer)) = peers.initialized_peer_by_node_id(node_id) {
990991
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
992+
self.do_attempt_write_data(&mut descriptor, &mut peer.sync_status, &mut peer.transport, &mut peer.pending_outbound_buffer);
993+
} else {
994+
//TODO: Do whatever we're gonna do for handling dropped messages
991995
}
992-
self.do_attempt_write_data(&mut descriptor, &mut peer.sync_status, &mut peer.transport, &mut peer.pending_outbound_buffer);
993996
},
994997
MessageSendEvent::SendAnnouncementSignatures { ref node_id, ref msg } => {
995998
log_trace!(self.logger, "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})",
996999
log_pubkey!(node_id),
9971000
log_bytes!(msg.channel_id));
998-
let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
999-
//TODO: generate a DiscardFunding event indicating to the wallet that
1000-
//they should just throw away this funding transaction
1001-
});
1002-
if peer.transport.is_connected() {
1001+
if let Some((mut descriptor, peer)) = peers.initialized_peer_by_node_id(node_id) {
10031002
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
1003+
self.do_attempt_write_data(&mut descriptor, &mut peer.sync_status, &mut peer.transport, &mut peer.pending_outbound_buffer);
1004+
} else {
1005+
//TODO: generate a DiscardFunding event indicating to the wallet that
1006+
//they should just throw away this funding transaction
10041007
}
1005-
self.do_attempt_write_data(&mut descriptor, &mut peer.sync_status, &mut peer.transport, &mut peer.pending_outbound_buffer);
10061008
},
10071009
MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
10081010
log_trace!(self.logger, "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}",
@@ -1011,10 +1013,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
10111013
update_fulfill_htlcs.len(),
10121014
update_fail_htlcs.len(),
10131015
log_bytes!(commitment_signed.channel_id));
1014-
let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
1015-
//TODO: Do whatever we're gonna do for handling dropped messages
1016-
});
1017-
if peer.transport.is_connected() {
1016+
if let Some((mut descriptor, peer)) = peers.initialized_peer_by_node_id(node_id) {
10181017
for msg in update_add_htlcs {
10191018
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
10201019
}
@@ -1031,62 +1030,60 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
10311030
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
10321031
}
10331032
peer.transport.enqueue_message(commitment_signed, &mut peer.pending_outbound_buffer, &*self.logger);
1033+
self.do_attempt_write_data(&mut descriptor, &mut peer.sync_status, &mut peer.transport, &mut peer.pending_outbound_buffer);
1034+
} else {
1035+
//TODO: Do whatever we're gonna do for handling dropped messages
10341036
}
1035-
self.do_attempt_write_data(&mut descriptor, &mut peer.sync_status, &mut peer.transport, &mut peer.pending_outbound_buffer);
10361037
},
10371038
MessageSendEvent::SendRevokeAndACK { ref node_id, ref msg } => {
10381039
log_trace!(self.logger, "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}",
10391040
log_pubkey!(node_id),
10401041
log_bytes!(msg.channel_id));
1041-
let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
1042-
//TODO: Do whatever we're gonna do for handling dropped messages
1043-
});
1044-
if peer.transport.is_connected() {
1042+
if let Some((mut descriptor, peer)) = peers.initialized_peer_by_node_id(node_id) {
10451043
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
1044+
self.do_attempt_write_data(&mut descriptor, &mut peer.sync_status, &mut peer.transport, &mut peer.pending_outbound_buffer);
1045+
} else {
1046+
//TODO: Do whatever we're gonna do for handling dropped messages
10461047
}
1047-
self.do_attempt_write_data(&mut descriptor, &mut peer.sync_status, &mut peer.transport, &mut peer.pending_outbound_buffer);
10481048
},
10491049
MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => {
10501050
log_trace!(self.logger, "Handling SendClosingSigned event in peer_handler for node {} for channel {}",
10511051
log_pubkey!(node_id),
10521052
log_bytes!(msg.channel_id));
1053-
let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
1054-
//TODO: Do whatever we're gonna do for handling dropped messages
1055-
});
1056-
if peer.transport.is_connected() {
1053+
if let Some((mut descriptor, peer)) = peers.initialized_peer_by_node_id(node_id) {
10571054
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
1055+
self.do_attempt_write_data(&mut descriptor, &mut peer.sync_status, &mut peer.transport, &mut peer.pending_outbound_buffer);
1056+
} else {
1057+
//TODO: Do whatever we're gonna do for handling dropped messages
10581058
}
1059-
self.do_attempt_write_data(&mut descriptor, &mut peer.sync_status, &mut peer.transport, &mut peer.pending_outbound_buffer);
10601059
},
10611060
MessageSendEvent::SendShutdown { ref node_id, ref msg } => {
10621061
log_trace!(self.logger, "Handling Shutdown event in peer_handler for node {} for channel {}",
10631062
log_pubkey!(node_id),
10641063
log_bytes!(msg.channel_id));
1065-
let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
1066-
//TODO: Do whatever we're gonna do for handling dropped messages
1067-
});
1068-
if peer.transport.is_connected() {
1064+
if let Some((mut descriptor, peer)) = peers.initialized_peer_by_node_id(node_id) {
10691065
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
1066+
self.do_attempt_write_data(&mut descriptor, &mut peer.sync_status, &mut peer.transport, &mut peer.pending_outbound_buffer);
1067+
} else {
1068+
//TODO: Do whatever we're gonna do for handling dropped messages
10701069
}
1071-
self.do_attempt_write_data(&mut descriptor, &mut peer.sync_status, &mut peer.transport, &mut peer.pending_outbound_buffer);
10721070
},
10731071
MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => {
10741072
log_trace!(self.logger, "Handling SendChannelReestablish event in peer_handler for node {} for channel {}",
10751073
log_pubkey!(node_id),
10761074
log_bytes!(msg.channel_id));
1077-
let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
1078-
//TODO: Do whatever we're gonna do for handling dropped messages
1079-
});
1080-
if peer.transport.is_connected() {
1075+
if let Some((mut descriptor, peer)) = peers.initialized_peer_by_node_id(node_id) {
10811076
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
1077+
self.do_attempt_write_data(&mut descriptor, &mut peer.sync_status, &mut peer.transport, &mut peer.pending_outbound_buffer);
1078+
} else {
1079+
//TODO: Do whatever we're gonna do for handling dropped messages
10821080
}
1083-
self.do_attempt_write_data(&mut descriptor, &mut peer.sync_status, &mut peer.transport, &mut peer.pending_outbound_buffer);
10841081
},
10851082
MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => {
10861083
log_trace!(self.logger, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id);
10871084
if self.message_handler.route_handler.handle_channel_announcement(msg).is_ok() && self.message_handler.route_handler.handle_channel_update(update_msg).is_ok() {
10881085
for (ref descriptor, ref mut peer) in peers.peers.iter_mut() {
1089-
if !peer.transport.is_connected() || peer.their_features.is_none() ||
1086+
if !peer.is_initialized() ||
10901087
!peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
10911088
continue
10921089
}
@@ -1095,10 +1092,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
10951092
if their_node_id == msg.contents.node_id_1 || their_node_id == msg.contents.node_id_2 {
10961093
continue
10971094
}
1098-
if peer.transport.is_connected() {
1099-
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
1100-
peer.transport.enqueue_message(update_msg, &mut peer.pending_outbound_buffer, &*self.logger);
1101-
}
1095+
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
1096+
peer.transport.enqueue_message(update_msg, &mut peer.pending_outbound_buffer, &*self.logger);
11021097
self.do_attempt_write_data(&mut (*descriptor).clone(), &mut peer.sync_status, &mut peer.transport, &mut peer.pending_outbound_buffer);
11031098
}
11041099
}
@@ -1107,13 +1102,11 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
11071102
log_trace!(self.logger, "Handling BroadcastNodeAnnouncement event in peer_handler");
11081103
if self.message_handler.route_handler.handle_node_announcement(msg).is_ok() {
11091104
for (ref descriptor, ref mut peer) in peers.peers.iter_mut() {
1110-
if !peer.transport.is_connected() || peer.their_features.is_none() ||
1105+
if !peer.is_initialized() ||
11111106
!peer.should_forward_node_announcement(msg.contents.node_id) {
11121107
continue
11131108
}
1114-
if peer.transport.is_connected() {
1115-
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
1116-
}
1109+
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
11171110
self.do_attempt_write_data(&mut (*descriptor).clone(), &mut peer.sync_status, &mut peer.transport, &mut peer.pending_outbound_buffer);
11181111
}
11191112
}
@@ -1122,13 +1115,11 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
11221115
log_trace!(self.logger, "Handling BroadcastChannelUpdate event in peer_handler for short channel id {}", msg.contents.short_channel_id);
11231116
if self.message_handler.route_handler.handle_channel_update(msg).is_ok() {
11241117
for (ref descriptor, ref mut peer) in peers.peers.iter_mut() {
1125-
if !peer.transport.is_connected() || peer.their_features.is_none() ||
1118+
if !peer.is_initialized() ||
11261119
!peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
11271120
continue
11281121
}
1129-
if peer.transport.is_connected() {
1130-
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
1131-
}
1122+
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
11321123
self.do_attempt_write_data(&mut (*descriptor).clone(), &mut peer.sync_status, &mut peer.transport, &mut peer.pending_outbound_buffer);
11331124
}
11341125
}
@@ -1165,13 +1156,12 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
11651156
log_trace!(self.logger, "Handling SendErrorMessage HandleError event in peer_handler for node {} with message {}",
11661157
log_pubkey!(node_id),
11671158
msg.data);
1168-
let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
1169-
//TODO: Do whatever we're gonna do for handling dropped messages
1170-
});
1171-
if peer.transport.is_connected() {
1159+
if let Some((mut descriptor, peer)) = peers.initialized_peer_by_node_id(node_id) {
11721160
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
1161+
self.do_attempt_write_data(&mut descriptor, &mut peer.sync_status, &mut peer.transport, &mut peer.pending_outbound_buffer);
1162+
} else {
1163+
//TODO: Do whatever we're gonna do for handling dropped messages
11731164
}
1174-
self.do_attempt_write_data(&mut descriptor, &mut peer.sync_status, &mut peer.transport, &mut peer.pending_outbound_buffer);
11751165
},
11761166
}
11771167
}

0 commit comments

Comments
 (0)