Skip to content

Commit 3cd1cb5

Browse files
authored
Merge pull request #3060 from TheBlueMatt/2024-05-parallel-async-om-events
Add a parallel async event handler to OnionMessenger and pass it directly to BackgroundProcessor
2 parents 96fe185 + fadb268 commit 3cd1cb5

File tree

6 files changed

+233
-69
lines changed

6 files changed

+233
-69
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 47 additions & 47 deletions
Large diffs are not rendered by default.

lightning/src/ln/msgs.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ use core::fmt::Display;
5252
use crate::io::{self, Cursor, Read};
5353
use crate::io_extras::read_to_end;
5454

55-
use crate::events::{EventsProvider, MessageSendEventsProvider};
55+
use crate::events::MessageSendEventsProvider;
5656
use crate::crypto::streams::ChaChaPolyReadAdapter;
5757
use crate::util::logger;
5858
use crate::util::ser::{BigSize, FixedLengthReader, HighZeroBytesDroppedBigSize, Hostname, LengthRead, LengthReadable, LengthReadableArgs, Readable, ReadableArgs, TransactionU16LenLimited, WithoutLength, Writeable, Writer};
@@ -1623,7 +1623,7 @@ pub trait RoutingMessageHandler : MessageSendEventsProvider {
16231623
}
16241624

16251625
/// A handler for received [`OnionMessage`]s and for providing generated ones to send.
1626-
pub trait OnionMessageHandler: EventsProvider {
1626+
pub trait OnionMessageHandler {
16271627
/// Handle an incoming `onion_message` message from the given peer.
16281628
fn handle_onion_message(&self, peer_node_id: &PublicKey, msg: &OnionMessage);
16291629

lightning/src/ln/peer_handler.rs

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use bitcoin::blockdata::constants::ChainHash;
1919
use bitcoin::secp256k1::{self, Secp256k1, SecretKey, PublicKey};
2020

2121
use crate::sign::{NodeSigner, Recipient};
22-
use crate::events::{EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider};
22+
use crate::events::{MessageSendEvent, MessageSendEventsProvider};
2323
use crate::ln::types::ChannelId;
2424
use crate::ln::features::{InitFeatures, NodeFeatures};
2525
use crate::ln::msgs;
@@ -97,9 +97,6 @@ pub trait CustomMessageHandler: wire::CustomMessageReader {
9797
/// A dummy struct which implements `RoutingMessageHandler` without storing any routing information
9898
/// or doing any processing. You can provide one of these as the route_handler in a MessageHandler.
9999
pub struct IgnoringMessageHandler{}
100-
impl EventsProvider for IgnoringMessageHandler {
101-
fn process_pending_events<H: Deref>(&self, _handler: H) where H::Target: EventHandler {}
102-
}
103100
impl MessageSendEventsProvider for IgnoringMessageHandler {
104101
fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> { Vec::new() }
105102
}
@@ -723,8 +720,6 @@ pub trait APeerManager {
723720
type NS: Deref<Target=Self::NST>;
724721
/// Gets a reference to the underlying [`PeerManager`].
725722
fn as_ref(&self) -> &PeerManager<Self::Descriptor, Self::CM, Self::RM, Self::OM, Self::L, Self::CMH, Self::NS>;
726-
/// Returns the peer manager's [`OnionMessageHandler`].
727-
fn onion_message_handler(&self) -> &Self::OMT;
728723
}
729724

730725
impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CMH: Deref, NS: Deref>
@@ -750,9 +745,6 @@ APeerManager for PeerManager<Descriptor, CM, RM, OM, L, CMH, NS> where
750745
type NST = <NS as Deref>::Target;
751746
type NS = NS;
752747
fn as_ref(&self) -> &PeerManager<Descriptor, CM, RM, OM, L, CMH, NS> { self }
753-
fn onion_message_handler(&self) -> &Self::OMT {
754-
self.message_handler.onion_message_handler.deref()
755-
}
756748
}
757749

758750
/// A PeerManager manages a set of peers, described by their [`SocketDescriptor`] and marshalls

lightning/src/onion_message/messenger.rs

Lines changed: 139 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,70 @@ use {
4747

4848
pub(super) const MAX_TIMER_TICKS: usize = 2;
4949

50+
/// A trivial trait which describes any [`OnionMessenger`].
51+
///
52+
/// This is not exported to bindings users as general cover traits aren't useful in other
53+
/// languages.
54+
pub trait AOnionMessenger {
55+
/// A type implementing [`EntropySource`]
56+
type EntropySource: EntropySource + ?Sized;
57+
/// A type that may be dereferenced to [`Self::EntropySource`]
58+
type ES: Deref<Target = Self::EntropySource>;
59+
/// A type implementing [`NodeSigner`]
60+
type NodeSigner: NodeSigner + ?Sized;
61+
/// A type that may be dereferenced to [`Self::NodeSigner`]
62+
type NS: Deref<Target = Self::NodeSigner>;
63+
/// A type implementing [`Logger`]
64+
type Logger: Logger + ?Sized;
65+
/// A type that may be dereferenced to [`Self::Logger`]
66+
type L: Deref<Target = Self::Logger>;
67+
/// A type implementing [`NodeIdLookUp`]
68+
type NodeIdLookUp: NodeIdLookUp + ?Sized;
69+
/// A type that may be dereferenced to [`Self::NodeIdLookUp`]
70+
type NL: Deref<Target = Self::NodeIdLookUp>;
71+
/// A type implementing [`MessageRouter`]
72+
type MessageRouter: MessageRouter + ?Sized;
73+
/// A type that may be dereferenced to [`Self::MessageRouter`]
74+
type MR: Deref<Target = Self::MessageRouter>;
75+
/// A type implementing [`OffersMessageHandler`]
76+
type OffersMessageHandler: OffersMessageHandler + ?Sized;
77+
/// A type that may be dereferenced to [`Self::OffersMessageHandler`]
78+
type OMH: Deref<Target = Self::OffersMessageHandler>;
79+
/// A type implementing [`CustomOnionMessageHandler`]
80+
type CustomOnionMessageHandler: CustomOnionMessageHandler + ?Sized;
81+
/// A type that may be dereferenced to [`Self::CustomOnionMessageHandler`]
82+
type CMH: Deref<Target = Self::CustomOnionMessageHandler>;
83+
/// Returns a reference to the actual [`OnionMessenger`] object.
84+
fn get_om(&self) -> &OnionMessenger<Self::ES, Self::NS, Self::L, Self::NL, Self::MR, Self::OMH, Self::CMH>;
85+
}
86+
87+
impl<ES: Deref, NS: Deref, L: Deref, NL: Deref, MR: Deref, OMH: Deref, CMH: Deref> AOnionMessenger
88+
for OnionMessenger<ES, NS, L, NL, MR, OMH, CMH> where
89+
ES::Target: EntropySource,
90+
NS::Target: NodeSigner,
91+
L::Target: Logger,
92+
NL::Target: NodeIdLookUp,
93+
MR::Target: MessageRouter,
94+
OMH::Target: OffersMessageHandler,
95+
CMH::Target: CustomOnionMessageHandler,
96+
{
97+
type EntropySource = ES::Target;
98+
type ES = ES;
99+
type NodeSigner = NS::Target;
100+
type NS = NS;
101+
type Logger = L::Target;
102+
type L = L;
103+
type NodeIdLookUp = NL::Target;
104+
type NL = NL;
105+
type MessageRouter = MR::Target;
106+
type MR = MR;
107+
type OffersMessageHandler = OMH::Target;
108+
type OMH = OMH;
109+
type CustomOnionMessageHandler = CMH::Target;
110+
type CMH = CMH;
111+
fn get_om(&self) -> &OnionMessenger<ES, NS, L, NL, MR, OMH, CMH> { self }
112+
}
113+
50114
/// A sender, receiver and forwarder of [`OnionMessage`]s.
51115
///
52116
/// # Handling Messages
@@ -181,7 +245,12 @@ where
181245
offers_handler: OMH,
182246
custom_handler: CMH,
183247
intercept_messages_for_offline_peers: bool,
184-
pending_events: Mutex<Vec<Event>>,
248+
pending_events: Mutex<PendingEvents>,
249+
}
250+
251+
struct PendingEvents {
252+
intercepted_msgs: Vec<Event>,
253+
peer_connecteds: Vec<Event>,
185254
}
186255

187256
/// [`OnionMessage`]s buffered to be sent.
@@ -929,7 +998,10 @@ where
929998
offers_handler,
930999
custom_handler,
9311000
intercept_messages_for_offline_peers,
932-
pending_events: Mutex::new(Vec::new()),
1001+
pending_events: Mutex::new(PendingEvents {
1002+
intercepted_msgs: Vec::new(),
1003+
peer_connecteds: Vec::new(),
1004+
}),
9331005
}
9341006
}
9351007

@@ -1150,18 +1222,61 @@ where
11501222
msgs
11511223
}
11521224

1153-
fn enqueue_event(&self, event: Event) {
1225+
fn enqueue_intercepted_event(&self, event: Event) {
11541226
const MAX_EVENTS_BUFFER_SIZE: usize = (1 << 10) * 256;
11551227
let mut pending_events = self.pending_events.lock().unwrap();
1156-
let total_buffered_bytes: usize = pending_events
1157-
.iter()
1158-
.map(|ev| ev.serialized_length())
1159-
.sum();
1228+
let total_buffered_bytes: usize =
1229+
pending_events.intercepted_msgs.iter().map(|ev| ev.serialized_length()).sum();
11601230
if total_buffered_bytes >= MAX_EVENTS_BUFFER_SIZE {
11611231
log_trace!(self.logger, "Dropping event {:?}: buffer full", event);
11621232
return
11631233
}
1164-
pending_events.push(event);
1234+
pending_events.intercepted_msgs.push(event);
1235+
}
1236+
1237+
/// Processes any events asynchronously using the given handler.
1238+
///
1239+
/// Note that the event handler is called in the order each event was generated, however
1240+
/// futures are polled in parallel for some events to allow for parallelism where events do not
1241+
/// have an ordering requirement.
1242+
///
1243+
/// See the trait-level documentation of [`EventsProvider`] for requirements.
1244+
pub async fn process_pending_events_async<Future: core::future::Future<Output = ()> + core::marker::Unpin, H: Fn(Event) -> Future>(
1245+
&self, handler: H
1246+
) {
1247+
let mut intercepted_msgs = Vec::new();
1248+
let mut peer_connecteds = Vec::new();
1249+
{
1250+
let mut pending_events = self.pending_events.lock().unwrap();
1251+
core::mem::swap(&mut pending_events.intercepted_msgs, &mut intercepted_msgs);
1252+
core::mem::swap(&mut pending_events.peer_connecteds, &mut peer_connecteds);
1253+
}
1254+
1255+
let mut futures = Vec::with_capacity(intercepted_msgs.len());
1256+
for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() {
1257+
if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient {
1258+
if let Some(addresses) = addresses.take() {
1259+
futures.push(Some(handler(Event::ConnectionNeeded { node_id: *node_id, addresses })));
1260+
}
1261+
}
1262+
}
1263+
1264+
for ev in intercepted_msgs {
1265+
if let Event::OnionMessageIntercepted { .. } = ev {} else { debug_assert!(false); }
1266+
futures.push(Some(handler(ev)));
1267+
}
1268+
// Let the `OnionMessageIntercepted` events finish before moving on to peer_connecteds
1269+
crate::util::async_poll::MultiFuturePoller(futures).await;
1270+
1271+
if peer_connecteds.len() <= 1 {
1272+
for event in peer_connecteds { handler(event).await; }
1273+
} else {
1274+
let mut futures = Vec::new();
1275+
for event in peer_connecteds {
1276+
futures.push(Some(handler(event)));
1277+
}
1278+
crate::util::async_poll::MultiFuturePoller(futures).await;
1279+
}
11651280
}
11661281
}
11671282

@@ -1208,7 +1323,20 @@ where
12081323
}
12091324
}
12101325
let mut events = Vec::new();
1211-
core::mem::swap(&mut *self.pending_events.lock().unwrap(), &mut events);
1326+
{
1327+
let mut pending_events = self.pending_events.lock().unwrap();
1328+
#[cfg(debug_assertions)] {
1329+
for ev in pending_events.intercepted_msgs.iter() {
1330+
if let Event::OnionMessageIntercepted { .. } = ev {} else { panic!(); }
1331+
}
1332+
for ev in pending_events.peer_connecteds.iter() {
1333+
if let Event::OnionMessagePeerConnected { .. } = ev {} else { panic!(); }
1334+
}
1335+
}
1336+
core::mem::swap(&mut pending_events.intercepted_msgs, &mut events);
1337+
events.append(&mut pending_events.peer_connecteds);
1338+
pending_events.peer_connecteds.shrink_to(10); // Limit total heap usage
1339+
}
12121340
for ev in events {
12131341
handler.handle_event(ev);
12141342
}
@@ -1286,7 +1414,7 @@ where
12861414
log_trace!(logger, "Forwarding an onion message to peer {}", next_node_id);
12871415
},
12881416
_ if self.intercept_messages_for_offline_peers => {
1289-
self.enqueue_event(
1417+
self.enqueue_intercepted_event(
12901418
Event::OnionMessageIntercepted {
12911419
peer_node_id: next_node_id, message: onion_message
12921420
}
@@ -1314,7 +1442,7 @@ where
13141442
.or_insert_with(|| OnionMessageRecipient::ConnectedPeer(VecDeque::new()))
13151443
.mark_connected();
13161444
if self.intercept_messages_for_offline_peers {
1317-
self.enqueue_event(
1445+
self.pending_events.lock().unwrap().peer_connecteds.push(
13181446
Event::OnionMessagePeerConnected { peer_node_id: *their_node_id }
13191447
);
13201448
}

lightning/src/util/async_poll.rs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
// This file is Copyright its original authors, visible in version control
2+
// history.
3+
//
4+
// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
5+
// or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
6+
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
7+
// You may not use this file except in accordance with one or both of these
8+
// licenses.
9+
10+
//! Some utilities to make working with the standard library's [`Future`]s easier
11+
12+
use crate::prelude::*;
13+
use core::future::Future;
14+
use core::marker::Unpin;
15+
use core::pin::Pin;
16+
use core::task::{Context, Poll};
17+
18+
pub(crate) struct MultiFuturePoller<F: Future<Output = ()> + Unpin>(pub Vec<Option<F>>);
19+
20+
impl<F: Future<Output = ()> + Unpin> Future for MultiFuturePoller<F> {
21+
type Output = ();
22+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
23+
let mut have_pending_futures = false;
24+
for fut_option in self.get_mut().0.iter_mut() {
25+
let mut fut = match fut_option.take() {
26+
None => continue,
27+
Some(fut) => fut,
28+
};
29+
match Pin::new(&mut fut).poll(cx) {
30+
Poll::Ready(()) => {},
31+
Poll::Pending => {
32+
have_pending_futures = true;
33+
*fut_option = Some(fut);
34+
},
35+
}
36+
}
37+
if have_pending_futures {
38+
Poll::Pending
39+
} else {
40+
Poll::Ready(())
41+
}
42+
}
43+
}

lightning/src/util/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ pub mod base32;
3030
pub(crate) mod base32;
3131

3232
pub(crate) mod atomic_counter;
33+
pub(crate) mod async_poll;
3334
pub(crate) mod byte_utils;
3435
pub(crate) mod transaction_utils;
3536
pub(crate) mod time;

0 commit comments

Comments
 (0)