Skip to content

Commit 89e630b

Browse files
committed
Test pending connection onion message buffering
Add tests for onion message buffering checking that messages are cleared upon disconnection and timed out after MAX_TIMER_TICKS. Also, checks that ConnectionNeeded events are generated.
1 parent 210407e commit 89e630b

File tree

2 files changed

+93
-10
lines changed

2 files changed

+93
-10
lines changed

lightning/src/onion_message/functional_tests.rs

Lines changed: 89 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,9 @@
1010
//! Onion message testing and test utilities live here.
1111
1212
use crate::blinded_path::BlindedPath;
13+
use crate::events::{Event, EventsProvider};
1314
use crate::ln::features::InitFeatures;
14-
use crate::ln::msgs::{self, DecodeError, OnionMessageHandler};
15+
use crate::ln::msgs::{self, DecodeError, OnionMessageHandler, SocketAddress};
1516
use crate::sign::{NodeSigner, Recipient};
1617
use crate::util::ser::{FixedLengthReader, LengthReadable, Writeable, Writer};
1718
use crate::util::test_utils;
@@ -50,7 +51,7 @@ impl MessageRouter for TestMessageRouter {
5051
Ok(OnionMessagePath {
5152
intermediate_nodes: vec![],
5253
destination,
53-
addresses: None,
54+
addresses: Some(vec![SocketAddress::TcpIpV4 { addr: [127, 0, 0, 1], port: 1000 }]),
5455
})
5556
}
5657
}
@@ -180,15 +181,30 @@ fn create_nodes_using_secrets(secrets: Vec<SecretKey>) -> Vec<MessengerNode> {
180181
});
181182
}
182183
for i in 0..nodes.len() - 1 {
183-
let mut features = InitFeatures::empty();
184-
features.set_onion_messages_optional();
185-
let init_msg = msgs::Init { features, networks: None, remote_network_address: None };
186-
nodes[i].messenger.peer_connected(&nodes[i + 1].node_id, &init_msg.clone(), true).unwrap();
187-
nodes[i + 1].messenger.peer_connected(&nodes[i].node_id, &init_msg.clone(), false).unwrap();
184+
connect_peers(&nodes[i], &nodes[i + 1]);
188185
}
189186
nodes
190187
}
191188

189+
fn connect_peers(node_a: &MessengerNode, node_b: &MessengerNode) {
190+
let mut features = InitFeatures::empty();
191+
features.set_onion_messages_optional();
192+
let init_msg = msgs::Init { features, networks: None, remote_network_address: None };
193+
node_a.messenger.peer_connected(&node_b.node_id, &init_msg.clone(), true).unwrap();
194+
node_b.messenger.peer_connected(&node_a.node_id, &init_msg.clone(), false).unwrap();
195+
}
196+
197+
fn disconnect_peers(node_a: &MessengerNode, node_b: &MessengerNode) {
198+
node_a.messenger.peer_disconnected(&node_b.node_id);
199+
node_b.messenger.peer_disconnected(&node_a.node_id);
200+
}
201+
202+
fn release_events(node: &MessengerNode) -> Vec<Event> {
203+
let events = core::cell::RefCell::new(Vec::new());
204+
node.messenger.process_pending_events(&|e| events.borrow_mut().push(e));
205+
events.into_inner()
206+
}
207+
192208
fn pass_along_path(path: &Vec<MessengerNode>) {
193209
let mut prev_node = &path[0];
194210
for node in path.into_iter().skip(1) {
@@ -460,6 +476,72 @@ fn many_hops() {
460476
pass_along_path(&nodes);
461477
}
462478

479+
#[test]
480+
fn requests_peer_connection_for_buffered_messages() {
481+
let nodes = create_nodes(3);
482+
let message = TestCustomMessage::Request;
483+
let secp_ctx = Secp256k1::new();
484+
let blinded_path = BlindedPath::new_for_message(
485+
&[nodes[1].node_id, nodes[2].node_id], &*nodes[0].entropy_source, &secp_ctx
486+
).unwrap();
487+
let destination = Destination::BlindedPath(blinded_path);
488+
489+
// Buffer an onion message for a connected peer
490+
nodes[0].messenger.send_onion_message(message.clone(), destination.clone(), None).unwrap();
491+
assert!(release_events(&nodes[0]).is_empty());
492+
assert!(nodes[0].messenger.next_onion_message_for_peer(nodes[1].node_id).is_some());
493+
assert!(nodes[0].messenger.next_onion_message_for_peer(nodes[1].node_id).is_none());
494+
495+
// Buffer an onion message for a disconnected peer
496+
disconnect_peers(&nodes[0], &nodes[1]);
497+
assert!(nodes[0].messenger.next_onion_message_for_peer(nodes[1].node_id).is_none());
498+
nodes[0].messenger.send_onion_message(message, destination, None).unwrap();
499+
500+
// Check that a ConnectionNeeded event for the peer is provided
501+
let events = release_events(&nodes[0]);
502+
assert_eq!(events.len(), 1);
503+
match &events[0] {
504+
Event::ConnectionNeeded { node_id, .. } => assert_eq!(*node_id, nodes[1].node_id),
505+
e => panic!("Unexpected event: {:?}", e),
506+
}
507+
508+
// Release the buffered onion message when reconnected
509+
connect_peers(&nodes[0], &nodes[1]);
510+
assert!(nodes[0].messenger.next_onion_message_for_peer(nodes[1].node_id).is_some());
511+
assert!(nodes[0].messenger.next_onion_message_for_peer(nodes[1].node_id).is_none());
512+
}
513+
514+
#[test]
515+
fn drops_buffered_messages_waiting_for_peer_connection() {
516+
let nodes = create_nodes(3);
517+
let message = TestCustomMessage::Request;
518+
let secp_ctx = Secp256k1::new();
519+
let blinded_path = BlindedPath::new_for_message(
520+
&[nodes[1].node_id, nodes[2].node_id], &*nodes[0].entropy_source, &secp_ctx
521+
).unwrap();
522+
let destination = Destination::BlindedPath(blinded_path);
523+
524+
// Buffer an onion message for a disconnected peer
525+
disconnect_peers(&nodes[0], &nodes[1]);
526+
nodes[0].messenger.send_onion_message(message, destination, None).unwrap();
527+
528+
// Release the event so the timer can start ticking
529+
let events = release_events(&nodes[0]);
530+
assert_eq!(events.len(), 1);
531+
match &events[0] {
532+
Event::ConnectionNeeded { node_id, .. } => assert_eq!(*node_id, nodes[1].node_id),
533+
e => panic!("Unexpected event: {:?}", e),
534+
}
535+
536+
// Drop buffered messages for a disconnected peer after some timer ticks
537+
use crate::onion_message::messenger::MAX_TIMER_TICKS;
538+
for _ in 0..=MAX_TIMER_TICKS {
539+
nodes[0].messenger.timer_tick_occurred();
540+
}
541+
connect_peers(&nodes[0], &nodes[1]);
542+
assert!(nodes[0].messenger.next_onion_message_for_peer(nodes[1].node_id).is_none());
543+
}
544+
463545
#[test]
464546
fn spec_test_vector() {
465547
let secret_keys = [

lightning/src/onion_message/messenger.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ use crate::io;
4040
use crate::sync::{Arc, Mutex};
4141
use crate::prelude::*;
4242

43+
pub(super) const MAX_TIMER_TICKS: usize = 2;
44+
4345
/// A sender, receiver and forwarder of [`OnionMessage`]s.
4446
///
4547
/// # Handling Messages
@@ -166,8 +168,8 @@ enum OnionMessageBuffer {
166168
/// Messages for a node connected as a peer.
167169
ConnectedPeer(VecDeque<OnionMessage>),
168170

169-
/// Messages for a node that is not yet connected, which are dropped after a certain number of
170-
/// timer ticks defined in [`OnionMessenger::timer_tick_occurred`] and tracked here.
171+
/// Messages for a node that is not yet connected, which are dropped after [`MAX_TIMER_TICKS`]
172+
/// and tracked here.
171173
PendingConnection(VecDeque<OnionMessage>, Option<Vec<SocketAddress>>, usize),
172174
}
173175

@@ -901,7 +903,6 @@ where
901903
}
902904

903905
fn timer_tick_occurred(&self) {
904-
const MAX_TIMER_TICKS: usize = 2;
905906
let mut message_buffers = self.message_buffers.lock().unwrap();
906907

907908
// Drop any pending recipients since the last call to avoid retaining buffered messages for

0 commit comments

Comments
 (0)