Skip to content

Commit 1de16c3

Browse files
committed
Add infra to block ChannelMonitorUpdates on forwarded claims
When we forward a payment and receive an `update_fulfill_htlc` message from the downstream channel, we immediately claim the HTLC on the upstream channel, before even doing a `commitment_signed` dance on the downstream channel. This implies that our `ChannelMonitorUpdate`s "go out" in the right order - first we ensure we'll get our money by writing the preimage down, then we write the update that resolves giving money on the downstream node. This is safe as long as `ChannelMonitorUpdate`s complete in the order in which they are generated, but of course looking forward we want to support asynchronous updates, which may complete in any order. Here we add infrastructure to handle downstream `ChannelMonitorUpdate`s which are blocked on an upstream preimage-containing one. We don't yet actually do the blocking which will come in a future commit.
1 parent 59f8add commit 1de16c3

File tree

1 file changed

+122
-26
lines changed

1 file changed

+122
-26
lines changed

lightning/src/ln/channelmanager.rs

Lines changed: 122 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -533,12 +533,24 @@ pub(crate) enum MonitorUpdateCompletionAction {
533533
/// event can be generated.
534534
PaymentClaimed { payment_hash: PaymentHash },
535535
/// Indicates an [`events::Event`] should be surfaced to the user.
536-
EmitEvent { event: events::Event },
536+
EmitEventAndFreeOtherChannel {
537+
event: events::Event,
538+
downstream_counterparty_and_funding_outpoint: Option<(PublicKey, OutPoint, RAAMonitorUpdateBlockingAction)>,
539+
},
537540
}
538541

539542
impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction,
540543
(0, PaymentClaimed) => { (0, payment_hash, required) },
541-
(2, EmitEvent) => { (0, event, upgradable_required) },
544+
(2, EmitEventAndFreeOtherChannel) => {
545+
(0, event, upgradable_required),
546+
// LDK prior to 0.0.115 did not have this field as the monitor update application order was
547+
// required by clients. If we downgrade to something prior to 0.0.115 this may result in
548+
// monitor updates which aren't properly blocked or resumed, however that's fine - we don't
549+
// support async monitor updates even in LDK 0.0.115 and once we do we'll require no
550+
// downgrades to prior versions. Thus, while this would break on downgrade, we don't
551+
// support it even without downgrade, so if it breaks its not on us ¯\_(ツ)_/¯.
552+
(1, downstream_counterparty_and_funding_outpoint, option),
553+
},
542554
);
543555

544556
#[derive(Clone, Debug, PartialEq, Eq)]
@@ -555,6 +567,29 @@ impl_writeable_tlv_based_enum!(EventCompletionAction,
555567
};
556568
);
557569

570+
#[derive(Clone, PartialEq, Eq, Debug)]
571+
pub(crate) enum RAAMonitorUpdateBlockingAction {
572+
/// The inbound channel's channel_id
573+
ForwardedPaymentOtherChannelClaim {
574+
channel_id: [u8; 32],
575+
htlc_id: u64,
576+
},
577+
}
578+
579+
impl RAAMonitorUpdateBlockingAction {
580+
fn from_prev_hop_data(prev_hop: &HTLCPreviousHopData) -> Self {
581+
Self::ForwardedPaymentOtherChannelClaim {
582+
channel_id: prev_hop.outpoint.to_channel_id(),
583+
htlc_id: prev_hop.htlc_id,
584+
}
585+
}
586+
}
587+
588+
impl_writeable_tlv_based_enum!(RAAMonitorUpdateBlockingAction,
589+
(0, ForwardedPaymentOtherChannelClaim) => { (0, channel_id, required), (2, htlc_id, required) }
590+
;);
591+
592+
558593
/// State we hold per-peer.
559594
pub(super) struct PeerState<Signer: ChannelSigner> {
560595
/// `temporary_channel_id` or `channel_id` -> `channel`.
@@ -583,6 +618,11 @@ pub(super) struct PeerState<Signer: ChannelSigner> {
583618
/// to funding appearing on-chain), the downstream `ChannelMonitor` set is required to ensure
584619
/// duplicates do not occur, so such channels should fail without a monitor update completing.
585620
monitor_update_blocked_actions: BTreeMap<[u8; 32], Vec<MonitorUpdateCompletionAction>>,
621+
/// If another channel's [`ChannelMonitorUpdate`] needs to complete before a channel we have
622+
/// with this peer can complete an RAA [`ChannelMonitorUpdate`] (e.g. because the RAA update
623+
/// will remove a preimage that needs to be durably in an upstream channel first), we put an
624+
/// entry here to note that the channel with the key's ID is blocked on a set of actions.
625+
actions_blocking_raa_monitor_updates: BTreeMap<[u8; 32], Vec<RAAMonitorUpdateBlockingAction>>,
586626
/// The peer is currently connected (i.e. we've seen a
587627
/// [`ChannelMessageHandler::peer_connected`] and no corresponding
588628
/// [`ChannelMessageHandler::peer_disconnected`].
@@ -4481,23 +4521,24 @@ where
44814521
},
44824522
HTLCSource::PreviousHopData(hop_data) => {
44834523
let prev_outpoint = hop_data.outpoint;
4524+
let completed_blocker = RAAMonitorUpdateBlockingAction::from_prev_hop_data(&hop_data);
44844525
let res = self.claim_funds_from_hop(hop_data, payment_preimage,
44854526
|htlc_claim_value_msat| {
44864527
if let Some(forwarded_htlc_value) = forwarded_htlc_value_msat {
44874528
let fee_earned_msat = if let Some(claimed_htlc_value) = htlc_claim_value_msat {
44884529
Some(claimed_htlc_value - forwarded_htlc_value)
44894530
} else { None };
44904531

4491-
let prev_channel_id = Some(prev_outpoint.to_channel_id());
4492-
let next_channel_id = Some(next_channel_id);
4493-
4494-
Some(MonitorUpdateCompletionAction::EmitEvent { event: events::Event::PaymentForwarded {
4495-
fee_earned_msat,
4496-
claim_from_onchain_tx: from_onchain,
4497-
prev_channel_id,
4498-
next_channel_id,
4499-
outbound_amount_forwarded_msat: forwarded_htlc_value_msat,
4500-
}})
4532+
Some(MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
4533+
event: events::Event::PaymentForwarded {
4534+
fee_earned_msat,
4535+
claim_from_onchain_tx: from_onchain,
4536+
prev_channel_id: Some(prev_outpoint.to_channel_id()),
4537+
next_channel_id: Some(next_channel_id),
4538+
outbound_amount_forwarded_msat: forwarded_htlc_value_msat,
4539+
},
4540+
downstream_counterparty_and_funding_outpoint: None,
4541+
})
45014542
} else { None }
45024543
});
45034544
if let Err((pk, err)) = res {
@@ -4524,8 +4565,13 @@ where
45244565
}, None));
45254566
}
45264567
},
4527-
MonitorUpdateCompletionAction::EmitEvent { event } => {
4568+
MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
4569+
event, downstream_counterparty_and_funding_outpoint
4570+
} => {
45284571
self.pending_events.lock().unwrap().push_back((event, None));
4572+
if let Some((node_id, funding_outpoint, blocker)) = downstream_counterparty_and_funding_outpoint {
4573+
self.handle_monitor_update_release(node_id, funding_outpoint, Some(blocker));
4574+
}
45294575
},
45304576
}
45314577
}
@@ -5372,6 +5418,36 @@ where
53725418
}
53735419
}
53745420

5421+
fn raa_monitor_updates_held(&self,
5422+
actions_blocking_raa_monitor_updates: &BTreeMap<[u8; 32], Vec<RAAMonitorUpdateBlockingAction>>,
5423+
channel_funding_outpoint: OutPoint, counterparty_node_id: PublicKey
5424+
) -> bool {
5425+
actions_blocking_raa_monitor_updates
5426+
.get(&channel_funding_outpoint.to_channel_id()).map(|v| !v.is_empty()).unwrap_or(false)
5427+
|| self.pending_events.lock().unwrap().iter().any(|(_, action)| {
5428+
action == &Some(EventCompletionAction::ReleaseRAAChannelMonitorUpdate {
5429+
channel_funding_outpoint,
5430+
counterparty_node_id,
5431+
})
5432+
})
5433+
}
5434+
5435+
pub(crate) fn test_raa_monitor_updates_held(&self, counterparty_node_id: PublicKey,
5436+
channel_id: [u8; 32])
5437+
-> bool {
5438+
let per_peer_state = self.per_peer_state.read().unwrap();
5439+
if let Some(peer_state_mtx) = per_peer_state.get(&counterparty_node_id) {
5440+
let mut peer_state_lck = peer_state_mtx.lock().unwrap();
5441+
let peer_state = &mut *peer_state_lck;
5442+
5443+
if let Some(chan) = peer_state.channel_by_id.get(&channel_id) {
5444+
return self.raa_monitor_updates_held(&peer_state.actions_blocking_raa_monitor_updates,
5445+
chan.get_funding_txo().unwrap(), counterparty_node_id);
5446+
}
5447+
}
5448+
false
5449+
}
5450+
53755451
fn internal_revoke_and_ack(&self, counterparty_node_id: &PublicKey, msg: &msgs::RevokeAndACK) -> Result<(), MsgHandleErrInternal> {
53765452
let (htlcs_to_fail, res) = {
53775453
let per_peer_state = self.per_peer_state.read().unwrap();
@@ -6036,25 +6112,29 @@ where
60366112
self.pending_outbound_payments.clear_pending_payments()
60376113
}
60386114

6039-
fn handle_monitor_update_release(&self, counterparty_node_id: PublicKey, channel_funding_outpoint: OutPoint) {
6115+
fn handle_monitor_update_release(&self, counterparty_node_id: PublicKey, channel_funding_outpoint: OutPoint, completed_blocker: Option<RAAMonitorUpdateBlockingAction>) {
60406116
let mut errors = Vec::new();
60416117
loop {
60426118
let per_peer_state = self.per_peer_state.read().unwrap();
60436119
if let Some(peer_state_mtx) = per_peer_state.get(&counterparty_node_id) {
60446120
let mut peer_state_lck = peer_state_mtx.lock().unwrap();
60456121
let peer_state = &mut *peer_state_lck;
6046-
if self.pending_events.lock().unwrap().iter()
6047-
.any(|(_ev, action_opt)| action_opt == &Some(EventCompletionAction::ReleaseRAAChannelMonitorUpdate {
6048-
channel_funding_outpoint, counterparty_node_id
6049-
}))
6050-
{
6051-
// Check that, while holding the peer lock, we don't have another event
6052-
// blocking any monitor updates for this channel. If we do, let those
6053-
// events be the ones that ultimately release the monitor update(s).
6054-
log_trace!(self.logger, "Delaying monitor unlock for channel {} as another event is pending",
6122+
6123+
if let Some(blocker) = &completed_blocker {
6124+
if let Some(blockers) = peer_state.actions_blocking_raa_monitor_updates
6125+
.get_mut(&channel_funding_outpoint.to_channel_id())
6126+
{
6127+
blockers.retain(|iter| iter != blocker);
6128+
}
6129+
}
6130+
6131+
if self.raa_monitor_updates_held(&peer_state.actions_blocking_raa_monitor_updates,
6132+
channel_funding_outpoint, counterparty_node_id) {
6133+
log_trace!(self.logger, "Delaying monitor unlock for channel {} as another channel's mon update needs to complete first",
60556134
log_bytes!(&channel_funding_outpoint.to_channel_id()[..]));
60566135
break;
60576136
}
6137+
60586138
if let hash_map::Entry::Occupied(mut chan) = peer_state.channel_by_id.entry(channel_funding_outpoint.to_channel_id()) {
60596139
debug_assert_eq!(chan.get().get_funding_txo().unwrap(), channel_funding_outpoint);
60606140
if let Some((monitor_update, further_update_exists)) = chan.get_mut().unblock_next_blocked_monitor_update() {
@@ -6096,7 +6176,7 @@ where
60966176
EventCompletionAction::ReleaseRAAChannelMonitorUpdate {
60976177
channel_funding_outpoint, counterparty_node_id
60986178
} => {
6099-
self.handle_monitor_update_release(counterparty_node_id, channel_funding_outpoint);
6179+
self.handle_monitor_update_release(counterparty_node_id, channel_funding_outpoint, None);
61006180
}
61016181
}
61026182
}
@@ -6772,6 +6852,7 @@ where
67726852
latest_features: init_msg.features.clone(),
67736853
pending_msg_events: Vec::new(),
67746854
monitor_update_blocked_actions: BTreeMap::new(),
6855+
actions_blocking_raa_monitor_updates: BTreeMap::new(),
67756856
is_connected: true,
67766857
}));
67776858
},
@@ -7968,6 +8049,7 @@ where
79688049
latest_features: Readable::read(reader)?,
79698050
pending_msg_events: Vec::new(),
79708051
monitor_update_blocked_actions: BTreeMap::new(),
8052+
actions_blocking_raa_monitor_updates: BTreeMap::new(),
79718053
is_connected: false,
79728054
};
79738055
per_peer_state.insert(peer_pubkey, Mutex::new(peer_state));
@@ -8049,7 +8131,7 @@ where
80498131
let mut claimable_htlc_purposes = None;
80508132
let mut claimable_htlc_onion_fields = None;
80518133
let mut pending_claiming_payments = Some(HashMap::new());
8052-
let mut monitor_update_blocked_actions_per_peer = Some(Vec::new());
8134+
let mut monitor_update_blocked_actions_per_peer: Option<Vec<(_, BTreeMap<_, Vec<_>>)>> = Some(Vec::new());
80538135
let mut events_override = None;
80548136
read_tlv_fields!(reader, {
80558137
(1, pending_outbound_payments_no_retry, option),
@@ -8374,7 +8456,21 @@ where
83748456
}
83758457

83768458
for (node_id, monitor_update_blocked_actions) in monitor_update_blocked_actions_per_peer.unwrap() {
8377-
if let Some(peer_state) = per_peer_state.get_mut(&node_id) {
8459+
if let Some(peer_state) = per_peer_state.get(&node_id) {
8460+
for (_, actions) in monitor_update_blocked_actions.iter() {
8461+
for action in actions.iter() {
8462+
if let MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
8463+
downstream_counterparty_and_funding_outpoint:
8464+
Some((blocked_node_id, blocked_channel_outpoint, blocking_action)), ..
8465+
} = action {
8466+
if let Some(blocked_peer_state) = per_peer_state.get(&blocked_node_id) {
8467+
blocked_peer_state.lock().unwrap().actions_blocking_raa_monitor_updates
8468+
.entry(blocked_channel_outpoint.to_channel_id())
8469+
.or_insert_with(Vec::new).push(blocking_action.clone());
8470+
}
8471+
}
8472+
}
8473+
}
83788474
peer_state.lock().unwrap().monitor_update_blocked_actions = monitor_update_blocked_actions;
83798475
} else {
83808476
log_error!(args.logger, "Got blocked actions without a per-peer-state for {}", node_id);

0 commit comments

Comments
 (0)