Skip to content

Commit 28df626

Browse files
committed
Lean on the holding cell when batch-forwarding/failing HTLCs
When we batch HTLC updates, we currently do the explicit queueing plus the commitment generation in the `ChannelManager`. This is a bit strange as its ultimately really a `Channel` responsibility to generate commitments at the correct time, with the abstraction leaking into `ChannelManager` with the `send_htlc` and `get_update_fail_htlc` method docs having clear comments about how `send_commitment` MUST be called prior to calling other `Channel` methods. Luckily `Channel` already has an update queue - the holding cell. Thus, we can trivially rewrite the batch update logic as inserting the desired updates into the holding cell and then asking all channels to clear their holding cells.
1 parent 8245128 commit 28df626

File tree

2 files changed

+57
-88
lines changed

2 files changed

+57
-88
lines changed

lightning/src/ln/channel.rs

Lines changed: 48 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1946,7 +1946,20 @@ impl<Signer: Sign> Channel<Signer> {
19461946
/// before we fail backwards.
19471947
/// If we do fail twice, we debug_assert!(false) and return Ok(None). Thus, will always return
19481948
/// Ok(_) if debug assertions are turned on or preconditions are met.
1949-
pub fn get_update_fail_htlc<L: Deref>(&mut self, htlc_id_arg: u64, err_packet: msgs::OnionErrorPacket, logger: &L) -> Result<Option<msgs::UpdateFailHTLC>, ChannelError> where L::Target: Logger {
1949+
pub fn queue_fail_htlc<L: Deref>(&mut self, htlc_id_arg: u64, err_packet: msgs::OnionErrorPacket, logger: &L)
1950+
-> Result<(), ChannelError> where L::Target: Logger {
1951+
self.fail_htlc(htlc_id_arg, err_packet, true, logger)
1952+
.map(|msg_opt| assert!(msg_opt.is_none(), "We forced holding cell?"))
1953+
}
1954+
1955+
/// We can only have one resolution per HTLC. In some cases around reconnect, we may fulfill
1956+
/// an HTLC more than once or fulfill once and then attempt to fail after reconnect. We cannot,
1957+
/// however, fail more than once as we wait for an upstream failure to be irrevocably committed
1958+
/// before we fail backwards.
1959+
/// If we do fail twice, we debug_assert!(false) and return Ok(None). Thus, will always return
1960+
/// Ok(_) if debug assertions are turned on or preconditions are met.
1961+
fn fail_htlc<L: Deref>(&mut self, htlc_id_arg: u64, err_packet: msgs::OnionErrorPacket, mut force_holding_cell: bool, logger: &L)
1962+
-> Result<Option<msgs::UpdateFailHTLC>, ChannelError> where L::Target: Logger {
19501963
if (self.channel_state & (ChannelState::ChannelReady as u32)) != (ChannelState::ChannelReady as u32) {
19511964
panic!("Was asked to fail an HTLC when channel was not in an operational state");
19521965
}
@@ -1984,8 +1997,13 @@ impl<Signer: Sign> Channel<Signer> {
19841997
return Ok(None);
19851998
}
19861999

1987-
// Now update local state:
19882000
if (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32 | ChannelState::PeerDisconnected as u32 | ChannelState::MonitorUpdateInProgress as u32)) != 0 {
2001+
debug_assert!(force_holding_cell, "We don't expect to need to use the holding cell if we weren't trying to");
2002+
force_holding_cell = true;
2003+
}
2004+
2005+
// Now update local state:
2006+
if force_holding_cell {
19892007
for pending_update in self.holding_cell_htlc_updates.iter() {
19902008
match pending_update {
19912009
&HTLCUpdateAwaitingACK::ClaimHTLC { htlc_id, .. } => {
@@ -3171,7 +3189,7 @@ impl<Signer: Sign> Channel<Signer> {
31713189
// to rebalance channels.
31723190
match &htlc_update {
31733191
&HTLCUpdateAwaitingACK::AddHTLC {amount_msat, cltv_expiry, ref payment_hash, ref source, ref onion_routing_packet, ..} => {
3174-
match self.send_htlc(amount_msat, *payment_hash, cltv_expiry, source.clone(), onion_routing_packet.clone(), logger) {
3192+
match self.send_htlc(amount_msat, *payment_hash, cltv_expiry, source.clone(), onion_routing_packet.clone(), false, logger) {
31753193
Ok(update_add_msg_option) => update_add_htlcs.push(update_add_msg_option.unwrap()),
31763194
Err(e) => {
31773195
match e {
@@ -3207,13 +3225,13 @@ impl<Signer: Sign> Channel<Signer> {
32073225
monitor_update.updates.append(&mut additional_monitor_update.updates);
32083226
},
32093227
&HTLCUpdateAwaitingACK::FailHTLC { htlc_id, ref err_packet } => {
3210-
match self.get_update_fail_htlc(htlc_id, err_packet.clone(), logger) {
3228+
match self.fail_htlc(htlc_id, err_packet.clone(), false, logger) {
32113229
Ok(update_fail_msg_option) => {
32123230
// If an HTLC failure was previously added to the holding cell (via
3213-
// `get_update_fail_htlc`) then generating the fail message itself
3214-
// must not fail - we should never end up in a state where we
3215-
// double-fail an HTLC or fail-then-claim an HTLC as it indicates
3216-
// we didn't wait for a full revocation before failing.
3231+
// `fail_htlc`) then generating the fail message itself must not
3232+
// fail - we should never end up in a state where we double-fail an
3233+
// HTLC or fail-then-claim an HTLC as it indicates we didn't wait
3234+
// for a full revocation before failing.
32173235
update_fail_htlcs.push(update_fail_msg_option.unwrap())
32183236
},
32193237
Err(e) => {
@@ -5458,6 +5476,19 @@ impl<Signer: Sign> Channel<Signer> {
54585476

54595477
// Send stuff to our remote peers:
54605478

5479+
/// Queues up an outbound HTLC to send by placing it in the holding cell. You should call
5480+
/// `maybe_free_holding_cell_htlcs` in order to actually generate and send the commitment
5481+
/// update.
5482+
///
5483+
/// If an Err is returned, it's a ChannelError::Ignore!
5484+
pub fn queue_htlc<L: Deref>(&mut self, amount_msat: u64, payment_hash: PaymentHash, cltv_expiry: u32, source: HTLCSource,
5485+
onion_routing_packet: msgs::OnionPacket, logger: &L)
5486+
-> Result<(), ChannelError> where L::Target: Logger {
5487+
self.send_htlc(amount_msat, payment_hash, cltv_expiry, source, onion_routing_packet, true, logger)
5488+
.map(|msg_opt| assert!(msg_opt.is_none(), "We forced holding cell?"))
5489+
5490+
}
5491+
54615492
/// Adds a pending outbound HTLC to this channel, note that you probably want
54625493
/// send_htlc_and_commit instead cause you'll want both messages at once.
54635494
///
@@ -5473,7 +5504,9 @@ impl<Signer: Sign> Channel<Signer> {
54735504
/// You MUST call send_commitment prior to calling any other methods on this Channel!
54745505
///
54755506
/// If an Err is returned, it's a ChannelError::Ignore!
5476-
pub fn send_htlc<L: Deref>(&mut self, amount_msat: u64, payment_hash: PaymentHash, cltv_expiry: u32, source: HTLCSource, onion_routing_packet: msgs::OnionPacket, logger: &L) -> Result<Option<msgs::UpdateAddHTLC>, ChannelError> where L::Target: Logger {
5507+
fn send_htlc<L: Deref>(&mut self, amount_msat: u64, payment_hash: PaymentHash, cltv_expiry: u32, source: HTLCSource,
5508+
onion_routing_packet: msgs::OnionPacket, mut force_holding_cell: bool, logger: &L)
5509+
-> Result<Option<msgs::UpdateAddHTLC>, ChannelError> where L::Target: Logger {
54775510
if (self.channel_state & (ChannelState::ChannelReady as u32 | BOTH_SIDES_SHUTDOWN_MASK)) != (ChannelState::ChannelReady as u32) {
54785511
return Err(ChannelError::Ignore("Cannot send HTLC until channel is fully established and we haven't started shutting down".to_owned()));
54795512
}
@@ -5568,8 +5601,12 @@ impl<Signer: Sign> Channel<Signer> {
55685601
return Err(ChannelError::Ignore(format!("Cannot send value that would put our balance under counterparty-announced channel reserve value ({})", chan_reserve_msat)));
55695602
}
55705603

5571-
// Now update local state:
55725604
if (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32 | ChannelState::MonitorUpdateInProgress as u32)) != 0 {
5605+
force_holding_cell = true;
5606+
}
5607+
5608+
// Now update local state:
5609+
if force_holding_cell {
55735610
self.holding_cell_htlc_updates.push(HTLCUpdateAwaitingACK::AddHTLC {
55745611
amount_msat,
55755612
payment_hash,
@@ -5762,7 +5799,7 @@ impl<Signer: Sign> Channel<Signer> {
57625799
/// Shorthand for calling send_htlc() followed by send_commitment(), see docs on those for
57635800
/// more info.
57645801
pub fn send_htlc_and_commit<L: Deref>(&mut self, amount_msat: u64, payment_hash: PaymentHash, cltv_expiry: u32, source: HTLCSource, onion_routing_packet: msgs::OnionPacket, logger: &L) -> Result<Option<(msgs::UpdateAddHTLC, msgs::CommitmentSigned, ChannelMonitorUpdate)>, ChannelError> where L::Target: Logger {
5765-
match self.send_htlc(amount_msat, payment_hash, cltv_expiry, source, onion_routing_packet, logger)? {
5802+
match self.send_htlc(amount_msat, payment_hash, cltv_expiry, source, onion_routing_packet, false, logger)? {
57665803
Some(update_add_htlc) => {
57675804
let (commitment_signed, monitor_update) = self.send_commitment_no_status_check(logger)?;
57685805
Ok(Some((update_add_htlc, commitment_signed, monitor_update)))

lightning/src/ln/channelmanager.rs

Lines changed: 9 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -3026,7 +3026,6 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
30263026
let mut new_events = Vec::new();
30273027
let mut failed_forwards = Vec::new();
30283028
let mut phantom_receives: Vec<(u64, OutPoint, Vec<(PendingHTLCInfo, u64)>)> = Vec::new();
3029-
let mut handle_errors = Vec::new();
30303029
{
30313030
let mut forward_htlcs = HashMap::new();
30323031
mem::swap(&mut forward_htlcs, &mut self.forward_htlcs.lock().unwrap());
@@ -3142,8 +3141,6 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
31423141
continue;
31433142
},
31443143
hash_map::Entry::Occupied(mut chan) => {
3145-
let mut add_htlc_msgs = Vec::new();
3146-
let mut fail_htlc_msgs = Vec::new();
31473144
for forward_info in pending_forwards.drain(..) {
31483145
match forward_info {
31493146
HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
@@ -3162,7 +3159,7 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
31623159
// Phantom payments are only PendingHTLCRouting::Receive.
31633160
phantom_shared_secret: None,
31643161
});
3165-
match chan.get_mut().send_htlc(outgoing_amt_msat, payment_hash, outgoing_cltv_value, htlc_source.clone(), onion_packet, &self.logger) {
3162+
match chan.get_mut().queue_htlc(outgoing_amt_msat, payment_hash, outgoing_cltv_value, htlc_source.clone(), onion_packet, &self.logger) {
31663163
Err(e) => {
31673164
if let ChannelError::Ignore(msg) = e {
31683165
log_trace!(self.logger, "Failed to forward HTLC with payment_hash {}: {}", log_bytes!(payment_hash.0), msg);
@@ -3176,28 +3173,15 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
31763173
));
31773174
continue;
31783175
},
3179-
Ok(update_add) => {
3180-
match update_add {
3181-
Some(msg) => { add_htlc_msgs.push(msg); },
3182-
None => {
3183-
// Nothing to do here...we're waiting on a remote
3184-
// revoke_and_ack before we can add anymore HTLCs. The Channel
3185-
// will automatically handle building the update_add_htlc and
3186-
// commitment_signed messages when we can.
3187-
// TODO: Do some kind of timer to set the channel as !is_live()
3188-
// as we don't really want others relying on us relaying through
3189-
// this channel currently :/.
3190-
}
3191-
}
3192-
}
3176+
Ok(()) => { },
31933177
}
31943178
},
31953179
HTLCForwardInfo::AddHTLC { .. } => {
31963180
panic!("short_channel_id != 0 should imply any pending_forward entries are of type Forward");
31973181
},
31983182
HTLCForwardInfo::FailHTLC { htlc_id, err_packet } => {
31993183
log_trace!(self.logger, "Failing HTLC back to channel with short id {} (backward HTLC ID {}) after delay", short_chan_id, htlc_id);
3200-
match chan.get_mut().get_update_fail_htlc(htlc_id, err_packet, &self.logger) {
3184+
match chan.get_mut().queue_fail_htlc(htlc_id, err_packet, &self.logger) {
32013185
Err(e) => {
32023186
if let ChannelError::Ignore(msg) = e {
32033187
log_trace!(self.logger, "Failed to fail HTLC with ID {} backwards to short_id {}: {}", htlc_id, short_chan_id, msg);
@@ -3209,65 +3193,11 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
32093193
// the chain and sending the HTLC-Timeout is their problem.
32103194
continue;
32113195
},
3212-
Ok(Some(msg)) => { fail_htlc_msgs.push(msg); },
3213-
Ok(None) => {
3214-
// Nothing to do here...we're waiting on a remote
3215-
// revoke_and_ack before we can update the commitment
3216-
// transaction. The Channel will automatically handle
3217-
// building the update_fail_htlc and commitment_signed
3218-
// messages when we can.
3219-
// We don't need any kind of timer here as they should fail
3220-
// the channel onto the chain if they can't get our
3221-
// update_fail_htlc in time, it's not our problem.
3222-
}
3196+
Ok(()) => { },
32233197
}
32243198
},
32253199
}
32263200
}
3227-
3228-
if !add_htlc_msgs.is_empty() || !fail_htlc_msgs.is_empty() {
3229-
let (commitment_msg, monitor_update) = match chan.get_mut().send_commitment(&self.logger) {
3230-
Ok(res) => res,
3231-
Err(e) => {
3232-
// We surely failed send_commitment due to bad keys, in that case
3233-
// close channel and then send error message to peer.
3234-
let counterparty_node_id = chan.get().get_counterparty_node_id();
3235-
let err: Result<(), _> = match e {
3236-
ChannelError::Ignore(_) | ChannelError::Warn(_) => {
3237-
panic!("Stated return value requirements in send_commitment() were not met");
3238-
}
3239-
ChannelError::Close(msg) => {
3240-
log_trace!(self.logger, "Closing channel {} due to Close-required error: {}", log_bytes!(chan.key()[..]), msg);
3241-
let mut channel = remove_channel!(self, chan);
3242-
// ChannelClosed event is generated by handle_error for us.
3243-
Err(MsgHandleErrInternal::from_finish_shutdown(msg, channel.channel_id(), channel.get_user_id(), channel.force_shutdown(true), self.get_channel_update_for_broadcast(&channel).ok()))
3244-
},
3245-
};
3246-
handle_errors.push((counterparty_node_id, err));
3247-
continue;
3248-
}
3249-
};
3250-
match self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), monitor_update) {
3251-
ChannelMonitorUpdateStatus::Completed => {},
3252-
e => {
3253-
handle_errors.push((chan.get().get_counterparty_node_id(), handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::CommitmentFirst, false, true)));
3254-
continue;
3255-
}
3256-
}
3257-
log_debug!(self.logger, "Forwarding HTLCs resulted in a commitment update with {} HTLCs added and {} HTLCs failed for channel {}",
3258-
add_htlc_msgs.len(), fail_htlc_msgs.len(), log_bytes!(chan.get().channel_id()));
3259-
channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
3260-
node_id: chan.get().get_counterparty_node_id(),
3261-
updates: msgs::CommitmentUpdate {
3262-
update_add_htlcs: add_htlc_msgs,
3263-
update_fulfill_htlcs: Vec::new(),
3264-
update_fail_htlcs: fail_htlc_msgs,
3265-
update_fail_malformed_htlcs: Vec::new(),
3266-
update_fee: None,
3267-
commitment_signed: commitment_msg,
3268-
},
3269-
});
3270-
}
32713201
}
32723202
}
32733203
} else {
@@ -3457,9 +3387,11 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
34573387
}
34583388
self.forward_htlcs(&mut phantom_receives);
34593389

3460-
for (counterparty_node_id, err) in handle_errors.drain(..) {
3461-
let _ = handle_error!(self, err, counterparty_node_id);
3462-
}
3390+
// Freeing the holding cell here is relatively redundant - in practice we'll do it when we
3391+
// next get a `get_and_clear_pending_msg_events` call, but some tests rely on it, and it's
3392+
// nice to do the work now if we can rather than while we're trying to get messages in the
3393+
// network stack.
3394+
self.check_free_holding_cells();
34633395

34643396
if new_events.is_empty() { return }
34653397
let mut events = self.pending_events.lock().unwrap();

0 commit comments

Comments
 (0)