Skip to content

Commit e9b3293

Browse files
Implement (v1 of) rate limiting for onion messages.
In this commit, we add business logic for checking if a peer's outbound buffer has room for onion messages, and if so pulls them from an implementer of a new trait, OnionMessageProvider. Because other steps need to be taken before the onion_message module is made public and the PeerManager is parameterized by an OnionMessageHandler (which causes a lot of cascading boilerplate changes and affects the public api), we implement OnionMessageProvider on () and store that in PeerManager as a stopgap. Makes sure channel messages are prioritized over OMs and we only go to write them when there's sufficient space in the peer's buffer.
1 parent f38f96d commit e9b3293

File tree

3 files changed

+78
-3
lines changed

3 files changed

+78
-3
lines changed

lightning/src/ln/peer_handler.rs

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use ln::wire;
2727
use ln::wire::Encode;
2828
use routing::gossip::{NetworkGraph, P2PGossipSync};
2929
use util::atomic_counter::AtomicCounter;
30-
use util::events::{MessageSendEvent, MessageSendEventsProvider};
30+
use util::events::{MessageSendEvent, MessageSendEventsProvider, OnionMessageProvider};
3131
use util::logger::Logger;
3232

3333
use prelude::*;
@@ -81,6 +81,11 @@ impl Deref for IgnoringMessageHandler {
8181
fn deref(&self) -> &Self { self }
8282
}
8383

84+
// TODO: implement `OnionMessageProvider` for `IgnoringMessageHandler` instead
85+
impl OnionMessageProvider for () {
86+
fn next_onion_messages_for_peer(&self, _peer_node_id: PublicKey, _max_messages: usize) -> Vec<msgs::OnionMessage> { Vec::new() }
87+
}
88+
8489
// Implement Type for Infallible, note that it cannot be constructed, and thus you can never call a
8590
// method that takes self for it.
8691
impl wire::Type for Infallible {
@@ -294,15 +299,23 @@ enum InitSyncTracker{
294299
/// forwarding gossip messages to peers altogether.
295300
const FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO: usize = 2;
296301

302+
/// The ratio between buffer sizes at which we stop sending initial sync messages vs when we pause
303+
/// forwarding onion messages to peers altogether.
304+
const OM_BUFFER_LIMIT_RATIO: usize = 2;
305+
297306
/// When the outbound buffer has this many messages, we'll stop reading bytes from the peer until
298307
/// we have fewer than this many messages in the outbound buffer again.
299-
/// We also use this as the target number of outbound gossip messages to keep in the write buffer,
300-
/// refilled as we send bytes.
308+
/// We also use this as the target number of outbound gossip and onion messages to keep in the write
309+
/// buffer, refilled as we send bytes.
301310
const OUTBOUND_BUFFER_LIMIT_READ_PAUSE: usize = 10;
302311
/// When the outbound buffer has this many messages, we'll simply skip relaying gossip messages to
303312
/// the peer.
304313
const OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP: usize = OUTBOUND_BUFFER_LIMIT_READ_PAUSE * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO;
305314

315+
/// When the outbound buffer has this many messages, we won't poll for new onion messages for this
316+
/// peer.
317+
const OUTBOUND_BUFFER_LIMIT_PAUSE_OMS: usize = OUTBOUND_BUFFER_LIMIT_READ_PAUSE * OM_BUFFER_LIMIT_RATIO;
318+
306319
/// If we've sent a ping, and are still awaiting a response, we may need to churn our way through
307320
/// the socket receive buffer before receiving the ping.
308321
///
@@ -378,6 +391,14 @@ impl Peer {
378391
InitSyncTracker::NodesSyncing(pk) => pk < node_id,
379392
}
380393
}
394+
395+
/// Returns the number of onion messages we can fit in this peer's buffer.
396+
fn onion_message_buffer_slots_available(&self) -> usize {
397+
cmp::min(
398+
OUTBOUND_BUFFER_LIMIT_PAUSE_OMS.saturating_sub(self.pending_outbound_buffer.len()),
399+
(BUFFER_DRAIN_MSGS_PER_TICK * OM_BUFFER_LIMIT_RATIO).saturating_sub(self.msgs_sent_since_pong))
400+
}
401+
381402
/// Returns whether this peer's buffer is full and we should drop gossip messages.
382403
fn buffer_full_drop_gossip(&self) -> bool {
383404
if self.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
@@ -432,6 +453,8 @@ pub struct PeerManager<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: De
432453
L::Target: Logger,
433454
CMH::Target: CustomMessageHandler {
434455
message_handler: MessageHandler<CM, RM>,
456+
onion_message_handler: (), // TODO: add `OnionMessageHandler` trait to
457+
// `MessageHandler` and get rid of this
435458
/// Connection state for each connected peer - we have an outer read-write lock which is taken
436459
/// as read while we're doing processing for a peer and taken write when a peer is being added
437460
/// or removed.
@@ -587,6 +610,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
587610

588611
PeerManager {
589612
message_handler,
613+
onion_message_handler: (),
590614
peers: FairRwLock::new(HashMap::new()),
591615
node_id_to_descriptor: Mutex::new(HashMap::new()),
592616
event_processing_lock: Mutex::new(()),
@@ -797,8 +821,12 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
797821
/// ready to call `[write_buffer_space_avail`] again if a write call generated here isn't
798822
/// sufficient!
799823
///
824+
/// If any bytes are written, [`process_events`] should be called afterwards.
825+
// TODO: why?
826+
///
800827
/// [`send_data`]: SocketDescriptor::send_data
801828
/// [`write_buffer_space_avail`]: PeerManager::write_buffer_space_avail
829+
/// [`process_events`]: PeerManager::process_events
802830
pub fn write_buffer_space_avail(&self, descriptor: &mut Descriptor) -> Result<(), PeerHandleError> {
803831
let peers = self.peers.read().unwrap();
804832
match peers.get(descriptor) {
@@ -1387,6 +1415,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
13871415
/// You don't have to call this function explicitly if you are using [`lightning-net-tokio`]
13881416
/// or one of the other clients provided in our language bindings.
13891417
///
1418+
/// Note that this method should be called again if any bytes are written.
1419+
///
13901420
/// Note that if there are any other calls to this function waiting on lock(s) this may return
13911421
/// without doing any work. All available events that need handling will be handled before the
13921422
/// other calls return.
@@ -1637,6 +1667,22 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
16371667

16381668
for (descriptor, peer_mutex) in peers.iter() {
16391669
self.do_attempt_write_data(&mut (*descriptor).clone(), &mut *peer_mutex.lock().unwrap());
1670+
1671+
// Only see if we have room for onion messages after we've written all channel messages, to
1672+
// ensure they take priority.
1673+
let (peer_node_id, om_buffer_slots_avail) = {
1674+
let peer = peer_mutex.lock().unwrap();
1675+
if let Some(peer_node_id) = peer.their_node_id {
1676+
(Some(peer_node_id.clone()), peer.onion_message_buffer_slots_available())
1677+
} else { (None, 0) }
1678+
};
1679+
if peer_node_id.is_some() && om_buffer_slots_avail > 0 {
1680+
for event in self.onion_message_handler.next_onion_messages_for_peer(
1681+
peer_node_id.unwrap(), om_buffer_slots_avail)
1682+
{
1683+
// TODO: forward onion message to peer
1684+
}
1685+
}
16401686
}
16411687
}
16421688
if !peers_to_disconnect.is_empty() {

lightning/src/onion_message/messenger.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@ use ln::onion_utils;
2121
use super::blinded_route::{BlindedRoute, ForwardTlvs, ReceiveTlvs};
2222
use super::packet::{BIG_PACKET_HOP_DATA_LEN, ForwardControlTlvs, Packet, Payload, ReceiveControlTlvs, SMALL_PACKET_HOP_DATA_LEN};
2323
use super::utils;
24+
use util::events::OnionMessageProvider;
2425
use util::logger::Logger;
2526

27+
use core::mem;
2628
use core::ops::Deref;
2729
use sync::{Arc, Mutex};
2830
use prelude::*;
@@ -278,6 +280,25 @@ impl<Signer: Sign, K: Deref, L: Deref> OnionMessenger<Signer, K, L>
278280
}
279281
}
280282

283+
impl<Signer: Sign, K: Deref, L: Deref> OnionMessageProvider for OnionMessenger<Signer, K, L>
284+
where K::Target: KeysInterface<Signer = Signer>,
285+
L::Target: Logger,
286+
{
287+
fn next_onion_messages_for_peer(&self, peer_node_id: PublicKey, max_messages: usize) -> Vec<msgs::OnionMessage> {
288+
let mut pending_msgs = self.pending_messages.lock().unwrap();
289+
if let Some(msgs) = pending_msgs.get_mut(&peer_node_id) {
290+
if max_messages >= msgs.len() {
291+
let mut peer_pending_msgs = Vec::new();
292+
mem::swap(msgs, &mut peer_pending_msgs);
293+
return peer_pending_msgs
294+
} else {
295+
return msgs.split_off(max_messages)
296+
}
297+
}
298+
Vec::new()
299+
}
300+
}
301+
281302
// TODO: parameterize the below Simple* types with OnionMessenger and handle the messages it
282303
// produces
283304
/// Useful for simplifying the parameters of [`SimpleArcChannelManager`] and

lightning/src/util/events.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1195,6 +1195,14 @@ pub trait MessageSendEventsProvider {
11951195
fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent>;
11961196
}
11971197

1198+
/// A trait indicating an object may generate onion message send events
1199+
// TODO: make this `pub` when we make onion message functionality public
1200+
pub(crate) trait OnionMessageProvider {
1201+
/// Gets up to `max_messages` pending onion message events.
1202+
// TODO: make this return `MessageSendEvent`s rather than `msgs::OnionMessage`s
1203+
fn next_onion_messages_for_peer(&self, peer_node_id: PublicKey, max_messages: usize) -> Vec<msgs::OnionMessage>;
1204+
}
1205+
11981206
/// A trait indicating an object may generate events.
11991207
///
12001208
/// Events are processed by passing an [`EventHandler`] to [`process_pending_events`].

0 commit comments

Comments
 (0)