Skip to content

Commit 8a84f9a

Browse files
committed
Flatten onchain_events_waiting_threshold_conf
Rather than mapping height to a vector of events, use a single vector for all events. This allows for easily processing events by either height or transaction. The latter will be used for an interface suitable for Electrum.
1 parent e23c270 commit 8a84f9a

File tree

2 files changed

+213
-198
lines changed

2 files changed

+213
-198
lines changed

lightning/src/chain/channelmonitor.rs

Lines changed: 132 additions & 128 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ use util::ser::{Readable, ReadableArgs, MaybeReadable, Writer, Writeable, U48};
5050
use util::byte_utils;
5151
use util::events::Event;
5252

53-
use std::collections::{HashMap, HashSet, hash_map};
53+
use std::collections::{HashMap, HashSet};
5454
use std::{cmp, mem};
5555
use std::io::Error;
5656
use std::ops::Deref;
@@ -465,9 +465,28 @@ pub(crate) struct ClaimRequest {
465465
pub(crate) witness_data: InputMaterial
466466
}
467467

468+
/// An entry for an [`OnchainEvent`], stating the block height when the event was observed.
469+
///
470+
/// Used to determine when the on-chain event can be considered safe from a chain reorganization.
471+
#[derive(PartialEq)]
472+
struct OnchainEventEntry {
473+
height: u32,
474+
event: OnchainEvent,
475+
}
476+
477+
impl OnchainEventEntry {
478+
fn confirmation_threshold(&self) -> u32 {
479+
self.height + ANTI_REORG_DELAY - 1
480+
}
481+
482+
fn has_reached_confirmation_threshold(&self, height: u32) -> bool {
483+
self.confirmation_threshold() == height
484+
}
485+
}
486+
468487
/// Upon discovering of some classes of onchain tx by ChannelMonitor, we may have to take actions on it
469488
/// once they mature to enough confirmations (ANTI_REORG_DELAY)
470-
#[derive(Clone, PartialEq)]
489+
#[derive(PartialEq)]
471490
enum OnchainEvent {
472491
/// HTLC output getting solved by a timeout, at maturation we pass upstream payment source information to solve
473492
/// inbound HTLC in backward channel. Note, in case of preimage, we pass info to upstream without delay as we can
@@ -687,7 +706,9 @@ pub(crate) struct ChannelMonitorImpl<Signer: Sign> {
687706
// Used to track onchain events, i.e transactions parts of channels confirmed on chain, on which
688707
// we have to take actions once they reach enough confs. Key is a block height timer, i.e we enforce
689708
// actions when we receive a block with given height. Actions depend on OnchainEvent type.
690-
onchain_events_waiting_threshold_conf: HashMap<u32, Vec<OnchainEvent>>,
709+
//
710+
// TODO: Rewrite docs
711+
onchain_events_waiting_threshold_conf: Vec<OnchainEventEntry>,
691712

692713
// If we get serialized out and re-read, we need to make sure that the chain monitoring
693714
// interface knows about the TXOs that we want to be notified of spends of. We could probably
@@ -934,21 +955,18 @@ impl<Signer: Sign> Writeable for ChannelMonitorImpl<Signer> {
934955
self.last_block_hash.write(writer)?;
935956

936957
writer.write_all(&byte_utils::be64_to_array(self.onchain_events_waiting_threshold_conf.len() as u64))?;
937-
for (ref target, ref events) in self.onchain_events_waiting_threshold_conf.iter() {
938-
writer.write_all(&byte_utils::be32_to_array(**target))?;
939-
writer.write_all(&byte_utils::be64_to_array(events.len() as u64))?;
940-
for ev in events.iter() {
941-
match *ev {
942-
OnchainEvent::HTLCUpdate { ref htlc_update } => {
943-
0u8.write(writer)?;
944-
htlc_update.0.write(writer)?;
945-
htlc_update.1.write(writer)?;
946-
},
947-
OnchainEvent::MaturingOutput { ref descriptor } => {
948-
1u8.write(writer)?;
949-
descriptor.write(writer)?;
950-
},
951-
}
958+
for ref entry in self.onchain_events_waiting_threshold_conf.iter() {
959+
writer.write_all(&byte_utils::be32_to_array(entry.height))?;
960+
match entry.event {
961+
OnchainEvent::HTLCUpdate { ref htlc_update } => {
962+
0u8.write(writer)?;
963+
htlc_update.0.write(writer)?;
964+
htlc_update.1.write(writer)?;
965+
},
966+
OnchainEvent::MaturingOutput { ref descriptor } => {
967+
1u8.write(writer)?;
968+
descriptor.write(writer)?;
969+
},
952970
}
953971
}
954972

@@ -1056,7 +1074,7 @@ impl<Signer: Sign> ChannelMonitor<Signer> {
10561074
pending_monitor_events: Vec::new(),
10571075
pending_events: Vec::new(),
10581076

1059-
onchain_events_waiting_threshold_conf: HashMap::new(),
1077+
onchain_events_waiting_threshold_conf: Vec::new(),
10601078
outputs_to_watch,
10611079

10621080
onchain_tx_handler,
@@ -1639,24 +1657,23 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
16391657
if let Some(ref outpoints) = self.counterparty_claimable_outpoints.get($txid) {
16401658
for &(ref htlc, ref source_option) in outpoints.iter() {
16411659
if let &Some(ref source) = source_option {
1642-
log_info!(logger, "Failing HTLC with payment_hash {} from {} counterparty commitment tx due to broadcast of revoked counterparty commitment transaction, waiting for confirmation (at height {})", log_bytes!(htlc.payment_hash.0), $commitment_tx, height + ANTI_REORG_DELAY - 1);
1643-
match self.onchain_events_waiting_threshold_conf.entry(height + ANTI_REORG_DELAY - 1) {
1644-
hash_map::Entry::Occupied(mut entry) => {
1645-
let e = entry.get_mut();
1646-
e.retain(|ref event| {
1647-
match **event {
1648-
OnchainEvent::HTLCUpdate { ref htlc_update } => {
1649-
return htlc_update.0 != **source
1650-
},
1651-
_ => true
1652-
}
1653-
});
1654-
e.push(OnchainEvent::HTLCUpdate { htlc_update: ((**source).clone(), htlc.payment_hash.clone())});
1655-
}
1656-
hash_map::Entry::Vacant(entry) => {
1657-
entry.insert(vec![OnchainEvent::HTLCUpdate { htlc_update: ((**source).clone(), htlc.payment_hash.clone())}]);
1660+
self.onchain_events_waiting_threshold_conf.retain(|ref entry| {
1661+
if entry.height != height { return true; }
1662+
match entry.event {
1663+
OnchainEvent::HTLCUpdate { ref htlc_update } => {
1664+
htlc_update.0 != **source
1665+
},
1666+
_ => true,
16581667
}
1659-
}
1668+
});
1669+
let entry = OnchainEventEntry {
1670+
height,
1671+
event: OnchainEvent::HTLCUpdate {
1672+
htlc_update: ((**source).clone(), htlc.payment_hash.clone())
1673+
},
1674+
};
1675+
log_info!(logger, "Failing HTLC with payment_hash {} from {} counterparty commitment tx due to broadcast of revoked counterparty commitment transaction, waiting for confirmation (at height {})", log_bytes!(htlc.payment_hash.0), $commitment_tx, entry.confirmation_threshold());
1676+
self.onchain_events_waiting_threshold_conf.push(entry);
16601677
}
16611678
}
16621679
}
@@ -1705,23 +1722,21 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
17051722
}
17061723
}
17071724
log_trace!(logger, "Failing HTLC with payment_hash {} from {} counterparty commitment tx due to broadcast of counterparty commitment transaction", log_bytes!(htlc.payment_hash.0), $commitment_tx);
1708-
match self.onchain_events_waiting_threshold_conf.entry(height + ANTI_REORG_DELAY - 1) {
1709-
hash_map::Entry::Occupied(mut entry) => {
1710-
let e = entry.get_mut();
1711-
e.retain(|ref event| {
1712-
match **event {
1713-
OnchainEvent::HTLCUpdate { ref htlc_update } => {
1714-
return htlc_update.0 != **source
1715-
},
1716-
_ => true
1717-
}
1718-
});
1719-
e.push(OnchainEvent::HTLCUpdate { htlc_update: ((**source).clone(), htlc.payment_hash.clone())});
1720-
}
1721-
hash_map::Entry::Vacant(entry) => {
1722-
entry.insert(vec![OnchainEvent::HTLCUpdate { htlc_update: ((**source).clone(), htlc.payment_hash.clone())}]);
1725+
self.onchain_events_waiting_threshold_conf.retain(|ref entry| {
1726+
if entry.height != height { return true; }
1727+
match entry.event {
1728+
OnchainEvent::HTLCUpdate { ref htlc_update } => {
1729+
htlc_update.0 != **source
1730+
},
1731+
_ => true,
17231732
}
1724-
}
1733+
});
1734+
self.onchain_events_waiting_threshold_conf.push(OnchainEventEntry {
1735+
height,
1736+
event: OnchainEvent::HTLCUpdate {
1737+
htlc_update: ((**source).clone(), htlc.payment_hash.clone())
1738+
},
1739+
});
17251740
}
17261741
}
17271742
}
@@ -1862,24 +1877,21 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
18621877

18631878
macro_rules! wait_threshold_conf {
18641879
($height: expr, $source: expr, $commitment_tx: expr, $payment_hash: expr) => {
1865-
log_trace!(logger, "Failing HTLC with payment_hash {} from {} holder commitment tx due to broadcast of transaction, waiting confirmation (at height{})", log_bytes!($payment_hash.0), $commitment_tx, height + ANTI_REORG_DELAY - 1);
1866-
match self.onchain_events_waiting_threshold_conf.entry($height + ANTI_REORG_DELAY - 1) {
1867-
hash_map::Entry::Occupied(mut entry) => {
1868-
let e = entry.get_mut();
1869-
e.retain(|ref event| {
1870-
match **event {
1871-
OnchainEvent::HTLCUpdate { ref htlc_update } => {
1872-
return htlc_update.0 != $source
1873-
},
1874-
_ => true
1875-
}
1876-
});
1877-
e.push(OnchainEvent::HTLCUpdate { htlc_update: ($source, $payment_hash)});
1880+
self.onchain_events_waiting_threshold_conf.retain(|ref entry| {
1881+
if entry.height != $height { return true; }
1882+
match entry.event {
1883+
OnchainEvent::HTLCUpdate { ref htlc_update } => {
1884+
htlc_update.0 != $source
1885+
},
1886+
_ => true,
18781887
}
1879-
hash_map::Entry::Vacant(entry) => {
1880-
entry.insert(vec![OnchainEvent::HTLCUpdate { htlc_update: ($source, $payment_hash)}]);
1881-
}
1882-
}
1888+
});
1889+
let entry = OnchainEventEntry {
1890+
height,
1891+
event: OnchainEvent::HTLCUpdate { htlc_update: ($source, $payment_hash) },
1892+
};
1893+
log_trace!(logger, "Failing HTLC with payment_hash {} from {} holder commitment tx due to broadcast of transaction, waiting confirmation (at height{})", log_bytes!($payment_hash.0), $commitment_tx, entry.confirmation_threshold());
1894+
self.onchain_events_waiting_threshold_conf.push(entry);
18831895
}
18841896
}
18851897

@@ -2054,9 +2066,12 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
20542066
}
20552067
claimable_outpoints.append(&mut new_outpoints);
20562068
}
2057-
if let Some(events) = self.onchain_events_waiting_threshold_conf.remove(&height) {
2058-
for ev in events {
2059-
match ev {
2069+
2070+
let onchain_events_waiting_threshold_conf =
2071+
self.onchain_events_waiting_threshold_conf.drain(..).collect::<Vec<_>>();
2072+
for entry in onchain_events_waiting_threshold_conf {
2073+
if entry.has_reached_confirmation_threshold(height) {
2074+
match entry.event {
20602075
OnchainEvent::HTLCUpdate { htlc_update } => {
20612076
log_trace!(logger, "HTLC {} failure update has got enough confirmations to be passed upstream", log_bytes!((htlc_update.1).0));
20622077
self.pending_monitor_events.push(MonitorEvent::HTLCEvent(HTLCUpdate {
@@ -2072,6 +2087,8 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
20722087
});
20732088
}
20742089
}
2090+
} else {
2091+
self.onchain_events_waiting_threshold_conf.push(entry);
20752092
}
20762093
}
20772094

@@ -2108,11 +2125,10 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
21082125
{
21092126
log_trace!(logger, "Block {} at height {} disconnected", header.block_hash(), height);
21102127

2111-
if let Some(_) = self.onchain_events_waiting_threshold_conf.remove(&(height + ANTI_REORG_DELAY - 1)) {
2112-
//We may discard:
2113-
//- htlc update there as failure-trigger tx (revoked commitment tx, non-revoked commitment tx, HTLC-timeout tx) has been disconnected
2114-
//- maturing spendable output has transaction paying us has been disconnected
2115-
}
2128+
//We may discard:
2129+
//- htlc update there as failure-trigger tx (revoked commitment tx, non-revoked commitment tx, HTLC-timeout tx) has been disconnected
2130+
//- maturing spendable output has transaction paying us has been disconnected
2131+
self.onchain_events_waiting_threshold_conf.retain(|ref entry| entry.height != height);
21162132

21172133
self.onchain_tx_handler.block_disconnected(height, broadcaster, fee_estimator, logger);
21182134

@@ -2344,24 +2360,21 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
23442360
}));
23452361
}
23462362
} else {
2347-
log_info!(logger, "Failing HTLC with payment_hash {} timeout by a spend tx, waiting for confirmation (at height{})", log_bytes!(payment_hash.0), height + ANTI_REORG_DELAY - 1);
2348-
match self.onchain_events_waiting_threshold_conf.entry(height + ANTI_REORG_DELAY - 1) {
2349-
hash_map::Entry::Occupied(mut entry) => {
2350-
let e = entry.get_mut();
2351-
e.retain(|ref event| {
2352-
match **event {
2353-
OnchainEvent::HTLCUpdate { ref htlc_update } => {
2354-
return htlc_update.0 != source
2355-
},
2356-
_ => true
2357-
}
2358-
});
2359-
e.push(OnchainEvent::HTLCUpdate { htlc_update: (source, payment_hash)});
2360-
}
2361-
hash_map::Entry::Vacant(entry) => {
2362-
entry.insert(vec![OnchainEvent::HTLCUpdate { htlc_update: (source, payment_hash)}]);
2363+
self.onchain_events_waiting_threshold_conf.retain(|ref entry| {
2364+
if entry.height != height { return true; }
2365+
match entry.event {
2366+
OnchainEvent::HTLCUpdate { ref htlc_update } => {
2367+
htlc_update.0 != source
2368+
},
2369+
_ => true,
23632370
}
2364-
}
2371+
});
2372+
let entry = OnchainEventEntry {
2373+
height,
2374+
event: OnchainEvent::HTLCUpdate { htlc_update: (source, payment_hash) },
2375+
};
2376+
log_info!(logger, "Failing HTLC with payment_hash {} timeout by a spend tx, waiting for confirmation (at height{})", log_bytes!(payment_hash.0), entry.confirmation_threshold());
2377+
self.onchain_events_waiting_threshold_conf.push(entry);
23652378
}
23662379
}
23672380
}
@@ -2420,16 +2433,12 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
24202433
}
24212434
}
24222435
if let Some(spendable_output) = spendable_output {
2423-
log_trace!(logger, "Maturing {} until {}", log_spendable!(spendable_output), height + ANTI_REORG_DELAY - 1);
2424-
match self.onchain_events_waiting_threshold_conf.entry(height + ANTI_REORG_DELAY - 1) {
2425-
hash_map::Entry::Occupied(mut entry) => {
2426-
let e = entry.get_mut();
2427-
e.push(OnchainEvent::MaturingOutput { descriptor: spendable_output });
2428-
}
2429-
hash_map::Entry::Vacant(entry) => {
2430-
entry.insert(vec![OnchainEvent::MaturingOutput { descriptor: spendable_output }]);
2431-
}
2432-
}
2436+
let entry = OnchainEventEntry {
2437+
height: height,
2438+
event: OnchainEvent::MaturingOutput { descriptor: spendable_output.clone() },
2439+
};
2440+
log_trace!(logger, "Maturing {} until {}", log_spendable!(spendable_output), entry.confirmation_threshold());
2441+
self.onchain_events_waiting_threshold_conf.push(entry);
24332442
}
24342443
}
24352444
}
@@ -2695,31 +2704,26 @@ impl<'a, Signer: Sign, K: KeysInterface<Signer = Signer>> ReadableArgs<&'a K>
26952704
let last_block_hash: BlockHash = Readable::read(reader)?;
26962705

26972706
let waiting_threshold_conf_len: u64 = Readable::read(reader)?;
2698-
let mut onchain_events_waiting_threshold_conf = HashMap::with_capacity(cmp::min(waiting_threshold_conf_len as usize, MAX_ALLOC_SIZE / 128));
2707+
let mut onchain_events_waiting_threshold_conf = Vec::with_capacity(cmp::min(waiting_threshold_conf_len as usize, MAX_ALLOC_SIZE / 128));
26992708
for _ in 0..waiting_threshold_conf_len {
2700-
let height_target = Readable::read(reader)?;
2701-
let events_len: u64 = Readable::read(reader)?;
2702-
let mut events = Vec::with_capacity(cmp::min(events_len as usize, MAX_ALLOC_SIZE / 128));
2703-
for _ in 0..events_len {
2704-
let ev = match <u8 as Readable>::read(reader)? {
2705-
0 => {
2706-
let htlc_source = Readable::read(reader)?;
2707-
let hash = Readable::read(reader)?;
2708-
OnchainEvent::HTLCUpdate {
2709-
htlc_update: (htlc_source, hash)
2710-
}
2711-
},
2712-
1 => {
2713-
let descriptor = Readable::read(reader)?;
2714-
OnchainEvent::MaturingOutput {
2715-
descriptor
2716-
}
2717-
},
2718-
_ => return Err(DecodeError::InvalidValue),
2719-
};
2720-
events.push(ev);
2721-
}
2722-
onchain_events_waiting_threshold_conf.insert(height_target, events);
2709+
let height = Readable::read(reader)?;
2710+
let event = match <u8 as Readable>::read(reader)? {
2711+
0 => {
2712+
let htlc_source = Readable::read(reader)?;
2713+
let hash = Readable::read(reader)?;
2714+
OnchainEvent::HTLCUpdate {
2715+
htlc_update: (htlc_source, hash)
2716+
}
2717+
},
2718+
1 => {
2719+
let descriptor = Readable::read(reader)?;
2720+
OnchainEvent::MaturingOutput {
2721+
descriptor
2722+
}
2723+
},
2724+
_ => return Err(DecodeError::InvalidValue),
2725+
};
2726+
onchain_events_waiting_threshold_conf.push(OnchainEventEntry { height, event });
27232727
}
27242728

27252729
let outputs_to_watch_len: u64 = Readable::read(reader)?;

0 commit comments

Comments
 (0)