Skip to content

Commit 8e75a53

Browse files
committed
refactor: Move all enqueue logging to Transport
1 parent c62dcc4 commit 8e75a53

File tree

2 files changed

+49
-51
lines changed

2 files changed

+49
-51
lines changed

lightning/src/ln/peers/handler.rs

Lines changed: 36 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ pub(super) trait ITransport {
6565

6666
/// Encodes, encrypts, and enqueues a message to the outbound queue. Panics if the connection is
6767
/// not established yet.
68-
fn enqueue_message<M: Encode + Writeable, Q: PayloadQueuer>(&mut self, message: &M, output_buffer: &mut Q);
68+
fn enqueue_message<M: Encode + Writeable, Q: PayloadQueuer, L: Deref>(&mut self, message: &M, output_buffer: &mut Q, logger: L) where L::Target: Logger;
6969
}
7070

7171

@@ -415,15 +415,6 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
415415
}
416416

417417
fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer) {
418-
macro_rules! enqueue_msg {
419-
($msg: expr) => {
420-
{
421-
log_trace!(self.logger, "Encoding and sending sync update message of type {} to {}", $msg.type_id(), log_pubkey!(peer.their_node_id.unwrap()));
422-
peer.transport.enqueue_message($msg, &mut peer.pending_outbound_buffer)
423-
}
424-
}
425-
}
426-
427418
while !peer.pending_outbound_buffer.is_blocked() {
428419
let queue_space = peer.pending_outbound_buffer.queue_space();
429420
if queue_space > 0 {
@@ -433,12 +424,12 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
433424
let steps = ((queue_space + 2) / 3) as u8;
434425
let all_messages = self.message_handler.route_handler.get_next_channel_announcements(c, steps);
435426
for &(ref announce, ref update_a_option, ref update_b_option) in all_messages.iter() {
436-
enqueue_msg!(announce);
427+
peer.transport.enqueue_message(announce, &mut peer.pending_outbound_buffer, &*self.logger);
437428
if let &Some(ref update_a) = update_a_option {
438-
enqueue_msg!(update_a);
429+
peer.transport.enqueue_message(update_a, &mut peer.pending_outbound_buffer, &*self.logger);
439430
}
440431
if let &Some(ref update_b) = update_b_option {
441-
enqueue_msg!(update_b);
432+
peer.transport.enqueue_message(update_b, &mut peer.pending_outbound_buffer, &*self.logger);
442433
}
443434
peer.sync_status = InitSyncTracker::ChannelsSyncing(announce.contents.short_channel_id + 1);
444435
}
@@ -450,7 +441,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
450441
let steps = queue_space as u8;
451442
let all_messages = self.message_handler.route_handler.get_next_node_announcements(None, steps);
452443
for msg in all_messages.iter() {
453-
enqueue_msg!(msg);
444+
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
454445
peer.sync_status = InitSyncTracker::NodesSyncing(msg.contents.node_id);
455446
}
456447
if all_messages.is_empty() || all_messages.len() != steps as usize {
@@ -462,7 +453,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
462453
let steps = queue_space as u8;
463454
let all_messages = self.message_handler.route_handler.get_next_node_announcements(Some(&key), steps);
464455
for msg in all_messages.iter() {
465-
enqueue_msg!(msg);
456+
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
466457
peer.sync_status = InitSyncTracker::NodesSyncing(msg.contents.node_id);
467458
}
468459
if all_messages.is_empty() || all_messages.len() != steps as usize {
@@ -526,10 +517,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
526517
}
527518

528519
/// Append a message to a peer's pending outbound/write buffer, and update the map of peers needing sends accordingly.
529-
fn enqueue_message<M: Encode + Writeable>(&self, peers_needing_send: &mut HashSet<Descriptor>, peer: &mut Peer, descriptor: Descriptor, message: &M) {
530-
log_trace!(self.logger, "Enqueueing message of type {} to {}", message.type_id(), log_pubkey!(peer.their_node_id.unwrap()));
531-
peer.transport.enqueue_message(message, &mut peer.pending_outbound_buffer);
532-
peers_needing_send.insert(descriptor);
520+
fn enqueue_message<M: Encode + Writeable>(&self, peers_needing_send: &mut HashSet<Descriptor>, transport: &mut impl ITransport, output_buffer: &mut impl PayloadQueuer, descriptor: &Descriptor, message: &M) {
521+
transport.enqueue_message(message, output_buffer, &*self.logger);
522+
peers_needing_send.insert(descriptor.clone());
533523
}
534524

535525
fn do_read_event(&self, peer_descriptor: &mut Descriptor, data: &[u8]) -> Result<bool, PeerHandleError> {
@@ -573,7 +563,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
573563
}
574564

575565
let resp = msgs::Init { features };
576-
self.enqueue_message(&mut peers.peers_needing_send, peer, peer_descriptor.clone(), &resp);
566+
self.enqueue_message(&mut peers.peers_needing_send, &mut peer.transport, &mut peer.pending_outbound_buffer, peer_descriptor, &resp);
577567
}
578568
entry.insert(peer_descriptor.clone());
579569
}
@@ -602,7 +592,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
602592
},
603593
msgs::ErrorAction::SendErrorMessage { msg } => {
604594
log_trace!(self.logger, "Got Err handling message, sending Error message because {}", e.err);
605-
self.enqueue_message(&mut peers.peers_needing_send, peer, peer_descriptor.clone(), &msg);
595+
self.enqueue_message(&mut peers.peers_needing_send, &mut peer.transport, &mut peer.pending_outbound_buffer, peer_descriptor, &msg);
606596
continue;
607597
},
608598
}
@@ -684,7 +674,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
684674
}
685675

686676
let resp = msgs::Init { features };
687-
self.enqueue_message(peers_needing_send, peer, peer_descriptor.clone(), &resp);
677+
self.enqueue_message(peers_needing_send, &mut peer.transport, &mut peer.pending_outbound_buffer, &peer_descriptor, &resp);
688678
}
689679

690680
self.message_handler.chan_handler.peer_connected(&peer.their_node_id.unwrap(), &msg);
@@ -713,7 +703,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
713703
wire::Message::Ping(msg) => {
714704
if msg.ponglen < 65532 {
715705
let resp = msgs::Pong { byteslen: msg.ponglen };
716-
self.enqueue_message(peers_needing_send, peer, peer_descriptor.clone(), &resp);
706+
self.enqueue_message(peers_needing_send, &mut peer.transport, &mut peer.pending_outbound_buffer, &peer_descriptor, &resp);
717707
}
718708
},
719709
wire::Message::Pong(_msg) => {
@@ -865,7 +855,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
865855
//TODO: Drop the pending channel? (or just let it timeout, but that sucks)
866856
});
867857
if peer.transport.is_connected() {
868-
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer);
858+
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
869859
}
870860
self.do_attempt_write_data(&mut descriptor, peer);
871861
},
@@ -877,7 +867,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
877867
//TODO: Drop the pending channel? (or just let it timeout, but that sucks)
878868
});
879869
if peer.transport.is_connected() {
880-
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer);
870+
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
881871
}
882872
self.do_attempt_write_data(&mut descriptor, peer);
883873
},
@@ -891,7 +881,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
891881
//they should just throw away this funding transaction
892882
});
893883
if peer.transport.is_connected() {
894-
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer);
884+
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
895885
}
896886
self.do_attempt_write_data(&mut descriptor, peer);
897887
},
@@ -904,7 +894,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
904894
//they should just throw away this funding transaction
905895
});
906896
if peer.transport.is_connected() {
907-
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer);
897+
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
908898
}
909899
self.do_attempt_write_data(&mut descriptor, peer);
910900
},
@@ -916,7 +906,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
916906
//TODO: Do whatever we're gonna do for handling dropped messages
917907
});
918908
if peer.transport.is_connected() {
919-
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer);
909+
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
920910
}
921911
self.do_attempt_write_data(&mut descriptor, peer);
922912
},
@@ -929,7 +919,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
929919
//they should just throw away this funding transaction
930920
});
931921
if peer.transport.is_connected() {
932-
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer);
922+
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
933923
}
934924
self.do_attempt_write_data(&mut descriptor, peer);
935925
},
@@ -945,21 +935,21 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
945935
});
946936
if peer.transport.is_connected() {
947937
for msg in update_add_htlcs {
948-
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer);
938+
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
949939
}
950940
for msg in update_fulfill_htlcs {
951-
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer);
941+
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
952942
}
953943
for msg in update_fail_htlcs {
954-
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer);
944+
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
955945
}
956946
for msg in update_fail_malformed_htlcs {
957-
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer);
947+
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
958948
}
959949
if let &Some(ref msg) = update_fee {
960-
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer);
950+
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
961951
}
962-
peer.transport.enqueue_message(commitment_signed, &mut peer.pending_outbound_buffer);
952+
peer.transport.enqueue_message(commitment_signed, &mut peer.pending_outbound_buffer, &*self.logger);
963953
}
964954
self.do_attempt_write_data(&mut descriptor, peer);
965955
},
@@ -971,7 +961,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
971961
//TODO: Do whatever we're gonna do for handling dropped messages
972962
});
973963
if peer.transport.is_connected() {
974-
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer);
964+
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
975965
}
976966
self.do_attempt_write_data(&mut descriptor, peer);
977967
},
@@ -983,7 +973,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
983973
//TODO: Do whatever we're gonna do for handling dropped messages
984974
});
985975
if peer.transport.is_connected() {
986-
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer);
976+
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
987977
}
988978
self.do_attempt_write_data(&mut descriptor, peer);
989979
},
@@ -995,7 +985,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
995985
//TODO: Do whatever we're gonna do for handling dropped messages
996986
});
997987
if peer.transport.is_connected() {
998-
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer);
988+
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
999989
}
1000990
self.do_attempt_write_data(&mut descriptor, peer);
1001991
},
@@ -1007,7 +997,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
1007997
//TODO: Do whatever we're gonna do for handling dropped messages
1008998
});
1009999
if peer.transport.is_connected() {
1010-
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer);
1000+
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
10111001
}
10121002
self.do_attempt_write_data(&mut descriptor, peer);
10131003
},
@@ -1028,8 +1018,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
10281018
}
10291019
}
10301020
if peer.transport.is_connected() {
1031-
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer);
1032-
peer.transport.enqueue_message(update_msg, &mut peer.pending_outbound_buffer);
1021+
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
1022+
peer.transport.enqueue_message(update_msg, &mut peer.pending_outbound_buffer, &*self.logger);
10331023
}
10341024
self.do_attempt_write_data(&mut (*descriptor).clone(), peer);
10351025
}
@@ -1044,7 +1034,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
10441034
continue
10451035
}
10461036
if peer.transport.is_connected() {
1047-
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer);
1037+
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
10481038
}
10491039
self.do_attempt_write_data(&mut (*descriptor).clone(), peer);
10501040
}
@@ -1059,7 +1049,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
10591049
continue
10601050
}
10611051
if peer.transport.is_connected() {
1062-
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer);
1052+
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
10631053
}
10641054
self.do_attempt_write_data(&mut (*descriptor).clone(), peer);
10651055
}
@@ -1079,7 +1069,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
10791069
log_pubkey!(node_id),
10801070
msg.data);
10811071
if peer.transport.is_connected() {
1082-
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer);
1072+
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
10831073
}
10841074
// This isn't guaranteed to work, but if there is enough free
10851075
// room in the send buffer, put the error message there...
@@ -1101,7 +1091,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
11011091
//TODO: Do whatever we're gonna do for handling dropped messages
11021092
});
11031093
if peer.transport.is_connected() {
1104-
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer);
1094+
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
11051095
}
11061096
self.do_attempt_write_data(&mut descriptor, peer);
11071097
},
@@ -1187,7 +1177,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
11871177
ponglen: 0,
11881178
byteslen: 64,
11891179
};
1190-
peer.transport.enqueue_message(&ping, &mut peer.pending_outbound_buffer);
1180+
peer.transport.enqueue_message(&ping, &mut peer.pending_outbound_buffer, &*self.logger);
11911181
needs_to_write_data = true;
11921182
}
11931183

lightning/src/ln/peers/transport.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -146,10 +146,14 @@ impl<PeerHandshakeImpl: IPeerHandshake> ITransport for Transport<PeerHandshakeIm
146146
self.conduit.is_some()
147147
}
148148

149-
fn enqueue_message<M: Encode + Writeable, Q: PayloadQueuer>(&mut self, message: &M, output_buffer: &mut Q) {
149+
fn enqueue_message<M: Encode + Writeable, Q: PayloadQueuer, L: Deref>(&mut self, message: &M, output_buffer: &mut Q, logger: L)
150+
where L::Target: Logger {
151+
150152
match self.conduit {
151153
None => panic!("Enqueueing messages only supported after transport is connected"),
152154
Some(ref mut conduit) => {
155+
log_trace!(logger, "Enqueueing message of type {} to {}", message.type_id(), log_pubkey!(self.their_node_id.unwrap()));
156+
153157
let mut buffer = VecWriter(Vec::new());
154158
wire::write(message, &mut buffer).unwrap();
155159
output_buffer.push_back(conduit.encrypt(&buffer.0));
@@ -263,33 +267,36 @@ mod tests {
263267
#[test]
264268
#[should_panic(expected = "Enqueueing messages only supported after transport is connected")]
265269
fn inbound_enqueue_message_panic() {
270+
let logger = TestLogger::new();
266271
let mut transport = create_inbound_for_test::<PeerHandshakeTestStubComplete>();
267272
let mut spy = Vec::new();
268273

269274
let ping = msgs::Ping {
270275
ponglen: 0,
271276
byteslen: 64,
272277
};
273-
transport.enqueue_message(&ping, &mut spy);
278+
transport.enqueue_message(&ping, &mut spy, &logger);
274279
}
275280

276281
// Test that enqueue_message() panics in the wrong state
277282
#[test]
278283
#[should_panic(expected = "Enqueueing messages only supported after transport is connected")]
279284
fn outbound_enqueue_message_panic() {
285+
let logger = TestLogger::new();
280286
let mut transport = create_outbound_for_test::<PeerHandshakeTestStubComplete>();
281287
let mut spy = Vec::new();
282288

283289
let ping = msgs::Ping {
284290
ponglen: 0,
285291
byteslen: 64,
286292
};
287-
transport.enqueue_message(&ping, &mut spy);
293+
transport.enqueue_message(&ping, &mut spy, &logger);
288294
}
289295

290296
// Test that enqueue_message() puts something into the outbound buffer
291297
#[test]
292298
fn inbound_enqueue_message_encrypts() {
299+
let logger = TestLogger::new();
293300
let mut transport = create_inbound_for_test::<PeerHandshakeTestStubComplete>();
294301
let mut spy = Vec::new();
295302

@@ -299,13 +306,14 @@ mod tests {
299306
ponglen: 0,
300307
byteslen: 64,
301308
};
302-
transport.enqueue_message(&ping, &mut spy);
309+
transport.enqueue_message(&ping, &mut spy, &logger);
303310

304311
assert_matches!(&spy[..], [_]);
305312
}
306313

307314
#[test]
308315
fn outbound_enqueue_message_encrypts() {
316+
let logger = TestLogger::new();
309317
let mut transport = create_outbound_for_test::<PeerHandshakeTestStubComplete>();
310318
let mut spy = Vec::new();
311319

@@ -315,7 +323,7 @@ mod tests {
315323
ponglen: 0,
316324
byteslen: 64,
317325
};
318-
transport.enqueue_message(&ping, &mut spy);
326+
transport.enqueue_message(&ping, &mut spy, &logger);
319327

320328
assert_matches!(&spy[..], [_]);
321329
}

0 commit comments

Comments
 (0)