Skip to content

Commit b2b8714

Browse files
committed
Add infra to block ChannelMonitorUpdates on forwarded claims
When we forward a payment and receive an `update_fulfill_htlc` message from the downstream channel, we immediately claim the HTLC on the upstream channel, before even doing a `commitment_signed` dance on the downstream channel. This implies that our `ChannelMonitorUpdate`s "go out" in the right order - first we ensure we'll get our money by writing the preimage down, then we write the update that resolves giving money on the downstream node. This is safe as long as `ChannelMonitorUpdate`s complete in the order in which they are generated, but of course looking forward we want to support asynchronous updates, which may complete in any order. Here we add infrastructure to handle downstream `ChannelMonitorUpdate`s which are blocked on an upstream preimage-containing one. We don't yet actually do the blocking which will come in a future commit.
1 parent 4262e75 commit b2b8714

File tree

1 file changed

+122
-26
lines changed

1 file changed

+122
-26
lines changed

lightning/src/ln/channelmanager.rs

Lines changed: 122 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -528,12 +528,24 @@ pub(crate) enum MonitorUpdateCompletionAction {
528528
/// event can be generated.
529529
PaymentClaimed { payment_hash: PaymentHash },
530530
/// Indicates an [`events::Event`] should be surfaced to the user.
531-
EmitEvent { event: events::Event },
531+
EmitEventAndFreeOtherChannel {
532+
event: events::Event,
533+
downstream_counterparty_and_funding_outpoint: Option<(PublicKey, OutPoint, RAAMonitorUpdateBlockingAction)>,
534+
},
532535
}
533536

534537
impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction,
535538
(0, PaymentClaimed) => { (0, payment_hash, required) },
536-
(2, EmitEvent) => { (0, event, upgradable_required) },
539+
(2, EmitEventAndFreeOtherChannel) => {
540+
(0, event, upgradable_required),
541+
// LDK prior to 0.0.115 did not have this field as the monitor update application order was
542+
// required by clients. If we downgrade to something prior to 0.0.115 this may result in
543+
// monitor updates which aren't properly blocked or resumed, however that's fine - we don't
544+
// support async monitor updates even in LDK 0.0.115 and once we do we'll require no
545+
// downgrades to prior versions. Thus, while this would break on downgrade, we don't
546+
// support it even without downgrade, so if it breaks its not on us ¯\_(ツ)_/¯.
547+
(1, downstream_counterparty_and_funding_outpoint, option),
548+
},
537549
);
538550

539551
#[derive(Clone, Debug, PartialEq, Eq)]
@@ -550,6 +562,29 @@ impl_writeable_tlv_based_enum!(EventCompletionAction,
550562
};
551563
);
552564

565+
#[derive(Clone, PartialEq, Eq, Debug)]
566+
pub(crate) enum RAAMonitorUpdateBlockingAction {
567+
/// The inbound channel's channel_id
568+
ForwardedPaymentOtherChannelClaim {
569+
channel_id: [u8; 32],
570+
htlc_id: u64,
571+
},
572+
}
573+
574+
impl RAAMonitorUpdateBlockingAction {
575+
fn from_prev_hop_data(prev_hop: &HTLCPreviousHopData) -> Self {
576+
Self::ForwardedPaymentOtherChannelClaim {
577+
channel_id: prev_hop.outpoint.to_channel_id(),
578+
htlc_id: prev_hop.htlc_id,
579+
}
580+
}
581+
}
582+
583+
impl_writeable_tlv_based_enum!(RAAMonitorUpdateBlockingAction,
584+
(0, ForwardedPaymentOtherChannelClaim) => { (0, channel_id, required), (2, htlc_id, required) }
585+
;);
586+
587+
553588
/// State we hold per-peer.
554589
pub(super) struct PeerState<Signer: ChannelSigner> {
555590
/// `temporary_channel_id` or `channel_id` -> `channel`.
@@ -578,6 +613,11 @@ pub(super) struct PeerState<Signer: ChannelSigner> {
578613
/// to funding appearing on-chain), the downstream `ChannelMonitor` set is required to ensure
579614
/// duplicates do not occur, so such channels should fail without a monitor update completing.
580615
monitor_update_blocked_actions: BTreeMap<[u8; 32], Vec<MonitorUpdateCompletionAction>>,
616+
/// If another channel's [`ChannelMonitorUpdate`] needs to complete before a channel we have
617+
/// with this peer can complete an RAA [`ChannelMonitorUpdate`] (e.g. because the RAA update
618+
/// will remove a preimage that needs to be durably in an upstream channel first), we put an
619+
/// entry here to note that the channel with the key's ID is blocked on a set of actions.
620+
actions_blocking_raa_monitor_updates: BTreeMap<[u8; 32], Vec<RAAMonitorUpdateBlockingAction>>,
581621
/// The peer is currently connected (i.e. we've seen a
582622
/// [`ChannelMessageHandler::peer_connected`] and no corresponding
583623
/// [`ChannelMessageHandler::peer_disconnected`].
@@ -4462,23 +4502,24 @@ where
44624502
},
44634503
HTLCSource::PreviousHopData(hop_data) => {
44644504
let prev_outpoint = hop_data.outpoint;
4505+
let completed_blocker = RAAMonitorUpdateBlockingAction::from_prev_hop_data(&hop_data);
44654506
let res = self.claim_funds_from_hop(hop_data, payment_preimage,
44664507
|htlc_claim_value_msat| {
44674508
if let Some(forwarded_htlc_value) = forwarded_htlc_value_msat {
44684509
let fee_earned_msat = if let Some(claimed_htlc_value) = htlc_claim_value_msat {
44694510
Some(claimed_htlc_value - forwarded_htlc_value)
44704511
} else { None };
44714512

4472-
let prev_channel_id = Some(prev_outpoint.to_channel_id());
4473-
let next_channel_id = Some(next_channel_id);
4474-
4475-
Some(MonitorUpdateCompletionAction::EmitEvent { event: events::Event::PaymentForwarded {
4476-
fee_earned_msat,
4477-
claim_from_onchain_tx: from_onchain,
4478-
prev_channel_id,
4479-
next_channel_id,
4480-
outbound_amount_forwarded_msat: forwarded_htlc_value_msat,
4481-
}})
4513+
Some(MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
4514+
event: events::Event::PaymentForwarded {
4515+
fee_earned_msat,
4516+
claim_from_onchain_tx: from_onchain,
4517+
prev_channel_id: Some(prev_outpoint.to_channel_id()),
4518+
next_channel_id: Some(next_channel_id),
4519+
outbound_amount_forwarded_msat: forwarded_htlc_value_msat,
4520+
},
4521+
downstream_counterparty_and_funding_outpoint: None,
4522+
})
44824523
} else { None }
44834524
});
44844525
if let Err((pk, err)) = res {
@@ -4505,8 +4546,13 @@ where
45054546
}, None));
45064547
}
45074548
},
4508-
MonitorUpdateCompletionAction::EmitEvent { event } => {
4549+
MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
4550+
event, downstream_counterparty_and_funding_outpoint
4551+
} => {
45094552
self.pending_events.lock().unwrap().push_back((event, None));
4553+
if let Some((node_id, funding_outpoint, blocker)) = downstream_counterparty_and_funding_outpoint {
4554+
self.handle_monitor_update_release(node_id, funding_outpoint, Some(blocker));
4555+
}
45104556
},
45114557
}
45124558
}
@@ -5353,6 +5399,36 @@ where
53535399
}
53545400
}
53555401

5402+
fn raa_monitor_updates_held(&self,
5403+
actions_blocking_raa_monitor_updates: &BTreeMap<[u8; 32], Vec<RAAMonitorUpdateBlockingAction>>,
5404+
channel_funding_outpoint: OutPoint, counterparty_node_id: PublicKey
5405+
) -> bool {
5406+
actions_blocking_raa_monitor_updates
5407+
.get(&channel_funding_outpoint.to_channel_id()).map(|v| !v.is_empty()).unwrap_or(false)
5408+
|| self.pending_events.lock().unwrap().iter().any(|(_, action)| {
5409+
action == &Some(EventCompletionAction::ReleaseRAAChannelMonitorUpdate {
5410+
channel_funding_outpoint,
5411+
counterparty_node_id,
5412+
})
5413+
})
5414+
}
5415+
5416+
pub(crate) fn test_raa_monitor_updates_held(&self, counterparty_node_id: PublicKey,
5417+
channel_id: [u8; 32])
5418+
-> bool {
5419+
let per_peer_state = self.per_peer_state.read().unwrap();
5420+
if let Some(peer_state_mtx) = per_peer_state.get(&counterparty_node_id) {
5421+
let mut peer_state_lck = peer_state_mtx.lock().unwrap();
5422+
let peer_state = &mut *peer_state_lck;
5423+
5424+
if let Some(chan) = peer_state.channel_by_id.get(&channel_id) {
5425+
return self.raa_monitor_updates_held(&peer_state.actions_blocking_raa_monitor_updates,
5426+
chan.get_funding_txo().unwrap(), counterparty_node_id);
5427+
}
5428+
}
5429+
false
5430+
}
5431+
53565432
fn internal_revoke_and_ack(&self, counterparty_node_id: &PublicKey, msg: &msgs::RevokeAndACK) -> Result<(), MsgHandleErrInternal> {
53575433
let (htlcs_to_fail, res) = {
53585434
let per_peer_state = self.per_peer_state.read().unwrap();
@@ -6016,25 +6092,29 @@ where
60166092
self.pending_outbound_payments.clear_pending_payments()
60176093
}
60186094

6019-
fn handle_monitor_update_release(&self, counterparty_node_id: PublicKey, channel_funding_outpoint: OutPoint) {
6095+
fn handle_monitor_update_release(&self, counterparty_node_id: PublicKey, channel_funding_outpoint: OutPoint, completed_blocker: Option<RAAMonitorUpdateBlockingAction>) {
60206096
let mut errors = Vec::new();
60216097
loop {
60226098
let per_peer_state = self.per_peer_state.read().unwrap();
60236099
if let Some(peer_state_mtx) = per_peer_state.get(&counterparty_node_id) {
60246100
let mut peer_state_lck = peer_state_mtx.lock().unwrap();
60256101
let peer_state = &mut *peer_state_lck;
6026-
if self.pending_events.lock().unwrap().iter()
6027-
.any(|(_ev, action_opt)| action_opt == &Some(EventCompletionAction::ReleaseRAAChannelMonitorUpdate {
6028-
channel_funding_outpoint, counterparty_node_id
6029-
}))
6030-
{
6031-
// Check that, while holding the peer lock, we don't have another event
6032-
// blocking any monitor updates for this channel. If we do, let those
6033-
// events be the ones that ultimately release the monitor update(s).
6034-
log_trace!(self.logger, "Delaying monitor unlock for channel {} as another event is pending",
6102+
6103+
if let Some(blocker) = &completed_blocker {
6104+
if let Some(blockers) = peer_state.actions_blocking_raa_monitor_updates
6105+
.get_mut(&channel_funding_outpoint.to_channel_id())
6106+
{
6107+
blockers.retain(|iter| iter != blocker);
6108+
}
6109+
}
6110+
6111+
if self.raa_monitor_updates_held(&peer_state.actions_blocking_raa_monitor_updates,
6112+
channel_funding_outpoint, counterparty_node_id) {
6113+
log_trace!(self.logger, "Delaying monitor unlock for channel {} as another channel's mon update needs to complete first",
60356114
log_bytes!(&channel_funding_outpoint.to_channel_id()[..]));
60366115
break;
60376116
}
6117+
60386118
if let hash_map::Entry::Occupied(mut chan) = peer_state.channel_by_id.entry(channel_funding_outpoint.to_channel_id()) {
60396119
debug_assert_eq!(chan.get().get_funding_txo().unwrap(), channel_funding_outpoint);
60406120
if let Some((monitor_update, further_update_exists)) = chan.get_mut().unblock_next_blocked_monitor_update() {
@@ -6076,7 +6156,7 @@ where
60766156
EventCompletionAction::ReleaseRAAChannelMonitorUpdate {
60776157
channel_funding_outpoint, counterparty_node_id
60786158
} => {
6079-
self.handle_monitor_update_release(counterparty_node_id, channel_funding_outpoint);
6159+
self.handle_monitor_update_release(counterparty_node_id, channel_funding_outpoint, None);
60806160
}
60816161
}
60826162
}
@@ -6752,6 +6832,7 @@ where
67526832
latest_features: init_msg.features.clone(),
67536833
pending_msg_events: Vec::new(),
67546834
monitor_update_blocked_actions: BTreeMap::new(),
6835+
actions_blocking_raa_monitor_updates: BTreeMap::new(),
67556836
is_connected: true,
67566837
}));
67576838
},
@@ -7946,6 +8027,7 @@ where
79468027
latest_features: Readable::read(reader)?,
79478028
pending_msg_events: Vec::new(),
79488029
monitor_update_blocked_actions: BTreeMap::new(),
8030+
actions_blocking_raa_monitor_updates: BTreeMap::new(),
79498031
is_connected: false,
79508032
};
79518033
per_peer_state.insert(peer_pubkey, Mutex::new(peer_state));
@@ -8026,7 +8108,7 @@ where
80268108
let mut claimable_htlc_purposes = None;
80278109
let mut claimable_htlc_onion_fields = None;
80288110
let mut pending_claiming_payments = Some(HashMap::new());
8029-
let mut monitor_update_blocked_actions_per_peer = Some(Vec::new());
8111+
let mut monitor_update_blocked_actions_per_peer: Option<Vec<(_, BTreeMap<_, Vec<_>>)>> = Some(Vec::new());
80308112
let mut events_override = None;
80318113
read_tlv_fields!(reader, {
80328114
(1, pending_outbound_payments_no_retry, option),
@@ -8351,7 +8433,21 @@ where
83518433
}
83528434

83538435
for (node_id, monitor_update_blocked_actions) in monitor_update_blocked_actions_per_peer.unwrap() {
8354-
if let Some(peer_state) = per_peer_state.get_mut(&node_id) {
8436+
if let Some(peer_state) = per_peer_state.get(&node_id) {
8437+
for (_, actions) in monitor_update_blocked_actions.iter() {
8438+
for action in actions.iter() {
8439+
if let MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
8440+
downstream_counterparty_and_funding_outpoint:
8441+
Some((blocked_node_id, blocked_channel_outpoint, blocking_action)), ..
8442+
} = action {
8443+
if let Some(blocked_peer_state) = per_peer_state.get(&blocked_node_id) {
8444+
blocked_peer_state.lock().unwrap().actions_blocking_raa_monitor_updates
8445+
.entry(blocked_channel_outpoint.to_channel_id())
8446+
.or_insert_with(Vec::new).push(blocking_action.clone());
8447+
}
8448+
}
8449+
}
8450+
}
83558451
peer_state.lock().unwrap().monitor_update_blocked_actions = monitor_update_blocked_actions;
83568452
} else {
83578453
log_error!(args.logger, "Got blocked actions without a per-peer-state for {}", node_id);

0 commit comments

Comments
 (0)