Skip to content

Commit 3395a94

Browse files
committed
Lean on the holding cell when batch-forwarding/failing HTLCs
When we batch HTLC updates, we currently do the explicit queueing plus the commitment generation in the `ChannelManager`. This is a bit strange as its ultimately really a `Channel` responsibility to generate commitments at the correct time, with the abstraction leaking into `ChannelManager` with the `send_htlc` and `get_update_fail_htlc` method docs having clear comments about how `send_commitment` MUST be called prior to calling other `Channel` methods. Luckily `Channel` already has an update queue - the holding cell. Thus, we can trivially rewrite the batch update logic as inserting the desired updates into the holding cell and then asking all channels to clear their holding cells.
1 parent 2c57878 commit 3395a94

File tree

2 files changed

+57
-88
lines changed

2 files changed

+57
-88
lines changed

lightning/src/ln/channel.rs

Lines changed: 48 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1948,7 +1948,20 @@ impl<Signer: Sign> Channel<Signer> {
19481948
/// before we fail backwards.
19491949
/// If we do fail twice, we debug_assert!(false) and return Ok(None). Thus, will always return
19501950
/// Ok(_) if debug assertions are turned on or preconditions are met.
1951-
pub fn get_update_fail_htlc<L: Deref>(&mut self, htlc_id_arg: u64, err_packet: msgs::OnionErrorPacket, logger: &L) -> Result<Option<msgs::UpdateFailHTLC>, ChannelError> where L::Target: Logger {
1951+
pub fn queue_fail_htlc<L: Deref>(&mut self, htlc_id_arg: u64, err_packet: msgs::OnionErrorPacket, logger: &L)
1952+
-> Result<(), ChannelError> where L::Target: Logger {
1953+
self.fail_htlc(htlc_id_arg, err_packet, true, logger)
1954+
.map(|msg_opt| assert!(msg_opt.is_none(), "We forced holding cell?"))
1955+
}
1956+
1957+
/// We can only have one resolution per HTLC. In some cases around reconnect, we may fulfill
1958+
/// an HTLC more than once or fulfill once and then attempt to fail after reconnect. We cannot,
1959+
/// however, fail more than once as we wait for an upstream failure to be irrevocably committed
1960+
/// before we fail backwards.
1961+
/// If we do fail twice, we debug_assert!(false) and return Ok(None). Thus, will always return
1962+
/// Ok(_) if debug assertions are turned on or preconditions are met.
1963+
fn fail_htlc<L: Deref>(&mut self, htlc_id_arg: u64, err_packet: msgs::OnionErrorPacket, mut force_holding_cell: bool, logger: &L)
1964+
-> Result<Option<msgs::UpdateFailHTLC>, ChannelError> where L::Target: Logger {
19521965
if (self.channel_state & (ChannelState::ChannelReady as u32)) != (ChannelState::ChannelReady as u32) {
19531966
panic!("Was asked to fail an HTLC when channel was not in an operational state");
19541967
}
@@ -1986,8 +1999,13 @@ impl<Signer: Sign> Channel<Signer> {
19861999
return Ok(None);
19872000
}
19882001

1989-
// Now update local state:
19902002
if (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32 | ChannelState::PeerDisconnected as u32 | ChannelState::MonitorUpdateInProgress as u32)) != 0 {
2003+
debug_assert!(force_holding_cell, "We don't expect to need to use the holding cell if we weren't trying to");
2004+
force_holding_cell = true;
2005+
}
2006+
2007+
// Now update local state:
2008+
if force_holding_cell {
19912009
for pending_update in self.holding_cell_htlc_updates.iter() {
19922010
match pending_update {
19932011
&HTLCUpdateAwaitingACK::ClaimHTLC { htlc_id, .. } => {
@@ -3173,7 +3191,7 @@ impl<Signer: Sign> Channel<Signer> {
31733191
// to rebalance channels.
31743192
match &htlc_update {
31753193
&HTLCUpdateAwaitingACK::AddHTLC {amount_msat, cltv_expiry, ref payment_hash, ref source, ref onion_routing_packet, ..} => {
3176-
match self.send_htlc(amount_msat, *payment_hash, cltv_expiry, source.clone(), onion_routing_packet.clone(), logger) {
3194+
match self.send_htlc(amount_msat, *payment_hash, cltv_expiry, source.clone(), onion_routing_packet.clone(), false, logger) {
31773195
Ok(update_add_msg_option) => update_add_htlcs.push(update_add_msg_option.unwrap()),
31783196
Err(e) => {
31793197
match e {
@@ -3209,13 +3227,13 @@ impl<Signer: Sign> Channel<Signer> {
32093227
monitor_update.updates.append(&mut additional_monitor_update.updates);
32103228
},
32113229
&HTLCUpdateAwaitingACK::FailHTLC { htlc_id, ref err_packet } => {
3212-
match self.get_update_fail_htlc(htlc_id, err_packet.clone(), logger) {
3230+
match self.fail_htlc(htlc_id, err_packet.clone(), false, logger) {
32133231
Ok(update_fail_msg_option) => {
32143232
// If an HTLC failure was previously added to the holding cell (via
3215-
// `get_update_fail_htlc`) then generating the fail message itself
3216-
// must not fail - we should never end up in a state where we
3217-
// double-fail an HTLC or fail-then-claim an HTLC as it indicates
3218-
// we didn't wait for a full revocation before failing.
3233+
// `fail_htlc`) then generating the fail message itself must not
3234+
// fail - we should never end up in a state where we double-fail an
3235+
// HTLC or fail-then-claim an HTLC as it indicates we didn't wait
3236+
// for a full revocation before failing.
32193237
update_fail_htlcs.push(update_fail_msg_option.unwrap())
32203238
},
32213239
Err(e) => {
@@ -5470,6 +5488,19 @@ impl<Signer: Sign> Channel<Signer> {
54705488

54715489
// Send stuff to our remote peers:
54725490

5491+
/// Queues up an outbound HTLC to send by placing it in the holding cell. You should call
5492+
/// `maybe_free_holding_cell_htlcs` in order to actually generate and send the commitment
5493+
/// update.
5494+
///
5495+
/// If an Err is returned, it's a ChannelError::Ignore!
5496+
pub fn queue_htlc<L: Deref>(&mut self, amount_msat: u64, payment_hash: PaymentHash, cltv_expiry: u32, source: HTLCSource,
5497+
onion_routing_packet: msgs::OnionPacket, logger: &L)
5498+
-> Result<(), ChannelError> where L::Target: Logger {
5499+
self.send_htlc(amount_msat, payment_hash, cltv_expiry, source, onion_routing_packet, true, logger)
5500+
.map(|msg_opt| assert!(msg_opt.is_none(), "We forced holding cell?"))
5501+
5502+
}
5503+
54735504
/// Adds a pending outbound HTLC to this channel, note that you probably want
54745505
/// send_htlc_and_commit instead cause you'll want both messages at once.
54755506
///
@@ -5485,7 +5516,9 @@ impl<Signer: Sign> Channel<Signer> {
54855516
/// You MUST call send_commitment prior to calling any other methods on this Channel!
54865517
///
54875518
/// If an Err is returned, it's a ChannelError::Ignore!
5488-
pub fn send_htlc<L: Deref>(&mut self, amount_msat: u64, payment_hash: PaymentHash, cltv_expiry: u32, source: HTLCSource, onion_routing_packet: msgs::OnionPacket, logger: &L) -> Result<Option<msgs::UpdateAddHTLC>, ChannelError> where L::Target: Logger {
5519+
fn send_htlc<L: Deref>(&mut self, amount_msat: u64, payment_hash: PaymentHash, cltv_expiry: u32, source: HTLCSource,
5520+
onion_routing_packet: msgs::OnionPacket, mut force_holding_cell: bool, logger: &L)
5521+
-> Result<Option<msgs::UpdateAddHTLC>, ChannelError> where L::Target: Logger {
54895522
if (self.channel_state & (ChannelState::ChannelReady as u32 | BOTH_SIDES_SHUTDOWN_MASK)) != (ChannelState::ChannelReady as u32) {
54905523
return Err(ChannelError::Ignore("Cannot send HTLC until channel is fully established and we haven't started shutting down".to_owned()));
54915524
}
@@ -5580,8 +5613,12 @@ impl<Signer: Sign> Channel<Signer> {
55805613
return Err(ChannelError::Ignore(format!("Cannot send value that would put our balance under counterparty-announced channel reserve value ({})", chan_reserve_msat)));
55815614
}
55825615

5583-
// Now update local state:
55845616
if (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32 | ChannelState::MonitorUpdateInProgress as u32)) != 0 {
5617+
force_holding_cell = true;
5618+
}
5619+
5620+
// Now update local state:
5621+
if force_holding_cell {
55855622
self.holding_cell_htlc_updates.push(HTLCUpdateAwaitingACK::AddHTLC {
55865623
amount_msat,
55875624
payment_hash,
@@ -5774,7 +5811,7 @@ impl<Signer: Sign> Channel<Signer> {
57745811
/// Shorthand for calling send_htlc() followed by send_commitment(), see docs on those for
57755812
/// more info.
57765813
pub fn send_htlc_and_commit<L: Deref>(&mut self, amount_msat: u64, payment_hash: PaymentHash, cltv_expiry: u32, source: HTLCSource, onion_routing_packet: msgs::OnionPacket, logger: &L) -> Result<Option<(msgs::UpdateAddHTLC, msgs::CommitmentSigned, ChannelMonitorUpdate)>, ChannelError> where L::Target: Logger {
5777-
match self.send_htlc(amount_msat, payment_hash, cltv_expiry, source, onion_routing_packet, logger)? {
5814+
match self.send_htlc(amount_msat, payment_hash, cltv_expiry, source, onion_routing_packet, false, logger)? {
57785815
Some(update_add_htlc) => {
57795816
let (commitment_signed, monitor_update) = self.send_commitment_no_status_check(logger)?;
57805817
Ok(Some((update_add_htlc, commitment_signed, monitor_update)))

lightning/src/ln/channelmanager.rs

Lines changed: 9 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -3033,7 +3033,6 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
30333033
let mut new_events = Vec::new();
30343034
let mut failed_forwards = Vec::new();
30353035
let mut phantom_receives: Vec<(u64, OutPoint, u128, Vec<(PendingHTLCInfo, u64)>)> = Vec::new();
3036-
let mut handle_errors = Vec::new();
30373036
{
30383037
let mut forward_htlcs = HashMap::new();
30393038
mem::swap(&mut forward_htlcs, &mut self.forward_htlcs.lock().unwrap());
@@ -3149,8 +3148,6 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
31493148
continue;
31503149
},
31513150
hash_map::Entry::Occupied(mut chan) => {
3152-
let mut add_htlc_msgs = Vec::new();
3153-
let mut fail_htlc_msgs = Vec::new();
31543151
for forward_info in pending_forwards.drain(..) {
31553152
match forward_info {
31563153
HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
@@ -3169,7 +3166,7 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
31693166
// Phantom payments are only PendingHTLCRouting::Receive.
31703167
phantom_shared_secret: None,
31713168
});
3172-
match chan.get_mut().send_htlc(outgoing_amt_msat, payment_hash, outgoing_cltv_value, htlc_source.clone(), onion_packet, &self.logger) {
3169+
match chan.get_mut().queue_htlc(outgoing_amt_msat, payment_hash, outgoing_cltv_value, htlc_source.clone(), onion_packet, &self.logger) {
31733170
Err(e) => {
31743171
if let ChannelError::Ignore(msg) = e {
31753172
log_trace!(self.logger, "Failed to forward HTLC with payment_hash {}: {}", log_bytes!(payment_hash.0), msg);
@@ -3183,28 +3180,15 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
31833180
));
31843181
continue;
31853182
},
3186-
Ok(update_add) => {
3187-
match update_add {
3188-
Some(msg) => { add_htlc_msgs.push(msg); },
3189-
None => {
3190-
// Nothing to do here...we're waiting on a remote
3191-
// revoke_and_ack before we can add anymore HTLCs. The Channel
3192-
// will automatically handle building the update_add_htlc and
3193-
// commitment_signed messages when we can.
3194-
// TODO: Do some kind of timer to set the channel as !is_live()
3195-
// as we don't really want others relying on us relaying through
3196-
// this channel currently :/.
3197-
}
3198-
}
3199-
}
3183+
Ok(()) => { },
32003184
}
32013185
},
32023186
HTLCForwardInfo::AddHTLC { .. } => {
32033187
panic!("short_channel_id != 0 should imply any pending_forward entries are of type Forward");
32043188
},
32053189
HTLCForwardInfo::FailHTLC { htlc_id, err_packet } => {
32063190
log_trace!(self.logger, "Failing HTLC back to channel with short id {} (backward HTLC ID {}) after delay", short_chan_id, htlc_id);
3207-
match chan.get_mut().get_update_fail_htlc(htlc_id, err_packet, &self.logger) {
3191+
match chan.get_mut().queue_fail_htlc(htlc_id, err_packet, &self.logger) {
32083192
Err(e) => {
32093193
if let ChannelError::Ignore(msg) = e {
32103194
log_trace!(self.logger, "Failed to fail HTLC with ID {} backwards to short_id {}: {}", htlc_id, short_chan_id, msg);
@@ -3216,65 +3200,11 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
32163200
// the chain and sending the HTLC-Timeout is their problem.
32173201
continue;
32183202
},
3219-
Ok(Some(msg)) => { fail_htlc_msgs.push(msg); },
3220-
Ok(None) => {
3221-
// Nothing to do here...we're waiting on a remote
3222-
// revoke_and_ack before we can update the commitment
3223-
// transaction. The Channel will automatically handle
3224-
// building the update_fail_htlc and commitment_signed
3225-
// messages when we can.
3226-
// We don't need any kind of timer here as they should fail
3227-
// the channel onto the chain if they can't get our
3228-
// update_fail_htlc in time, it's not our problem.
3229-
}
3203+
Ok(()) => { },
32303204
}
32313205
},
32323206
}
32333207
}
3234-
3235-
if !add_htlc_msgs.is_empty() || !fail_htlc_msgs.is_empty() {
3236-
let (commitment_msg, monitor_update) = match chan.get_mut().send_commitment(&self.logger) {
3237-
Ok(res) => res,
3238-
Err(e) => {
3239-
// We surely failed send_commitment due to bad keys, in that case
3240-
// close channel and then send error message to peer.
3241-
let counterparty_node_id = chan.get().get_counterparty_node_id();
3242-
let err: Result<(), _> = match e {
3243-
ChannelError::Ignore(_) | ChannelError::Warn(_) => {
3244-
panic!("Stated return value requirements in send_commitment() were not met");
3245-
}
3246-
ChannelError::Close(msg) => {
3247-
log_trace!(self.logger, "Closing channel {} due to Close-required error: {}", log_bytes!(chan.key()[..]), msg);
3248-
let mut channel = remove_channel!(self, chan);
3249-
// ChannelClosed event is generated by handle_error for us.
3250-
Err(MsgHandleErrInternal::from_finish_shutdown(msg, channel.channel_id(), channel.get_user_id(), channel.force_shutdown(true), self.get_channel_update_for_broadcast(&channel).ok()))
3251-
},
3252-
};
3253-
handle_errors.push((counterparty_node_id, err));
3254-
continue;
3255-
}
3256-
};
3257-
match self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), monitor_update) {
3258-
ChannelMonitorUpdateStatus::Completed => {},
3259-
e => {
3260-
handle_errors.push((chan.get().get_counterparty_node_id(), handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::CommitmentFirst, false, true)));
3261-
continue;
3262-
}
3263-
}
3264-
log_debug!(self.logger, "Forwarding HTLCs resulted in a commitment update with {} HTLCs added and {} HTLCs failed for channel {}",
3265-
add_htlc_msgs.len(), fail_htlc_msgs.len(), log_bytes!(chan.get().channel_id()));
3266-
channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
3267-
node_id: chan.get().get_counterparty_node_id(),
3268-
updates: msgs::CommitmentUpdate {
3269-
update_add_htlcs: add_htlc_msgs,
3270-
update_fulfill_htlcs: Vec::new(),
3271-
update_fail_htlcs: fail_htlc_msgs,
3272-
update_fail_malformed_htlcs: Vec::new(),
3273-
update_fee: None,
3274-
commitment_signed: commitment_msg,
3275-
},
3276-
});
3277-
}
32783208
}
32793209
}
32803210
} else {
@@ -3478,9 +3408,11 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
34783408
}
34793409
self.forward_htlcs(&mut phantom_receives);
34803410

3481-
for (counterparty_node_id, err) in handle_errors.drain(..) {
3482-
let _ = handle_error!(self, err, counterparty_node_id);
3483-
}
3411+
// Freeing the holding cell here is relatively redundant - in practice we'll do it when we
3412+
// next get a `get_and_clear_pending_msg_events` call, but some tests rely on it, and it's
3413+
// nice to do the work now if we can rather than while we're trying to get messages in the
3414+
// network stack.
3415+
self.check_free_holding_cells();
34843416

34853417
if new_events.is_empty() { return }
34863418
let mut events = self.pending_events.lock().unwrap();

0 commit comments

Comments
 (0)