Skip to content

Test that do_attempt_write_data does not infinitely loop #556

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 27, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 153 additions & 8 deletions lightning/src/ln/peer_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref> PeerManager<Descriptor, CM> where
InitSyncTracker::NoSyncRequested => {},
InitSyncTracker::ChannelsSyncing(c) if c < 0xffff_ffff_ffff_ffff => {
let steps = ((MSG_BUFF_SIZE - peer.pending_outbound_buffer.len() + 2) / 3) as u8;
let all_messages = self.message_handler.route_handler.get_next_channel_announcements(0, steps);
let all_messages = self.message_handler.route_handler.get_next_channel_announcements(c, steps);
for &(ref announce, ref update_a, ref update_b) in all_messages.iter() {
encode_and_send_msg!(announce);
encode_and_send_msg!(update_a);
Expand Down Expand Up @@ -1149,8 +1149,13 @@ impl<Descriptor: SocketDescriptor, CM: Deref> PeerManager<Descriptor, CM> where

#[cfg(test)]
mod tests {
use secp256k1::Signature;
use bitcoin::BitcoinHash;
use bitcoin::network::constants::Network;
use bitcoin::blockdata::constants::genesis_block;
use ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor};
use ln::msgs;
use ln::features::ChannelFeatures;
use util::events;
use util::test_utils;
use util::logger::Logger;
Expand All @@ -1161,7 +1166,9 @@ mod tests {
use rand::{thread_rng, Rng};

use std;
use std::cmp::min;
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicUsize, Ordering};

#[derive(Clone)]
struct FileDescriptor {
Expand Down Expand Up @@ -1199,29 +1206,31 @@ mod tests {
chan_handlers
}

fn create_network<'a>(peer_count: usize, chan_handlers: &'a Vec<test_utils::TestChannelMessageHandler>) -> Vec<PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler>> {
fn create_network<'a>(peer_count: usize, chan_handlers: &'a Vec<test_utils::TestChannelMessageHandler>, routing_handlers: Option<&'a Vec<Arc<msgs::RoutingMessageHandler>>>) -> Vec<PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler>> {
let mut peers = Vec::new();
let mut rng = thread_rng();
let logger : Arc<Logger> = Arc::new(test_utils::TestLogger::new());
let mut ephemeral_bytes = [0; 32];
rng.fill_bytes(&mut ephemeral_bytes);

for i in 0..peer_count {
let router = test_utils::TestRoutingMessageHandler::new();
let router = if let Some(routers) = routing_handlers { routers[i].clone() } else {
Arc::new(test_utils::TestRoutingMessageHandler::new())
};
let node_id = {
let mut key_slice = [0;32];
rng.fill_bytes(&mut key_slice);
SecretKey::from_slice(&key_slice).unwrap()
};
let msg_handler = MessageHandler { chan_handler: &chan_handlers[i], route_handler: Arc::new(router) };
let msg_handler = MessageHandler { chan_handler: &chan_handlers[i], route_handler: router };
let peer = PeerManager::new(msg_handler, node_id, &ephemeral_bytes, Arc::clone(&logger));
peers.push(peer);
}

peers
}

fn establish_connection<'a>(peer_a: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler>, peer_b: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler>) {
fn establish_connection<'a>(peer_a: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler>, peer_b: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler>) -> (FileDescriptor, FileDescriptor) {
let secp_ctx = Secp256k1::new();
let a_id = PublicKey::from_secret_key(&secp_ctx, &peer_a.our_node_secret);
let mut fd_a = FileDescriptor { fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())) };
Expand All @@ -1231,6 +1240,7 @@ mod tests {
assert_eq!(peer_a.read_event(&mut fd_a, &initial_data).unwrap(), false);
assert_eq!(peer_b.read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
assert_eq!(peer_a.read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
(fd_a.clone(), fd_b.clone())
}

#[test]
Expand All @@ -1239,7 +1249,7 @@ mod tests {
// push a DisconnectPeer event to remove the node flagged by id
let chan_handlers = create_chan_handlers(2);
let chan_handler = test_utils::TestChannelMessageHandler::new();
let mut peers = create_network(2, &chan_handlers);
let mut peers = create_network(2, &chan_handlers, None);
establish_connection(&peers[0], &peers[1]);
assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 1);

Expand All @@ -1256,11 +1266,12 @@ mod tests {
peers[0].process_events();
assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 0);
}

#[test]
fn test_timer_tick_occured(){
fn test_timer_tick_occurred() {
// Create peers, a vector of two peer managers, perform initial set up and check that peers[0] has one Peer.
let chan_handlers = create_chan_handlers(2);
let peers = create_network(2, &chan_handlers);
let peers = create_network(2, &chan_handlers, None);
establish_connection(&peers[0], &peers[1]);
assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 1);

Expand All @@ -1272,4 +1283,138 @@ mod tests {
peers[0].timer_tick_occured();
assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 0);
}

pub struct TestRoutingMessageHandler {
pub chan_upds_recvd: AtomicUsize,
pub chan_anns_recvd: AtomicUsize,
pub chan_anns_sent: AtomicUsize,
}

impl TestRoutingMessageHandler {
pub fn new() -> Self {
TestRoutingMessageHandler {
chan_upds_recvd: AtomicUsize::new(0),
chan_anns_recvd: AtomicUsize::new(0),
chan_anns_sent: AtomicUsize::new(0),
}
}

}
impl msgs::RoutingMessageHandler for TestRoutingMessageHandler {
fn handle_node_announcement(&self, _msg: &msgs::NodeAnnouncement) -> Result<bool, msgs::LightningError> {
Err(msgs::LightningError { err: "", action: msgs::ErrorAction::IgnoreError })
}
fn handle_channel_announcement(&self, _msg: &msgs::ChannelAnnouncement) -> Result<bool, msgs::LightningError> {
self.chan_anns_recvd.fetch_add(1, Ordering::AcqRel);
Err(msgs::LightningError { err: "", action: msgs::ErrorAction::IgnoreError })
}
fn handle_channel_update(&self, _msg: &msgs::ChannelUpdate) -> Result<bool, msgs::LightningError> {
self.chan_upds_recvd.fetch_add(1, Ordering::AcqRel);
Err(msgs::LightningError { err: "", action: msgs::ErrorAction::IgnoreError })
}
fn handle_htlc_fail_channel_update(&self, _update: &msgs::HTLCFailChannelUpdate) {}
fn get_next_channel_announcements(&self, starting_point: u64, batch_amount: u8) -> Vec<(msgs::ChannelAnnouncement, msgs::ChannelUpdate,msgs::ChannelUpdate)> {
let mut chan_anns = Vec::new();
const TOTAL_UPDS: u64 = 100;
let end: u64 = min(starting_point + batch_amount as u64, TOTAL_UPDS - self.chan_anns_sent.load(Ordering::Acquire) as u64);
for i in starting_point..end {
let chan_upd_1 = get_dummy_channel_update(i);
let chan_upd_2 = get_dummy_channel_update(i);
let chan_ann = get_dummy_channel_announcement(i);

chan_anns.push((chan_ann, chan_upd_1, chan_upd_2));
}

self.chan_anns_sent.fetch_add(chan_anns.len(), Ordering::AcqRel);
chan_anns
}

fn get_next_node_announcements(&self, _starting_point: Option<&PublicKey>, _batch_amount: u8) -> Vec<msgs::NodeAnnouncement> {
Vec::new()
}

fn should_request_full_sync(&self, _node_id: &PublicKey) -> bool {
true
}
}

fn get_dummy_channel_announcement(short_chan_id: u64) -> msgs::ChannelAnnouncement {
use secp256k1::ffi::Signature as FFISignature;
let secp_ctx = Secp256k1::new();
let network = Network::Testnet;
let node_1_privkey = SecretKey::from_slice(&[42; 32]).unwrap();
let node_2_privkey = SecretKey::from_slice(&[41; 32]).unwrap();
let node_1_btckey = SecretKey::from_slice(&[40; 32]).unwrap();
let node_2_btckey = SecretKey::from_slice(&[39; 32]).unwrap();
let unsigned_ann = msgs::UnsignedChannelAnnouncement {
features: ChannelFeatures::supported(),
chain_hash: genesis_block(network).header.bitcoin_hash(),
short_channel_id: short_chan_id,
node_id_1: PublicKey::from_secret_key(&secp_ctx, &node_1_privkey),
node_id_2: PublicKey::from_secret_key(&secp_ctx, &node_2_privkey),
bitcoin_key_1: PublicKey::from_secret_key(&secp_ctx, &node_1_btckey),
bitcoin_key_2: PublicKey::from_secret_key(&secp_ctx, &node_2_btckey),
excess_data: Vec::new(),
};

msgs::ChannelAnnouncement {
node_signature_1: Signature::from(FFISignature::new()),
node_signature_2: Signature::from(FFISignature::new()),
bitcoin_signature_1: Signature::from(FFISignature::new()),
bitcoin_signature_2: Signature::from(FFISignature::new()),
contents: unsigned_ann,
}
}

fn get_dummy_channel_update(short_chan_id: u64) -> msgs::ChannelUpdate {
use secp256k1::ffi::Signature as FFISignature;
let network = Network::Testnet;
msgs::ChannelUpdate {
signature: Signature::from(FFISignature::new()),
contents: msgs::UnsignedChannelUpdate {
chain_hash: genesis_block(network).header.bitcoin_hash(),
short_channel_id: short_chan_id,
timestamp: 0,
flags: 0,
cltv_expiry_delta: 0,
htlc_minimum_msat: 0,
fee_base_msat: 0,
fee_proportional_millionths: 0,
excess_data: vec![],
}
}
}

#[test]
fn test_do_attempt_write_data() {
// Create 2 peers with custom TestRoutingMessageHandlers and connect them.
let chan_handlers = create_chan_handlers(2);
let mut routing_handlers: Vec<Arc<msgs::RoutingMessageHandler>> = Vec::new();
let mut routing_handlers_concrete: Vec<Arc<TestRoutingMessageHandler>> = Vec::new();
for _ in 0..2 {
let routing_handler = Arc::new(TestRoutingMessageHandler::new());
routing_handlers.push(routing_handler.clone());
routing_handlers_concrete.push(routing_handler.clone());
}
let peers = create_network(2, &chan_handlers, Some(&routing_handlers));

// By calling establish_connect, we trigger do_attempt_write_data between
// the peers. Previously this function would mistakenly enter an infinite loop
// when there were more channel messages available than could fit into a peer's
// buffer. This issue would now be detected by this test (because we use custom
// RoutingMessageHandlers that intentionally return more channel messages
// than can fit into a peer's buffer).
let (mut fd_a, mut fd_b) = establish_connection(&peers[0], &peers[1]);

// Make each peer to read the messages that the other peer just wrote to them.
peers[1].read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap();
peers[0].read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).unwrap();

// Check that each peer has received the expected number of channel updates and channel
// announcements.
assert_eq!(routing_handlers_concrete[0].clone().chan_upds_recvd.load(Ordering::Acquire), 100);
assert_eq!(routing_handlers_concrete[0].clone().chan_anns_recvd.load(Ordering::Acquire), 50);
assert_eq!(routing_handlers_concrete[1].clone().chan_upds_recvd.load(Ordering::Acquire), 100);
assert_eq!(routing_handlers_concrete[1].clone().chan_anns_recvd.load(Ordering::Acquire), 50);
}
}