Skip to content

Commit 5e3056e

Browse files
authored
Merge pull request #3113 from TheBlueMatt/2024-04-async-monitor-fuzz
Fuzz reloading with a stale monitor in chanmon_consistency
2 parents b8cdde8 + 920d96e commit 5e3056e

File tree

3 files changed

+204
-141
lines changed

3 files changed

+204
-141
lines changed

fuzz/src/chanmon_consistency.rs

Lines changed: 116 additions & 140 deletions
Original file line numberDiff line numberDiff line change
@@ -142,17 +142,29 @@ impl Writer for VecWriter {
142142
}
143143
}
144144

145+
/// The LDK API requires that any time we tell it we're done persisting a `ChannelMonitor[Update]`
146+
/// we never pass it in as the "latest" `ChannelMonitor` on startup. However, we can pass
147+
/// out-of-date monitors as long as we never told LDK we finished persisting them, which we do by
148+
/// storing both old `ChannelMonitor`s and ones that are "being persisted" here.
149+
///
150+
/// Note that such "being persisted" `ChannelMonitor`s are stored in `ChannelManager` and will
151+
/// simply be replayed on startup.
152+
struct LatestMonitorState {
153+
/// The latest monitor id which we told LDK we've persisted
154+
persisted_monitor_id: u64,
155+
/// The latest serialized `ChannelMonitor` that we told LDK we persisted.
156+
persisted_monitor: Vec<u8>,
157+
/// A set of (monitor id, serialized `ChannelMonitor`)s which we're currently "persisting",
158+
/// from LDK's perspective.
159+
pending_monitors: Vec<(u64, Vec<u8>)>,
160+
}
161+
145162
struct TestChainMonitor {
146163
pub logger: Arc<dyn Logger>,
147164
pub keys: Arc<KeyProvider>,
148165
pub persister: Arc<TestPersister>,
149166
pub chain_monitor: Arc<chainmonitor::ChainMonitor<TestChannelSigner, Arc<dyn chain::Filter>, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>, Arc<TestPersister>>>,
150-
// If we reload a node with an old copy of ChannelMonitors, the ChannelManager deserialization
151-
// logic will automatically force-close our channels for us (as we don't have an up-to-date
152-
// monitor implying we are not able to punish misbehaving counterparties). Because this test
153-
// "fails" if we ever force-close a channel, we avoid doing so, always saving the latest
154-
// fully-serialized monitor state here, as well as the corresponding update_id.
155-
pub latest_monitors: Mutex<HashMap<OutPoint, (u64, Vec<u8>)>>,
167+
pub latest_monitors: Mutex<HashMap<OutPoint, LatestMonitorState>>,
156168
}
157169
impl TestChainMonitor {
158170
pub fn new(broadcaster: Arc<TestBroadcaster>, logger: Arc<dyn Logger>, feeest: Arc<FuzzEstimator>, persister: Arc<TestPersister>, keys: Arc<KeyProvider>) -> Self {
@@ -169,22 +181,47 @@ impl chain::Watch<TestChannelSigner> for TestChainMonitor {
169181
fn watch_channel(&self, funding_txo: OutPoint, monitor: channelmonitor::ChannelMonitor<TestChannelSigner>) -> Result<chain::ChannelMonitorUpdateStatus, ()> {
170182
let mut ser = VecWriter(Vec::new());
171183
monitor.write(&mut ser).unwrap();
172-
if let Some(_) = self.latest_monitors.lock().unwrap().insert(funding_txo, (monitor.get_latest_update_id(), ser.0)) {
184+
let monitor_id = monitor.get_latest_update_id();
185+
let res = self.chain_monitor.watch_channel(funding_txo, monitor);
186+
let state = match res {
187+
Ok(chain::ChannelMonitorUpdateStatus::Completed) => {
188+
LatestMonitorState {
189+
persisted_monitor_id: monitor_id, persisted_monitor: ser.0,
190+
pending_monitors: Vec::new(),
191+
}
192+
},
193+
Ok(chain::ChannelMonitorUpdateStatus::InProgress) =>
194+
panic!("The test currently doesn't test initial-persistence via the async pipeline"),
195+
Ok(chain::ChannelMonitorUpdateStatus::UnrecoverableError) => panic!(),
196+
Err(()) => panic!(),
197+
};
198+
if self.latest_monitors.lock().unwrap().insert(funding_txo, state).is_some() {
173199
panic!("Already had monitor pre-watch_channel");
174200
}
175-
self.chain_monitor.watch_channel(funding_txo, monitor)
201+
res
176202
}
177203

178204
fn update_channel(&self, funding_txo: OutPoint, update: &channelmonitor::ChannelMonitorUpdate) -> chain::ChannelMonitorUpdateStatus {
179205
let mut map_lock = self.latest_monitors.lock().unwrap();
180206
let map_entry = map_lock.get_mut(&funding_txo).expect("Didn't have monitor on update call");
207+
let latest_monitor_data = map_entry.pending_monitors.last().as_ref().map(|(_, data)| data).unwrap_or(&map_entry.persisted_monitor);
181208
let deserialized_monitor = <(BlockHash, channelmonitor::ChannelMonitor<TestChannelSigner>)>::
182-
read(&mut Cursor::new(&map_entry.1), (&*self.keys, &*self.keys)).unwrap().1;
209+
read(&mut Cursor::new(&latest_monitor_data), (&*self.keys, &*self.keys)).unwrap().1;
183210
deserialized_monitor.update_monitor(update, &&TestBroadcaster{}, &&FuzzEstimator { ret_val: atomic::AtomicU32::new(253) }, &self.logger).unwrap();
184211
let mut ser = VecWriter(Vec::new());
185212
deserialized_monitor.write(&mut ser).unwrap();
186-
*map_entry = (update.update_id, ser.0);
187-
self.chain_monitor.update_channel(funding_txo, update)
213+
let res = self.chain_monitor.update_channel(funding_txo, update);
214+
match res {
215+
chain::ChannelMonitorUpdateStatus::Completed => {
216+
map_entry.persisted_monitor_id = update.update_id;
217+
map_entry.persisted_monitor = ser.0;
218+
},
219+
chain::ChannelMonitorUpdateStatus::InProgress => {
220+
map_entry.pending_monitors.push((update.update_id, ser.0));
221+
},
222+
chain::ChannelMonitorUpdateStatus::UnrecoverableError => panic!(),
223+
}
224+
res
188225
}
189226

190227
fn release_pending_monitor_events(&self) -> Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, Option<PublicKey>)> {
@@ -511,9 +548,15 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
511548

512549
let mut monitors = new_hash_map();
513550
let mut old_monitors = $old_monitors.latest_monitors.lock().unwrap();
514-
for (outpoint, (update_id, monitor_ser)) in old_monitors.drain() {
515-
monitors.insert(outpoint, <(BlockHash, ChannelMonitor<TestChannelSigner>)>::read(&mut Cursor::new(&monitor_ser), (&*$keys_manager, &*$keys_manager)).expect("Failed to read monitor").1);
516-
chain_monitor.latest_monitors.lock().unwrap().insert(outpoint, (update_id, monitor_ser));
551+
for (outpoint, mut prev_state) in old_monitors.drain() {
552+
monitors.insert(outpoint, <(BlockHash, ChannelMonitor<TestChannelSigner>)>::read(
553+
&mut Cursor::new(&prev_state.persisted_monitor), (&*$keys_manager, &*$keys_manager)
554+
).expect("Failed to read monitor").1);
555+
// Wipe any `ChannelMonitor`s which we never told LDK we finished persisting,
556+
// considering them discarded. LDK should replay these for us as they're stored in
557+
// the `ChannelManager`.
558+
prev_state.pending_monitors.clear();
559+
chain_monitor.latest_monitors.lock().unwrap().insert(outpoint, prev_state);
517560
}
518561
let mut monitor_refs = new_hash_map();
519562
for (outpoint, monitor) in monitors.iter_mut() {
@@ -1040,6 +1083,43 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
10401083
} }
10411084
}
10421085

1086+
let complete_first = |v: &mut Vec<_>| if !v.is_empty() { Some(v.remove(0)) } else { None };
1087+
let complete_second = |v: &mut Vec<_>| if v.len() > 1 { Some(v.remove(1)) } else { None };
1088+
let complete_monitor_update = |
1089+
monitor: &Arc<TestChainMonitor>, chan_funding,
1090+
compl_selector: &dyn Fn(&mut Vec<(u64, Vec<u8>)>) -> Option<(u64, Vec<u8>)>,
1091+
| {
1092+
if let Some(state) = monitor.latest_monitors.lock().unwrap().get_mut(chan_funding) {
1093+
assert!(
1094+
state.pending_monitors.windows(2).all(|pair| pair[0].0 < pair[1].0),
1095+
"updates should be sorted by id"
1096+
);
1097+
if let Some((id, data)) = compl_selector(&mut state.pending_monitors) {
1098+
monitor.chain_monitor.channel_monitor_updated(*chan_funding, id).unwrap();
1099+
if id > state.persisted_monitor_id {
1100+
state.persisted_monitor_id = id;
1101+
state.persisted_monitor = data;
1102+
}
1103+
}
1104+
}
1105+
};
1106+
1107+
let complete_all_monitor_updates = |monitor: &Arc<TestChainMonitor>, chan_funding| {
1108+
if let Some(state) = monitor.latest_monitors.lock().unwrap().get_mut(chan_funding) {
1109+
assert!(
1110+
state.pending_monitors.windows(2).all(|pair| pair[0].0 < pair[1].0),
1111+
"updates should be sorted by id"
1112+
);
1113+
for (id, data) in state.pending_monitors.drain(..) {
1114+
monitor.chain_monitor.channel_monitor_updated(*chan_funding, id).unwrap();
1115+
if id > state.persisted_monitor_id {
1116+
state.persisted_monitor_id = id;
1117+
state.persisted_monitor = data;
1118+
}
1119+
}
1120+
}
1121+
};
1122+
10431123
let v = get_slice!(1)[0];
10441124
out.locked_write(format!("READ A BYTE! HANDLING INPUT {:x}...........\n", v).as_bytes());
10451125
match v {
@@ -1054,30 +1134,10 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
10541134
0x05 => *monitor_b.persister.update_ret.lock().unwrap() = ChannelMonitorUpdateStatus::Completed,
10551135
0x06 => *monitor_c.persister.update_ret.lock().unwrap() = ChannelMonitorUpdateStatus::Completed,
10561136

1057-
0x08 => {
1058-
if let Some((id, _)) = monitor_a.latest_monitors.lock().unwrap().get(&chan_1_funding) {
1059-
monitor_a.chain_monitor.force_channel_monitor_updated(chan_1_funding, *id);
1060-
nodes[0].process_monitor_events();
1061-
}
1062-
},
1063-
0x09 => {
1064-
if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_1_funding) {
1065-
monitor_b.chain_monitor.force_channel_monitor_updated(chan_1_funding, *id);
1066-
nodes[1].process_monitor_events();
1067-
}
1068-
},
1069-
0x0a => {
1070-
if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_2_funding) {
1071-
monitor_b.chain_monitor.force_channel_monitor_updated(chan_2_funding, *id);
1072-
nodes[1].process_monitor_events();
1073-
}
1074-
},
1075-
0x0b => {
1076-
if let Some((id, _)) = monitor_c.latest_monitors.lock().unwrap().get(&chan_2_funding) {
1077-
monitor_c.chain_monitor.force_channel_monitor_updated(chan_2_funding, *id);
1078-
nodes[2].process_monitor_events();
1079-
}
1080-
},
1137+
0x08 => complete_all_monitor_updates(&monitor_a, &chan_1_funding),
1138+
0x09 => complete_all_monitor_updates(&monitor_b, &chan_1_funding),
1139+
0x0a => complete_all_monitor_updates(&monitor_b, &chan_2_funding),
1140+
0x0b => complete_all_monitor_updates(&monitor_c, &chan_2_funding),
10811141

10821142
0x0c => {
10831143
if !chan_a_disconnected {
@@ -1285,119 +1345,35 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
12851345
},
12861346
0x89 => { fee_est_c.ret_val.store(253, atomic::Ordering::Release); nodes[2].maybe_update_chan_fees(); },
12871347

1288-
0xf0 => {
1289-
let pending_updates = monitor_a.chain_monitor.list_pending_monitor_updates().remove(&chan_1_funding).unwrap();
1290-
if let Some(id) = pending_updates.get(0) {
1291-
monitor_a.chain_monitor.channel_monitor_updated(chan_1_funding, *id).unwrap();
1292-
}
1293-
nodes[0].process_monitor_events();
1294-
}
1295-
0xf1 => {
1296-
let pending_updates = monitor_a.chain_monitor.list_pending_monitor_updates().remove(&chan_1_funding).unwrap();
1297-
if let Some(id) = pending_updates.get(1) {
1298-
monitor_a.chain_monitor.channel_monitor_updated(chan_1_funding, *id).unwrap();
1299-
}
1300-
nodes[0].process_monitor_events();
1301-
}
1302-
0xf2 => {
1303-
let pending_updates = monitor_a.chain_monitor.list_pending_monitor_updates().remove(&chan_1_funding).unwrap();
1304-
if let Some(id) = pending_updates.last() {
1305-
monitor_a.chain_monitor.channel_monitor_updated(chan_1_funding, *id).unwrap();
1306-
}
1307-
nodes[0].process_monitor_events();
1308-
}
1348+
0xf0 => complete_monitor_update(&monitor_a, &chan_1_funding, &complete_first),
1349+
0xf1 => complete_monitor_update(&monitor_a, &chan_1_funding, &complete_second),
1350+
0xf2 => complete_monitor_update(&monitor_a, &chan_1_funding, &Vec::pop),
13091351

1310-
0xf4 => {
1311-
let pending_updates = monitor_b.chain_monitor.list_pending_monitor_updates().remove(&chan_1_funding).unwrap();
1312-
if let Some(id) = pending_updates.get(0) {
1313-
monitor_b.chain_monitor.channel_monitor_updated(chan_1_funding, *id).unwrap();
1314-
}
1315-
nodes[1].process_monitor_events();
1316-
}
1317-
0xf5 => {
1318-
let pending_updates = monitor_b.chain_monitor.list_pending_monitor_updates().remove(&chan_1_funding).unwrap();
1319-
if let Some(id) = pending_updates.get(1) {
1320-
monitor_b.chain_monitor.channel_monitor_updated(chan_1_funding, *id).unwrap();
1321-
}
1322-
nodes[1].process_monitor_events();
1323-
}
1324-
0xf6 => {
1325-
let pending_updates = monitor_b.chain_monitor.list_pending_monitor_updates().remove(&chan_1_funding).unwrap();
1326-
if let Some(id) = pending_updates.last() {
1327-
monitor_b.chain_monitor.channel_monitor_updated(chan_1_funding, *id).unwrap();
1328-
}
1329-
nodes[1].process_monitor_events();
1330-
}
1352+
0xf4 => complete_monitor_update(&monitor_b, &chan_1_funding, &complete_first),
1353+
0xf5 => complete_monitor_update(&monitor_b, &chan_1_funding, &complete_second),
1354+
0xf6 => complete_monitor_update(&monitor_b, &chan_1_funding, &Vec::pop),
13311355

1332-
0xf8 => {
1333-
let pending_updates = monitor_b.chain_monitor.list_pending_monitor_updates().remove(&chan_2_funding).unwrap();
1334-
if let Some(id) = pending_updates.get(0) {
1335-
monitor_b.chain_monitor.channel_monitor_updated(chan_2_funding, *id).unwrap();
1336-
}
1337-
nodes[1].process_monitor_events();
1338-
}
1339-
0xf9 => {
1340-
let pending_updates = monitor_b.chain_monitor.list_pending_monitor_updates().remove(&chan_2_funding).unwrap();
1341-
if let Some(id) = pending_updates.get(1) {
1342-
monitor_b.chain_monitor.channel_monitor_updated(chan_2_funding, *id).unwrap();
1343-
}
1344-
nodes[1].process_monitor_events();
1345-
}
1346-
0xfa => {
1347-
let pending_updates = monitor_b.chain_monitor.list_pending_monitor_updates().remove(&chan_2_funding).unwrap();
1348-
if let Some(id) = pending_updates.last() {
1349-
monitor_b.chain_monitor.channel_monitor_updated(chan_2_funding, *id).unwrap();
1350-
}
1351-
nodes[1].process_monitor_events();
1352-
}
1356+
0xf8 => complete_monitor_update(&monitor_b, &chan_2_funding, &complete_first),
1357+
0xf9 => complete_monitor_update(&monitor_b, &chan_2_funding, &complete_second),
1358+
0xfa => complete_monitor_update(&monitor_b, &chan_2_funding, &Vec::pop),
13531359

1354-
0xfc => {
1355-
let pending_updates = monitor_c.chain_monitor.list_pending_monitor_updates().remove(&chan_2_funding).unwrap();
1356-
if let Some(id) = pending_updates.get(0) {
1357-
monitor_c.chain_monitor.channel_monitor_updated(chan_2_funding, *id).unwrap();
1358-
}
1359-
nodes[2].process_monitor_events();
1360-
}
1361-
0xfd => {
1362-
let pending_updates = monitor_c.chain_monitor.list_pending_monitor_updates().remove(&chan_2_funding).unwrap();
1363-
if let Some(id) = pending_updates.get(1) {
1364-
monitor_c.chain_monitor.channel_monitor_updated(chan_2_funding, *id).unwrap();
1365-
}
1366-
nodes[2].process_monitor_events();
1367-
}
1368-
0xfe => {
1369-
let pending_updates = monitor_c.chain_monitor.list_pending_monitor_updates().remove(&chan_2_funding).unwrap();
1370-
if let Some(id) = pending_updates.last() {
1371-
monitor_c.chain_monitor.channel_monitor_updated(chan_2_funding, *id).unwrap();
1372-
}
1373-
nodes[2].process_monitor_events();
1374-
}
1360+
0xfc => complete_monitor_update(&monitor_c, &chan_2_funding, &complete_first),
1361+
0xfd => complete_monitor_update(&monitor_c, &chan_2_funding, &complete_second),
1362+
0xfe => complete_monitor_update(&monitor_c, &chan_2_funding, &Vec::pop),
13751363

13761364
0xff => {
13771365
// Test that no channel is in a stuck state where neither party can send funds even
13781366
// after we resolve all pending events.
1379-
// First make sure there are no pending monitor updates, resetting the error state
1380-
// and calling force_channel_monitor_updated for each monitor.
1367+
// First make sure there are no pending monitor updates and further update
1368+
// operations complete.
13811369
*monitor_a.persister.update_ret.lock().unwrap() = ChannelMonitorUpdateStatus::Completed;
13821370
*monitor_b.persister.update_ret.lock().unwrap() = ChannelMonitorUpdateStatus::Completed;
13831371
*monitor_c.persister.update_ret.lock().unwrap() = ChannelMonitorUpdateStatus::Completed;
13841372

1385-
if let Some((id, _)) = monitor_a.latest_monitors.lock().unwrap().get(&chan_1_funding) {
1386-
monitor_a.chain_monitor.force_channel_monitor_updated(chan_1_funding, *id);
1387-
nodes[0].process_monitor_events();
1388-
}
1389-
if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_1_funding) {
1390-
monitor_b.chain_monitor.force_channel_monitor_updated(chan_1_funding, *id);
1391-
nodes[1].process_monitor_events();
1392-
}
1393-
if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_2_funding) {
1394-
monitor_b.chain_monitor.force_channel_monitor_updated(chan_2_funding, *id);
1395-
nodes[1].process_monitor_events();
1396-
}
1397-
if let Some((id, _)) = monitor_c.latest_monitors.lock().unwrap().get(&chan_2_funding) {
1398-
monitor_c.chain_monitor.force_channel_monitor_updated(chan_2_funding, *id);
1399-
nodes[2].process_monitor_events();
1400-
}
1373+
complete_all_monitor_updates(&monitor_a, &chan_1_funding);
1374+
complete_all_monitor_updates(&monitor_b, &chan_1_funding);
1375+
complete_all_monitor_updates(&monitor_b, &chan_2_funding);
1376+
complete_all_monitor_updates(&monitor_c, &chan_2_funding);
14011377

14021378
// Next, make sure peers are all connected to each other
14031379
if chan_a_disconnected {

0 commit comments

Comments
 (0)