Skip to content

Commit f62b0e7

Browse files
committed
Move pending-HTLC-updated ChannelMonitor from ManyChannelMonitor
This is important for a number of reasons: * Firstly, I hit this trying to implement rescan in the demo bitcoinrpc client - if individual ChannelMonitors are out of sync with each other, we cannot add them all into a ManyChannelMonitor together and then rescan, but need to rescan them individually without having to do a bunch of manual work. Of the three return values in ChannelMonitor::block_connected, only the HTLCsource stuff that is moved here makes no sense to be exposed to the user. * Secondly, the logic currently in ManyChannelMonitor cannot be reproduced by the user! HTLCSource is deliberately an opaque type but we use its data to decide which things to keep when inserting into the HashMap. This would prevent a user from properly implementing a replacement ManyChannelMonitor, which is unacceptable. * Finally, by moving the tracking into ChannelMonitor, we can serialize them out, which prevents us from forgetting them when loading from disk, though there are still other races which need to be handled to make this fully safe (see TODOs in ChannelManager). This is safe as no two entries can have the same HTLCSource across different channels (or, if they did, it would be a rather serious bug), though note that, IIRC, when this code was added, the HTLCSource field in the values was not present. We also take this opportunity to rename the fetch function to match our other event interfaces, makaing it clear that by calling the function the set of HTLCUpdates will also be cleared.
1 parent 2b7a343 commit f62b0e7

File tree

4 files changed

+96
-60
lines changed

4 files changed

+96
-60
lines changed

fuzz/src/chanmon_consistency.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,8 @@ impl channelmonitor::ManyChannelMonitor<EnforcingChannelKeys> for TestChannelMon
121121
ret
122122
}
123123

124-
fn fetch_pending_htlc_updated(&self) -> Vec<HTLCUpdate> {
125-
return self.simple_monitor.fetch_pending_htlc_updated();
124+
fn get_and_clear_pending_htlcs_updated(&self) -> Vec<HTLCUpdate> {
125+
return self.simple_monitor.get_and_clear_pending_htlcs_updated();
126126
}
127127
}
128128

lightning/src/ln/channelmanager.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2767,7 +2767,7 @@ impl<ChanSigner: ChannelKeys, M: Deref> events::MessageSendEventsProvider for Ch
27672767
// restart. This is doubly true for the fail/fulfill-backs from monitor events!
27682768
{
27692769
//TODO: This behavior should be documented.
2770-
for htlc_update in self.monitor.fetch_pending_htlc_updated() {
2770+
for htlc_update in self.monitor.get_and_clear_pending_htlcs_updated() {
27712771
if let Some(preimage) = htlc_update.payment_preimage {
27722772
log_trace!(self, "Claiming HTLC with preimage {} from our monitor", log_bytes!(preimage.0));
27732773
self.claim_funds_internal(self.channel_state.lock().unwrap(), htlc_update.source, preimage);
@@ -2792,7 +2792,7 @@ impl<ChanSigner: ChannelKeys, M: Deref> events::EventsProvider for ChannelManage
27922792
// restart. This is doubly true for the fail/fulfill-backs from monitor events!
27932793
{
27942794
//TODO: This behavior should be documented.
2795-
for htlc_update in self.monitor.fetch_pending_htlc_updated() {
2795+
for htlc_update in self.monitor.get_and_clear_pending_htlcs_updated() {
27962796
if let Some(preimage) = htlc_update.payment_preimage {
27972797
log_trace!(self, "Claiming HTLC with preimage {} from our monitor", log_bytes!(preimage.0));
27982798
self.claim_funds_internal(self.channel_state.lock().unwrap(), htlc_update.source, preimage);

lightning/src/ln/channelmonitor.rs

Lines changed: 90 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,12 @@ pub trait ManyChannelMonitor<ChanSigner: ChannelKeys>: Send + Sync {
127127
fn add_update_monitor(&self, funding_txo: OutPoint, monitor: ChannelMonitor<ChanSigner>) -> Result<(), ChannelMonitorUpdateErr>;
128128

129129
/// Used by ChannelManager to get list of HTLC resolved onchain and which needed to be updated
130-
/// with success or failure backward
131-
fn fetch_pending_htlc_updated(&self) -> Vec<HTLCUpdate>;
130+
/// with success or failure.
131+
///
132+
/// You should probably just call through to
133+
/// ChannelMonitor::get_and_clear_pending_htlcs_updated() for each ChannelMonitor and return
134+
/// the full list.
135+
fn get_and_clear_pending_htlcs_updated(&self) -> Vec<HTLCUpdate>;
132136
}
133137

134138
/// A simple implementation of a ManyChannelMonitor and ChainListener. Can be used to create a
@@ -150,7 +154,6 @@ pub struct SimpleManyChannelMonitor<Key, ChanSigner: ChannelKeys> {
150154
chain_monitor: Arc<ChainWatchInterface>,
151155
broadcaster: Arc<BroadcasterInterface>,
152156
pending_events: Mutex<Vec<events::Event>>,
153-
pending_htlc_updated: Mutex<HashMap<PaymentHash, Vec<(HTLCSource, Option<PaymentPreimage>)>>>,
154157
logger: Arc<Logger>,
155158
fee_estimator: Arc<FeeEstimator>
156159
}
@@ -159,11 +162,10 @@ impl<'a, Key : Send + cmp::Eq + hash::Hash, ChanSigner: ChannelKeys> ChainListen
159162
fn block_connected(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], _indexes_of_txn_matched: &[u32]) {
160163
let block_hash = header.bitcoin_hash();
161164
let mut new_events: Vec<events::Event> = Vec::with_capacity(0);
162-
let mut htlc_updated_infos = Vec::new();
163165
{
164166
let mut monitors = self.monitors.lock().unwrap();
165167
for monitor in monitors.values_mut() {
166-
let (txn_outputs, spendable_outputs, mut htlc_updated) = monitor.block_connected(txn_matched, height, &block_hash, &*self.broadcaster, &*self.fee_estimator);
168+
let (txn_outputs, spendable_outputs) = monitor.block_connected(txn_matched, height, &block_hash, &*self.broadcaster, &*self.fee_estimator);
167169
if spendable_outputs.len() > 0 {
168170
new_events.push(events::Event::SpendableOutputs {
169171
outputs: spendable_outputs,
@@ -175,35 +177,6 @@ impl<'a, Key : Send + cmp::Eq + hash::Hash, ChanSigner: ChannelKeys> ChainListen
175177
self.chain_monitor.install_watch_outpoint((txid.clone(), idx as u32), &output.script_pubkey);
176178
}
177179
}
178-
htlc_updated_infos.append(&mut htlc_updated);
179-
}
180-
}
181-
{
182-
// ChannelManager will just need to fetch pending_htlc_updated and pass state backward
183-
let mut pending_htlc_updated = self.pending_htlc_updated.lock().unwrap();
184-
for htlc in htlc_updated_infos.drain(..) {
185-
match pending_htlc_updated.entry(htlc.2) {
186-
hash_map::Entry::Occupied(mut e) => {
187-
// In case of reorg we may have htlc outputs solved in a different way so
188-
// we prefer to keep claims but don't store duplicate updates for a given
189-
// (payment_hash, HTLCSource) pair.
190-
let mut existing_claim = false;
191-
e.get_mut().retain(|htlc_data| {
192-
if htlc.0 == htlc_data.0 {
193-
if htlc_data.1.is_some() {
194-
existing_claim = true;
195-
true
196-
} else { false }
197-
} else { true }
198-
});
199-
if !existing_claim {
200-
e.get_mut().push((htlc.0, htlc.1));
201-
}
202-
}
203-
hash_map::Entry::Vacant(e) => {
204-
e.insert(vec![(htlc.0, htlc.1)]);
205-
}
206-
}
207180
}
208181
}
209182
let mut pending_events = self.pending_events.lock().unwrap();
@@ -228,7 +201,6 @@ impl<Key : Send + cmp::Eq + hash::Hash + 'static, ChanSigner: ChannelKeys> Simpl
228201
chain_monitor,
229202
broadcaster,
230203
pending_events: Mutex::new(Vec::new()),
231-
pending_htlc_updated: Mutex::new(HashMap::new()),
232204
logger,
233205
fee_estimator: feeest,
234206
};
@@ -281,17 +253,10 @@ impl<ChanSigner: ChannelKeys> ManyChannelMonitor<ChanSigner> for SimpleManyChann
281253
}
282254
}
283255

284-
fn fetch_pending_htlc_updated(&self) -> Vec<HTLCUpdate> {
285-
let mut updated = self.pending_htlc_updated.lock().unwrap();
286-
let mut pending_htlcs_updated = Vec::with_capacity(updated.len());
287-
for (k, v) in updated.drain() {
288-
for htlc_data in v {
289-
pending_htlcs_updated.push(HTLCUpdate {
290-
payment_hash: k,
291-
payment_preimage: htlc_data.1,
292-
source: htlc_data.0,
293-
});
294-
}
256+
fn get_and_clear_pending_htlcs_updated(&self) -> Vec<HTLCUpdate> {
257+
let mut pending_htlcs_updated = Vec::new();
258+
for chan in self.monitors.lock().unwrap().values_mut() {
259+
pending_htlcs_updated.append(&mut chan.get_and_clear_pending_htlcs_updated());
295260
}
296261
pending_htlcs_updated
297262
}
@@ -637,6 +602,8 @@ pub struct ChannelMonitor<ChanSigner: ChannelKeys> {
637602

638603
payment_preimages: HashMap<PaymentHash, PaymentPreimage>,
639604

605+
pending_htlcs_updated: HashMap<PaymentHash, Vec<(HTLCSource, Option<PaymentPreimage>)>>,
606+
640607
destination_script: Script,
641608
// Thanks to data loss protection, we may be able to claim our non-htlc funds
642609
// back, this is the script we have to spend from but we need to
@@ -747,6 +714,7 @@ impl<ChanSigner: ChannelKeys> PartialEq for ChannelMonitor<ChanSigner> {
747714
self.current_remote_commitment_number != other.current_remote_commitment_number ||
748715
self.current_local_signed_commitment_tx != other.current_local_signed_commitment_tx ||
749716
self.payment_preimages != other.payment_preimages ||
717+
self.pending_htlcs_updated != other.pending_htlcs_updated ||
750718
self.destination_script != other.destination_script ||
751719
self.to_remote_rescue != other.to_remote_rescue ||
752720
self.pending_claim_requests != other.pending_claim_requests ||
@@ -935,6 +903,16 @@ impl<ChanSigner: ChannelKeys + Writeable> ChannelMonitor<ChanSigner> {
935903
writer.write_all(&payment_preimage.0[..])?;
936904
}
937905

906+
writer.write_all(&byte_utils::be64_to_array(self.pending_htlcs_updated.len() as u64))?;
907+
for (payment_hash, data) in self.pending_htlcs_updated.iter() {
908+
writer.write_all(&payment_hash.0[..])?;
909+
writer.write_all(&byte_utils::be64_to_array(data.len() as u64))?;
910+
for &(ref source, ref payment_preimage) in data.iter() {
911+
source.write(writer)?;
912+
write_option!(payment_preimage);
913+
}
914+
}
915+
938916
self.last_block_hash.write(writer)?;
939917
self.destination_script.write(writer)?;
940918
if let Some((ref to_remote_script, ref local_key)) = self.to_remote_rescue {
@@ -1053,6 +1031,8 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
10531031
current_remote_commitment_number: 1 << 48,
10541032

10551033
payment_preimages: HashMap::new(),
1034+
pending_htlcs_updated: HashMap::new(),
1035+
10561036
destination_script: destination_script,
10571037
to_remote_rescue: None,
10581038

@@ -1416,6 +1396,22 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
14161396
res
14171397
}
14181398

1399+
/// Get the list of HTLCs who's status has been updated on chain. This should be called by
1400+
/// ChannelManager via ManyChannelMonitor::get_and_clear_pending_htlcs_updated().
1401+
pub fn get_and_clear_pending_htlcs_updated(&mut self) -> Vec<HTLCUpdate> {
1402+
let mut pending_htlcs_updated = Vec::with_capacity(self.pending_htlcs_updated.len());
1403+
for (k, v) in self.pending_htlcs_updated.drain() {
1404+
for htlc_data in v {
1405+
pending_htlcs_updated.push(HTLCUpdate {
1406+
payment_hash: k,
1407+
payment_preimage: htlc_data.1,
1408+
source: htlc_data.0,
1409+
});
1410+
}
1411+
}
1412+
pending_htlcs_updated
1413+
}
1414+
14191415
/// Can only fail if idx is < get_min_seen_secret
14201416
pub(super) fn get_secret(&self, idx: u64) -> Option<[u8; 32]> {
14211417
for i in 0..self.old_secrets.len() {
@@ -2394,11 +2390,39 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
23942390
}
23952391
}
23962392

2393+
fn append_htlc_updated(&mut self, mut htlc_updated_infos: Vec<(HTLCSource, Option<PaymentPreimage>, PaymentHash)>) {
2394+
// ChannelManager will just need to fetch pending_htlcs_updated and pass state backward
2395+
for htlc in htlc_updated_infos.drain(..) {
2396+
match self.pending_htlcs_updated.entry(htlc.2) {
2397+
hash_map::Entry::Occupied(mut e) => {
2398+
// In case of reorg we may have htlc outputs solved in a different way so
2399+
// we prefer to keep claims but don't store duplicate updates for a given
2400+
// (payment_hash, HTLCSource) pair.
2401+
let mut existing_claim = false;
2402+
e.get_mut().retain(|htlc_data| {
2403+
if htlc.0 == htlc_data.0 {
2404+
if htlc_data.1.is_some() {
2405+
existing_claim = true;
2406+
true
2407+
} else { false }
2408+
} else { true }
2409+
});
2410+
if !existing_claim {
2411+
e.get_mut().push((htlc.0, htlc.1));
2412+
}
2413+
}
2414+
hash_map::Entry::Vacant(e) => {
2415+
e.insert(vec![(htlc.0, htlc.1)]);
2416+
}
2417+
}
2418+
}
2419+
}
2420+
23972421
/// Called by ChannelMonitor::block_connected, which implements ChainListener::block_connected.
23982422
/// Eventually this should be pub and, roughly, implement ChainListener, however this requires
23992423
/// &mut self, as well as returns new spendable outputs and outpoints to watch for spending of
24002424
/// on-chain.
2401-
fn block_connected(&mut self, txn_matched: &[&Transaction], height: u32, block_hash: &Sha256dHash, broadcaster: &BroadcasterInterface, fee_estimator: &FeeEstimator)-> (Vec<(Sha256dHash, Vec<TxOut>)>, Vec<SpendableOutputDescriptor>, Vec<(HTLCSource, Option<PaymentPreimage>, PaymentHash)>) {
2425+
fn block_connected(&mut self, txn_matched: &[&Transaction], height: u32, block_hash: &Sha256dHash, broadcaster: &BroadcasterInterface, fee_estimator: &FeeEstimator)-> (Vec<(Sha256dHash, Vec<TxOut>)>, Vec<SpendableOutputDescriptor>) {
24022426
for tx in txn_matched {
24032427
let mut output_val = 0;
24042428
for out in tx.output.iter() {
@@ -2411,7 +2435,6 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
24112435
log_trace!(self, "Block {} at height {} connected with {} txn matched", block_hash, height, txn_matched.len());
24122436
let mut watch_outputs = Vec::new();
24132437
let mut spendable_outputs = Vec::new();
2414-
let mut htlc_updated = Vec::new();
24152438
let mut bump_candidates = HashSet::new();
24162439
for tx in txn_matched {
24172440
if tx.input.len() == 1 {
@@ -2470,10 +2493,8 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
24702493
// While all commitment/HTLC-Success/HTLC-Timeout transactions have one input, HTLCs
24712494
// can also be resolved in a few other ways which can have more than one output. Thus,
24722495
// we call is_resolving_htlc_output here outside of the tx.input.len() == 1 check.
2473-
let mut updated = self.is_resolving_htlc_output(&tx, height);
2474-
if updated.len() > 0 {
2475-
htlc_updated.append(&mut updated);
2476-
}
2496+
let htlcs_updated = self.is_resolving_htlc_output(&tx, height);
2497+
self.append_htlc_updated(htlcs_updated);
24772498

24782499
// Scan all input to verify is one of the outpoint spent is of interest for us
24792500
let mut claimed_outputs_material = Vec::new();
@@ -2596,7 +2617,7 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
25962617
},
25972618
OnchainEvent::HTLCUpdate { htlc_update } => {
25982619
log_trace!(self, "HTLC {} failure update has got enough confirmations to be passed upstream", log_bytes!((htlc_update.1).0));
2599-
htlc_updated.push((htlc_update.0, None, htlc_update.1));
2620+
self.append_htlc_updated(vec![(htlc_update.0, None, htlc_update.1)]);
26002621
},
26012622
OnchainEvent::ContentiousOutpoint { outpoint, .. } => {
26022623
self.claimable_outpoints.remove(&outpoint);
@@ -2628,7 +2649,7 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
26282649
for &(ref txid, ref output_scripts) in watch_outputs.iter() {
26292650
self.outputs_to_watch.insert(txid.clone(), output_scripts.iter().map(|o| o.script_pubkey.clone()).collect());
26302651
}
2631-
(watch_outputs, spendable_outputs, htlc_updated)
2652+
(watch_outputs, spendable_outputs)
26322653
}
26332654

26342655
fn block_disconnected(&mut self, height: u32, block_hash: &Sha256dHash, broadcaster: &BroadcasterInterface, fee_estimator: &FeeEstimator) {
@@ -3217,6 +3238,20 @@ impl<R: ::std::io::Read, ChanSigner: ChannelKeys + Readable<R>> ReadableArgs<R,
32173238
}
32183239
}
32193240

3241+
let pending_htlcs_updated_len: u64 = Readable::read(reader)?;
3242+
let mut pending_htlcs_updated = HashMap::with_capacity(cmp::min(pending_htlcs_updated_len as usize, MAX_ALLOC_SIZE / (32 + 8*3)));
3243+
for _ in 0..pending_htlcs_updated_len {
3244+
let payment_hash: PaymentHash = Readable::read(reader)?;
3245+
let htlcs_len: u64 = Readable::read(reader)?;
3246+
let mut htlcs = Vec::with_capacity(cmp::min(htlcs_len as usize, MAX_ALLOC_SIZE / 64));
3247+
for _ in 0..htlcs_len {
3248+
htlcs.push((Readable::read(reader)?, Readable::read(reader)?));
3249+
}
3250+
if let Some(_) = pending_htlcs_updated.insert(payment_hash, htlcs) {
3251+
return Err(DecodeError::InvalidValue);
3252+
}
3253+
}
3254+
32203255
let last_block_hash: Sha256dHash = Readable::read(reader)?;
32213256
let destination_script = Readable::read(reader)?;
32223257
let to_remote_rescue = match <u8 as Readable<R>>::read(reader)? {
@@ -3317,6 +3352,7 @@ impl<R: ::std::io::Read, ChanSigner: ChannelKeys + Readable<R>> ReadableArgs<R,
33173352
current_remote_commitment_number,
33183353

33193354
payment_preimages,
3355+
pending_htlcs_updated,
33203356

33213357
destination_script,
33223358
to_remote_rescue,

lightning/src/util/test_utils.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,8 @@ impl channelmonitor::ManyChannelMonitor<EnforcingChannelKeys> for TestChannelMon
7474
self.update_ret.lock().unwrap().clone()
7575
}
7676

77-
fn fetch_pending_htlc_updated(&self) -> Vec<HTLCUpdate> {
78-
return self.simple_monitor.fetch_pending_htlc_updated();
77+
fn get_and_clear_pending_htlcs_updated(&self) -> Vec<HTLCUpdate> {
78+
return self.simple_monitor.get_and_clear_pending_htlcs_updated();
7979
}
8080
}
8181

0 commit comments

Comments
 (0)