Skip to content

Commit 424ded7

Browse files
committed
Handle-initial_routing_sync-requests-from-peers-in-their-Init-messages
1 parent 0e098d4 commit 424ded7

File tree

4 files changed

+219
-43
lines changed

4 files changed

+219
-43
lines changed

src/ln/msgs.rs

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ pub struct AnnouncementSignatures {
334334
}
335335

336336
/// An address which can be used to connect to a remote peer
337-
#[derive(Clone)]
337+
#[derive(Clone, PartialEq)]
338338
pub enum NetAddress {
339339
/// An IPv4 address/port on which the peer is listenting.
340340
IPv4 {
@@ -381,7 +381,7 @@ impl NetAddress {
381381
}
382382
}
383383
}
384-
384+
#[derive(Clone, PartialEq)]
385385
// Only exposed as broadcast of node_announcement should be filtered by node_id
386386
/// The unsigned part of a node_announcement
387387
pub struct UnsignedNodeAnnouncement {
@@ -398,6 +398,7 @@ pub struct UnsignedNodeAnnouncement {
398398
pub(crate) excess_address_data: Vec<u8>,
399399
pub(crate) excess_data: Vec<u8>,
400400
}
401+
#[derive(Clone, PartialEq)]
401402
/// A node_announcement message to be sent or received from a peer
402403
pub struct NodeAnnouncement {
403404
pub(crate) signature: Signature,
@@ -575,6 +576,18 @@ pub trait ChannelMessageHandler : events::MessageSendEventsProvider + Send + Syn
575576
fn handle_error(&self, their_node_id: &PublicKey, msg: &ErrorMessage);
576577
}
577578

579+
///Enum used to keep track of syncing information/state of peer and if a sync is required
580+
pub enum InitSyncTracker{
581+
///This indicates if a sync is required or not, false is no sync required, true is sync required but not started
582+
Sync(bool),
583+
///This is the last synced node's public key
584+
///During this state it is syncing nodes
585+
NodeCounter(PublicKey),
586+
///This is the last synced channel _id
587+
///During this state it is syncing nodes
588+
ChannelCounter(u64),
589+
}
590+
578591
/// A trait to describe an object which can receive routing messages.
579592
pub trait RoutingMessageHandler : Send + Sync {
580593
/// Handle an incoming node_announcement message, returning true if it should be forwarded on,
@@ -588,6 +601,12 @@ pub trait RoutingMessageHandler : Send + Sync {
588601
fn handle_channel_update(&self, msg: &ChannelUpdate) -> Result<bool, HandleError>;
589602
/// Handle some updates to the route graph that we learned due to an outbound failed payment.
590603
fn handle_htlc_fail_channel_update(&self, update: &HTLCFailChannelUpdate);
604+
///Gets a subset of the channel announcements and updates required to dump our routing table to a remote node, starting at the short_channel_id indicated by starting_point.channelcounter and including batch_amount entries
605+
/// This function will start iterating at 0 if the starting_point is < 0.
606+
fn get_next_channel_announcements(&self, starting_point: &mut InitSyncTracker, batch_amount: u8)->(Vec<(ChannelAnnouncement, ChannelUpdate,ChannelUpdate)>);
607+
///Gets a subset of the node announcements required to dump our routing table to a remote node, starting at the PublicKey indicated by starting_point.nodecounter and including batch_amount entries
608+
/// This function will start iterating at 0 if the starting_point is < 0.
609+
fn get_next_node_announcements(&self, starting_point: &mut InitSyncTracker, batch_amount: u8)->(Vec<NodeAnnouncement>);
591610
}
592611

593612
pub(crate) struct OnionRealm0HopData {

src/ln/peer_handler.rs

Lines changed: 59 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,13 @@ struct Peer {
102102
pending_read_buffer: Vec<u8>,
103103
pending_read_buffer_pos: usize,
104104
pending_read_is_header: bool,
105+
sync_status : msgs::InitSyncTracker,
106+
}
107+
108+
impl Peer {
109+
pub fn require_sync(&self)->bool{
110+
if let msgs::InitSyncTracker::Sync(i) = self.sync_status {i} else {false}
111+
}
105112
}
106113

107114
struct PeerHolder<Descriptor: SocketDescriptor> {
@@ -221,6 +228,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
221228
pending_read_buffer: pending_read_buffer,
222229
pending_read_buffer_pos: 0,
223230
pending_read_is_header: false,
231+
sync_status : msgs::InitSyncTracker::Sync(false),
224232
}).is_some() {
225233
panic!("PeerManager driver duplicated descriptors!");
226234
};
@@ -255,22 +263,47 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
255263
pending_read_buffer: pending_read_buffer,
256264
pending_read_buffer_pos: 0,
257265
pending_read_is_header: false,
266+
sync_status : msgs::InitSyncTracker::Sync(false),
258267
}).is_some() {
259268
panic!("PeerManager driver duplicated descriptors!");
260269
};
261270
Ok(())
262271
}
263272

264-
fn do_attempt_write_data(descriptor: &mut Descriptor, peer: &mut Peer) {
273+
fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer) {
274+
macro_rules! encode_and_send_msg {
275+
($msg: expr, $msg_code: expr) => {
276+
{
277+
log_trace!(self, "Encoding and sending message of type {} to {}", $msg_code, log_pubkey!(peer.their_node_id.unwrap()));
278+
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!($msg, $msg_code)[..]));
279+
}
280+
}
281+
}
265282
while !peer.awaiting_write_event {
266283
if {
284+
let should_be_reading = peer.pending_outbound_buffer.len() < 10;
285+
if (peer.require_sync()) &&(should_be_reading){
286+
match peer.sync_status{
287+
msgs::InitSyncTracker::ChannelCounter(_c) => {
288+
let all_messages_tuple = self.message_handler.route_handler.get_next_channel_announcements(&mut peer.sync_status,(10-peer.pending_outbound_buffer.len()) as u8);
289+
for tuple in all_messages_tuple.iter(){
290+
encode_and_send_msg!(tuple.0, 256);
291+
encode_and_send_msg!(tuple.1, 258);
292+
encode_and_send_msg!(tuple.2, 258);
293+
}
294+
},
295+
_=>{let all_messages = self.message_handler.route_handler.get_next_node_announcements(&mut peer.sync_status,(10-peer.pending_outbound_buffer.len()) as u8);
296+
for message in all_messages.iter(){
297+
encode_and_send_msg!(message, 256);
298+
}},
299+
};
300+
}
267301
let next_buff = match peer.pending_outbound_buffer.front() {
268302
None => return,
269303
Some(buff) => buff,
270304
};
271-
let should_be_reading = peer.pending_outbound_buffer.len() < 10;
272305

273-
let data_sent = descriptor.send_data(next_buff, peer.pending_outbound_buffer_first_msg_offset, should_be_reading);
306+
let data_sent = descriptor.send_data(&next_buff, peer.pending_outbound_buffer_first_msg_offset, should_be_reading);
274307
peer.pending_outbound_buffer_first_msg_offset += data_sent;
275308
if peer.pending_outbound_buffer_first_msg_offset == next_buff.len() { true } else { false }
276309
} {
@@ -297,7 +330,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
297330
None => panic!("Descriptor for write_event is not already known to PeerManager"),
298331
Some(peer) => {
299332
peer.awaiting_write_event = false;
300-
Self::do_attempt_write_data(descriptor, peer);
333+
self.do_attempt_write_data(descriptor, peer);
301334
}
302335
};
303336
Ok(())
@@ -522,6 +555,10 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
522555
if msg.local_features.supports_unknown_bits() { "present" } else { "none" },
523556
if msg.global_features.supports_unknown_bits() { "present" } else { "none" });
524557

558+
if msg.local_features.initial_routing_sync() {
559+
peer.sync_status = msgs::InitSyncTracker::Sync(true);
560+
peers.peers_needing_send.insert(peer_descriptor.clone());
561+
}
525562
peer.their_global_features = Some(msg.global_features);
526563
peer.their_local_features = Some(msg.local_features);
527564

@@ -531,6 +568,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
531568
self.initial_syncs_sent.fetch_add(1, Ordering::AcqRel);
532569
local_features.set_initial_routing_sync();
533570
}
571+
534572
encode_and_send_msg!(msgs::Init {
535573
global_features: msgs::GlobalFeatures::new(),
536574
local_features,
@@ -678,7 +716,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
678716
}
679717
}
680718

681-
Self::do_attempt_write_data(peer_descriptor, peer);
719+
self.do_attempt_write_data(peer_descriptor, peer);
682720

683721
peer.pending_outbound_buffer.len() > 10 // pause_read
684722
}
@@ -735,7 +773,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
735773
//TODO: Drop the pending channel? (or just let it timeout, but that sucks)
736774
});
737775
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 33)));
738-
Self::do_attempt_write_data(&mut descriptor, peer);
776+
self.do_attempt_write_data(&mut descriptor, peer);
739777
},
740778
MessageSendEvent::SendOpenChannel { ref node_id, ref msg } => {
741779
log_trace!(self, "Handling SendOpenChannel event in peer_handler for node {} for channel {}",
@@ -745,7 +783,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
745783
//TODO: Drop the pending channel? (or just let it timeout, but that sucks)
746784
});
747785
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 32)));
748-
Self::do_attempt_write_data(&mut descriptor, peer);
786+
self.do_attempt_write_data(&mut descriptor, peer);
749787
},
750788
MessageSendEvent::SendFundingCreated { ref node_id, ref msg } => {
751789
log_trace!(self, "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})",
@@ -757,7 +795,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
757795
//they should just throw away this funding transaction
758796
});
759797
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 34)));
760-
Self::do_attempt_write_data(&mut descriptor, peer);
798+
self.do_attempt_write_data(&mut descriptor, peer);
761799
},
762800
MessageSendEvent::SendFundingSigned { ref node_id, ref msg } => {
763801
log_trace!(self, "Handling SendFundingSigned event in peer_handler for node {} for channel {}",
@@ -768,7 +806,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
768806
//they should just throw away this funding transaction
769807
});
770808
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 35)));
771-
Self::do_attempt_write_data(&mut descriptor, peer);
809+
self.do_attempt_write_data(&mut descriptor, peer);
772810
},
773811
MessageSendEvent::SendFundingLocked { ref node_id, ref msg } => {
774812
log_trace!(self, "Handling SendFundingLocked event in peer_handler for node {} for channel {}",
@@ -778,7 +816,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
778816
//TODO: Do whatever we're gonna do for handling dropped messages
779817
});
780818
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 36)));
781-
Self::do_attempt_write_data(&mut descriptor, peer);
819+
self.do_attempt_write_data(&mut descriptor, peer);
782820
},
783821
MessageSendEvent::SendAnnouncementSignatures { ref node_id, ref msg } => {
784822
log_trace!(self, "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})",
@@ -789,7 +827,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
789827
//they should just throw away this funding transaction
790828
});
791829
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 259)));
792-
Self::do_attempt_write_data(&mut descriptor, peer);
830+
self.do_attempt_write_data(&mut descriptor, peer);
793831
},
794832
MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
795833
log_trace!(self, "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}",
@@ -817,7 +855,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
817855
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 134)));
818856
}
819857
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(commitment_signed, 132)));
820-
Self::do_attempt_write_data(&mut descriptor, peer);
858+
self.do_attempt_write_data(&mut descriptor, peer);
821859
},
822860
MessageSendEvent::SendRevokeAndACK { ref node_id, ref msg } => {
823861
log_trace!(self, "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}",
@@ -827,7 +865,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
827865
//TODO: Do whatever we're gonna do for handling dropped messages
828866
});
829867
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 133)));
830-
Self::do_attempt_write_data(&mut descriptor, peer);
868+
self.do_attempt_write_data(&mut descriptor, peer);
831869
},
832870
MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => {
833871
log_trace!(self, "Handling SendClosingSigned event in peer_handler for node {} for channel {}",
@@ -837,7 +875,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
837875
//TODO: Do whatever we're gonna do for handling dropped messages
838876
});
839877
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 39)));
840-
Self::do_attempt_write_data(&mut descriptor, peer);
878+
self.do_attempt_write_data(&mut descriptor, peer);
841879
},
842880
MessageSendEvent::SendShutdown { ref node_id, ref msg } => {
843881
log_trace!(self, "Handling Shutdown event in peer_handler for node {} for channel {}",
@@ -847,7 +885,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
847885
//TODO: Do whatever we're gonna do for handling dropped messages
848886
});
849887
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 38)));
850-
Self::do_attempt_write_data(&mut descriptor, peer);
888+
self.do_attempt_write_data(&mut descriptor, peer);
851889
},
852890
MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => {
853891
log_trace!(self, "Handling SendChannelReestablish event in peer_handler for node {} for channel {}",
@@ -857,7 +895,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
857895
//TODO: Do whatever we're gonna do for handling dropped messages
858896
});
859897
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 136)));
860-
Self::do_attempt_write_data(&mut descriptor, peer);
898+
self.do_attempt_write_data(&mut descriptor, peer);
861899
},
862900
MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => {
863901
log_trace!(self, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id);
@@ -879,7 +917,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
879917
}
880918
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
881919
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_update_msg[..]));
882-
Self::do_attempt_write_data(&mut (*descriptor).clone(), peer);
920+
self.do_attempt_write_data(&mut (*descriptor).clone(), peer);
883921
}
884922
}
885923
},
@@ -893,7 +931,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
893931
continue
894932
}
895933
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
896-
Self::do_attempt_write_data(&mut (*descriptor).clone(), peer);
934+
self.do_attempt_write_data(&mut (*descriptor).clone(), peer);
897935
}
898936
}
899937
},
@@ -914,7 +952,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
914952
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 17)));
915953
// This isn't guaranteed to work, but if there is enough free
916954
// room in the send buffer, put the error message there...
917-
Self::do_attempt_write_data(&mut descriptor, &mut peer);
955+
self.do_attempt_write_data(&mut descriptor, &mut peer);
918956
} else {
919957
log_trace!(self, "Handling DisconnectPeer HandleError event in peer_handler for node {} with no message", log_pubkey!(node_id));
920958
}
@@ -932,7 +970,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
932970
//TODO: Do whatever we're gonna do for handling dropped messages
933971
});
934972
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 17)));
935-
Self::do_attempt_write_data(&mut descriptor, peer);
973+
self.do_attempt_write_data(&mut descriptor, peer);
936974
},
937975
}
938976
} else {
@@ -944,7 +982,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
944982

945983
for mut descriptor in peers.peers_needing_send.drain() {
946984
match peers.peers.get_mut(&descriptor) {
947-
Some(peer) => Self::do_attempt_write_data(&mut descriptor, peer),
985+
Some(peer) => self.do_attempt_write_data(&mut descriptor, peer),
948986
None => panic!("Inconsistent peers set state!"),
949987
}
950988
}

0 commit comments

Comments
 (0)