Skip to content

Commit 66c0f70

Browse files
committed
Add infra to block ChannelMonitorUpdates on forwarded claims
When we forward a payment and receive an `update_fulfill_htlc` message from the downstream channel, we immediately claim the HTLC on the upstream channel, before even doing a `commitment_signed` dance on the downstream channel. This implies that our `ChannelMonitorUpdate`s "go out" in the right order - first we ensure we'll get our money by writing the preimage down, then we write the update that resolves giving money on the downstream node. This is safe as long as `ChannelMonitorUpdate`s complete in the order in which they are generated, but of course looking forward we want to support asynchronous updates, which may complete in any order. Here we add infrastructure to handle downstream `ChannelMonitorUpdate`s which are blocked on an upstream preimage-containing one. We don't yet actually do the blocking which will come in a future commit.
1 parent 1256bba commit 66c0f70

File tree

1 file changed

+122
-26
lines changed

1 file changed

+122
-26
lines changed

lightning/src/ln/channelmanager.rs

Lines changed: 122 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -535,12 +535,24 @@ pub(crate) enum MonitorUpdateCompletionAction {
535535
/// event can be generated.
536536
PaymentClaimed { payment_hash: PaymentHash },
537537
/// Indicates an [`events::Event`] should be surfaced to the user.
538-
EmitEvent { event: events::Event },
538+
EmitEventAndFreeOtherChannel {
539+
event: events::Event,
540+
downstream_counterparty_and_funding_outpoint: Option<(PublicKey, OutPoint, RAAMonitorUpdateBlockingAction)>,
541+
},
539542
}
540543

541544
impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction,
542545
(0, PaymentClaimed) => { (0, payment_hash, required) },
543-
(2, EmitEvent) => { (0, event, upgradable_required) },
546+
(2, EmitEventAndFreeOtherChannel) => {
547+
(0, event, upgradable_required),
548+
// LDK prior to 0.0.115 did not have this field as the monitor update application order was
549+
// required by clients. If we downgrade to something prior to 0.0.115 this may result in
550+
// monitor updates which aren't properly blocked or resumed, however that's fine - we don't
551+
// support async monitor updates even in LDK 0.0.115 and once we do we'll require no
552+
// downgrades to prior versions. Thus, while this would break on downgrade, we don't
553+
// support it even without downgrade, so if it breaks its not on us ¯\_(ツ)_/¯.
554+
(1, downstream_counterparty_and_funding_outpoint, option),
555+
},
544556
);
545557

546558
#[derive(Clone, Debug, PartialEq, Eq)]
@@ -557,6 +569,29 @@ impl_writeable_tlv_based_enum!(EventCompletionAction,
557569
};
558570
);
559571

572+
#[derive(Clone, PartialEq, Eq, Debug)]
573+
pub(crate) enum RAAMonitorUpdateBlockingAction {
574+
/// The inbound channel's channel_id
575+
ForwardedPaymentOtherChannelClaim {
576+
channel_id: [u8; 32],
577+
htlc_id: u64,
578+
},
579+
}
580+
581+
impl RAAMonitorUpdateBlockingAction {
582+
fn from_prev_hop_data(prev_hop: &HTLCPreviousHopData) -> Self {
583+
Self::ForwardedPaymentOtherChannelClaim {
584+
channel_id: prev_hop.outpoint.to_channel_id(),
585+
htlc_id: prev_hop.htlc_id,
586+
}
587+
}
588+
}
589+
590+
impl_writeable_tlv_based_enum!(RAAMonitorUpdateBlockingAction,
591+
(0, ForwardedPaymentOtherChannelClaim) => { (0, channel_id, required), (2, htlc_id, required) }
592+
;);
593+
594+
560595
/// State we hold per-peer.
561596
pub(super) struct PeerState<Signer: ChannelSigner> {
562597
/// `temporary_channel_id` or `channel_id` -> `channel`.
@@ -585,6 +620,11 @@ pub(super) struct PeerState<Signer: ChannelSigner> {
585620
/// to funding appearing on-chain), the downstream `ChannelMonitor` set is required to ensure
586621
/// duplicates do not occur, so such channels should fail without a monitor update completing.
587622
monitor_update_blocked_actions: BTreeMap<[u8; 32], Vec<MonitorUpdateCompletionAction>>,
623+
/// If another channel's [`ChannelMonitorUpdate`] needs to complete before a channel we have
624+
/// with this peer can complete an RAA [`ChannelMonitorUpdate`] (e.g. because the RAA update
625+
/// will remove a preimage that needs to be durably in an upstream channel first), we put an
626+
/// entry here to note that the channel with the key's ID is blocked on a set of actions.
627+
actions_blocking_raa_monitor_updates: BTreeMap<[u8; 32], Vec<RAAMonitorUpdateBlockingAction>>,
588628
/// The peer is currently connected (i.e. we've seen a
589629
/// [`ChannelMessageHandler::peer_connected`] and no corresponding
590630
/// [`ChannelMessageHandler::peer_disconnected`].
@@ -4484,23 +4524,24 @@ where
44844524
},
44854525
HTLCSource::PreviousHopData(hop_data) => {
44864526
let prev_outpoint = hop_data.outpoint;
4527+
let completed_blocker = RAAMonitorUpdateBlockingAction::from_prev_hop_data(&hop_data);
44874528
let res = self.claim_funds_from_hop(hop_data, payment_preimage,
44884529
|htlc_claim_value_msat| {
44894530
if let Some(forwarded_htlc_value) = forwarded_htlc_value_msat {
44904531
let fee_earned_msat = if let Some(claimed_htlc_value) = htlc_claim_value_msat {
44914532
Some(claimed_htlc_value - forwarded_htlc_value)
44924533
} else { None };
44934534

4494-
let prev_channel_id = Some(prev_outpoint.to_channel_id());
4495-
let next_channel_id = Some(next_channel_id);
4496-
4497-
Some(MonitorUpdateCompletionAction::EmitEvent { event: events::Event::PaymentForwarded {
4498-
fee_earned_msat,
4499-
claim_from_onchain_tx: from_onchain,
4500-
prev_channel_id,
4501-
next_channel_id,
4502-
outbound_amount_forwarded_msat: forwarded_htlc_value_msat,
4503-
}})
4535+
Some(MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
4536+
event: events::Event::PaymentForwarded {
4537+
fee_earned_msat,
4538+
claim_from_onchain_tx: from_onchain,
4539+
prev_channel_id: Some(prev_outpoint.to_channel_id()),
4540+
next_channel_id: Some(next_channel_id),
4541+
outbound_amount_forwarded_msat: forwarded_htlc_value_msat,
4542+
},
4543+
downstream_counterparty_and_funding_outpoint: None,
4544+
})
45044545
} else { None }
45054546
});
45064547
if let Err((pk, err)) = res {
@@ -4527,8 +4568,13 @@ where
45274568
}, None));
45284569
}
45294570
},
4530-
MonitorUpdateCompletionAction::EmitEvent { event } => {
4571+
MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
4572+
event, downstream_counterparty_and_funding_outpoint
4573+
} => {
45314574
self.pending_events.lock().unwrap().push_back((event, None));
4575+
if let Some((node_id, funding_outpoint, blocker)) = downstream_counterparty_and_funding_outpoint {
4576+
self.handle_monitor_update_release(node_id, funding_outpoint, Some(blocker));
4577+
}
45324578
},
45334579
}
45344580
}
@@ -5375,6 +5421,36 @@ where
53755421
}
53765422
}
53775423

5424+
fn raa_monitor_updates_held(&self,
5425+
actions_blocking_raa_monitor_updates: &BTreeMap<[u8; 32], Vec<RAAMonitorUpdateBlockingAction>>,
5426+
channel_funding_outpoint: OutPoint, counterparty_node_id: PublicKey
5427+
) -> bool {
5428+
actions_blocking_raa_monitor_updates
5429+
.get(&channel_funding_outpoint.to_channel_id()).map(|v| !v.is_empty()).unwrap_or(false)
5430+
|| self.pending_events.lock().unwrap().iter().any(|(_, action)| {
5431+
action == &Some(EventCompletionAction::ReleaseRAAChannelMonitorUpdate {
5432+
channel_funding_outpoint,
5433+
counterparty_node_id,
5434+
})
5435+
})
5436+
}
5437+
5438+
pub(crate) fn test_raa_monitor_updates_held(&self, counterparty_node_id: PublicKey,
5439+
channel_id: [u8; 32])
5440+
-> bool {
5441+
let per_peer_state = self.per_peer_state.read().unwrap();
5442+
if let Some(peer_state_mtx) = per_peer_state.get(&counterparty_node_id) {
5443+
let mut peer_state_lck = peer_state_mtx.lock().unwrap();
5444+
let peer_state = &mut *peer_state_lck;
5445+
5446+
if let Some(chan) = peer_state.channel_by_id.get(&channel_id) {
5447+
return self.raa_monitor_updates_held(&peer_state.actions_blocking_raa_monitor_updates,
5448+
chan.get_funding_txo().unwrap(), counterparty_node_id);
5449+
}
5450+
}
5451+
false
5452+
}
5453+
53785454
fn internal_revoke_and_ack(&self, counterparty_node_id: &PublicKey, msg: &msgs::RevokeAndACK) -> Result<(), MsgHandleErrInternal> {
53795455
let (htlcs_to_fail, res) = {
53805456
let per_peer_state = self.per_peer_state.read().unwrap();
@@ -6039,25 +6115,29 @@ where
60396115
self.pending_outbound_payments.clear_pending_payments()
60406116
}
60416117

6042-
fn handle_monitor_update_release(&self, counterparty_node_id: PublicKey, channel_funding_outpoint: OutPoint) {
6118+
fn handle_monitor_update_release(&self, counterparty_node_id: PublicKey, channel_funding_outpoint: OutPoint, completed_blocker: Option<RAAMonitorUpdateBlockingAction>) {
60436119
let mut errors = Vec::new();
60446120
loop {
60456121
let per_peer_state = self.per_peer_state.read().unwrap();
60466122
if let Some(peer_state_mtx) = per_peer_state.get(&counterparty_node_id) {
60476123
let mut peer_state_lck = peer_state_mtx.lock().unwrap();
60486124
let peer_state = &mut *peer_state_lck;
6049-
if self.pending_events.lock().unwrap().iter()
6050-
.any(|(_ev, action_opt)| action_opt == &Some(EventCompletionAction::ReleaseRAAChannelMonitorUpdate {
6051-
channel_funding_outpoint, counterparty_node_id
6052-
}))
6053-
{
6054-
// Check that, while holding the peer lock, we don't have another event
6055-
// blocking any monitor updates for this channel. If we do, let those
6056-
// events be the ones that ultimately release the monitor update(s).
6057-
log_trace!(self.logger, "Delaying monitor unlock for channel {} as another event is pending",
6125+
6126+
if let Some(blocker) = &completed_blocker {
6127+
if let Some(blockers) = peer_state.actions_blocking_raa_monitor_updates
6128+
.get_mut(&channel_funding_outpoint.to_channel_id())
6129+
{
6130+
blockers.retain(|iter| iter != blocker);
6131+
}
6132+
}
6133+
6134+
if self.raa_monitor_updates_held(&peer_state.actions_blocking_raa_monitor_updates,
6135+
channel_funding_outpoint, counterparty_node_id) {
6136+
log_trace!(self.logger, "Delaying monitor unlock for channel {} as another channel's mon update needs to complete first",
60586137
log_bytes!(&channel_funding_outpoint.to_channel_id()[..]));
60596138
break;
60606139
}
6140+
60616141
if let hash_map::Entry::Occupied(mut chan) = peer_state.channel_by_id.entry(channel_funding_outpoint.to_channel_id()) {
60626142
debug_assert_eq!(chan.get().get_funding_txo().unwrap(), channel_funding_outpoint);
60636143
if let Some((monitor_update, further_update_exists)) = chan.get_mut().unblock_next_blocked_monitor_update() {
@@ -6099,7 +6179,7 @@ where
60996179
EventCompletionAction::ReleaseRAAChannelMonitorUpdate {
61006180
channel_funding_outpoint, counterparty_node_id
61016181
} => {
6102-
self.handle_monitor_update_release(counterparty_node_id, channel_funding_outpoint);
6182+
self.handle_monitor_update_release(counterparty_node_id, channel_funding_outpoint, None);
61036183
}
61046184
}
61056185
}
@@ -6775,6 +6855,7 @@ where
67756855
latest_features: init_msg.features.clone(),
67766856
pending_msg_events: Vec::new(),
67776857
monitor_update_blocked_actions: BTreeMap::new(),
6858+
actions_blocking_raa_monitor_updates: BTreeMap::new(),
67786859
is_connected: true,
67796860
}));
67806861
},
@@ -7971,6 +8052,7 @@ where
79718052
latest_features: Readable::read(reader)?,
79728053
pending_msg_events: Vec::new(),
79738054
monitor_update_blocked_actions: BTreeMap::new(),
8055+
actions_blocking_raa_monitor_updates: BTreeMap::new(),
79748056
is_connected: false,
79758057
};
79768058
per_peer_state.insert(peer_pubkey, Mutex::new(peer_state));
@@ -8052,7 +8134,7 @@ where
80528134
let mut claimable_htlc_purposes = None;
80538135
let mut claimable_htlc_onion_fields = None;
80548136
let mut pending_claiming_payments = Some(HashMap::new());
8055-
let mut monitor_update_blocked_actions_per_peer = Some(Vec::new());
8137+
let mut monitor_update_blocked_actions_per_peer: Option<Vec<(_, BTreeMap<_, Vec<_>>)>> = Some(Vec::new());
80568138
let mut events_override = None;
80578139
read_tlv_fields!(reader, {
80588140
(1, pending_outbound_payments_no_retry, option),
@@ -8377,7 +8459,21 @@ where
83778459
}
83788460

83798461
for (node_id, monitor_update_blocked_actions) in monitor_update_blocked_actions_per_peer.unwrap() {
8380-
if let Some(peer_state) = per_peer_state.get_mut(&node_id) {
8462+
if let Some(peer_state) = per_peer_state.get(&node_id) {
8463+
for (_, actions) in monitor_update_blocked_actions.iter() {
8464+
for action in actions.iter() {
8465+
if let MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
8466+
downstream_counterparty_and_funding_outpoint:
8467+
Some((blocked_node_id, blocked_channel_outpoint, blocking_action)), ..
8468+
} = action {
8469+
if let Some(blocked_peer_state) = per_peer_state.get(&blocked_node_id) {
8470+
blocked_peer_state.lock().unwrap().actions_blocking_raa_monitor_updates
8471+
.entry(blocked_channel_outpoint.to_channel_id())
8472+
.or_insert_with(Vec::new).push(blocking_action.clone());
8473+
}
8474+
}
8475+
}
8476+
}
83818477
peer_state.lock().unwrap().monitor_update_blocked_actions = monitor_update_blocked_actions;
83828478
} else {
83838479
log_error!(args.logger, "Got blocked actions without a per-peer-state for {}", node_id);

0 commit comments

Comments
 (0)