Skip to content

Handle-initial_routing_sync-requests-from-peers-in-their-Init-messages #247

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/ln/msgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ impl NetAddress {
}
}

#[derive(Clone)]
// Only exposed as broadcast of node_announcement should be filtered by node_id
/// The unsigned part of a node_announcement
pub struct UnsignedNodeAnnouncement {
Expand All @@ -398,6 +399,7 @@ pub struct UnsignedNodeAnnouncement {
pub(crate) excess_address_data: Vec<u8>,
pub(crate) excess_data: Vec<u8>,
}
#[derive(Clone)]
/// A node_announcement message to be sent or received from a peer
pub struct NodeAnnouncement {
pub(crate) signature: Signature,
Expand Down Expand Up @@ -588,6 +590,14 @@ pub trait RoutingMessageHandler : Send + Sync {
fn handle_channel_update(&self, msg: &ChannelUpdate) -> Result<bool, HandleError>;
/// Handle some updates to the route graph that we learned due to an outbound failed payment.
fn handle_htlc_fail_channel_update(&self, update: &HTLCFailChannelUpdate);
/// Gets a subset of the channel announcements and updates required to dump our routing table
/// to a remote node, starting at the short_channel_id indicated by starting_point and
/// including batch_amount entries.
fn get_next_channel_announcements(&self, starting_point: u64, batch_amount: u8) -> Vec<(ChannelAnnouncement, ChannelUpdate, ChannelUpdate)>;
/// Gets a subset of the node announcements required to dump our routing table to a remote node,
/// starting at the node *after* the provided publickey and including batch_amount entries.
/// If None is provided for starting_point, we start at the first node.
fn get_next_node_announcements(&self, starting_point: Option<&PublicKey>, batch_amount: u8) -> Vec<NodeAnnouncement>;
}

pub(crate) struct OnionRealm0HopData {
Expand Down
130 changes: 108 additions & 22 deletions src/ln/peer_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ impl error::Error for PeerHandleError {
}
}

enum InitSyncTracker{
NoSyncRequested,
ChannelsSyncing(u64),
NodesSyncing(PublicKey),
}

struct Peer {
channel_encryptor: PeerChannelEncryptor,
outbound: bool,
Expand All @@ -102,6 +108,24 @@ struct Peer {
pending_read_buffer: Vec<u8>,
pending_read_buffer_pos: usize,
pending_read_is_header: bool,

sync_status: InitSyncTracker,
}

impl Peer {
/// Returns true if the the channel announcements/updates for the given channel should be
/// forwarded to this peer.
/// If we are sending our routing table to this peer and we have not yet sent channel
/// announcements/updates for the given channel_id then we will send it when we get to that
/// point and we shouldn't send it yet to avoid sending duplicate updates. If we've already
/// sent the old versions, we should send the update, and so return true here.
fn should_forward_channel(&self, channel_id: u64)->bool{
match self.sync_status {
InitSyncTracker::NoSyncRequested => true,
InitSyncTracker::ChannelsSyncing(i) => i < channel_id,
InitSyncTracker::NodesSyncing(_) => true,
}
}
}

struct PeerHolder<Descriptor: SocketDescriptor> {
Expand Down Expand Up @@ -221,6 +245,8 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
pending_read_buffer: pending_read_buffer,
pending_read_buffer_pos: 0,
pending_read_is_header: false,

sync_status: InitSyncTracker::NoSyncRequested,
}).is_some() {
panic!("PeerManager driver duplicated descriptors!");
};
Expand Down Expand Up @@ -255,21 +281,74 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
pending_read_buffer: pending_read_buffer,
pending_read_buffer_pos: 0,
pending_read_is_header: false,

sync_status: InitSyncTracker::NoSyncRequested,
}).is_some() {
panic!("PeerManager driver duplicated descriptors!");
};
Ok(())
}

fn do_attempt_write_data(descriptor: &mut Descriptor, peer: &mut Peer) {
fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer) {
macro_rules! encode_and_send_msg {
($msg: expr, $msg_code: expr) => {
{
log_trace!(self, "Encoding and sending sync update message of type {} to {}", $msg_code, log_pubkey!(peer.their_node_id.unwrap()));
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!($msg, $msg_code)[..]));
}
}
}
const MSG_BUFF_SIZE: usize = 10;
while !peer.awaiting_write_event {
if peer.pending_outbound_buffer.len() < MSG_BUFF_SIZE {
match peer.sync_status {
InitSyncTracker::NoSyncRequested => {},
InitSyncTracker::ChannelsSyncing(c) if c < 0xffff_ffff_ffff_ffff => {
let steps = ((MSG_BUFF_SIZE - peer.pending_outbound_buffer.len() + 2) / 3) as u8;
let all_messages = self.message_handler.route_handler.get_next_channel_announcements(0, steps);
for &(ref announce, ref update_a, ref update_b) in all_messages.iter() {
encode_and_send_msg!(announce, 256);
encode_and_send_msg!(update_a, 258);
encode_and_send_msg!(update_b, 258);
peer.sync_status = InitSyncTracker::ChannelsSyncing(announce.contents.short_channel_id + 1);
}
if all_messages.is_empty() || all_messages.len() != steps as usize {
peer.sync_status = InitSyncTracker::ChannelsSyncing(0xffff_ffff_ffff_ffff);
}
},
InitSyncTracker::ChannelsSyncing(c) if c == 0xffff_ffff_ffff_ffff => {
let steps = (MSG_BUFF_SIZE - peer.pending_outbound_buffer.len()) as u8;
let all_messages = self.message_handler.route_handler.get_next_node_announcements(None, steps);
for msg in all_messages.iter() {
encode_and_send_msg!(msg, 256);
peer.sync_status = InitSyncTracker::NodesSyncing(msg.contents.node_id);
}
if all_messages.is_empty() || all_messages.len() != steps as usize {
peer.sync_status = InitSyncTracker::NoSyncRequested;
}
},
InitSyncTracker::ChannelsSyncing(_) => unreachable!(),
InitSyncTracker::NodesSyncing(key) => {
let steps = (MSG_BUFF_SIZE - peer.pending_outbound_buffer.len()) as u8;
let all_messages = self.message_handler.route_handler.get_next_node_announcements(Some(&key), steps);
for msg in all_messages.iter() {
encode_and_send_msg!(msg, 256);
peer.sync_status = InitSyncTracker::NodesSyncing(msg.contents.node_id);
}
if all_messages.is_empty() || all_messages.len() != steps as usize {
peer.sync_status = InitSyncTracker::NoSyncRequested;
}
},
}
}

if {
let next_buff = match peer.pending_outbound_buffer.front() {
None => return,
Some(buff) => buff,
};
let should_be_reading = peer.pending_outbound_buffer.len() < 10;

let should_be_reading = peer.pending_outbound_buffer.len() < MSG_BUFF_SIZE;
let data_sent = descriptor.send_data(next_buff, peer.pending_outbound_buffer_first_msg_offset, should_be_reading);
peer.pending_outbound_buffer_first_msg_offset += data_sent;
if peer.pending_outbound_buffer_first_msg_offset == next_buff.len() { true } else { false }
Expand Down Expand Up @@ -297,7 +376,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
None => panic!("Descriptor for write_event is not already known to PeerManager"),
Some(peer) => {
peer.awaiting_write_event = false;
Self::do_attempt_write_data(descriptor, peer);
self.do_attempt_write_data(descriptor, peer);
}
};
Ok(())
Expand Down Expand Up @@ -522,6 +601,10 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
if msg.local_features.supports_unknown_bits() { "present" } else { "none" },
if msg.global_features.supports_unknown_bits() { "present" } else { "none" });

if msg.local_features.initial_routing_sync() {
peer.sync_status = InitSyncTracker::ChannelsSyncing(0);
peers.peers_needing_send.insert(peer_descriptor.clone());
}
peer.their_global_features = Some(msg.global_features);
peer.their_local_features = Some(msg.local_features);

Expand All @@ -531,6 +614,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
self.initial_syncs_sent.fetch_add(1, Ordering::AcqRel);
local_features.set_initial_routing_sync();
}

encode_and_send_msg!(msgs::Init {
global_features: msgs::GlobalFeatures::new(),
local_features,
Expand Down Expand Up @@ -678,7 +762,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
}
}

Self::do_attempt_write_data(peer_descriptor, peer);
self.do_attempt_write_data(peer_descriptor, peer);

peer.pending_outbound_buffer.len() > 10 // pause_read
}
Expand Down Expand Up @@ -735,7 +819,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
//TODO: Drop the pending channel? (or just let it timeout, but that sucks)
});
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 33)));
Self::do_attempt_write_data(&mut descriptor, peer);
self.do_attempt_write_data(&mut descriptor, peer);
},
MessageSendEvent::SendOpenChannel { ref node_id, ref msg } => {
log_trace!(self, "Handling SendOpenChannel event in peer_handler for node {} for channel {}",
Expand All @@ -745,7 +829,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
//TODO: Drop the pending channel? (or just let it timeout, but that sucks)
});
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 32)));
Self::do_attempt_write_data(&mut descriptor, peer);
self.do_attempt_write_data(&mut descriptor, peer);
},
MessageSendEvent::SendFundingCreated { ref node_id, ref msg } => {
log_trace!(self, "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})",
Expand All @@ -757,7 +841,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
//they should just throw away this funding transaction
});
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 34)));
Self::do_attempt_write_data(&mut descriptor, peer);
self.do_attempt_write_data(&mut descriptor, peer);
},
MessageSendEvent::SendFundingSigned { ref node_id, ref msg } => {
log_trace!(self, "Handling SendFundingSigned event in peer_handler for node {} for channel {}",
Expand All @@ -768,7 +852,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
//they should just throw away this funding transaction
});
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 35)));
Self::do_attempt_write_data(&mut descriptor, peer);
self.do_attempt_write_data(&mut descriptor, peer);
},
MessageSendEvent::SendFundingLocked { ref node_id, ref msg } => {
log_trace!(self, "Handling SendFundingLocked event in peer_handler for node {} for channel {}",
Expand All @@ -778,7 +862,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
//TODO: Do whatever we're gonna do for handling dropped messages
});
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 36)));
Self::do_attempt_write_data(&mut descriptor, peer);
self.do_attempt_write_data(&mut descriptor, peer);
},
MessageSendEvent::SendAnnouncementSignatures { ref node_id, ref msg } => {
log_trace!(self, "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})",
Expand All @@ -789,7 +873,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
//they should just throw away this funding transaction
});
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 259)));
Self::do_attempt_write_data(&mut descriptor, peer);
self.do_attempt_write_data(&mut descriptor, peer);
},
MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
log_trace!(self, "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}",
Expand Down Expand Up @@ -817,7 +901,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 134)));
}
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(commitment_signed, 132)));
Self::do_attempt_write_data(&mut descriptor, peer);
self.do_attempt_write_data(&mut descriptor, peer);
},
MessageSendEvent::SendRevokeAndACK { ref node_id, ref msg } => {
log_trace!(self, "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}",
Expand All @@ -827,7 +911,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
//TODO: Do whatever we're gonna do for handling dropped messages
});
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 133)));
Self::do_attempt_write_data(&mut descriptor, peer);
self.do_attempt_write_data(&mut descriptor, peer);
},
MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => {
log_trace!(self, "Handling SendClosingSigned event in peer_handler for node {} for channel {}",
Expand All @@ -837,7 +921,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
//TODO: Do whatever we're gonna do for handling dropped messages
});
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 39)));
Self::do_attempt_write_data(&mut descriptor, peer);
self.do_attempt_write_data(&mut descriptor, peer);
},
MessageSendEvent::SendShutdown { ref node_id, ref msg } => {
log_trace!(self, "Handling Shutdown event in peer_handler for node {} for channel {}",
Expand All @@ -847,7 +931,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
//TODO: Do whatever we're gonna do for handling dropped messages
});
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 38)));
Self::do_attempt_write_data(&mut descriptor, peer);
self.do_attempt_write_data(&mut descriptor, peer);
},
MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => {
log_trace!(self, "Handling SendChannelReestablish event in peer_handler for node {} for channel {}",
Expand All @@ -857,7 +941,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
//TODO: Do whatever we're gonna do for handling dropped messages
});
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 136)));
Self::do_attempt_write_data(&mut descriptor, peer);
self.do_attempt_write_data(&mut descriptor, peer);
},
MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => {
log_trace!(self, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id);
Expand All @@ -866,7 +950,8 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
let encoded_update_msg = encode_msg!(update_msg, 258);

for (ref descriptor, ref mut peer) in peers.peers.iter_mut() {
if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_global_features.is_none() {
if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_global_features.is_none() ||
!peer.should_forward_channel(msg.contents.short_channel_id) {
continue
}
match peer.their_node_id {
Expand All @@ -879,7 +964,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
}
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_update_msg[..]));
Self::do_attempt_write_data(&mut (*descriptor).clone(), peer);
self.do_attempt_write_data(&mut (*descriptor).clone(), peer);
}
}
},
Expand All @@ -889,11 +974,12 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
let encoded_msg = encode_msg!(msg, 258);

for (ref descriptor, ref mut peer) in peers.peers.iter_mut() {
if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_global_features.is_none() {
if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_global_features.is_none() ||
!peer.should_forward_channel(msg.contents.short_channel_id) {
continue
}
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
Self::do_attempt_write_data(&mut (*descriptor).clone(), peer);
self.do_attempt_write_data(&mut (*descriptor).clone(), peer);
}
}
},
Expand All @@ -914,7 +1000,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 17)));
// This isn't guaranteed to work, but if there is enough free
// room in the send buffer, put the error message there...
Self::do_attempt_write_data(&mut descriptor, &mut peer);
self.do_attempt_write_data(&mut descriptor, &mut peer);
} else {
log_trace!(self, "Handling DisconnectPeer HandleError event in peer_handler for node {} with no message", log_pubkey!(node_id));
}
Expand All @@ -932,7 +1018,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
//TODO: Do whatever we're gonna do for handling dropped messages
});
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 17)));
Self::do_attempt_write_data(&mut descriptor, peer);
self.do_attempt_write_data(&mut descriptor, peer);
},
}
} else {
Expand All @@ -944,7 +1030,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {

for mut descriptor in peers.peers_needing_send.drain() {
match peers.peers.get_mut(&descriptor) {
Some(peer) => Self::do_attempt_write_data(&mut descriptor, peer),
Some(peer) => self.do_attempt_write_data(&mut descriptor, peer),
None => panic!("Inconsistent peers set state!"),
}
}
Expand Down
Loading