Skip to content

Add deserialize+load steps to chanmon_fail_consistency (fixes #327) #356

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
258 changes: 226 additions & 32 deletions fuzz/fuzz_targets/chanmon_fail_consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,30 +27,32 @@ use bitcoin::network::constants::Network;
use bitcoin_hashes::Hash as TraitImport;
use bitcoin_hashes::hash160::Hash as Hash160;
use bitcoin_hashes::sha256::Hash as Sha256;
use bitcoin_hashes::sha256d::Hash as Sha256d;

use lightning::chain::chaininterface;
use lightning::chain::transaction::OutPoint;
use lightning::chain::chaininterface::{BroadcasterInterface,ConfirmationTarget,ChainListener,FeeEstimator,ChainWatchInterfaceUtil};
use lightning::chain::keysinterface::{ChannelKeys, KeysInterface};
use lightning::ln::channelmonitor;
use lightning::ln::channelmonitor::{ChannelMonitorUpdateErr, HTLCUpdate};
use lightning::ln::channelmanager::{ChannelManager, PaymentHash, PaymentPreimage};
use lightning::ln::channelmonitor::{ChannelMonitor, ChannelMonitorUpdateErr, HTLCUpdate};
use lightning::ln::channelmanager::{ChannelManager, PaymentHash, PaymentPreimage, ChannelManagerReadArgs};
use lightning::ln::router::{Route, RouteHop};
use lightning::ln::msgs::{CommitmentUpdate, ChannelMessageHandler, ErrorAction, HandleError, UpdateAddHTLC, LocalFeatures};
use lightning::util::events;
use lightning::util::logger::Logger;
use lightning::util::config::UserConfig;
use lightning::util::events::{EventsProvider, MessageSendEventsProvider};
use lightning::util::ser::{Readable, Writeable};
use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer};

mod utils;
use utils::test_logger;

use secp256k1::key::{PublicKey,SecretKey};
use secp256k1::Secp256k1;

use std::mem;
use std::cmp::Ordering;
use std::collections::HashSet;
use std::collections::{HashSet, HashMap};
use std::sync::{Arc,Mutex};
use std::sync::atomic;
use std::io::Cursor;
Expand All @@ -67,22 +69,51 @@ impl BroadcasterInterface for TestBroadcaster {
fn broadcast_transaction(&self, _tx: &Transaction) { }
}

pub struct VecWriter(pub Vec<u8>);
impl Writer for VecWriter {
fn write_all(&mut self, buf: &[u8]) -> Result<(), ::std::io::Error> {
self.0.extend_from_slice(buf);
Ok(())
}
fn size_hint(&mut self, size: usize) {
self.0.reserve_exact(size);
}
}

pub struct TestChannelMonitor {
pub simple_monitor: Arc<channelmonitor::SimpleManyChannelMonitor<OutPoint>>,
pub update_ret: Mutex<Result<(), channelmonitor::ChannelMonitorUpdateErr>>,
pub latest_good_update: Mutex<HashMap<OutPoint, Vec<u8>>>,
pub latest_update_good: Mutex<HashMap<OutPoint, bool>>,
pub latest_updates_good_at_last_ser: Mutex<HashMap<OutPoint, bool>>,
pub should_update_manager: atomic::AtomicBool,
}
impl TestChannelMonitor {
pub fn new(chain_monitor: Arc<chaininterface::ChainWatchInterface>, broadcaster: Arc<chaininterface::BroadcasterInterface>, logger: Arc<Logger>, feeest: Arc<chaininterface::FeeEstimator>) -> Self {
Self {
simple_monitor: channelmonitor::SimpleManyChannelMonitor::new(chain_monitor, broadcaster, logger, feeest),
update_ret: Mutex::new(Ok(())),
latest_good_update: Mutex::new(HashMap::new()),
latest_update_good: Mutex::new(HashMap::new()),
latest_updates_good_at_last_ser: Mutex::new(HashMap::new()),
should_update_manager: atomic::AtomicBool::new(false),
}
}
}
impl channelmonitor::ManyChannelMonitor for TestChannelMonitor {
fn add_update_monitor(&self, funding_txo: OutPoint, monitor: channelmonitor::ChannelMonitor) -> Result<(), channelmonitor::ChannelMonitorUpdateErr> {
let ret = self.update_ret.lock().unwrap().clone();
if let Ok(()) = ret {
let mut ser = VecWriter(Vec::new());
monitor.write_for_disk(&mut ser).unwrap();
self.latest_good_update.lock().unwrap().insert(funding_txo, ser.0);
self.latest_update_good.lock().unwrap().insert(funding_txo, true);
self.should_update_manager.store(true, atomic::Ordering::Relaxed);
} else {
self.latest_update_good.lock().unwrap().insert(funding_txo, false);
}
assert!(self.simple_monitor.add_update_monitor(funding_txo, monitor).is_ok());
self.update_ret.lock().unwrap().clone()
ret
}

fn fetch_pending_htlc_updated(&self) -> Vec<HTLCUpdate> {
Expand Down Expand Up @@ -155,6 +186,55 @@ pub fn do_test(data: &[u8]) {
} }
}

macro_rules! reload_node {
($ser: expr, $node_id: expr, $old_monitors: expr) => { {
let logger: Arc<Logger> = Arc::new(test_logger::TestLogger::new($node_id.to_string()));
let watch = Arc::new(ChainWatchInterfaceUtil::new(Network::Bitcoin, Arc::clone(&logger)));
let monitor = Arc::new(TestChannelMonitor::new(watch.clone(), broadcast.clone(), logger.clone(), fee_est.clone()));

let keys_manager = Arc::new(KeyProvider { node_id: $node_id, session_id: atomic::AtomicU8::new(0), channel_id: atomic::AtomicU8::new(0) });
let mut config = UserConfig::new();
config.channel_options.fee_proportional_millionths = 0;
config.channel_options.announced_channel = true;
config.peer_channel_config_limits.min_dust_limit_satoshis = 0;

let mut monitors = HashMap::new();
let mut old_monitors = $old_monitors.latest_good_update.lock().unwrap();
for (outpoint, monitor_ser) in old_monitors.drain() {
monitors.insert(outpoint, <(Sha256d, ChannelMonitor)>::read(&mut Cursor::new(&monitor_ser), Arc::clone(&logger)).expect("Failed to read monitor").1);
monitor.latest_good_update.lock().unwrap().insert(outpoint, monitor_ser);
}
let mut monitor_refs = HashMap::new();
for (outpoint, monitor) in monitors.iter() {
monitor_refs.insert(*outpoint, monitor);
}

let read_args = ChannelManagerReadArgs {
keys_manager,
fee_estimator: fee_est.clone(),
monitor: monitor.clone(),
chain_monitor: watch,
tx_broadcaster: broadcast.clone(),
logger,
default_config: config,
channel_monitors: &monitor_refs,
};

let res = (<(Sha256d, ChannelManager)>::read(&mut Cursor::new(&$ser.0), read_args).expect("Failed to read manager").1, monitor);
for (_, was_good) in $old_monitors.latest_updates_good_at_last_ser.lock().unwrap().iter() {
if !was_good {
// If the last time we updated a monitor we didn't successfully update (and we
// have sense updated our serialized copy of the ChannelManager) we may
// force-close the channel on our counterparty cause we know we're missing
// something. Thus, we just return here since we can't continue to test.
return;
}
}
res
} }
}


let mut channel_txn = Vec::new();
macro_rules! make_channel {
($source: expr, $dest: expr, $chan_id: expr) => { {
Expand Down Expand Up @@ -264,11 +344,11 @@ pub fn do_test(data: &[u8]) {

// 3 nodes is enough to hit all the possible cases, notably unknown-source-unknown-dest
// forwarding.
let (node_a, monitor_a) = make_node!(0);
let (node_b, monitor_b) = make_node!(1);
let (node_c, monitor_c) = make_node!(2);
let (mut node_a, mut monitor_a) = make_node!(0);
let (mut node_b, mut monitor_b) = make_node!(1);
let (mut node_c, mut monitor_c) = make_node!(2);

let nodes = [node_a, node_b, node_c];
let mut nodes = [node_a, node_b, node_c];

make_channel!(nodes[0], nodes[1], 0);
make_channel!(nodes[1], nodes[2], 1);
Expand All @@ -286,8 +366,15 @@ pub fn do_test(data: &[u8]) {

let mut chan_a_disconnected = false;
let mut chan_b_disconnected = false;
let mut chan_a_reconnecting = false;
let mut chan_b_reconnecting = false;
let mut ba_events = Vec::new();
let mut bc_events = Vec::new();

let mut node_a_ser = VecWriter(Vec::new());
nodes[0].write(&mut node_a_ser).unwrap();
let mut node_b_ser = VecWriter(Vec::new());
nodes[1].write(&mut node_b_ser).unwrap();
let mut node_c_ser = VecWriter(Vec::new());
nodes[2].write(&mut node_c_ser).unwrap();

macro_rules! test_err {
($res: expr) => {
Expand Down Expand Up @@ -363,13 +450,18 @@ pub fn do_test(data: &[u8]) {

macro_rules! process_msg_events {
($node: expr, $corrupt_forward: expr) => { {
for event in nodes[$node].get_and_clear_pending_msg_events() {
let events = if $node == 1 {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm so in fact we test only test A <--> B and B <--> C but never A <--> B, doing so would let us hit new cases ? Like maybe cyclic payment.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, seems like a reasonable next step to try to fuzz, but not 100% sure it'll add too much.

let mut new_events = Vec::new();
mem::swap(&mut new_events, &mut ba_events);
new_events.extend_from_slice(&bc_events[..]);
bc_events.clear();
new_events
} else { Vec::new() };
for event in events.iter().chain(nodes[$node].get_and_clear_pending_msg_events().iter()) {
match event {
events::MessageSendEvent::UpdateHTLCs { ref node_id, updates: CommitmentUpdate { ref update_add_htlcs, ref update_fail_htlcs, ref update_fulfill_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
for (idx, dest) in nodes.iter().enumerate() {
if dest.get_our_node_id() == *node_id &&
(($node != 0 && idx != 0) || !chan_a_disconnected) &&
(($node != 2 && idx != 2) || !chan_b_disconnected) {
for dest in nodes.iter() {
if dest.get_our_node_id() == *node_id {
assert!(update_fee.is_none());
for update_add in update_add_htlcs {
if !$corrupt_forward {
Expand Down Expand Up @@ -399,25 +491,16 @@ pub fn do_test(data: &[u8]) {
}
},
events::MessageSendEvent::SendRevokeAndACK { ref node_id, ref msg } => {
for (idx, dest) in nodes.iter().enumerate() {
if dest.get_our_node_id() == *node_id &&
(($node != 0 && idx != 0) || !chan_a_disconnected) &&
(($node != 2 && idx != 2) || !chan_b_disconnected) {
for dest in nodes.iter() {
if dest.get_our_node_id() == *node_id {
test_err!(dest.handle_revoke_and_ack(&nodes[$node].get_our_node_id(), msg));
}
}
},
events::MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => {
for (idx, dest) in nodes.iter().enumerate() {
for dest in nodes.iter() {
if dest.get_our_node_id() == *node_id {
test_err!(dest.handle_channel_reestablish(&nodes[$node].get_our_node_id(), msg));
if $node == 0 || idx == 0 {
chan_a_reconnecting = false;
chan_a_disconnected = false;
} else {
chan_b_reconnecting = false;
chan_b_disconnected = false;
}
}
}
},
Expand All @@ -434,6 +517,56 @@ pub fn do_test(data: &[u8]) {
} }
}

macro_rules! drain_msg_events_on_disconnect {
($counterparty_id: expr) => { {
if $counterparty_id == 0 {
for event in nodes[0].get_and_clear_pending_msg_events() {
match event {
events::MessageSendEvent::UpdateHTLCs { .. } => {},
events::MessageSendEvent::SendRevokeAndACK { .. } => {},
events::MessageSendEvent::SendChannelReestablish { .. } => {},
events::MessageSendEvent::SendFundingLocked { .. } => {},
events::MessageSendEvent::PaymentFailureNetworkUpdate { .. } => {},
_ => panic!("Unhandled message event"),
}
}
ba_events.clear();
} else {
for event in nodes[2].get_and_clear_pending_msg_events() {
match event {
events::MessageSendEvent::UpdateHTLCs { .. } => {},
events::MessageSendEvent::SendRevokeAndACK { .. } => {},
events::MessageSendEvent::SendChannelReestablish { .. } => {},
events::MessageSendEvent::SendFundingLocked { .. } => {},
events::MessageSendEvent::PaymentFailureNetworkUpdate { .. } => {},
_ => panic!("Unhandled message event"),
}
}
bc_events.clear();
}
let mut events = nodes[1].get_and_clear_pending_msg_events();
let drop_node_id = if $counterparty_id == 0 { nodes[0].get_our_node_id() } else { nodes[2].get_our_node_id() };
let msg_sink = if $counterparty_id == 0 { &mut bc_events } else { &mut ba_events };
for event in events.drain(..) {
let push = match event {
events::MessageSendEvent::UpdateHTLCs { ref node_id, .. } => {
if *node_id != drop_node_id { true } else { false }
},
events::MessageSendEvent::SendRevokeAndACK { ref node_id, .. } => {
if *node_id != drop_node_id { true } else { false }
},
events::MessageSendEvent::SendChannelReestablish { ref node_id, .. } => {
if *node_id != drop_node_id { true } else { false }
},
events::MessageSendEvent::SendFundingLocked { .. } => false,
events::MessageSendEvent::PaymentFailureNetworkUpdate { .. } => false,
_ => panic!("Unhandled message event"),
};
if push { msg_sink.push(event); }
}
} }
}

macro_rules! process_events {
($node: expr, $fail: expr) => { {
// In case we get 256 payments we may have a hash collision, resulting in the
Expand Down Expand Up @@ -500,27 +633,29 @@ pub fn do_test(data: &[u8]) {
nodes[0].peer_disconnected(&nodes[1].get_our_node_id(), false);
nodes[1].peer_disconnected(&nodes[0].get_our_node_id(), false);
chan_a_disconnected = true;
drain_msg_events_on_disconnect!(0);
}
},
0x10 => {
if !chan_b_disconnected {
nodes[1].peer_disconnected(&nodes[2].get_our_node_id(), false);
nodes[2].peer_disconnected(&nodes[1].get_our_node_id(), false);
chan_b_disconnected = true;
drain_msg_events_on_disconnect!(2);
}
},
0x11 => {
if chan_a_disconnected && !chan_a_reconnecting {
if chan_a_disconnected {
nodes[0].peer_connected(&nodes[1].get_our_node_id());
nodes[1].peer_connected(&nodes[0].get_our_node_id());
chan_a_reconnecting = true;
chan_a_disconnected = false;
}
},
0x12 => {
if chan_b_disconnected && !chan_b_reconnecting {
if chan_b_disconnected {
nodes[1].peer_connected(&nodes[2].get_our_node_id());
nodes[2].peer_connected(&nodes[1].get_our_node_id());
chan_b_reconnecting = true;
chan_b_disconnected = false;
}
},
0x13 => process_msg_events!(0, true),
Expand All @@ -535,8 +670,67 @@ pub fn do_test(data: &[u8]) {
0x1c => process_msg_events!(2, false),
0x1d => process_events!(2, true),
0x1e => process_events!(2, false),
0x1f => {
if !chan_a_disconnected {
nodes[1].peer_disconnected(&nodes[0].get_our_node_id(), false);
chan_a_disconnected = true;
drain_msg_events_on_disconnect!(0);
}
let (new_node_a, new_monitor_a) = reload_node!(node_a_ser, 0, monitor_a);
node_a = Arc::new(new_node_a);
nodes[0] = node_a.clone();
monitor_a = new_monitor_a;
},
0x20 => {
if !chan_a_disconnected {
nodes[0].peer_disconnected(&nodes[1].get_our_node_id(), false);
chan_a_disconnected = true;
nodes[0].get_and_clear_pending_msg_events();
ba_events.clear();
}
if !chan_b_disconnected {
nodes[2].peer_disconnected(&nodes[1].get_our_node_id(), false);
chan_b_disconnected = true;
nodes[2].get_and_clear_pending_msg_events();
bc_events.clear();
}
let (new_node_b, new_monitor_b) = reload_node!(node_b_ser, 1, monitor_b);
node_b = Arc::new(new_node_b);
nodes[1] = node_b.clone();
monitor_b = new_monitor_b;
},
0x21 => {
if !chan_b_disconnected {
nodes[1].peer_disconnected(&nodes[2].get_our_node_id(), false);
chan_b_disconnected = true;
drain_msg_events_on_disconnect!(2);
}
let (new_node_c, new_monitor_c) = reload_node!(node_c_ser, 2, monitor_c);
node_c = Arc::new(new_node_c);
nodes[2] = node_c.clone();
monitor_c = new_monitor_c;
},
_ => test_return!(),
}

if monitor_a.should_update_manager.load(atomic::Ordering::Relaxed) {
node_a_ser.0.clear();
nodes[0].write(&mut node_a_ser).unwrap();
monitor_a.should_update_manager.store(false, atomic::Ordering::Relaxed);
*monitor_a.latest_updates_good_at_last_ser.lock().unwrap() = monitor_a.latest_update_good.lock().unwrap().clone();
}
if monitor_b.should_update_manager.load(atomic::Ordering::Relaxed) {
node_b_ser.0.clear();
nodes[1].write(&mut node_b_ser).unwrap();
monitor_b.should_update_manager.store(false, atomic::Ordering::Relaxed);
*monitor_b.latest_updates_good_at_last_ser.lock().unwrap() = monitor_b.latest_update_good.lock().unwrap().clone();
}
if monitor_c.should_update_manager.load(atomic::Ordering::Relaxed) {
node_c_ser.0.clear();
nodes[2].write(&mut node_c_ser).unwrap();
monitor_c.should_update_manager.store(false, atomic::Ordering::Relaxed);
*monitor_c.latest_updates_good_at_last_ser.lock().unwrap() = monitor_c.latest_update_good.lock().unwrap().clone();
}
}
}

Expand Down
Loading