Skip to content

Commit f2d7a09

Browse files
committed
Flatten onchain_events_waiting_threshold_conf
Rather than mapping height to a vector of events, use a single vector for all events. This allows for easily processing events by either height or transaction. The latter will be used for an interface suitable for Electrum.
1 parent 6fcac8b commit f2d7a09

File tree

2 files changed

+209
-198
lines changed

2 files changed

+209
-198
lines changed

lightning/src/chain/channelmonitor.rs

Lines changed: 130 additions & 128 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ use util::ser::{Readable, ReadableArgs, MaybeReadable, Writer, Writeable, U48};
5050
use util::byte_utils;
5151
use util::events::Event;
5252

53-
use std::collections::{HashMap, HashSet, hash_map};
53+
use std::collections::{HashMap, HashSet};
5454
use std::{cmp, mem};
5555
use std::io::Error;
5656
use std::ops::Deref;
@@ -465,9 +465,26 @@ pub(crate) struct ClaimRequest {
465465
pub(crate) witness_data: InputMaterial
466466
}
467467

468+
/// TODO: Write docs
469+
#[derive(PartialEq)]
470+
struct OnchainEventEntry {
471+
height: u32,
472+
event: OnchainEvent,
473+
}
474+
475+
impl OnchainEventEntry {
476+
fn confirmation_threshold(&self) -> u32 {
477+
self.height + ANTI_REORG_DELAY - 1
478+
}
479+
480+
fn has_reached_confirmation_threshold(&self, height: u32) -> bool {
481+
self.confirmation_threshold() == height
482+
}
483+
}
484+
468485
/// Upon discovering of some classes of onchain tx by ChannelMonitor, we may have to take actions on it
469486
/// once they mature to enough confirmations (ANTI_REORG_DELAY)
470-
#[derive(Clone, PartialEq)]
487+
#[derive(PartialEq)]
471488
enum OnchainEvent {
472489
/// HTLC output getting solved by a timeout, at maturation we pass upstream payment source information to solve
473490
/// inbound HTLC in backward channel. Note, in case of preimage, we pass info to upstream without delay as we can
@@ -687,7 +704,9 @@ pub(crate) struct ChannelMonitorImpl<Signer: Sign> {
687704
// Used to track onchain events, i.e transactions parts of channels confirmed on chain, on which
688705
// we have to take actions once they reach enough confs. Key is a block height timer, i.e we enforce
689706
// actions when we receive a block with given height. Actions depend on OnchainEvent type.
690-
onchain_events_waiting_threshold_conf: HashMap<u32, Vec<OnchainEvent>>,
707+
//
708+
// TODO: Rewrite docs
709+
onchain_events_waiting_threshold_conf: Vec<OnchainEventEntry>,
691710

692711
// If we get serialized out and re-read, we need to make sure that the chain monitoring
693712
// interface knows about the TXOs that we want to be notified of spends of. We could probably
@@ -934,21 +953,18 @@ impl<Signer: Sign> Writeable for ChannelMonitorImpl<Signer> {
934953
self.last_block_hash.write(writer)?;
935954

936955
writer.write_all(&byte_utils::be64_to_array(self.onchain_events_waiting_threshold_conf.len() as u64))?;
937-
for (ref target, ref events) in self.onchain_events_waiting_threshold_conf.iter() {
938-
writer.write_all(&byte_utils::be32_to_array(**target))?;
939-
writer.write_all(&byte_utils::be64_to_array(events.len() as u64))?;
940-
for ev in events.iter() {
941-
match *ev {
942-
OnchainEvent::HTLCUpdate { ref htlc_update } => {
943-
0u8.write(writer)?;
944-
htlc_update.0.write(writer)?;
945-
htlc_update.1.write(writer)?;
946-
},
947-
OnchainEvent::MaturingOutput { ref descriptor } => {
948-
1u8.write(writer)?;
949-
descriptor.write(writer)?;
950-
},
951-
}
956+
for ref entry in self.onchain_events_waiting_threshold_conf.iter() {
957+
writer.write_all(&byte_utils::be32_to_array(entry.height))?;
958+
match entry.event {
959+
OnchainEvent::HTLCUpdate { ref htlc_update } => {
960+
0u8.write(writer)?;
961+
htlc_update.0.write(writer)?;
962+
htlc_update.1.write(writer)?;
963+
},
964+
OnchainEvent::MaturingOutput { ref descriptor } => {
965+
1u8.write(writer)?;
966+
descriptor.write(writer)?;
967+
},
952968
}
953969
}
954970

@@ -1056,7 +1072,7 @@ impl<Signer: Sign> ChannelMonitor<Signer> {
10561072
pending_monitor_events: Vec::new(),
10571073
pending_events: Vec::new(),
10581074

1059-
onchain_events_waiting_threshold_conf: HashMap::new(),
1075+
onchain_events_waiting_threshold_conf: Vec::new(),
10601076
outputs_to_watch,
10611077

10621078
onchain_tx_handler,
@@ -1639,24 +1655,23 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
16391655
if let Some(ref outpoints) = self.counterparty_claimable_outpoints.get($txid) {
16401656
for &(ref htlc, ref source_option) in outpoints.iter() {
16411657
if let &Some(ref source) = source_option {
1642-
log_info!(logger, "Failing HTLC with payment_hash {} from {} counterparty commitment tx due to broadcast of revoked counterparty commitment transaction, waiting for confirmation (at height {})", log_bytes!(htlc.payment_hash.0), $commitment_tx, height + ANTI_REORG_DELAY - 1);
1643-
match self.onchain_events_waiting_threshold_conf.entry(height + ANTI_REORG_DELAY - 1) {
1644-
hash_map::Entry::Occupied(mut entry) => {
1645-
let e = entry.get_mut();
1646-
e.retain(|ref event| {
1647-
match **event {
1648-
OnchainEvent::HTLCUpdate { ref htlc_update } => {
1649-
return htlc_update.0 != **source
1650-
},
1651-
_ => true
1652-
}
1653-
});
1654-
e.push(OnchainEvent::HTLCUpdate { htlc_update: ((**source).clone(), htlc.payment_hash.clone())});
1655-
}
1656-
hash_map::Entry::Vacant(entry) => {
1657-
entry.insert(vec![OnchainEvent::HTLCUpdate { htlc_update: ((**source).clone(), htlc.payment_hash.clone())}]);
1658+
self.onchain_events_waiting_threshold_conf.retain(|ref entry| {
1659+
if entry.height != height { return true; }
1660+
match entry.event {
1661+
OnchainEvent::HTLCUpdate { ref htlc_update } => {
1662+
htlc_update.0 != **source
1663+
},
1664+
_ => true,
16581665
}
1659-
}
1666+
});
1667+
let entry = OnchainEventEntry {
1668+
height,
1669+
event: OnchainEvent::HTLCUpdate {
1670+
htlc_update: ((**source).clone(), htlc.payment_hash.clone())
1671+
},
1672+
};
1673+
log_info!(logger, "Failing HTLC with payment_hash {} from {} counterparty commitment tx due to broadcast of revoked counterparty commitment transaction, waiting for confirmation (at height {})", log_bytes!(htlc.payment_hash.0), $commitment_tx, entry.confirmation_threshold());
1674+
self.onchain_events_waiting_threshold_conf.push(entry);
16601675
}
16611676
}
16621677
}
@@ -1705,23 +1720,21 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
17051720
}
17061721
}
17071722
log_trace!(logger, "Failing HTLC with payment_hash {} from {} counterparty commitment tx due to broadcast of counterparty commitment transaction", log_bytes!(htlc.payment_hash.0), $commitment_tx);
1708-
match self.onchain_events_waiting_threshold_conf.entry(height + ANTI_REORG_DELAY - 1) {
1709-
hash_map::Entry::Occupied(mut entry) => {
1710-
let e = entry.get_mut();
1711-
e.retain(|ref event| {
1712-
match **event {
1713-
OnchainEvent::HTLCUpdate { ref htlc_update } => {
1714-
return htlc_update.0 != **source
1715-
},
1716-
_ => true
1717-
}
1718-
});
1719-
e.push(OnchainEvent::HTLCUpdate { htlc_update: ((**source).clone(), htlc.payment_hash.clone())});
1720-
}
1721-
hash_map::Entry::Vacant(entry) => {
1722-
entry.insert(vec![OnchainEvent::HTLCUpdate { htlc_update: ((**source).clone(), htlc.payment_hash.clone())}]);
1723+
self.onchain_events_waiting_threshold_conf.retain(|ref entry| {
1724+
if entry.height != height { return true; }
1725+
match entry.event {
1726+
OnchainEvent::HTLCUpdate { ref htlc_update } => {
1727+
htlc_update.0 != **source
1728+
},
1729+
_ => true,
17231730
}
1724-
}
1731+
});
1732+
self.onchain_events_waiting_threshold_conf.push(OnchainEventEntry {
1733+
height,
1734+
event: OnchainEvent::HTLCUpdate {
1735+
htlc_update: ((**source).clone(), htlc.payment_hash.clone())
1736+
},
1737+
});
17251738
}
17261739
}
17271740
}
@@ -1862,24 +1875,21 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
18621875

18631876
macro_rules! wait_threshold_conf {
18641877
($height: expr, $source: expr, $commitment_tx: expr, $payment_hash: expr) => {
1865-
log_trace!(logger, "Failing HTLC with payment_hash {} from {} holder commitment tx due to broadcast of transaction, waiting confirmation (at height{})", log_bytes!($payment_hash.0), $commitment_tx, height + ANTI_REORG_DELAY - 1);
1866-
match self.onchain_events_waiting_threshold_conf.entry($height + ANTI_REORG_DELAY - 1) {
1867-
hash_map::Entry::Occupied(mut entry) => {
1868-
let e = entry.get_mut();
1869-
e.retain(|ref event| {
1870-
match **event {
1871-
OnchainEvent::HTLCUpdate { ref htlc_update } => {
1872-
return htlc_update.0 != $source
1873-
},
1874-
_ => true
1875-
}
1876-
});
1877-
e.push(OnchainEvent::HTLCUpdate { htlc_update: ($source, $payment_hash)});
1878+
self.onchain_events_waiting_threshold_conf.retain(|ref entry| {
1879+
if entry.height != $height { return true; }
1880+
match entry.event {
1881+
OnchainEvent::HTLCUpdate { ref htlc_update } => {
1882+
htlc_update.0 != $source
1883+
},
1884+
_ => true,
18781885
}
1879-
hash_map::Entry::Vacant(entry) => {
1880-
entry.insert(vec![OnchainEvent::HTLCUpdate { htlc_update: ($source, $payment_hash)}]);
1881-
}
1882-
}
1886+
});
1887+
let entry = OnchainEventEntry {
1888+
height,
1889+
event: OnchainEvent::HTLCUpdate { htlc_update: ($source, $payment_hash) },
1890+
};
1891+
log_trace!(logger, "Failing HTLC with payment_hash {} from {} holder commitment tx due to broadcast of transaction, waiting confirmation (at height{})", log_bytes!($payment_hash.0), $commitment_tx, entry.confirmation_threshold());
1892+
self.onchain_events_waiting_threshold_conf.push(entry);
18831893
}
18841894
}
18851895

@@ -2054,9 +2064,12 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
20542064
}
20552065
claimable_outpoints.append(&mut new_outpoints);
20562066
}
2057-
if let Some(events) = self.onchain_events_waiting_threshold_conf.remove(&height) {
2058-
for ev in events {
2059-
match ev {
2067+
2068+
let onchain_events_waiting_threshold_conf =
2069+
self.onchain_events_waiting_threshold_conf.drain(..).collect::<Vec<_>>();
2070+
for entry in onchain_events_waiting_threshold_conf {
2071+
if entry.has_reached_confirmation_threshold(height) {
2072+
match entry.event {
20602073
OnchainEvent::HTLCUpdate { htlc_update } => {
20612074
log_trace!(logger, "HTLC {} failure update has got enough confirmations to be passed upstream", log_bytes!((htlc_update.1).0));
20622075
self.pending_monitor_events.push(MonitorEvent::HTLCEvent(HTLCUpdate {
@@ -2072,6 +2085,8 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
20722085
});
20732086
}
20742087
}
2088+
} else {
2089+
self.onchain_events_waiting_threshold_conf.push(entry);
20752090
}
20762091
}
20772092

@@ -2108,11 +2123,10 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
21082123
{
21092124
log_trace!(logger, "Block {} at height {} disconnected", header.block_hash(), height);
21102125

2111-
if let Some(_) = self.onchain_events_waiting_threshold_conf.remove(&(height + ANTI_REORG_DELAY - 1)) {
2112-
//We may discard:
2113-
//- htlc update there as failure-trigger tx (revoked commitment tx, non-revoked commitment tx, HTLC-timeout tx) has been disconnected
2114-
//- maturing spendable output has transaction paying us has been disconnected
2115-
}
2126+
//We may discard:
2127+
//- htlc update there as failure-trigger tx (revoked commitment tx, non-revoked commitment tx, HTLC-timeout tx) has been disconnected
2128+
//- maturing spendable output has transaction paying us has been disconnected
2129+
self.onchain_events_waiting_threshold_conf.retain(|ref entry| entry.height != height);
21162130

21172131
self.onchain_tx_handler.block_disconnected(height, broadcaster, fee_estimator, logger);
21182132

@@ -2344,24 +2358,21 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
23442358
}));
23452359
}
23462360
} else {
2347-
log_info!(logger, "Failing HTLC with payment_hash {} timeout by a spend tx, waiting for confirmation (at height{})", log_bytes!(payment_hash.0), height + ANTI_REORG_DELAY - 1);
2348-
match self.onchain_events_waiting_threshold_conf.entry(height + ANTI_REORG_DELAY - 1) {
2349-
hash_map::Entry::Occupied(mut entry) => {
2350-
let e = entry.get_mut();
2351-
e.retain(|ref event| {
2352-
match **event {
2353-
OnchainEvent::HTLCUpdate { ref htlc_update } => {
2354-
return htlc_update.0 != source
2355-
},
2356-
_ => true
2357-
}
2358-
});
2359-
e.push(OnchainEvent::HTLCUpdate { htlc_update: (source, payment_hash)});
2360-
}
2361-
hash_map::Entry::Vacant(entry) => {
2362-
entry.insert(vec![OnchainEvent::HTLCUpdate { htlc_update: (source, payment_hash)}]);
2361+
self.onchain_events_waiting_threshold_conf.retain(|ref entry| {
2362+
if entry.height != height { return true; }
2363+
match entry.event {
2364+
OnchainEvent::HTLCUpdate { ref htlc_update } => {
2365+
htlc_update.0 != source
2366+
},
2367+
_ => true,
23632368
}
2364-
}
2369+
});
2370+
let entry = OnchainEventEntry {
2371+
height,
2372+
event: OnchainEvent::HTLCUpdate { htlc_update: (source, payment_hash) },
2373+
};
2374+
log_info!(logger, "Failing HTLC with payment_hash {} timeout by a spend tx, waiting for confirmation (at height{})", log_bytes!(payment_hash.0), entry.confirmation_threshold());
2375+
self.onchain_events_waiting_threshold_conf.push(entry);
23652376
}
23662377
}
23672378
}
@@ -2420,16 +2431,12 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
24202431
}
24212432
}
24222433
if let Some(spendable_output) = spendable_output {
2423-
log_trace!(logger, "Maturing {} until {}", log_spendable!(spendable_output), height + ANTI_REORG_DELAY - 1);
2424-
match self.onchain_events_waiting_threshold_conf.entry(height + ANTI_REORG_DELAY - 1) {
2425-
hash_map::Entry::Occupied(mut entry) => {
2426-
let e = entry.get_mut();
2427-
e.push(OnchainEvent::MaturingOutput { descriptor: spendable_output });
2428-
}
2429-
hash_map::Entry::Vacant(entry) => {
2430-
entry.insert(vec![OnchainEvent::MaturingOutput { descriptor: spendable_output }]);
2431-
}
2432-
}
2434+
let entry = OnchainEventEntry {
2435+
height: height,
2436+
event: OnchainEvent::MaturingOutput { descriptor: spendable_output.clone() },
2437+
};
2438+
log_trace!(logger, "Maturing {} until {}", log_spendable!(spendable_output), entry.confirmation_threshold());
2439+
self.onchain_events_waiting_threshold_conf.push(entry);
24332440
}
24342441
}
24352442
}
@@ -2695,31 +2702,26 @@ impl<'a, Signer: Sign, K: KeysInterface<Signer = Signer>> ReadableArgs<&'a K>
26952702
let last_block_hash: BlockHash = Readable::read(reader)?;
26962703

26972704
let waiting_threshold_conf_len: u64 = Readable::read(reader)?;
2698-
let mut onchain_events_waiting_threshold_conf = HashMap::with_capacity(cmp::min(waiting_threshold_conf_len as usize, MAX_ALLOC_SIZE / 128));
2705+
let mut onchain_events_waiting_threshold_conf = Vec::with_capacity(cmp::min(waiting_threshold_conf_len as usize, MAX_ALLOC_SIZE / 128));
26992706
for _ in 0..waiting_threshold_conf_len {
2700-
let height_target = Readable::read(reader)?;
2701-
let events_len: u64 = Readable::read(reader)?;
2702-
let mut events = Vec::with_capacity(cmp::min(events_len as usize, MAX_ALLOC_SIZE / 128));
2703-
for _ in 0..events_len {
2704-
let ev = match <u8 as Readable>::read(reader)? {
2705-
0 => {
2706-
let htlc_source = Readable::read(reader)?;
2707-
let hash = Readable::read(reader)?;
2708-
OnchainEvent::HTLCUpdate {
2709-
htlc_update: (htlc_source, hash)
2710-
}
2711-
},
2712-
1 => {
2713-
let descriptor = Readable::read(reader)?;
2714-
OnchainEvent::MaturingOutput {
2715-
descriptor
2716-
}
2717-
},
2718-
_ => return Err(DecodeError::InvalidValue),
2719-
};
2720-
events.push(ev);
2721-
}
2722-
onchain_events_waiting_threshold_conf.insert(height_target, events);
2707+
let height = Readable::read(reader)?;
2708+
let event = match <u8 as Readable>::read(reader)? {
2709+
0 => {
2710+
let htlc_source = Readable::read(reader)?;
2711+
let hash = Readable::read(reader)?;
2712+
OnchainEvent::HTLCUpdate {
2713+
htlc_update: (htlc_source, hash)
2714+
}
2715+
},
2716+
1 => {
2717+
let descriptor = Readable::read(reader)?;
2718+
OnchainEvent::MaturingOutput {
2719+
descriptor
2720+
}
2721+
},
2722+
_ => return Err(DecodeError::InvalidValue),
2723+
};
2724+
onchain_events_waiting_threshold_conf.push(OnchainEventEntry { height, event });
27232725
}
27242726

27252727
let outputs_to_watch_len: u64 = Readable::read(reader)?;

0 commit comments

Comments
 (0)