Skip to content

Commit 2358b0b

Browse files
authored
Merge pull request #1683 from valentinewallace/2022-08-multiqueue-peerman
2 parents fa62b00 + 47e818f commit 2358b0b

File tree

1 file changed

+42
-25
lines changed

1 file changed

+42
-25
lines changed

lightning/src/ln/peer_handler.rs

Lines changed: 42 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,9 @@ struct Peer {
337337

338338
pending_outbound_buffer: LinkedList<Vec<u8>>,
339339
pending_outbound_buffer_first_msg_offset: usize,
340+
// Queue gossip broadcasts separately from `pending_outbound_buffer` so we can easily prioritize
341+
// channel messages over them.
342+
gossip_broadcast_buffer: LinkedList<Vec<u8>>,
340343
awaiting_write_event: bool,
341344

342345
pending_read_buffer: Vec<u8>,
@@ -389,21 +392,27 @@ impl Peer {
389392
self.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE
390393
}
391394

392-
/// Determines if we should push additional gossip messages onto a peer's outbound buffer for
393-
/// backfilling gossip data to the peer. This is checked every time the peer's buffer may have
394-
/// been drained.
395+
/// Determines if we should push additional gossip background sync (aka "backfill") onto a peer's
396+
/// outbound buffer. This is checked every time the peer's buffer may have been drained.
395397
fn should_buffer_gossip_backfill(&self) -> bool {
396-
self.pending_outbound_buffer.is_empty() &&
397-
self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
398+
self.pending_outbound_buffer.is_empty() && self.gossip_broadcast_buffer.is_empty()
399+
&& self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
398400
}
399401

400-
/// Returns whether this peer's buffer is full and we should drop gossip messages.
401-
fn buffer_full_drop_gossip(&self) -> bool {
402-
if self.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
403-
|| self.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO {
404-
return false
405-
}
406-
true
402+
/// Determines if we should push additional gossip broadcast messages onto a peer's outbound
403+
/// buffer. This is checked every time the peer's buffer may have been drained.
404+
fn should_buffer_gossip_broadcast(&self) -> bool {
405+
self.pending_outbound_buffer.is_empty()
406+
&& self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
407+
}
408+
409+
/// Returns whether this peer's outbound buffers are full and we should drop gossip broadcasts.
410+
fn buffer_full_drop_gossip_broadcast(&self) -> bool {
411+
let total_outbound_buffered =
412+
self.gossip_broadcast_buffer.len() + self.pending_outbound_buffer.len();
413+
414+
total_outbound_buffered > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP ||
415+
self.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
407416
}
408417
}
409418

@@ -671,6 +680,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
671680

672681
pending_outbound_buffer: LinkedList::new(),
673682
pending_outbound_buffer_first_msg_offset: 0,
683+
gossip_broadcast_buffer: LinkedList::new(),
674684
awaiting_write_event: false,
675685

676686
pending_read_buffer,
@@ -717,6 +727,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
717727

718728
pending_outbound_buffer: LinkedList::new(),
719729
pending_outbound_buffer_first_msg_offset: 0,
730+
gossip_broadcast_buffer: LinkedList::new(),
720731
awaiting_write_event: false,
721732

722733
pending_read_buffer,
@@ -737,6 +748,11 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
737748

738749
fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer) {
739750
while !peer.awaiting_write_event {
751+
if peer.should_buffer_gossip_broadcast() {
752+
if let Some(msg) = peer.gossip_broadcast_buffer.pop_front() {
753+
peer.pending_outbound_buffer.push_back(msg);
754+
}
755+
}
740756
if peer.should_buffer_gossip_backfill() {
741757
match peer.sync_status {
742758
InitSyncTracker::NoSyncRequested => {},
@@ -851,12 +867,6 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
851867
}
852868
}
853869

854-
/// Append a message to a peer's pending outbound/write buffer
855-
fn enqueue_encoded_message(&self, peer: &mut Peer, encoded_message: &Vec<u8>) {
856-
peer.msgs_sent_since_pong += 1;
857-
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_message[..]));
858-
}
859-
860870
/// Append a message to a peer's pending outbound/write buffer
861871
fn enqueue_message<M: wire::Type>(&self, peer: &mut Peer, message: &M) {
862872
let mut buffer = VecWriter(Vec::with_capacity(2048));
@@ -867,7 +877,14 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
867877
} else {
868878
log_trace!(self.logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap()))
869879
}
870-
self.enqueue_encoded_message(peer, &buffer.0);
880+
peer.msgs_sent_since_pong += 1;
881+
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&buffer.0[..]));
882+
}
883+
884+
/// Append a message to a peer's pending outbound/write gossip broadcast buffer
885+
fn enqueue_encoded_gossip_broadcast(&self, peer: &mut Peer, encoded_message: &Vec<u8>) {
886+
peer.msgs_sent_since_pong += 1;
887+
peer.gossip_broadcast_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_message[..]));
871888
}
872889

873890
fn do_read_event(&self, peer_descriptor: &mut Descriptor, data: &[u8]) -> Result<bool, PeerHandleError> {
@@ -1325,7 +1342,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
13251342
!peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
13261343
continue
13271344
}
1328-
if peer.buffer_full_drop_gossip() {
1345+
if peer.buffer_full_drop_gossip_broadcast() {
13291346
log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
13301347
continue;
13311348
}
@@ -1336,7 +1353,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
13361353
if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
13371354
continue;
13381355
}
1339-
self.enqueue_encoded_message(&mut *peer, &encoded_msg);
1356+
self.enqueue_encoded_gossip_broadcast(&mut *peer, &encoded_msg);
13401357
}
13411358
},
13421359
wire::Message::NodeAnnouncement(ref msg) => {
@@ -1349,7 +1366,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
13491366
!peer.should_forward_node_announcement(msg.contents.node_id) {
13501367
continue
13511368
}
1352-
if peer.buffer_full_drop_gossip() {
1369+
if peer.buffer_full_drop_gossip_broadcast() {
13531370
log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
13541371
continue;
13551372
}
@@ -1359,7 +1376,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
13591376
if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
13601377
continue;
13611378
}
1362-
self.enqueue_encoded_message(&mut *peer, &encoded_msg);
1379+
self.enqueue_encoded_gossip_broadcast(&mut *peer, &encoded_msg);
13631380
}
13641381
},
13651382
wire::Message::ChannelUpdate(ref msg) => {
@@ -1372,14 +1389,14 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
13721389
!peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
13731390
continue
13741391
}
1375-
if peer.buffer_full_drop_gossip() {
1392+
if peer.buffer_full_drop_gossip_broadcast() {
13761393
log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
13771394
continue;
13781395
}
13791396
if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
13801397
continue;
13811398
}
1382-
self.enqueue_encoded_message(&mut *peer, &encoded_msg);
1399+
self.enqueue_encoded_gossip_broadcast(&mut *peer, &encoded_msg);
13831400
}
13841401
},
13851402
_ => debug_assert!(false, "We shouldn't attempt to forward anything but gossip messages"),

0 commit comments

Comments
 (0)