Skip to content

Commit 43f5506

Browse files
committed
Move the bucketed history tracking logic into a scoring submodule
1 parent 85b0cd4 commit 43f5506

File tree

1 file changed

+160
-155
lines changed

1 file changed

+160
-155
lines changed

lightning/src/routing/scoring.rs

Lines changed: 160 additions & 155 deletions
Original file line numberDiff line numberDiff line change
@@ -540,161 +540,6 @@ pub struct ProbabilisticScoringParameters {
540540
pub considered_impossible_penalty_msat: u64,
541541
}
542542

543-
/// Tracks the historical state of a distribution as a weighted average of how much time was spent
544-
/// in each of 8 buckets.
545-
#[derive(Clone, Copy)]
546-
struct HistoricalBucketRangeTracker {
547-
buckets: [u16; 8],
548-
}
549-
550-
impl HistoricalBucketRangeTracker {
551-
fn new() -> Self { Self { buckets: [0; 8] } }
552-
fn track_datapoint(&mut self, liquidity_offset_msat: u64, capacity_msat: u64) {
553-
// We have 8 leaky buckets for min and max liquidity. Each bucket tracks the amount of time
554-
// we spend in each bucket as a 16-bit fixed-point number with a 5 bit fractional part.
555-
//
556-
// Each time we update our liquidity estimate, we add 32 (1.0 in our fixed-point system) to
557-
// the buckets for the current min and max liquidity offset positions.
558-
//
559-
// We then decay each bucket by multiplying by 2047/2048 (avoiding dividing by a
560-
// non-power-of-two). This ensures we can't actually overflow the u16 - when we get to
561-
// 63,457 adding 32 and decaying by 2047/2048 leaves us back at 63,457.
562-
//
563-
// In total, this allows us to track data for the last 8,000 or so payments across a given
564-
// channel.
565-
//
566-
// These constants are a balance - we try to fit in 2 bytes per bucket to reduce overhead,
567-
// and need to balance having more bits in the decimal part (to ensure decay isn't too
568-
// non-linear) with having too few bits in the mantissa, causing us to not store very many
569-
// datapoints.
570-
//
571-
// The constants were picked experimentally, selecting a decay amount that restricts us
572-
// from overflowing buckets without having to cap them manually.
573-
574-
// Ensure the bucket index is in the range [0, 7], even if the liquidity offset is zero or
575-
// the channel's capacity, though the second should generally never happen.
576-
debug_assert!(liquidity_offset_msat <= capacity_msat);
577-
let bucket_idx: u8 = (liquidity_offset_msat * 8 / capacity_msat.saturating_add(1))
578-
.try_into().unwrap_or(32); // 32 is bogus for 8 buckets, and will be ignored
579-
debug_assert!(bucket_idx < 8);
580-
if bucket_idx < 8 {
581-
for e in self.buckets.iter_mut() {
582-
*e = ((*e as u32) * 2047 / 2048) as u16;
583-
}
584-
self.buckets[bucket_idx as usize] = self.buckets[bucket_idx as usize].saturating_add(32);
585-
}
586-
}
587-
/// Decay all buckets by the given number of half-lives. Used to more aggressively remove old
588-
/// datapoints as we receive newer information.
589-
fn time_decay_data(&mut self, half_lives: u32) {
590-
for e in self.buckets.iter_mut() {
591-
*e = e.checked_shr(half_lives).unwrap_or(0);
592-
}
593-
}
594-
}
595-
596-
impl_writeable_tlv_based!(HistoricalBucketRangeTracker, { (0, buckets, required) });
597-
598-
struct HistoricalMinMaxBuckets<'a> {
599-
min_liquidity_offset_history: &'a HistoricalBucketRangeTracker,
600-
max_liquidity_offset_history: &'a HistoricalBucketRangeTracker,
601-
}
602-
603-
impl HistoricalMinMaxBuckets<'_> {
604-
#[inline]
605-
fn get_decayed_buckets<T: Time>(&self, now: T, last_updated: T, half_life: Duration)
606-
-> ([u16; 8], [u16; 8], u32) {
607-
let required_decays = now.duration_since(last_updated).as_secs()
608-
.checked_div(half_life.as_secs())
609-
.map_or(u32::max_value(), |decays| cmp::min(decays, u32::max_value() as u64) as u32);
610-
let mut min_buckets = *self.min_liquidity_offset_history;
611-
min_buckets.time_decay_data(required_decays);
612-
let mut max_buckets = *self.max_liquidity_offset_history;
613-
max_buckets.time_decay_data(required_decays);
614-
(min_buckets.buckets, max_buckets.buckets, required_decays)
615-
}
616-
617-
#[inline]
618-
fn calculate_success_probability_times_billion<T: Time>(
619-
&self, now: T, last_updated: T, half_life: Duration, amount_msat: u64, capacity_msat: u64)
620-
-> Option<u64> {
621-
// If historical penalties are enabled, calculate the penalty by walking the set of
622-
// historical liquidity bucket (min, max) combinations (where min_idx < max_idx) and, for
623-
// each, calculate the probability of success given our payment amount, then total the
624-
// weighted average probability of success.
625-
//
626-
// We use a sliding scale to decide which point within a given bucket will be compared to
627-
// the amount being sent - for lower-bounds, the amount being sent is compared to the lower
628-
// edge of the first bucket (i.e. zero), but compared to the upper 7/8ths of the last
629-
// bucket (i.e. 9 times the index, or 63), with each bucket in between increasing the
630-
// comparison point by 1/64th. For upper-bounds, the same applies, however with an offset
631-
// of 1/64th (i.e. starting at one and ending at 64). This avoids failing to assign
632-
// penalties to channels at the edges.
633-
//
634-
// If we used the bottom edge of buckets, we'd end up never assigning any penalty at all to
635-
// such a channel when sending less than ~0.19% of the channel's capacity (e.g. ~200k sats
636-
// for a 1 BTC channel!).
637-
//
638-
// If we used the middle of each bucket we'd never assign any penalty at all when sending
639-
// less than 1/16th of a channel's capacity, or 1/8th if we used the top of the bucket.
640-
let mut total_valid_points_tracked = 0;
641-
642-
let payment_amt_64th_bucket: u8 = if amount_msat < u64::max_value() / 64 {
643-
(amount_msat * 64 / capacity_msat.saturating_add(1))
644-
.try_into().unwrap_or(65)
645-
} else {
646-
// Only use 128-bit arithmetic when multiplication will overflow to avoid 128-bit
647-
// division. This branch should only be hit in fuzz testing since the amount would
648-
// need to be over 2.88 million BTC in practice.
649-
((amount_msat as u128) * 64 / (capacity_msat as u128).saturating_add(1))
650-
.try_into().unwrap_or(65)
651-
};
652-
#[cfg(not(fuzzing))]
653-
debug_assert!(payment_amt_64th_bucket <= 64);
654-
if payment_amt_64th_bucket >= 64 { return None; }
655-
656-
// Check if all our buckets are zero, once decayed and treat it as if we had no data. We
657-
// don't actually use the decayed buckets, though, as that would lose precision.
658-
let (decayed_min_buckets, decayed_max_buckets, required_decays) =
659-
self.get_decayed_buckets(now, last_updated, half_life);
660-
if decayed_min_buckets.iter().all(|v| *v == 0) || decayed_max_buckets.iter().all(|v| *v == 0) {
661-
return None;
662-
}
663-
664-
for (min_idx, min_bucket) in self.min_liquidity_offset_history.buckets.iter().enumerate() {
665-
for max_bucket in self.max_liquidity_offset_history.buckets.iter().take(8 - min_idx) {
666-
total_valid_points_tracked += (*min_bucket as u64) * (*max_bucket as u64);
667-
}
668-
}
669-
// If the total valid points is smaller than 1.0 (i.e. 32 in our fixed-point scheme), treat
670-
// it as if we were fully decayed.
671-
if total_valid_points_tracked.checked_shr(required_decays).unwrap_or(0) < 32*32 {
672-
return None;
673-
}
674-
675-
let mut cumulative_success_prob_times_billion = 0;
676-
for (min_idx, min_bucket) in self.min_liquidity_offset_history.buckets.iter().enumerate() {
677-
for (max_idx, max_bucket) in self.max_liquidity_offset_history.buckets.iter().enumerate().take(8 - min_idx) {
678-
let bucket_prob_times_million = (*min_bucket as u64) * (*max_bucket as u64)
679-
* 1024 * 1024 / total_valid_points_tracked;
680-
let min_64th_bucket = min_idx as u8 * 9;
681-
let max_64th_bucket = (7 - max_idx as u8) * 9 + 1;
682-
if payment_amt_64th_bucket > max_64th_bucket {
683-
// Success probability 0, the payment amount is above the max liquidity
684-
} else if payment_amt_64th_bucket <= min_64th_bucket {
685-
cumulative_success_prob_times_billion += bucket_prob_times_million * 1024;
686-
} else {
687-
cumulative_success_prob_times_billion += bucket_prob_times_million *
688-
((max_64th_bucket - payment_amt_64th_bucket) as u64) * 1024 /
689-
((max_64th_bucket - min_64th_bucket) as u64);
690-
}
691-
}
692-
}
693-
694-
Some(cumulative_success_prob_times_billion)
695-
}
696-
}
697-
698543
/// Accounting for channel liquidity balance uncertainty.
699544
///
700545
/// Direction is defined in terms of [`NodeId`] partial ordering, where the source node is the
@@ -1666,6 +1511,166 @@ mod approx {
16661511
}
16671512
}
16681513

1514+
mod bucketed_history {
1515+
use super::*;
1516+
1517+
/// Tracks the historical state of a distribution as a weighted average of how much time was spent
1518+
/// in each of 8 buckets.
1519+
#[derive(Clone, Copy)]
1520+
pub(super) struct HistoricalBucketRangeTracker {
1521+
buckets: [u16; 8],
1522+
}
1523+
1524+
impl HistoricalBucketRangeTracker {
1525+
pub(super) fn new() -> Self { Self { buckets: [0; 8] } }
1526+
pub(super) fn track_datapoint(&mut self, liquidity_offset_msat: u64, capacity_msat: u64) {
1527+
// We have 8 leaky buckets for min and max liquidity. Each bucket tracks the amount of time
1528+
// we spend in each bucket as a 16-bit fixed-point number with a 5 bit fractional part.
1529+
//
1530+
// Each time we update our liquidity estimate, we add 32 (1.0 in our fixed-point system) to
1531+
// the buckets for the current min and max liquidity offset positions.
1532+
//
1533+
// We then decay each bucket by multiplying by 2047/2048 (avoiding dividing by a
1534+
// non-power-of-two). This ensures we can't actually overflow the u16 - when we get to
1535+
// 63,457 adding 32 and decaying by 2047/2048 leaves us back at 63,457.
1536+
//
1537+
// In total, this allows us to track data for the last 8,000 or so payments across a given
1538+
// channel.
1539+
//
1540+
// These constants are a balance - we try to fit in 2 bytes per bucket to reduce overhead,
1541+
// and need to balance having more bits in the decimal part (to ensure decay isn't too
1542+
// non-linear) with having too few bits in the mantissa, causing us to not store very many
1543+
// datapoints.
1544+
//
1545+
// The constants were picked experimentally, selecting a decay amount that restricts us
1546+
// from overflowing buckets without having to cap them manually.
1547+
1548+
// Ensure the bucket index is in the range [0, 7], even if the liquidity offset is zero or
1549+
// the channel's capacity, though the second should generally never happen.
1550+
debug_assert!(liquidity_offset_msat <= capacity_msat);
1551+
let bucket_idx: u8 = (liquidity_offset_msat * 32 / capacity_msat.saturating_add(1))
1552+
.try_into().unwrap_or(32); // 32 is bogus for 32 buckets, and will be ignored
1553+
debug_assert!(bucket_idx < 8);
1554+
if bucket_idx < 8 {
1555+
for e in self.buckets.iter_mut() {
1556+
*e = ((*e as u32) * 2047 / 2048) as u16;
1557+
}
1558+
self.buckets[bucket_idx as usize] = self.buckets[bucket_idx as usize].saturating_add(32);
1559+
}
1560+
}
1561+
/// Decay all buckets by the given number of half-lives. Used to more aggressively remove old
1562+
/// datapoints as we receive newer information.
1563+
pub(super) fn time_decay_data(&mut self, half_lives: u32) {
1564+
for e in self.buckets.iter_mut() {
1565+
*e = e.checked_shr(half_lives).unwrap_or(0);
1566+
}
1567+
}
1568+
}
1569+
1570+
impl_writeable_tlv_based!(HistoricalBucketRangeTracker, { (0, buckets, required) });
1571+
1572+
pub(super) struct HistoricalMinMaxBuckets<'a> {
1573+
pub(super) min_liquidity_offset_history: &'a HistoricalBucketRangeTracker,
1574+
pub(super) max_liquidity_offset_history: &'a HistoricalBucketRangeTracker,
1575+
}
1576+
1577+
impl HistoricalMinMaxBuckets<'_> {
1578+
#[inline]
1579+
pub(super) fn get_decayed_buckets<T: Time>(&self, now: T, last_updated: T, half_life: Duration)
1580+
-> ([u16; 8], [u16; 8], u32) {
1581+
let required_decays = now.duration_since(last_updated).as_secs()
1582+
.checked_div(half_life.as_secs())
1583+
.map_or(u32::max_value(), |decays| cmp::min(decays, u32::max_value() as u64) as u32);
1584+
let mut min_buckets = *self.min_liquidity_offset_history;
1585+
min_buckets.time_decay_data(required_decays);
1586+
let mut max_buckets = *self.max_liquidity_offset_history;
1587+
max_buckets.time_decay_data(required_decays);
1588+
(min_buckets.buckets, max_buckets.buckets, required_decays)
1589+
}
1590+
1591+
#[inline]
1592+
pub(super) fn calculate_success_probability_times_billion<T: Time>(
1593+
&self, now: T, last_updated: T, half_life: Duration, amount_msat: u64, capacity_msat: u64)
1594+
-> Option<u64> {
1595+
// If historical penalties are enabled, calculate the penalty by walking the set of
1596+
// historical liquidity bucket (min, max) combinations (where min_idx < max_idx) and, for
1597+
// each, calculate the probability of success given our payment amount, then total the
1598+
// weighted average probability of success.
1599+
//
1600+
// We use a sliding scale to decide which point within a given bucket will be compared to
1601+
// the amount being sent - for lower-bounds, the amount being sent is compared to the lower
1602+
// edge of the first bucket (i.e. zero), but compared to the upper 7/8ths of the last
1603+
// bucket (i.e. 9 times the index, or 63), with each bucket in between increasing the
1604+
// comparison point by 1/64th. For upper-bounds, the same applies, however with an offset
1605+
// of 1/64th (i.e. starting at one and ending at 64). This avoids failing to assign
1606+
// penalties to channels at the edges.
1607+
//
1608+
// If we used the bottom edge of buckets, we'd end up never assigning any penalty at all to
1609+
// such a channel when sending less than ~0.19% of the channel's capacity (e.g. ~200k sats
1610+
// for a 1 BTC channel!).
1611+
//
1612+
// If we used the middle of each bucket we'd never assign any penalty at all when sending
1613+
// less than 1/16th of a channel's capacity, or 1/8th if we used the top of the bucket.
1614+
let mut total_valid_points_tracked = 0;
1615+
1616+
let payment_amt_64th_bucket: u16 = if amount_msat < u64::max_value() / 1024 {
1617+
(amount_msat * 1024 / capacity_msat.saturating_add(1))
1618+
.try_into().unwrap_or(65)
1619+
} else {
1620+
// Only use 128-bit arithmetic when multiplication will overflow to avoid 128-bit
1621+
// division. This branch should only be hit in fuzz testing since the amount would
1622+
// need to be over 2.88 million BTC in practice.
1623+
((amount_msat as u128) * 1024 / (capacity_msat as u128).saturating_add(1))
1624+
.try_into().unwrap_or(65)
1625+
};
1626+
#[cfg(not(fuzzing))]
1627+
debug_assert!(payment_amt_64th_bucket <= 64);
1628+
if payment_amt_64th_bucket >= 64 { return None; }
1629+
1630+
// Check if all our buckets are zero, once decayed and treat it as if we had no data. We
1631+
// don't actually use the decayed buckets, though, as that would lose precision.
1632+
let (decayed_min_buckets, decayed_max_buckets, required_decays) =
1633+
self.get_decayed_buckets(now, last_updated, half_life);
1634+
if decayed_min_buckets.iter().all(|v| *v == 0) || decayed_max_buckets.iter().all(|v| *v == 0) {
1635+
return None;
1636+
}
1637+
1638+
for (min_idx, min_bucket) in self.min_liquidity_offset_history.buckets.iter().enumerate() {
1639+
for max_bucket in self.max_liquidity_offset_history.buckets.iter().take(8 - min_idx) {
1640+
total_valid_points_tracked += (*min_bucket as u64) * (*max_bucket as u64);
1641+
}
1642+
}
1643+
// If the total valid points is smaller than 1.0 (i.e. 32 in our fixed-point scheme), treat
1644+
// it as if we were fully decayed.
1645+
if total_valid_points_tracked.checked_shr(required_decays).unwrap_or(0) < 32*32 {
1646+
return None;
1647+
}
1648+
1649+
let mut cumulative_success_prob_times_billion = 0;
1650+
for (min_idx, min_bucket) in self.min_liquidity_offset_history.buckets.iter().enumerate() {
1651+
for (max_idx, max_bucket) in self.max_liquidity_offset_history.buckets.iter().enumerate().take(8 - min_idx) {
1652+
let min_64th_bucket = min_idx as u16 * 33;
1653+
let max_64th_bucket = (31 - max_idx as u16) * 33 + 1;
1654+
let bucket_prob_times_million = (*min_bucket as u64) * (*max_bucket as u64)
1655+
* 1024 * 1024 / total_valid_points_tracked;
1656+
if payment_amt_64th_bucket > max_64th_bucket {
1657+
// Success probability 0, the payment amount is above the max liquidity
1658+
} else if payment_amt_64th_bucket <= min_64th_bucket {
1659+
cumulative_success_prob_times_billion += bucket_prob_times_million * 1024;
1660+
} else {
1661+
cumulative_success_prob_times_billion += bucket_prob_times_million *
1662+
((max_64th_bucket - payment_amt_64th_bucket) as u64) * 1024 /
1663+
((max_64th_bucket - min_64th_bucket) as u64);
1664+
}
1665+
}
1666+
}
1667+
1668+
Some(cumulative_success_prob_times_billion)
1669+
}
1670+
}
1671+
}
1672+
use bucketed_history::{HistoricalBucketRangeTracker, HistoricalMinMaxBuckets};
1673+
16691674
impl<G: Deref<Target = NetworkGraph<L>>, L: Deref, T: Time> Writeable for ProbabilisticScorerUsingTime<G, L, T> where L::Target: Logger {
16701675
#[inline]
16711676
fn write<W: Writer>(&self, w: &mut W) -> Result<(), io::Error> {

0 commit comments

Comments
 (0)