Skip to content

Commit f8a196c

Browse files
committed
Initiate sync only after receiving GossipTimestampFilter.
1 parent 72069bf commit f8a196c

File tree

2 files changed

+70
-14
lines changed

2 files changed

+70
-14
lines changed

lightning/src/ln/peer_handler.rs

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,7 @@ struct Peer {
339339
msgs_sent_since_pong: usize,
340340
awaiting_pong_timer_tick_intervals: i8,
341341
received_message_since_timer_tick: bool,
342+
sent_gossip_timestamp_filter: bool,
342343
}
343344

344345
impl Peer {
@@ -348,7 +349,11 @@ impl Peer {
348349
/// announcements/updates for the given channel_id then we will send it when we get to that
349350
/// point and we shouldn't send it yet to avoid sending duplicate updates. If we've already
350351
/// sent the old versions, we should send the update, and so return true here.
351-
fn should_forward_channel_announcement(&self, channel_id: u64)->bool{
352+
fn should_forward_channel_announcement(&self, channel_id: u64) -> bool {
353+
if self.their_features.as_ref().unwrap().supports_gossip_queries() &&
354+
!self.sent_gossip_timestamp_filter {
355+
return false;
356+
}
352357
match self.sync_status {
353358
InitSyncTracker::NoSyncRequested => true,
354359
InitSyncTracker::ChannelsSyncing(i) => i < channel_id,
@@ -358,6 +363,10 @@ impl Peer {
358363

359364
/// Similar to the above, but for node announcements indexed by node_id.
360365
fn should_forward_node_announcement(&self, node_id: PublicKey) -> bool {
366+
if self.their_features.as_ref().unwrap().supports_gossip_queries() &&
367+
!self.sent_gossip_timestamp_filter {
368+
return false;
369+
}
361370
match self.sync_status {
362371
InitSyncTracker::NoSyncRequested => true,
363372
InitSyncTracker::ChannelsSyncing(_) => false,
@@ -619,6 +628,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
619628
msgs_sent_since_pong: 0,
620629
awaiting_pong_timer_tick_intervals: 0,
621630
received_message_since_timer_tick: false,
631+
sent_gossip_timestamp_filter: false,
622632
}).is_some() {
623633
panic!("PeerManager driver duplicated descriptors!");
624634
};
@@ -665,6 +675,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
665675
msgs_sent_since_pong: 0,
666676
awaiting_pong_timer_tick_intervals: 0,
667677
received_message_since_timer_tick: false,
678+
sent_gossip_timestamp_filter: false,
668679
}).is_some() {
669680
panic!("PeerManager driver duplicated descriptors!");
670681
};
@@ -1058,7 +1069,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
10581069

10591070
log_info!(self.logger, "Received peer Init message from {}: {}", log_pubkey!(peer.their_node_id.unwrap()), msg.features);
10601071

1061-
if msg.features.initial_routing_sync() {
1072+
// For peers not supporting gossip queries start sync now, otherwise wait until we receive a filter.
1073+
if msg.features.initial_routing_sync() && !msg.features.supports_gossip_queries() {
10621074
peer.sync_status = InitSyncTracker::ChannelsSyncing(0);
10631075
}
10641076
if !msg.features.supports_static_remote_key() {
@@ -1205,7 +1217,13 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
12051217
self.message_handler.route_handler.handle_reply_channel_range(&peer.their_node_id.unwrap(), msg)?;
12061218
},
12071219
wire::Message::GossipTimestampFilter(_msg) => {
1208-
// TODO: handle message
1220+
// When supporting gossip messages, start inital gossip sync only after we receive
1221+
// a GossipTimestampFilter
1222+
if peer.their_features.as_ref().unwrap().supports_gossip_queries() &&
1223+
!peer.sent_gossip_timestamp_filter {
1224+
peer.sent_gossip_timestamp_filter = true;
1225+
peer.sync_status = InitSyncTracker::ChannelsSyncing(0);
1226+
}
12091227
},
12101228

12111229
// Unknown messages:
@@ -1799,6 +1817,8 @@ mod tests {
17991817
assert_eq!(peer_b.read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
18001818
peer_b.process_events();
18011819
assert_eq!(peer_a.read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
1820+
peer_a.process_events();
1821+
assert_eq!(peer_b.read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
18021822
(fd_a.clone(), fd_b.clone())
18031823
}
18041824

@@ -1862,21 +1882,21 @@ mod tests {
18621882
let (mut fd_a, mut fd_b) = establish_connection(&peers[0], &peers[1]);
18631883

18641884
// Make each peer to read the messages that the other peer just wrote to them. Note that
1865-
// due to the max-messagse-before-ping limits this may take a few iterations to complete.
1885+
// due to the max-message-before-ping limits this may take a few iterations to complete.
18661886
for _ in 0..150/super::BUFFER_DRAIN_MSGS_PER_TICK + 1 {
1867-
peers[0].process_events();
1868-
let b_read_data = fd_a.outbound_data.lock().unwrap().split_off(0);
1869-
assert!(!b_read_data.is_empty());
1870-
1871-
peers[1].read_event(&mut fd_b, &b_read_data).unwrap();
18721887
peers[1].process_events();
1873-
18741888
let a_read_data = fd_b.outbound_data.lock().unwrap().split_off(0);
18751889
assert!(!a_read_data.is_empty());
1890+
18761891
peers[0].read_event(&mut fd_a, &a_read_data).unwrap();
1892+
peers[0].process_events();
18771893

1878-
peers[1].process_events();
1879-
assert_eq!(fd_b.outbound_data.lock().unwrap().len(), 0, "Until B receives data, it shouldn't send more messages");
1894+
let b_read_data = fd_a.outbound_data.lock().unwrap().split_off(0);
1895+
assert!(!b_read_data.is_empty());
1896+
peers[1].read_event(&mut fd_b, &b_read_data).unwrap();
1897+
1898+
peers[0].process_events();
1899+
assert_eq!(fd_a.outbound_data.lock().unwrap().len(), 0, "Until A receives data, it shouldn't send more messages");
18801900
}
18811901

18821902
// Check that each peer has received the expected number of channel updates and channel

lightning/src/util/test_utils.rs

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ use core::{cmp, mem};
4949
use bitcoin::bech32::u5;
5050
use chain::keysinterface::{InMemorySigner, Recipient, KeyMaterial};
5151

52+
#[cfg(feature = "std")]
53+
use std::time::{SystemTime, UNIX_EPOCH};
54+
5255
pub struct TestVecWriter(pub Vec<u8>);
5356
impl Writer for TestVecWriter {
5457
fn write_all(&mut self, buf: &[u8]) -> Result<(), io::Error> {
@@ -341,6 +344,7 @@ fn get_dummy_channel_update(short_chan_id: u64) -> msgs::ChannelUpdate {
341344
pub struct TestRoutingMessageHandler {
342345
pub chan_upds_recvd: AtomicUsize,
343346
pub chan_anns_recvd: AtomicUsize,
347+
pub pending_events: Mutex<Vec<events::MessageSendEvent>>,
344348
pub request_full_sync: AtomicBool,
345349
}
346350

@@ -349,6 +353,7 @@ impl TestRoutingMessageHandler {
349353
TestRoutingMessageHandler {
350354
chan_upds_recvd: AtomicUsize::new(0),
351355
chan_anns_recvd: AtomicUsize::new(0),
356+
pending_events: Mutex::new(vec![]),
352357
request_full_sync: AtomicBool::new(false),
353358
}
354359
}
@@ -384,7 +389,35 @@ impl msgs::RoutingMessageHandler for TestRoutingMessageHandler {
384389
Vec::new()
385390
}
386391

387-
fn peer_connected(&self, _their_node_id: &PublicKey, _init_msg: &msgs::Init) {}
392+
fn peer_connected(&self, their_node_id: &PublicKey, init_msg: &msgs::Init) {
393+
if !init_msg.features.supports_gossip_queries() {
394+
return ();
395+
}
396+
397+
let should_request_full_sync = self.request_full_sync.load(Ordering::Acquire);
398+
399+
#[allow(unused_mut, unused_assignments)]
400+
let mut gossip_start_time = 0;
401+
#[cfg(feature = "std")]
402+
{
403+
gossip_start_time = SystemTime::now().duration_since(UNIX_EPOCH).expect("Time must be > 1970").as_secs();
404+
if should_request_full_sync {
405+
gossip_start_time -= 60 * 60 * 24 * 7 * 2; // 2 weeks ago
406+
} else {
407+
gossip_start_time -= 60 * 60; // an hour ago
408+
}
409+
}
410+
411+
let mut pending_events = self.pending_events.lock().unwrap();
412+
pending_events.push(events::MessageSendEvent::SendGossipTimestampFilter {
413+
node_id: their_node_id.clone(),
414+
msg: msgs::GossipTimestampFilter {
415+
chain_hash: genesis_block(Network::Testnet).header.block_hash(),
416+
first_timestamp: gossip_start_time as u32,
417+
timestamp_range: u32::max_value(),
418+
},
419+
});
420+
}
388421

389422
fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: msgs::ReplyChannelRange) -> Result<(), msgs::LightningError> {
390423
Ok(())
@@ -405,7 +438,10 @@ impl msgs::RoutingMessageHandler for TestRoutingMessageHandler {
405438

406439
impl events::MessageSendEventsProvider for TestRoutingMessageHandler {
407440
fn get_and_clear_pending_msg_events(&self) -> Vec<events::MessageSendEvent> {
408-
vec![]
441+
let mut ret = Vec::new();
442+
let mut pending_events = self.pending_events.lock().unwrap();
443+
core::mem::swap(&mut ret, &mut pending_events);
444+
ret
409445
}
410446
}
411447

0 commit comments

Comments
 (0)