Skip to content

Commit 7446a79

Browse files
committed
Fuzz reloading with a stale monitor in chanmon_consistency
Now that we are gearing up to support fully async monitor storage, we really need to fuzz monitor updates not completing before a reload, which we do here in the `chanmon_consistency` fuzzer. While there are more parts to async monitor updating that we need to fuzz, this at least gets us started by having basic async restart cases handled. In the future, we should extend this to make sure some basic properties (eg claim/balance consistency) remain true through `chanmon_consistency` runs.
1 parent 1d0c6c6 commit 7446a79

File tree

1 file changed

+132
-138
lines changed

1 file changed

+132
-138
lines changed

fuzz/src/chanmon_consistency.rs

Lines changed: 132 additions & 138 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,12 @@ impl Writer for VecWriter {
142142
}
143143
}
144144

145+
struct LatestMonitorState {
146+
persisted_monitor_id: u64,
147+
persisted_monitor: Vec<u8>,
148+
pending_monitor_updates: Vec<(u64, Vec<u8>)>,
149+
}
150+
145151
struct TestChainMonitor {
146152
pub logger: Arc<dyn Logger>,
147153
pub keys: Arc<KeyProvider>,
@@ -152,7 +158,10 @@ struct TestChainMonitor {
152158
// monitor implying we are not able to punish misbehaving counterparties). Because this test
153159
// "fails" if we ever force-close a channel, we avoid doing so, always saving the latest
154160
// fully-serialized monitor state here, as well as the corresponding update_id.
155-
pub latest_monitors: Mutex<HashMap<OutPoint, (u64, Vec<u8>)>>,
161+
//
162+
// Note that this doesn't apply to monitors which are pending persistence, so we store the
163+
// latest pending monitor separately.
164+
pub latest_monitors: Mutex<HashMap<OutPoint, LatestMonitorState>>,
156165
}
157166
impl TestChainMonitor {
158167
pub fn new(broadcaster: Arc<TestBroadcaster>, logger: Arc<dyn Logger>, feeest: Arc<FuzzEstimator>, persister: Arc<TestPersister>, keys: Arc<KeyProvider>) -> Self {
@@ -169,22 +178,47 @@ impl chain::Watch<TestChannelSigner> for TestChainMonitor {
169178
fn watch_channel(&self, funding_txo: OutPoint, monitor: channelmonitor::ChannelMonitor<TestChannelSigner>) -> Result<chain::ChannelMonitorUpdateStatus, ()> {
170179
let mut ser = VecWriter(Vec::new());
171180
monitor.write(&mut ser).unwrap();
172-
if let Some(_) = self.latest_monitors.lock().unwrap().insert(funding_txo, (monitor.get_latest_update_id(), ser.0)) {
181+
let monitor_id = monitor.get_latest_update_id();
182+
let res = self.chain_monitor.watch_channel(funding_txo, monitor);
183+
let state = match res {
184+
Ok(chain::ChannelMonitorUpdateStatus::Completed) => {
185+
LatestMonitorState {
186+
persisted_monitor_id: monitor_id, persisted_monitor: ser.0,
187+
pending_monitor_updates: Vec::new(),
188+
}
189+
},
190+
Ok(chain::ChannelMonitorUpdateStatus::InProgress) =>
191+
panic!("The test currently doesn't test initial-persistence via the async pipeline"),
192+
Ok(chain::ChannelMonitorUpdateStatus::UnrecoverableError) => panic!(),
193+
Err(()) => panic!(),
194+
};
195+
if self.latest_monitors.lock().unwrap().insert(funding_txo, state).is_some() {
173196
panic!("Already had monitor pre-watch_channel");
174197
}
175-
self.chain_monitor.watch_channel(funding_txo, monitor)
198+
res
176199
}
177200

178201
fn update_channel(&self, funding_txo: OutPoint, update: &channelmonitor::ChannelMonitorUpdate) -> chain::ChannelMonitorUpdateStatus {
179202
let mut map_lock = self.latest_monitors.lock().unwrap();
180203
let map_entry = map_lock.get_mut(&funding_txo).expect("Didn't have monitor on update call");
204+
let latest_monitor_data = map_entry.pending_monitor_updates.last().as_ref().map(|(_, data)| data).unwrap_or(&map_entry.persisted_monitor);
181205
let deserialized_monitor = <(BlockHash, channelmonitor::ChannelMonitor<TestChannelSigner>)>::
182-
read(&mut Cursor::new(&map_entry.1), (&*self.keys, &*self.keys)).unwrap().1;
206+
read(&mut Cursor::new(&latest_monitor_data), (&*self.keys, &*self.keys)).unwrap().1;
183207
deserialized_monitor.update_monitor(update, &&TestBroadcaster{}, &&FuzzEstimator { ret_val: atomic::AtomicU32::new(253) }, &self.logger).unwrap();
184208
let mut ser = VecWriter(Vec::new());
185209
deserialized_monitor.write(&mut ser).unwrap();
186-
*map_entry = (update.update_id, ser.0);
187-
self.chain_monitor.update_channel(funding_txo, update)
210+
let res = self.chain_monitor.update_channel(funding_txo, update);
211+
match res {
212+
chain::ChannelMonitorUpdateStatus::Completed => {
213+
map_entry.persisted_monitor_id = update.update_id;
214+
map_entry.persisted_monitor = ser.0;
215+
},
216+
chain::ChannelMonitorUpdateStatus::InProgress => {
217+
map_entry.pending_monitor_updates.push((update.update_id, ser.0));
218+
},
219+
chain::ChannelMonitorUpdateStatus::UnrecoverableError => panic!(),
220+
}
221+
res
188222
}
189223

190224
fn release_pending_monitor_events(&self) -> Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, Option<PublicKey>)> {
@@ -511,9 +545,12 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
511545

512546
let mut monitors = new_hash_map();
513547
let mut old_monitors = $old_monitors.latest_monitors.lock().unwrap();
514-
for (outpoint, (update_id, monitor_ser)) in old_monitors.drain() {
515-
monitors.insert(outpoint, <(BlockHash, ChannelMonitor<TestChannelSigner>)>::read(&mut Cursor::new(&monitor_ser), (&*$keys_manager, &*$keys_manager)).expect("Failed to read monitor").1);
516-
chain_monitor.latest_monitors.lock().unwrap().insert(outpoint, (update_id, monitor_ser));
548+
for (outpoint, mut prev_state) in old_monitors.drain() {
549+
monitors.insert(outpoint, <(BlockHash, ChannelMonitor<TestChannelSigner>)>::read(
550+
&mut Cursor::new(&prev_state.persisted_monitor), (&*$keys_manager, &*$keys_manager)
551+
).expect("Failed to read monitor").1);
552+
prev_state.pending_monitor_updates.clear();
553+
chain_monitor.latest_monitors.lock().unwrap().insert(outpoint, prev_state);
517554
}
518555
let mut monitor_refs = new_hash_map();
519556
for (outpoint, monitor) in monitors.iter_mut() {
@@ -1040,6 +1077,43 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
10401077
} }
10411078
}
10421079

1080+
let complete_monitor_update = |
1081+
node: &ChannelManager<_, _, _, _, _, _, _, _>, monitor: &Arc<TestChainMonitor>,
1082+
chan_funding, compl_selector: &dyn Fn(&mut Vec<(u64, Vec<u8>)>) -> Option<(u64, Vec<u8>)>,
1083+
| {
1084+
if let Some(state) = monitor.latest_monitors.lock().unwrap().get_mut(chan_funding) {
1085+
assert!(
1086+
state.pending_monitor_updates.windows(2).all(|pair| pair[0].0 < pair[1].0),
1087+
"updates should be sorted by id"
1088+
);
1089+
if let Some((id, data)) = compl_selector(&mut state.pending_monitor_updates) {
1090+
monitor.chain_monitor.channel_monitor_updated(*chan_funding, id).unwrap();
1091+
if id > state.persisted_monitor_id {
1092+
state.persisted_monitor_id = id;
1093+
state.persisted_monitor = data;
1094+
}
1095+
node.process_monitor_events();
1096+
}
1097+
}
1098+
};
1099+
1100+
let complete_all_monitor_updates = |node: &ChannelManager<_, _, _, _, _, _, _, _>, monitor: &Arc<TestChainMonitor>, chan_funding| {
1101+
if let Some(state) = monitor.latest_monitors.lock().unwrap().get_mut(chan_funding) {
1102+
assert!(
1103+
state.pending_monitor_updates.windows(2).all(|pair| pair[0].0 < pair[1].0),
1104+
"updates should be sorted by id"
1105+
);
1106+
for (id, data) in state.pending_monitor_updates.drain(..) {
1107+
monitor.chain_monitor.channel_monitor_updated(*chan_funding, id).unwrap();
1108+
if id > state.persisted_monitor_id {
1109+
state.persisted_monitor_id = id;
1110+
state.persisted_monitor = data;
1111+
}
1112+
}
1113+
node.process_monitor_events();
1114+
}
1115+
};
1116+
10431117
let v = get_slice!(1)[0];
10441118
out.locked_write(format!("READ A BYTE! HANDLING INPUT {:x}...........\n", v).as_bytes());
10451119
match v {
@@ -1054,30 +1128,10 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
10541128
0x05 => *monitor_b.persister.update_ret.lock().unwrap() = ChannelMonitorUpdateStatus::Completed,
10551129
0x06 => *monitor_c.persister.update_ret.lock().unwrap() = ChannelMonitorUpdateStatus::Completed,
10561130

1057-
0x08 => {
1058-
if let Some((id, _)) = monitor_a.latest_monitors.lock().unwrap().get(&chan_1_funding) {
1059-
monitor_a.chain_monitor.force_channel_monitor_updated(chan_1_funding, *id);
1060-
nodes[0].process_monitor_events();
1061-
}
1062-
},
1063-
0x09 => {
1064-
if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_1_funding) {
1065-
monitor_b.chain_monitor.force_channel_monitor_updated(chan_1_funding, *id);
1066-
nodes[1].process_monitor_events();
1067-
}
1068-
},
1069-
0x0a => {
1070-
if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_2_funding) {
1071-
monitor_b.chain_monitor.force_channel_monitor_updated(chan_2_funding, *id);
1072-
nodes[1].process_monitor_events();
1073-
}
1074-
},
1075-
0x0b => {
1076-
if let Some((id, _)) = monitor_c.latest_monitors.lock().unwrap().get(&chan_2_funding) {
1077-
monitor_c.chain_monitor.force_channel_monitor_updated(chan_2_funding, *id);
1078-
nodes[2].process_monitor_events();
1079-
}
1080-
},
1131+
0x08 => complete_all_monitor_updates(&nodes[0], &monitor_a, &chan_1_funding),
1132+
0x09 => complete_all_monitor_updates(&nodes[1], &monitor_b, &chan_1_funding),
1133+
0x0a => complete_all_monitor_updates(&nodes[1], &monitor_b, &chan_2_funding),
1134+
0x0b => complete_all_monitor_updates(&nodes[2], &monitor_c, &chan_2_funding),
10811135

10821136
0x0c => {
10831137
if !chan_a_disconnected {
@@ -1285,119 +1339,59 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
12851339
},
12861340
0x89 => { fee_est_c.ret_val.store(253, atomic::Ordering::Release); nodes[2].maybe_update_chan_fees(); },
12871341

1288-
0xf0 => {
1289-
let pending_updates = monitor_a.chain_monitor.list_pending_monitor_updates().remove(&chan_1_funding).unwrap();
1290-
if let Some(id) = pending_updates.get(0) {
1291-
monitor_a.chain_monitor.channel_monitor_updated(chan_1_funding, *id).unwrap();
1292-
}
1293-
nodes[0].process_monitor_events();
1294-
}
1295-
0xf1 => {
1296-
let pending_updates = monitor_a.chain_monitor.list_pending_monitor_updates().remove(&chan_1_funding).unwrap();
1297-
if let Some(id) = pending_updates.get(1) {
1298-
monitor_a.chain_monitor.channel_monitor_updated(chan_1_funding, *id).unwrap();
1299-
}
1300-
nodes[0].process_monitor_events();
1301-
}
1302-
0xf2 => {
1303-
let pending_updates = monitor_a.chain_monitor.list_pending_monitor_updates().remove(&chan_1_funding).unwrap();
1304-
if let Some(id) = pending_updates.last() {
1305-
monitor_a.chain_monitor.channel_monitor_updated(chan_1_funding, *id).unwrap();
1306-
}
1307-
nodes[0].process_monitor_events();
1308-
}
1309-
1310-
0xf4 => {
1311-
let pending_updates = monitor_b.chain_monitor.list_pending_monitor_updates().remove(&chan_1_funding).unwrap();
1312-
if let Some(id) = pending_updates.get(0) {
1313-
monitor_b.chain_monitor.channel_monitor_updated(chan_1_funding, *id).unwrap();
1314-
}
1315-
nodes[1].process_monitor_events();
1316-
}
1317-
0xf5 => {
1318-
let pending_updates = monitor_b.chain_monitor.list_pending_monitor_updates().remove(&chan_1_funding).unwrap();
1319-
if let Some(id) = pending_updates.get(1) {
1320-
monitor_b.chain_monitor.channel_monitor_updated(chan_1_funding, *id).unwrap();
1321-
}
1322-
nodes[1].process_monitor_events();
1323-
}
1324-
0xf6 => {
1325-
let pending_updates = monitor_b.chain_monitor.list_pending_monitor_updates().remove(&chan_1_funding).unwrap();
1326-
if let Some(id) = pending_updates.last() {
1327-
monitor_b.chain_monitor.channel_monitor_updated(chan_1_funding, *id).unwrap();
1328-
}
1329-
nodes[1].process_monitor_events();
1330-
}
1331-
1332-
0xf8 => {
1333-
let pending_updates = monitor_b.chain_monitor.list_pending_monitor_updates().remove(&chan_2_funding).unwrap();
1334-
if let Some(id) = pending_updates.get(0) {
1335-
monitor_b.chain_monitor.channel_monitor_updated(chan_2_funding, *id).unwrap();
1336-
}
1337-
nodes[1].process_monitor_events();
1338-
}
1339-
0xf9 => {
1340-
let pending_updates = monitor_b.chain_monitor.list_pending_monitor_updates().remove(&chan_2_funding).unwrap();
1341-
if let Some(id) = pending_updates.get(1) {
1342-
monitor_b.chain_monitor.channel_monitor_updated(chan_2_funding, *id).unwrap();
1343-
}
1344-
nodes[1].process_monitor_events();
1345-
}
1346-
0xfa => {
1347-
let pending_updates = monitor_b.chain_monitor.list_pending_monitor_updates().remove(&chan_2_funding).unwrap();
1348-
if let Some(id) = pending_updates.last() {
1349-
monitor_b.chain_monitor.channel_monitor_updated(chan_2_funding, *id).unwrap();
1350-
}
1351-
nodes[1].process_monitor_events();
1352-
}
1353-
1354-
0xfc => {
1355-
let pending_updates = monitor_c.chain_monitor.list_pending_monitor_updates().remove(&chan_2_funding).unwrap();
1356-
if let Some(id) = pending_updates.get(0) {
1357-
monitor_c.chain_monitor.channel_monitor_updated(chan_2_funding, *id).unwrap();
1358-
}
1359-
nodes[2].process_monitor_events();
1360-
}
1361-
0xfd => {
1362-
let pending_updates = monitor_c.chain_monitor.list_pending_monitor_updates().remove(&chan_2_funding).unwrap();
1363-
if let Some(id) = pending_updates.get(1) {
1364-
monitor_c.chain_monitor.channel_monitor_updated(chan_2_funding, *id).unwrap();
1365-
}
1366-
nodes[2].process_monitor_events();
1367-
}
1368-
0xfe => {
1369-
let pending_updates = monitor_c.chain_monitor.list_pending_monitor_updates().remove(&chan_2_funding).unwrap();
1370-
if let Some(id) = pending_updates.last() {
1371-
monitor_c.chain_monitor.channel_monitor_updated(chan_2_funding, *id).unwrap();
1372-
}
1373-
nodes[2].process_monitor_events();
1374-
}
1342+
0xf0 =>
1343+
complete_monitor_update(&nodes[0], &monitor_a, &chan_1_funding,
1344+
&|v: &mut Vec<_>| if !v.is_empty() { Some(v.remove(0)) } else { None }),
1345+
0xf1 =>
1346+
complete_monitor_update(&nodes[0], &monitor_a, &chan_1_funding,
1347+
&|v: &mut Vec<_>| if v.len() > 1 { Some(v.remove(1)) } else { None }),
1348+
0xf2 =>
1349+
complete_monitor_update(&nodes[0], &monitor_a, &chan_1_funding,
1350+
&|v: &mut Vec<_>| v.pop()),
1351+
1352+
0xf4 =>
1353+
complete_monitor_update(&nodes[1], &monitor_b, &chan_1_funding,
1354+
&|v: &mut Vec<_>| if !v.is_empty() { Some(v.remove(0)) } else { None }),
1355+
0xf5 =>
1356+
complete_monitor_update(&nodes[1], &monitor_b, &chan_1_funding,
1357+
&|v: &mut Vec<_>| if v.len() > 1 { Some(v.remove(1)) } else { None }),
1358+
0xf6 =>
1359+
complete_monitor_update(&nodes[1], &monitor_b, &chan_1_funding,
1360+
&|v: &mut Vec<_>| v.pop()),
1361+
1362+
0xf8 =>
1363+
complete_monitor_update(&nodes[1], &monitor_b, &chan_2_funding,
1364+
&|v: &mut Vec<_>| if !v.is_empty() { Some(v.remove(0)) } else { None }),
1365+
0xf9 =>
1366+
complete_monitor_update(&nodes[1], &monitor_b, &chan_2_funding,
1367+
&|v: &mut Vec<_>| if v.len() > 1 { Some(v.remove(1)) } else { None }),
1368+
0xfa =>
1369+
complete_monitor_update(&nodes[1], &monitor_b, &chan_2_funding,
1370+
&|v: &mut Vec<_>| v.pop()),
1371+
1372+
0xfc =>
1373+
complete_monitor_update(&nodes[2], &monitor_c, &chan_2_funding,
1374+
&|v: &mut Vec<_>| if !v.is_empty() { Some(v.remove(0)) } else { None }),
1375+
0xfd =>
1376+
complete_monitor_update(&nodes[2], &monitor_c, &chan_2_funding,
1377+
&|v: &mut Vec<_>| if v.len() > 1 { Some(v.remove(1)) } else { None }),
1378+
0xfe =>
1379+
complete_monitor_update(&nodes[2], &monitor_c, &chan_2_funding,
1380+
&|v: &mut Vec<_>| v.pop()),
13751381

13761382
0xff => {
13771383
// Test that no channel is in a stuck state where neither party can send funds even
13781384
// after we resolve all pending events.
1379-
// First make sure there are no pending monitor updates, resetting the error state
1380-
// and calling force_channel_monitor_updated for each monitor.
1385+
// First make sure there are no pending monitor updates and further update
1386+
// operations complete.
13811387
*monitor_a.persister.update_ret.lock().unwrap() = ChannelMonitorUpdateStatus::Completed;
13821388
*monitor_b.persister.update_ret.lock().unwrap() = ChannelMonitorUpdateStatus::Completed;
13831389
*monitor_c.persister.update_ret.lock().unwrap() = ChannelMonitorUpdateStatus::Completed;
13841390

1385-
if let Some((id, _)) = monitor_a.latest_monitors.lock().unwrap().get(&chan_1_funding) {
1386-
monitor_a.chain_monitor.force_channel_monitor_updated(chan_1_funding, *id);
1387-
nodes[0].process_monitor_events();
1388-
}
1389-
if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_1_funding) {
1390-
monitor_b.chain_monitor.force_channel_monitor_updated(chan_1_funding, *id);
1391-
nodes[1].process_monitor_events();
1392-
}
1393-
if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_2_funding) {
1394-
monitor_b.chain_monitor.force_channel_monitor_updated(chan_2_funding, *id);
1395-
nodes[1].process_monitor_events();
1396-
}
1397-
if let Some((id, _)) = monitor_c.latest_monitors.lock().unwrap().get(&chan_2_funding) {
1398-
monitor_c.chain_monitor.force_channel_monitor_updated(chan_2_funding, *id);
1399-
nodes[2].process_monitor_events();
1400-
}
1391+
complete_all_monitor_updates(&nodes[0], &monitor_a, &chan_1_funding);
1392+
complete_all_monitor_updates(&nodes[1], &monitor_b, &chan_1_funding);
1393+
complete_all_monitor_updates(&nodes[1], &monitor_b, &chan_2_funding);
1394+
complete_all_monitor_updates(&nodes[2], &monitor_c, &chan_2_funding);
14011395

14021396
// Next, make sure peers are all connected to each other
14031397
if chan_a_disconnected {

0 commit comments

Comments
 (0)