@@ -1130,6 +1130,24 @@ impl_writeable_tlv_based_enum!(EventCompletionAction,
1130
1130
}
1131
1131
);
1132
1132
1133
+ struct HTLCClaimSource {
1134
+ counterparty_node_id: Option<PublicKey>,
1135
+ funding_txo: OutPoint,
1136
+ channel_id: ChannelId,
1137
+ htlc_id: u64,
1138
+ }
1139
+
1140
+ impl From<&MPPClaimHTLCSource> for HTLCClaimSource {
1141
+ fn from(o: &MPPClaimHTLCSource) -> HTLCClaimSource {
1142
+ HTLCClaimSource {
1143
+ counterparty_node_id: Some(o.counterparty_node_id),
1144
+ funding_txo: o.funding_txo,
1145
+ channel_id: o.channel_id,
1146
+ htlc_id: o.htlc_id,
1147
+ }
1148
+ }
1149
+ }
1150
+
1133
1151
#[derive(Clone, Debug, PartialEq, Eq)]
1134
1152
/// The source of an HTLC which is being claimed as a part of an incoming payment. Each part is
1135
1153
/// tracked in [`PendingMPPClaim`].
@@ -6896,6 +6914,27 @@ where
6896
6914
>(
6897
6915
&self, prev_hop: HTLCPreviousHopData, payment_preimage: PaymentPreimage,
6898
6916
payment_info: Option<PaymentClaimDetails>, completion_action: ComplFunc,
6917
+ ) {
6918
+ let counterparty_node_id =
6919
+ match self.short_to_chan_info.read().unwrap().get(&prev_hop.short_channel_id) {
6920
+ Some((cp_id, _dup_chan_id)) => Some(cp_id.clone()),
6921
+ None => None
6922
+ };
6923
+
6924
+ let htlc_source = HTLCClaimSource {
6925
+ counterparty_node_id,
6926
+ funding_txo: prev_hop.outpoint,
6927
+ channel_id: prev_hop.channel_id,
6928
+ htlc_id: prev_hop.htlc_id,
6929
+ };
6930
+ self.claim_mpp_part(htlc_source, payment_preimage, payment_info, completion_action)
6931
+ }
6932
+
6933
+ fn claim_mpp_part<
6934
+ ComplFunc: FnOnce(Option<u64>, bool) -> (Option<MonitorUpdateCompletionAction>, Option<RAAMonitorUpdateBlockingAction>)
6935
+ >(
6936
+ &self, prev_hop: HTLCClaimSource, payment_preimage: PaymentPreimage,
6937
+ payment_info: Option<PaymentClaimDetails>, completion_action: ComplFunc,
6899
6938
) {
6900
6939
//TODO: Delay the claimed_funds relaying just like we do outbound relay!
6901
6940
@@ -6912,12 +6951,8 @@ where
6912
6951
{
6913
6952
let per_peer_state = self.per_peer_state.read().unwrap();
6914
6953
let chan_id = prev_hop.channel_id;
6915
- let counterparty_node_id_opt = match self.short_to_chan_info.read().unwrap().get(&prev_hop.short_channel_id) {
6916
- Some((cp_id, _dup_chan_id)) => Some(cp_id.clone()),
6917
- None => None
6918
- };
6919
6954
6920
- let peer_state_opt = counterparty_node_id_opt .as_ref().map(
6955
+ let peer_state_opt = prev_hop.counterparty_node_id .as_ref().map(
6921
6956
|counterparty_node_id| per_peer_state.get(counterparty_node_id)
6922
6957
.map(|peer_mutex| peer_mutex.lock().unwrap())
6923
6958
).unwrap_or(None);
@@ -6944,7 +6979,7 @@ where
6944
6979
peer_state.actions_blocking_raa_monitor_updates.entry(chan_id).or_insert_with(Vec::new).push(raa_blocker);
6945
6980
}
6946
6981
if !during_init {
6947
- handle_new_monitor_update!(self, prev_hop.outpoint , monitor_update, peer_state_lock,
6982
+ handle_new_monitor_update!(self, prev_hop.funding_txo , monitor_update, peer_state_lock,
6948
6983
peer_state, per_peer_state, chan);
6949
6984
} else {
6950
6985
// If we're running during init we cannot update a monitor directly -
@@ -6953,7 +6988,7 @@ where
6953
6988
self.pending_background_events.lock().unwrap().push(
6954
6989
BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
6955
6990
counterparty_node_id,
6956
- funding_txo: prev_hop.outpoint ,
6991
+ funding_txo: prev_hop.funding_txo ,
6957
6992
channel_id: prev_hop.channel_id,
6958
6993
update: monitor_update.clone(),
6959
6994
});
@@ -7027,7 +7062,7 @@ where
7027
7062
}
7028
7063
let preimage_update = ChannelMonitorUpdate {
7029
7064
update_id: CLOSED_CHANNEL_UPDATE_ID,
7030
- counterparty_node_id: None ,
7065
+ counterparty_node_id: prev_hop.counterparty_node_id ,
7031
7066
updates: vec![ChannelMonitorUpdateStep::PaymentPreimage {
7032
7067
payment_preimage,
7033
7068
payment_info,
@@ -7038,7 +7073,7 @@ where
7038
7073
if !during_init {
7039
7074
// We update the ChannelMonitor on the backward link, after
7040
7075
// receiving an `update_fulfill_htlc` from the forward link.
7041
- let update_res = self.chain_monitor.update_channel(prev_hop.outpoint , &preimage_update);
7076
+ let update_res = self.chain_monitor.update_channel(prev_hop.funding_txo , &preimage_update);
7042
7077
if update_res != ChannelMonitorUpdateStatus::Completed {
7043
7078
// TODO: This needs to be handled somehow - if we receive a monitor update
7044
7079
// with a preimage we *must* somehow manage to propagate it to the upstream
@@ -7061,7 +7096,7 @@ where
7061
7096
// complete the monitor update completion action from `completion_action`.
7062
7097
self.pending_background_events.lock().unwrap().push(
7063
7098
BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup((
7064
- prev_hop.outpoint , prev_hop.channel_id, preimage_update,
7099
+ prev_hop.funding_txo , prev_hop.channel_id, preimage_update,
7065
7100
)));
7066
7101
}
7067
7102
// Note that we do process the completion action here. This totally could be a
@@ -7312,7 +7347,7 @@ where
7312
7347
onion_fields,
7313
7348
payment_id,
7314
7349
}) = payment {
7315
- self.pending_events.lock().unwrap().push_back(( events::Event::PaymentClaimed {
7350
+ let event = events::Event::PaymentClaimed {
7316
7351
payment_hash,
7317
7352
purpose,
7318
7353
amount_msat,
@@ -7321,7 +7356,16 @@ where
7321
7356
sender_intended_total_msat,
7322
7357
onion_fields,
7323
7358
payment_id,
7324
- }, None));
7359
+ };
7360
+ let event_action = (event, None);
7361
+ let mut pending_events = self.pending_events.lock().unwrap();
7362
+ // If we're replaying a claim on startup we may end up duplicating an event
7363
+ // that's already in our queue, so check before we push another one. The
7364
+ // `payment_id` should suffice to ensure we never spuriously drop a second
7365
+ // event for a duplicate payment.
7366
+ if !pending_events.contains(&event_action) {
7367
+ pending_events.push_back(event_action);
7368
+ }
7325
7369
}
7326
7370
},
7327
7371
MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
@@ -13130,67 +13174,126 @@ where
13130
13174
};
13131
13175
13132
13176
for (_, monitor) in args.channel_monitors.iter() {
13133
- for (payment_hash, (payment_preimage, _)) in monitor.get_stored_preimages() {
13134
- let per_peer_state = channel_manager.per_peer_state.read().unwrap();
13135
- let mut claimable_payments = channel_manager.claimable_payments.lock().unwrap();
13136
- let payment = claimable_payments.claimable_payments.remove(&payment_hash);
13137
- mem::drop(claimable_payments);
13138
- if let Some(payment) = payment {
13139
- log_info!(channel_manager.logger, "Re-claiming HTLCs with payment hash {} as we've released the preimage to a ChannelMonitor!", &payment_hash);
13140
- let mut claimable_amt_msat = 0;
13141
- let mut receiver_node_id = Some(our_network_pubkey);
13142
- let phantom_shared_secret = payment.htlcs[0].prev_hop.phantom_shared_secret;
13143
- if phantom_shared_secret.is_some() {
13144
- let phantom_pubkey = channel_manager.node_signer.get_node_id(Recipient::PhantomNode)
13145
- .expect("Failed to get node_id for phantom node recipient");
13146
- receiver_node_id = Some(phantom_pubkey)
13147
- }
13148
- for claimable_htlc in &payment.htlcs {
13149
- claimable_amt_msat += claimable_htlc.value;
13150
-
13151
- // Add a holding-cell claim of the payment to the Channel, which should be
13152
- // applied ~immediately on peer reconnection. Because it won't generate a
13153
- // new commitment transaction we can just provide the payment preimage to
13154
- // the corresponding ChannelMonitor and nothing else.
13155
- //
13156
- // We do so directly instead of via the normal ChannelMonitor update
13157
- // procedure as the ChainMonitor hasn't yet been initialized, implying
13158
- // we're not allowed to call it directly yet. Further, we do the update
13159
- // without incrementing the ChannelMonitor update ID as there isn't any
13160
- // reason to.
13161
- // If we were to generate a new ChannelMonitor update ID here and then
13162
- // crash before the user finishes block connect we'd end up force-closing
13163
- // this channel as well. On the flip side, there's no harm in restarting
13164
- // without the new monitor persisted - we'll end up right back here on
13165
- // restart.
13166
- let previous_channel_id = claimable_htlc.prev_hop.channel_id;
13167
- let peer_node_id_opt = channel_manager.outpoint_to_peer.lock().unwrap()
13168
- .get(&claimable_htlc.prev_hop.outpoint).cloned();
13169
- if let Some(peer_node_id) = peer_node_id_opt {
13170
- let peer_state_mutex = per_peer_state.get(&peer_node_id).unwrap();
13171
- let mut peer_state_lock = peer_state_mutex.lock().unwrap();
13172
- let peer_state = &mut *peer_state_lock;
13173
- if let Some(ChannelPhase::Funded(channel)) = peer_state.channel_by_id.get_mut(&previous_channel_id) {
13174
- let logger = WithChannelContext::from(&channel_manager.logger, &channel.context, Some(payment_hash));
13175
- channel.claim_htlc_while_disconnected_dropping_mon_update(claimable_htlc.prev_hop.htlc_id, payment_preimage, &&logger);
13177
+ for (payment_hash, (payment_preimage, payment_claims)) in monitor.get_stored_preimages() {
13178
+ if !payment_claims.is_empty() {
13179
+ for payment_claim in payment_claims {
13180
+ if payment_claim.mpp_parts.is_empty() {
13181
+ return Err(DecodeError::InvalidValue);
13182
+ }
13183
+ let pending_claims = PendingMPPClaim {
13184
+ channels_without_preimage: payment_claim.mpp_parts.clone(),
13185
+ channels_with_preimage: Vec::new(),
13186
+ };
13187
+ let pending_claim_ptr_opt = Some(Arc::new(Mutex::new(pending_claims)));
13188
+
13189
+ // While it may be duplicative to generate a PaymentClaimed here, trying to
13190
+ // figure out if the user definitely saw it before shutdown would require some
13191
+ // nontrivial logic and may break as we move away from regularly persisting
13192
+ // ChannelManager. Instead, we rely on the users' event handler being
13193
+ // idempotent and just blindly generate one no matter what, letting the
13194
+ // preimages eventually timing out from ChannelMonitors to prevent us from
13195
+ // doing so forever.
13196
+
13197
+ let claim_found =
13198
+ channel_manager.claimable_payments.lock().unwrap().begin_claiming_payment(
13199
+ payment_hash, &channel_manager.node_signer, &channel_manager.logger,
13200
+ &channel_manager.inbound_payment_id_secret, |_| Ok(()),
13201
+ );
13202
+ if claim_found.is_err() {
13203
+ let mut claimable_payments = channel_manager.claimable_payments.lock().unwrap();
13204
+ match claimable_payments.pending_claiming_payments.entry(payment_hash) {
13205
+ hash_map::Entry::Occupied(_) => {
13206
+ debug_assert!(false, "Entry was added in begin_claiming_payment");
13207
+ return Err(DecodeError::InvalidValue);
13208
+ },
13209
+ hash_map::Entry::Vacant(entry) => {
13210
+ entry.insert(payment_claim.claiming_payment);
13211
+ },
13176
13212
}
13177
13213
}
13178
- if let Some(previous_hop_monitor) = args.channel_monitors.get(&claimable_htlc.prev_hop.outpoint) {
13179
- previous_hop_monitor.provide_payment_preimage(&payment_hash, &payment_preimage, &channel_manager.tx_broadcaster, &channel_manager.fee_estimator, &channel_manager.logger);
13214
+
13215
+ for part in payment_claim.mpp_parts.iter() {
13216
+ let pending_mpp_claim = pending_claim_ptr_opt.as_ref().map(|ptr| (
13217
+ part.counterparty_node_id, part.channel_id, part.htlc_id,
13218
+ PendingMPPClaimPointer(Arc::clone(&ptr))
13219
+ ));
13220
+ let pending_claim_ptr = pending_claim_ptr_opt.as_ref().map(|ptr|
13221
+ RAAMonitorUpdateBlockingAction::ClaimedMPPPayment {
13222
+ pending_claim: PendingMPPClaimPointer(Arc::clone(&ptr)),
13223
+ }
13224
+ );
13225
+ // Note that we don't need to pass the `payment_info` here - its
13226
+ // already (clearly) durably on disk in the `ChannelMonitor` so there's
13227
+ // no need to worry about getting it into others.
13228
+ channel_manager.claim_mpp_part(
13229
+ part.into(), payment_preimage, None,
13230
+ |_, _|
13231
+ (Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash, pending_mpp_claim }), pending_claim_ptr)
13232
+ );
13180
13233
}
13181
13234
}
13182
- let mut pending_events = channel_manager.pending_events.lock().unwrap();
13183
- let payment_id = payment.inbound_payment_id(&inbound_payment_id_secret.unwrap());
13184
- pending_events.push_back((events::Event::PaymentClaimed {
13185
- receiver_node_id,
13186
- payment_hash,
13187
- purpose: payment.purpose,
13188
- amount_msat: claimable_amt_msat,
13189
- htlcs: payment.htlcs.iter().map(events::ClaimedHTLC::from).collect(),
13190
- sender_intended_total_msat: payment.htlcs.first().map(|htlc| htlc.total_msat),
13191
- onion_fields: payment.onion_fields,
13192
- payment_id: Some(payment_id),
13193
- }, None));
13235
+ } else {
13236
+ let per_peer_state = channel_manager.per_peer_state.read().unwrap();
13237
+ let mut claimable_payments = channel_manager.claimable_payments.lock().unwrap();
13238
+ let payment = claimable_payments.claimable_payments.remove(&payment_hash);
13239
+ mem::drop(claimable_payments);
13240
+ if let Some(payment) = payment {
13241
+ log_info!(channel_manager.logger, "Re-claiming HTLCs with payment hash {} as we've released the preimage to a ChannelMonitor!", &payment_hash);
13242
+ let mut claimable_amt_msat = 0;
13243
+ let mut receiver_node_id = Some(our_network_pubkey);
13244
+ let phantom_shared_secret = payment.htlcs[0].prev_hop.phantom_shared_secret;
13245
+ if phantom_shared_secret.is_some() {
13246
+ let phantom_pubkey = channel_manager.node_signer.get_node_id(Recipient::PhantomNode)
13247
+ .expect("Failed to get node_id for phantom node recipient");
13248
+ receiver_node_id = Some(phantom_pubkey)
13249
+ }
13250
+ for claimable_htlc in &payment.htlcs {
13251
+ claimable_amt_msat += claimable_htlc.value;
13252
+
13253
+ // Add a holding-cell claim of the payment to the Channel, which should be
13254
+ // applied ~immediately on peer reconnection. Because it won't generate a
13255
+ // new commitment transaction we can just provide the payment preimage to
13256
+ // the corresponding ChannelMonitor and nothing else.
13257
+ //
13258
+ // We do so directly instead of via the normal ChannelMonitor update
13259
+ // procedure as the ChainMonitor hasn't yet been initialized, implying
13260
+ // we're not allowed to call it directly yet. Further, we do the update
13261
+ // without incrementing the ChannelMonitor update ID as there isn't any
13262
+ // reason to.
13263
+ // If we were to generate a new ChannelMonitor update ID here and then
13264
+ // crash before the user finishes block connect we'd end up force-closing
13265
+ // this channel as well. On the flip side, there's no harm in restarting
13266
+ // without the new monitor persisted - we'll end up right back here on
13267
+ // restart.
13268
+ let previous_channel_id = claimable_htlc.prev_hop.channel_id;
13269
+ let peer_node_id_opt = channel_manager.outpoint_to_peer.lock().unwrap()
13270
+ .get(&claimable_htlc.prev_hop.outpoint).cloned();
13271
+ if let Some(peer_node_id) = peer_node_id_opt {
13272
+ let peer_state_mutex = per_peer_state.get(&peer_node_id).unwrap();
13273
+ let mut peer_state_lock = peer_state_mutex.lock().unwrap();
13274
+ let peer_state = &mut *peer_state_lock;
13275
+ if let Some(ChannelPhase::Funded(channel)) = peer_state.channel_by_id.get_mut(&previous_channel_id) {
13276
+ let logger = WithChannelContext::from(&channel_manager.logger, &channel.context, Some(payment_hash));
13277
+ channel.claim_htlc_while_disconnected_dropping_mon_update(claimable_htlc.prev_hop.htlc_id, payment_preimage, &&logger);
13278
+ }
13279
+ }
13280
+ if let Some(previous_hop_monitor) = args.channel_monitors.get(&claimable_htlc.prev_hop.outpoint) {
13281
+ previous_hop_monitor.provide_payment_preimage(&payment_hash, &payment_preimage, &channel_manager.tx_broadcaster, &channel_manager.fee_estimator, &channel_manager.logger);
13282
+ }
13283
+ }
13284
+ let mut pending_events = channel_manager.pending_events.lock().unwrap();
13285
+ let payment_id = payment.inbound_payment_id(&inbound_payment_id_secret.unwrap());
13286
+ pending_events.push_back((events::Event::PaymentClaimed {
13287
+ receiver_node_id,
13288
+ payment_hash,
13289
+ purpose: payment.purpose,
13290
+ amount_msat: claimable_amt_msat,
13291
+ htlcs: payment.htlcs.iter().map(events::ClaimedHTLC::from).collect(),
13292
+ sender_intended_total_msat: payment.htlcs.first().map(|htlc| htlc.total_msat),
13293
+ onion_fields: payment.onion_fields,
13294
+ payment_id: Some(payment_id),
13295
+ }, None));
13296
+ }
13194
13297
}
13195
13298
}
13196
13299
}
0 commit comments