Skip to content

Commit 6cf1a5d

Browse files
Holding cell: if we fail to free an HTLC, fail it backwards
Plus add a test.
1 parent 8fae0c0 commit 6cf1a5d

File tree

3 files changed

+287
-27
lines changed

3 files changed

+287
-27
lines changed

lightning/src/ln/channel.rs

Lines changed: 31 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2115,7 +2115,7 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
21152115

21162116
/// Used to fulfill holding_cell_htlcs when we get a remote ack (or implicitly get it by them
21172117
/// fulfilling or failing the last pending HTLC)
2118-
fn free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> Result<Option<(msgs::CommitmentUpdate, ChannelMonitorUpdate)>, ChannelError> where L::Target: Logger {
2118+
fn free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> Result<(Option<(msgs::CommitmentUpdate, ChannelMonitorUpdate)>, Vec<(HTLCSource, PaymentHash)>), ChannelError> where L::Target: Logger {
21192119
assert_eq!(self.channel_state & ChannelState::MonitorUpdateFailed as u32, 0);
21202120
if self.holding_cell_htlc_updates.len() != 0 || self.holding_cell_update_fee.is_some() {
21212121
log_trace!(logger, "Freeing holding cell with {} HTLC updates{}", self.holding_cell_htlc_updates.len(), if self.holding_cell_update_fee.is_some() { " and a fee update" } else { "" });
@@ -2130,6 +2130,7 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
21302130
let mut update_add_htlcs = Vec::with_capacity(htlc_updates.len());
21312131
let mut update_fulfill_htlcs = Vec::with_capacity(htlc_updates.len());
21322132
let mut update_fail_htlcs = Vec::with_capacity(htlc_updates.len());
2133+
let mut htlcs_to_fail = Vec::new();
21332134
let mut err = None;
21342135
for htlc_update in htlc_updates.drain(..) {
21352136
// Note that this *can* fail, though it should be due to rather-rare conditions on
@@ -2148,6 +2149,13 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
21482149
match e {
21492150
ChannelError::Ignore(ref msg) => {
21502151
log_info!(logger, "Failed to send HTLC with payment_hash {} due to {}", log_bytes!(payment_hash.0), msg);
2152+
// If we fail to send here, then this HTLC should
2153+
// be failed backwards. Failing to send here
2154+
// indicates that this HTLC may keep being put back
2155+
// into the holding cell without ever being
2156+
// successfully forwarded/failed/fulfilled, causing
2157+
// our counterparty to eventually close on us.
2158+
htlcs_to_fail.push((source.clone(), *payment_hash));
21512159
},
21522160
_ => {
21532161
log_info!(logger, "Failed to send HTLC with payment_hash {} resulting in a channel closure during holding_cell freeing", log_bytes!(payment_hash.0));
@@ -2200,10 +2208,10 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
22002208
match err {
22012209
None => {
22022210
if update_add_htlcs.is_empty() && update_fulfill_htlcs.is_empty() && update_fail_htlcs.is_empty() && self.holding_cell_update_fee.is_none() {
2203-
// This should never actually happen and indicates we got some Errs back
2204-
// from update_fulfill_htlc/update_fail_htlc, but we handle it anyway in
2205-
// case there is some strange way to hit duplicate HTLC removes.
2206-
return Ok(None);
2211+
// Hitting this case indicates that we got some Errs back from update_fulfill_htlc
2212+
// or update_fail_htlc.
2213+
log_warn!(logger, "Attempted to fulfill or fail an HTLC that was already removed");
2214+
return Ok((None, htlcs_to_fail));
22072215
}
22082216
let update_fee = if let Some(feerate) = self.holding_cell_update_fee {
22092217
self.pending_update_fee = self.holding_cell_update_fee.take();
@@ -2221,19 +2229,19 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
22212229
self.latest_monitor_update_id = monitor_update.update_id;
22222230
monitor_update.updates.append(&mut additional_update.updates);
22232231

2224-
Ok(Some((msgs::CommitmentUpdate {
2232+
Ok((Some((msgs::CommitmentUpdate {
22252233
update_add_htlcs,
22262234
update_fulfill_htlcs,
22272235
update_fail_htlcs,
22282236
update_fail_malformed_htlcs: Vec::new(),
22292237
update_fee: update_fee,
22302238
commitment_signed,
2231-
}, monitor_update)))
2239+
}, monitor_update)), htlcs_to_fail))
22322240
},
22332241
Some(e) => Err(e)
22342242
}
22352243
} else {
2236-
Ok(None)
2244+
Ok((None, Vec::new()))
22372245
}
22382246
}
22392247

@@ -2242,7 +2250,7 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
22422250
/// waiting on this revoke_and_ack. The generation of this new commitment_signed may also fail,
22432251
/// generating an appropriate error *after* the channel state has been updated based on the
22442252
/// revoke_and_ack message.
2245-
pub fn revoke_and_ack<F: Deref, L: Deref>(&mut self, msg: &msgs::RevokeAndACK, fee_estimator: &F, logger: &L) -> Result<(Option<msgs::CommitmentUpdate>, Vec<(PendingHTLCInfo, u64)>, Vec<(HTLCSource, PaymentHash, HTLCFailReason)>, Option<msgs::ClosingSigned>, ChannelMonitorUpdate), ChannelError>
2253+
pub fn revoke_and_ack<F: Deref, L: Deref>(&mut self, msg: &msgs::RevokeAndACK, fee_estimator: &F, logger: &L) -> Result<(Option<msgs::CommitmentUpdate>, Vec<(PendingHTLCInfo, u64)>, Vec<(HTLCSource, PaymentHash, HTLCFailReason)>, Option<msgs::ClosingSigned>, ChannelMonitorUpdate, Vec<(HTLCSource, PaymentHash)>), ChannelError>
22462254
where F::Target: FeeEstimator,
22472255
L::Target: Logger,
22482256
{
@@ -2417,11 +2425,11 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
24172425
}
24182426
self.monitor_pending_forwards.append(&mut to_forward_infos);
24192427
self.monitor_pending_failures.append(&mut revoked_htlcs);
2420-
return Ok((None, Vec::new(), Vec::new(), None, monitor_update))
2428+
return Ok((None, Vec::new(), Vec::new(), None, monitor_update, Vec::new()))
24212429
}
24222430

24232431
match self.free_holding_cell_htlcs(logger)? {
2424-
Some((mut commitment_update, mut additional_update)) => {
2432+
(Some((mut commitment_update, mut additional_update)), htlcs_to_fail) => {
24252433
commitment_update.update_fail_htlcs.reserve(update_fail_htlcs.len());
24262434
for fail_msg in update_fail_htlcs.drain(..) {
24272435
commitment_update.update_fail_htlcs.push(fail_msg);
@@ -2436,9 +2444,9 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
24362444
self.latest_monitor_update_id = monitor_update.update_id;
24372445
monitor_update.updates.append(&mut additional_update.updates);
24382446

2439-
Ok((Some(commitment_update), to_forward_infos, revoked_htlcs, None, monitor_update))
2447+
Ok((Some(commitment_update), to_forward_infos, revoked_htlcs, None, monitor_update, htlcs_to_fail))
24402448
},
2441-
None => {
2449+
(None, htlcs_to_fail) => {
24422450
if require_commitment {
24432451
let (commitment_signed, mut additional_update) = self.send_commitment_no_status_check(logger)?;
24442452

@@ -2454,9 +2462,9 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
24542462
update_fail_malformed_htlcs,
24552463
update_fee: None,
24562464
commitment_signed
2457-
}), to_forward_infos, revoked_htlcs, None, monitor_update))
2465+
}), to_forward_infos, revoked_htlcs, None, monitor_update, htlcs_to_fail))
24582466
} else {
2459-
Ok((None, to_forward_infos, revoked_htlcs, self.maybe_propose_first_closing_signed(fee_estimator), monitor_update))
2467+
Ok((None, to_forward_infos, revoked_htlcs, self.maybe_propose_first_closing_signed(fee_estimator), monitor_update, htlcs_to_fail))
24602468
}
24612469
}
24622470
}
@@ -2726,7 +2734,7 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
27262734

27272735
/// May panic if some calls other than message-handling calls (which will all Err immediately)
27282736
/// have been called between remove_uncommitted_htlcs_and_mark_paused and this call.
2729-
pub fn channel_reestablish<L: Deref>(&mut self, msg: &msgs::ChannelReestablish, logger: &L) -> Result<(Option<msgs::FundingLocked>, Option<msgs::RevokeAndACK>, Option<msgs::CommitmentUpdate>, Option<ChannelMonitorUpdate>, RAACommitmentOrder, Option<msgs::Shutdown>), ChannelError> where L::Target: Logger {
2737+
pub fn channel_reestablish<L: Deref>(&mut self, msg: &msgs::ChannelReestablish, logger: &L) -> Result<(Option<msgs::FundingLocked>, Option<msgs::RevokeAndACK>, Option<msgs::CommitmentUpdate>, Option<ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>, RAACommitmentOrder, Option<msgs::Shutdown>), ChannelError> where L::Target: Logger {
27302738
if self.channel_state & (ChannelState::PeerDisconnected as u32) == 0 {
27312739
// While BOLT 2 doesn't indicate explicitly we should error this channel here, it
27322740
// almost certainly indicates we are going to end up out-of-sync in some way, so we
@@ -2774,7 +2782,7 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
27742782
return Err(ChannelError::Close("Peer claimed they saw a revoke_and_ack but we haven't sent funding_locked yet"));
27752783
}
27762784
// Short circuit the whole handler as there is nothing we can resend them
2777-
return Ok((None, None, None, None, RAACommitmentOrder::CommitmentFirst, shutdown_msg));
2785+
return Ok((None, None, None, None, Vec::new(), RAACommitmentOrder::CommitmentFirst, shutdown_msg));
27782786
}
27792787

27802788
// We have OurFundingLocked set!
@@ -2783,7 +2791,7 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
27832791
return Ok((Some(msgs::FundingLocked {
27842792
channel_id: self.channel_id(),
27852793
next_per_commitment_point: next_per_commitment_point,
2786-
}), None, None, None, RAACommitmentOrder::CommitmentFirst, shutdown_msg));
2794+
}), None, None, None, Vec::new(), RAACommitmentOrder::CommitmentFirst, shutdown_msg));
27872795
}
27882796

27892797
let required_revoke = if msg.next_remote_commitment_number + 1 == INITIAL_COMMITMENT_NUMBER - self.cur_local_commitment_transaction_number {
@@ -2832,11 +2840,11 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
28322840
match self.free_holding_cell_htlcs(logger) {
28332841
Err(ChannelError::Close(msg)) => return Err(ChannelError::Close(msg)),
28342842
Err(ChannelError::Ignore(_)) | Err(ChannelError::CloseDelayBroadcast(_)) => panic!("Got non-channel-failing result from free_holding_cell_htlcs"),
2835-
Ok(Some((commitment_update, monitor_update))) => return Ok((resend_funding_locked, required_revoke, Some(commitment_update), Some(monitor_update), self.resend_order.clone(), shutdown_msg)),
2836-
Ok(None) => return Ok((resend_funding_locked, required_revoke, None, None, self.resend_order.clone(), shutdown_msg)),
2843+
Ok((Some((commitment_update, monitor_update)), htlcs_to_fail)) => return Ok((resend_funding_locked, required_revoke, Some(commitment_update), Some(monitor_update), htlcs_to_fail, self.resend_order.clone(), shutdown_msg)),
2844+
Ok((None, htlcs_to_fail)) => return Ok((resend_funding_locked, required_revoke, None, None, htlcs_to_fail, self.resend_order.clone(), shutdown_msg)),
28372845
}
28382846
} else {
2839-
return Ok((resend_funding_locked, required_revoke, None, None, self.resend_order.clone(), shutdown_msg));
2847+
return Ok((resend_funding_locked, required_revoke, None, None, Vec::new(), self.resend_order.clone(), shutdown_msg));
28402848
}
28412849
} else if msg.next_local_commitment_number == our_next_remote_commitment_number - 1 {
28422850
if required_revoke.is_some() {
@@ -2847,10 +2855,10 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
28472855

28482856
if self.channel_state & (ChannelState::MonitorUpdateFailed as u32) != 0 {
28492857
self.monitor_pending_commitment_signed = true;
2850-
return Ok((resend_funding_locked, None, None, None, self.resend_order.clone(), shutdown_msg));
2858+
return Ok((resend_funding_locked, None, None, None, Vec::new(), self.resend_order.clone(), shutdown_msg));
28512859
}
28522860

2853-
return Ok((resend_funding_locked, required_revoke, Some(self.get_last_commitment_update(logger)), None, self.resend_order.clone(), shutdown_msg));
2861+
return Ok((resend_funding_locked, required_revoke, Some(self.get_last_commitment_update(logger)), None, Vec::new(), self.resend_order.clone(), shutdown_msg));
28542862
} else {
28552863
return Err(ChannelError::Close("Peer attempted to reestablish channel with a very old remote commitment transaction"));
28562864
}

lightning/src/ln/channelmanager.rs

Lines changed: 49 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1819,6 +1819,49 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
18191819
} else { false }
18201820
}
18211821

1822+
// Fail a list of HTLCs. Only called when 1 or more HTLCs fails to be freed
1823+
// from the holding cell. In this case, the HTLCs need to be failed
1824+
// backwards or, if they were one of our outgoing HTLCs, then their failure
1825+
// needs to be surfaced to the user.
1826+
fn fail_holding_cell_htlcs(&self, mut htlcs_to_fail: Vec<(HTLCSource, PaymentHash)>, channel_id: [u8; 32]) -> Result<(), MsgHandleErrInternal> {
1827+
for (htlc_src, payment_hash) in htlcs_to_fail.drain(..) {
1828+
match htlc_src {
1829+
HTLCSource::PreviousHopData(HTLCPreviousHopData { .. }) => {
1830+
let onion_failure_data =
1831+
match self.channel_state.lock().unwrap().by_id.entry(channel_id) {
1832+
hash_map::Entry::Occupied(chan_entry) => {
1833+
if let Ok(upd) = self.get_channel_update(&chan_entry.get()) {
1834+
let mut res = Vec::with_capacity(8 + 128);
1835+
res.extend_from_slice(&byte_utils::be16_to_array(upd.contents.flags));
1836+
res.extend_from_slice(&upd.encode_with_len()[..]);
1837+
res
1838+
} else {
1839+
return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel", channel_id));
1840+
}
1841+
},
1842+
hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel", channel_id))
1843+
};
1844+
let channel_state = self.channel_state.lock().unwrap();
1845+
self.fail_htlc_backwards_internal(channel_state,
1846+
htlc_src, &payment_hash, HTLCFailReason::Reason { failure_code: 0x1000|7, data: onion_failure_data});
1847+
},
1848+
HTLCSource::OutboundRoute { .. } => {
1849+
self.pending_events.lock().unwrap().push(
1850+
events::Event::PaymentFailed {
1851+
payment_hash,
1852+
rejected_by_dest: false,
1853+
#[cfg(test)]
1854+
error_code: None,
1855+
#[cfg(test)]
1856+
error_data: None,
1857+
}
1858+
)
1859+
},
1860+
};
1861+
}
1862+
Ok(())
1863+
}
1864+
18221865
/// Fails an HTLC backwards to the sender of it to us.
18231866
/// Note that while we take a channel_state lock as input, we do *not* assume consistency here.
18241867
/// There are several callsites that do stupid things like loop over a list of payment_hashes
@@ -2666,7 +2709,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
26662709
}
26672710

26682711
fn internal_revoke_and_ack(&self, their_node_id: &PublicKey, msg: &msgs::RevokeAndACK) -> Result<(), MsgHandleErrInternal> {
2669-
let (pending_forwards, mut pending_failures, short_channel_id) = {
2712+
let (pending_forwards, mut pending_failures, short_channel_id, htlcs_to_fail) = {
26702713
let mut channel_state_lock = self.channel_state.lock().unwrap();
26712714
let channel_state = &mut *channel_state_lock;
26722715
match channel_state.by_id.entry(msg.channel_id) {
@@ -2675,7 +2718,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
26752718
return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!", msg.channel_id));
26762719
}
26772720
let was_frozen_for_monitor = chan.get().is_awaiting_monitor_update();
2678-
let (commitment_update, pending_forwards, pending_failures, closing_signed, monitor_update) =
2721+
let (commitment_update, pending_forwards, pending_failures, closing_signed, monitor_update, htlcs_to_fail) =
26792722
try_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.fee_estimator, &self.logger), channel_state, chan);
26802723
if let Err(e) = self.monitor.update_monitor(chan.get().get_funding_txo().unwrap(), monitor_update) {
26812724
if was_frozen_for_monitor {
@@ -2697,14 +2740,15 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
26972740
msg,
26982741
});
26992742
}
2700-
(pending_forwards, pending_failures, chan.get().get_short_channel_id().expect("RAA should only work on a short-id-available channel"))
2743+
(pending_forwards, pending_failures, chan.get().get_short_channel_id().expect("RAA should only work on a short-id-available channel"), htlcs_to_fail)
27012744
},
27022745
hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel", msg.channel_id))
27032746
}
27042747
};
27052748
for failure in pending_failures.drain(..) {
27062749
self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), failure.0, &failure.1, failure.2);
27072750
}
2751+
if let Err(e) = self.fail_holding_cell_htlcs(htlcs_to_fail, msg.channel_id) { return Err(e) };
27082752
self.forward_htlcs(&mut [(short_channel_id, pending_forwards)]);
27092753

27102754
Ok(())
@@ -2777,7 +2821,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
27772821
if chan.get().get_their_node_id() != *their_node_id {
27782822
return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!", msg.channel_id));
27792823
}
2780-
let (funding_locked, revoke_and_ack, commitment_update, monitor_update_opt, mut order, shutdown) =
2824+
let (funding_locked, revoke_and_ack, commitment_update, monitor_update_opt, htlcs_to_fail, mut order, shutdown) =
27812825
try_chan_entry!(self, chan.get_mut().channel_reestablish(msg, &self.logger), channel_state, chan);
27822826
if let Some(monitor_update) = monitor_update_opt {
27832827
if let Err(e) = self.monitor.update_monitor(chan.get().get_funding_txo().unwrap(), monitor_update) {
@@ -2794,6 +2838,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
27942838
//TODO: Resend the funding_locked if needed once we get the monitor running again
27952839
}
27962840
}
2841+
if let Err(e) = self.fail_holding_cell_htlcs(htlcs_to_fail, msg.channel_id) { return Err(e) };
27972842
if let Some(msg) = funding_locked {
27982843
channel_state.pending_msg_events.push(events::MessageSendEvent::SendFundingLocked {
27992844
node_id: their_node_id.clone(),

0 commit comments

Comments
 (0)