Skip to content

Commit d3794ea

Browse files
Holding cell: if we fail to free an HTLC, fail it backwards
Plus add a test.
1 parent bb369b5 commit d3794ea

File tree

3 files changed

+287
-27
lines changed

3 files changed

+287
-27
lines changed

lightning/src/ln/channel.rs

Lines changed: 31 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2116,7 +2116,7 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
21162116

21172117
/// Used to fulfill holding_cell_htlcs when we get a remote ack (or implicitly get it by them
21182118
/// fulfilling or failing the last pending HTLC)
2119-
fn free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> Result<Option<(msgs::CommitmentUpdate, ChannelMonitorUpdate)>, ChannelError> where L::Target: Logger {
2119+
fn free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> Result<(Option<(msgs::CommitmentUpdate, ChannelMonitorUpdate)>, Vec<(HTLCSource, PaymentHash)>), ChannelError> where L::Target: Logger {
21202120
assert_eq!(self.channel_state & ChannelState::MonitorUpdateFailed as u32, 0);
21212121
if self.holding_cell_htlc_updates.len() != 0 || self.holding_cell_update_fee.is_some() {
21222122
log_trace!(logger, "Freeing holding cell with {} HTLC updates{}", self.holding_cell_htlc_updates.len(), if self.holding_cell_update_fee.is_some() { " and a fee update" } else { "" });
@@ -2131,6 +2131,7 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
21312131
let mut update_add_htlcs = Vec::with_capacity(htlc_updates.len());
21322132
let mut update_fulfill_htlcs = Vec::with_capacity(htlc_updates.len());
21332133
let mut update_fail_htlcs = Vec::with_capacity(htlc_updates.len());
2134+
let mut htlcs_to_fail = Vec::new();
21342135
let mut err = None;
21352136
for htlc_update in htlc_updates.drain(..) {
21362137
// Note that this *can* fail, though it should be due to rather-rare conditions on
@@ -2149,6 +2150,13 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
21492150
match e {
21502151
ChannelError::Ignore(ref msg) => {
21512152
log_info!(logger, "Failed to send HTLC with payment_hash {} due to {}", log_bytes!(payment_hash.0), msg);
2153+
// If we fail to send here, then this HTLC should
2154+
// be failed backwards. Failing to send here
2155+
// indicates that this HTLC may keep being put back
2156+
// into the holding cell without ever being
2157+
// successfully forwarded/failed/fulfilled, causing
2158+
// our counterparty to eventually close on us.
2159+
htlcs_to_fail.push((source.clone(), *payment_hash));
21522160
},
21532161
_ => {
21542162
log_info!(logger, "Failed to send HTLC with payment_hash {} resulting in a channel closure during holding_cell freeing", log_bytes!(payment_hash.0));
@@ -2201,10 +2209,10 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
22012209
match err {
22022210
None => {
22032211
if update_add_htlcs.is_empty() && update_fulfill_htlcs.is_empty() && update_fail_htlcs.is_empty() && self.holding_cell_update_fee.is_none() {
2204-
// This should never actually happen and indicates we got some Errs back
2205-
// from update_fulfill_htlc/update_fail_htlc, but we handle it anyway in
2206-
// case there is some strange way to hit duplicate HTLC removes.
2207-
return Ok(None);
2212+
// Hitting this case indicates that we got some Errs back from update_fulfill_htlc
2213+
// or update_fail_htlc.
2214+
log_warn!(logger, "Attempted to fulfill or fail an HTLC that was already removed");
2215+
return Ok((None, htlcs_to_fail));
22082216
}
22092217
let update_fee = if let Some(feerate) = self.holding_cell_update_fee {
22102218
self.pending_update_fee = self.holding_cell_update_fee.take();
@@ -2222,19 +2230,19 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
22222230
self.latest_monitor_update_id = monitor_update.update_id;
22232231
monitor_update.updates.append(&mut additional_update.updates);
22242232

2225-
Ok(Some((msgs::CommitmentUpdate {
2233+
Ok((Some((msgs::CommitmentUpdate {
22262234
update_add_htlcs,
22272235
update_fulfill_htlcs,
22282236
update_fail_htlcs,
22292237
update_fail_malformed_htlcs: Vec::new(),
22302238
update_fee: update_fee,
22312239
commitment_signed,
2232-
}, monitor_update)))
2240+
}, monitor_update)), htlcs_to_fail))
22332241
},
22342242
Some(e) => Err(e)
22352243
}
22362244
} else {
2237-
Ok(None)
2245+
Ok((None, Vec::new()))
22382246
}
22392247
}
22402248

@@ -2243,7 +2251,7 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
22432251
/// waiting on this revoke_and_ack. The generation of this new commitment_signed may also fail,
22442252
/// generating an appropriate error *after* the channel state has been updated based on the
22452253
/// revoke_and_ack message.
2246-
pub fn revoke_and_ack<F: Deref, L: Deref>(&mut self, msg: &msgs::RevokeAndACK, fee_estimator: &F, logger: &L) -> Result<(Option<msgs::CommitmentUpdate>, Vec<(PendingHTLCInfo, u64)>, Vec<(HTLCSource, PaymentHash, HTLCFailReason)>, Option<msgs::ClosingSigned>, ChannelMonitorUpdate), ChannelError>
2254+
pub fn revoke_and_ack<F: Deref, L: Deref>(&mut self, msg: &msgs::RevokeAndACK, fee_estimator: &F, logger: &L) -> Result<(Option<msgs::CommitmentUpdate>, Vec<(PendingHTLCInfo, u64)>, Vec<(HTLCSource, PaymentHash, HTLCFailReason)>, Option<msgs::ClosingSigned>, ChannelMonitorUpdate, Vec<(HTLCSource, PaymentHash)>), ChannelError>
22472255
where F::Target: FeeEstimator,
22482256
L::Target: Logger,
22492257
{
@@ -2418,11 +2426,11 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
24182426
}
24192427
self.monitor_pending_forwards.append(&mut to_forward_infos);
24202428
self.monitor_pending_failures.append(&mut revoked_htlcs);
2421-
return Ok((None, Vec::new(), Vec::new(), None, monitor_update))
2429+
return Ok((None, Vec::new(), Vec::new(), None, monitor_update, Vec::new()))
24222430
}
24232431

24242432
match self.free_holding_cell_htlcs(logger)? {
2425-
Some((mut commitment_update, mut additional_update)) => {
2433+
(Some((mut commitment_update, mut additional_update)), htlcs_to_fail) => {
24262434
commitment_update.update_fail_htlcs.reserve(update_fail_htlcs.len());
24272435
for fail_msg in update_fail_htlcs.drain(..) {
24282436
commitment_update.update_fail_htlcs.push(fail_msg);
@@ -2437,9 +2445,9 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
24372445
self.latest_monitor_update_id = monitor_update.update_id;
24382446
monitor_update.updates.append(&mut additional_update.updates);
24392447

2440-
Ok((Some(commitment_update), to_forward_infos, revoked_htlcs, None, monitor_update))
2448+
Ok((Some(commitment_update), to_forward_infos, revoked_htlcs, None, monitor_update, htlcs_to_fail))
24412449
},
2442-
None => {
2450+
(None, htlcs_to_fail) => {
24432451
if require_commitment {
24442452
let (commitment_signed, mut additional_update) = self.send_commitment_no_status_check(logger)?;
24452453

@@ -2455,9 +2463,9 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
24552463
update_fail_malformed_htlcs,
24562464
update_fee: None,
24572465
commitment_signed
2458-
}), to_forward_infos, revoked_htlcs, None, monitor_update))
2466+
}), to_forward_infos, revoked_htlcs, None, monitor_update, htlcs_to_fail))
24592467
} else {
2460-
Ok((None, to_forward_infos, revoked_htlcs, self.maybe_propose_first_closing_signed(fee_estimator), monitor_update))
2468+
Ok((None, to_forward_infos, revoked_htlcs, self.maybe_propose_first_closing_signed(fee_estimator), monitor_update, htlcs_to_fail))
24612469
}
24622470
}
24632471
}
@@ -2727,7 +2735,7 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
27272735

27282736
/// May panic if some calls other than message-handling calls (which will all Err immediately)
27292737
/// have been called between remove_uncommitted_htlcs_and_mark_paused and this call.
2730-
pub fn channel_reestablish<L: Deref>(&mut self, msg: &msgs::ChannelReestablish, logger: &L) -> Result<(Option<msgs::FundingLocked>, Option<msgs::RevokeAndACK>, Option<msgs::CommitmentUpdate>, Option<ChannelMonitorUpdate>, RAACommitmentOrder, Option<msgs::Shutdown>), ChannelError> where L::Target: Logger {
2738+
pub fn channel_reestablish<L: Deref>(&mut self, msg: &msgs::ChannelReestablish, logger: &L) -> Result<(Option<msgs::FundingLocked>, Option<msgs::RevokeAndACK>, Option<msgs::CommitmentUpdate>, Option<ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>, RAACommitmentOrder, Option<msgs::Shutdown>), ChannelError> where L::Target: Logger {
27312739
if self.channel_state & (ChannelState::PeerDisconnected as u32) == 0 {
27322740
// While BOLT 2 doesn't indicate explicitly we should error this channel here, it
27332741
// almost certainly indicates we are going to end up out-of-sync in some way, so we
@@ -2775,7 +2783,7 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
27752783
return Err(ChannelError::Close("Peer claimed they saw a revoke_and_ack but we haven't sent funding_locked yet"));
27762784
}
27772785
// Short circuit the whole handler as there is nothing we can resend them
2778-
return Ok((None, None, None, None, RAACommitmentOrder::CommitmentFirst, shutdown_msg));
2786+
return Ok((None, None, None, None, Vec::new(), RAACommitmentOrder::CommitmentFirst, shutdown_msg));
27792787
}
27802788

27812789
// We have OurFundingLocked set!
@@ -2784,7 +2792,7 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
27842792
return Ok((Some(msgs::FundingLocked {
27852793
channel_id: self.channel_id(),
27862794
next_per_commitment_point: next_per_commitment_point,
2787-
}), None, None, None, RAACommitmentOrder::CommitmentFirst, shutdown_msg));
2795+
}), None, None, None, Vec::new(), RAACommitmentOrder::CommitmentFirst, shutdown_msg));
27882796
}
27892797

27902798
let required_revoke = if msg.next_remote_commitment_number + 1 == INITIAL_COMMITMENT_NUMBER - self.cur_local_commitment_transaction_number {
@@ -2833,11 +2841,11 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
28332841
match self.free_holding_cell_htlcs(logger) {
28342842
Err(ChannelError::Close(msg)) => return Err(ChannelError::Close(msg)),
28352843
Err(ChannelError::Ignore(_)) | Err(ChannelError::CloseDelayBroadcast(_)) => panic!("Got non-channel-failing result from free_holding_cell_htlcs"),
2836-
Ok(Some((commitment_update, monitor_update))) => return Ok((resend_funding_locked, required_revoke, Some(commitment_update), Some(monitor_update), self.resend_order.clone(), shutdown_msg)),
2837-
Ok(None) => return Ok((resend_funding_locked, required_revoke, None, None, self.resend_order.clone(), shutdown_msg)),
2844+
Ok((Some((commitment_update, monitor_update)), htlcs_to_fail)) => return Ok((resend_funding_locked, required_revoke, Some(commitment_update), Some(monitor_update), htlcs_to_fail, self.resend_order.clone(), shutdown_msg)),
2845+
Ok((None, htlcs_to_fail)) => return Ok((resend_funding_locked, required_revoke, None, None, htlcs_to_fail, self.resend_order.clone(), shutdown_msg)),
28382846
}
28392847
} else {
2840-
return Ok((resend_funding_locked, required_revoke, None, None, self.resend_order.clone(), shutdown_msg));
2848+
return Ok((resend_funding_locked, required_revoke, None, None, Vec::new(), self.resend_order.clone(), shutdown_msg));
28412849
}
28422850
} else if msg.next_local_commitment_number == our_next_remote_commitment_number - 1 {
28432851
if required_revoke.is_some() {
@@ -2848,10 +2856,10 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
28482856

28492857
if self.channel_state & (ChannelState::MonitorUpdateFailed as u32) != 0 {
28502858
self.monitor_pending_commitment_signed = true;
2851-
return Ok((resend_funding_locked, None, None, None, self.resend_order.clone(), shutdown_msg));
2859+
return Ok((resend_funding_locked, None, None, None, Vec::new(), self.resend_order.clone(), shutdown_msg));
28522860
}
28532861

2854-
return Ok((resend_funding_locked, required_revoke, Some(self.get_last_commitment_update(logger)), None, self.resend_order.clone(), shutdown_msg));
2862+
return Ok((resend_funding_locked, required_revoke, Some(self.get_last_commitment_update(logger)), None, Vec::new(), self.resend_order.clone(), shutdown_msg));
28552863
} else {
28562864
return Err(ChannelError::Close("Peer attempted to reestablish channel with a very old remote commitment transaction"));
28572865
}

lightning/src/ln/channelmanager.rs

Lines changed: 49 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1819,6 +1819,49 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
18191819
} else { false }
18201820
}
18211821

1822+
// Fail a list of HTLCs. Only called when 1 or more HTLCs fails to be freed
1823+
// from the holding cell. In this case, the HTLCs need to be failed
1824+
// backwards or, if they were one of our outgoing HTLCs, then their failure
1825+
// needs to be surfaced to the user.
1826+
fn fail_holding_cell_htlcs(&self, mut htlcs_to_fail: Vec<(HTLCSource, PaymentHash)>, channel_id: [u8; 32]) -> Result<(), MsgHandleErrInternal> {
1827+
for (htlc_src, payment_hash) in htlcs_to_fail.drain(..) {
1828+
match htlc_src {
1829+
HTLCSource::PreviousHopData(HTLCPreviousHopData { .. }) => {
1830+
let onion_failure_data =
1831+
match self.channel_state.lock().unwrap().by_id.entry(channel_id) {
1832+
hash_map::Entry::Occupied(chan_entry) => {
1833+
if let Ok(upd) = self.get_channel_update(&chan_entry.get()) {
1834+
let mut res = Vec::with_capacity(8 + 128);
1835+
res.extend_from_slice(&byte_utils::be16_to_array(upd.contents.flags));
1836+
res.extend_from_slice(&upd.encode_with_len()[..]);
1837+
res
1838+
} else {
1839+
return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel", channel_id));
1840+
}
1841+
},
1842+
hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel", channel_id))
1843+
};
1844+
let channel_state = self.channel_state.lock().unwrap();
1845+
self.fail_htlc_backwards_internal(channel_state,
1846+
htlc_src, &payment_hash, HTLCFailReason::Reason { failure_code: 0x1000|7, data: onion_failure_data});
1847+
},
1848+
HTLCSource::OutboundRoute { .. } => {
1849+
self.pending_events.lock().unwrap().push(
1850+
events::Event::PaymentFailed {
1851+
payment_hash,
1852+
rejected_by_dest: false,
1853+
#[cfg(test)]
1854+
error_code: None,
1855+
#[cfg(test)]
1856+
error_data: None,
1857+
}
1858+
)
1859+
},
1860+
};
1861+
}
1862+
Ok(())
1863+
}
1864+
18221865
/// Fails an HTLC backwards to the sender of it to us.
18231866
/// Note that while we take a channel_state lock as input, we do *not* assume consistency here.
18241867
/// There are several callsites that do stupid things like loop over a list of payment_hashes
@@ -2666,7 +2709,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
26662709
}
26672710

26682711
fn internal_revoke_and_ack(&self, their_node_id: &PublicKey, msg: &msgs::RevokeAndACK) -> Result<(), MsgHandleErrInternal> {
2669-
let (pending_forwards, mut pending_failures, short_channel_id) = {
2712+
let (pending_forwards, mut pending_failures, short_channel_id, htlcs_to_fail) = {
26702713
let mut channel_state_lock = self.channel_state.lock().unwrap();
26712714
let channel_state = &mut *channel_state_lock;
26722715
match channel_state.by_id.entry(msg.channel_id) {
@@ -2675,7 +2718,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
26752718
return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!", msg.channel_id));
26762719
}
26772720
let was_frozen_for_monitor = chan.get().is_awaiting_monitor_update();
2678-
let (commitment_update, pending_forwards, pending_failures, closing_signed, monitor_update) =
2721+
let (commitment_update, pending_forwards, pending_failures, closing_signed, monitor_update, htlcs_to_fail) =
26792722
try_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.fee_estimator, &self.logger), channel_state, chan);
26802723
if let Err(e) = self.monitor.update_monitor(chan.get().get_funding_txo().unwrap(), monitor_update) {
26812724
if was_frozen_for_monitor {
@@ -2697,14 +2740,15 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
26972740
msg,
26982741
});
26992742
}
2700-
(pending_forwards, pending_failures, chan.get().get_short_channel_id().expect("RAA should only work on a short-id-available channel"))
2743+
(pending_forwards, pending_failures, chan.get().get_short_channel_id().expect("RAA should only work on a short-id-available channel"), htlcs_to_fail)
27012744
},
27022745
hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel", msg.channel_id))
27032746
}
27042747
};
27052748
for failure in pending_failures.drain(..) {
27062749
self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), failure.0, &failure.1, failure.2);
27072750
}
2751+
if let Err(e) = self.fail_holding_cell_htlcs(htlcs_to_fail, msg.channel_id) { return Err(e) };
27082752
self.forward_htlcs(&mut [(short_channel_id, pending_forwards)]);
27092753

27102754
Ok(())
@@ -2777,7 +2821,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
27772821
if chan.get().get_their_node_id() != *their_node_id {
27782822
return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!", msg.channel_id));
27792823
}
2780-
let (funding_locked, revoke_and_ack, commitment_update, monitor_update_opt, mut order, shutdown) =
2824+
let (funding_locked, revoke_and_ack, commitment_update, monitor_update_opt, htlcs_to_fail, mut order, shutdown) =
27812825
try_chan_entry!(self, chan.get_mut().channel_reestablish(msg, &self.logger), channel_state, chan);
27822826
if let Some(monitor_update) = monitor_update_opt {
27832827
if let Err(e) = self.monitor.update_monitor(chan.get().get_funding_txo().unwrap(), monitor_update) {
@@ -2794,6 +2838,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
27942838
//TODO: Resend the funding_locked if needed once we get the monitor running again
27952839
}
27962840
}
2841+
if let Err(e) = self.fail_holding_cell_htlcs(htlcs_to_fail, msg.channel_id) { return Err(e) };
27972842
if let Some(msg) = funding_locked {
27982843
channel_state.pending_msg_events.push(events::MessageSendEvent::SendFundingLocked {
27992844
node_id: their_node_id.clone(),

0 commit comments

Comments
 (0)