Skip to content

Commit 0954526

Browse files
committed
Lean on the holding cell when batch-forwarding/failing HTLCs
When we batch HTLC updates, we currently do the explicit queueing plus the commitment generation in the `ChannelManager`. This is a bit strange as its ultimately really a `Channel` responsibility to generate commitments at the correct time, with the abstraction leaking into `ChannelManager` with the `send_htlc` and `get_update_fail_htlc` method docs having clear comments about how `send_commitment` MUST be called prior to calling other `Channel` methods. Luckily `Channel` already has an update queue - the holding cell. Thus, we can trivially rewrite the batch update logic as inserting the desired updates into the holding cell and then asking all channels to clear their holding cells.
1 parent 087c0bd commit 0954526

File tree

2 files changed

+57
-88
lines changed

2 files changed

+57
-88
lines changed

lightning/src/ln/channel.rs

Lines changed: 48 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1948,7 +1948,20 @@ impl<Signer: Sign> Channel<Signer> {
19481948
/// before we fail backwards.
19491949
/// If we do fail twice, we debug_assert!(false) and return Ok(None). Thus, will always return
19501950
/// Ok(_) if debug assertions are turned on or preconditions are met.
1951-
pub fn get_update_fail_htlc<L: Deref>(&mut self, htlc_id_arg: u64, err_packet: msgs::OnionErrorPacket, logger: &L) -> Result<Option<msgs::UpdateFailHTLC>, ChannelError> where L::Target: Logger {
1951+
pub fn queue_fail_htlc<L: Deref>(&mut self, htlc_id_arg: u64, err_packet: msgs::OnionErrorPacket, logger: &L)
1952+
-> Result<(), ChannelError> where L::Target: Logger {
1953+
self.fail_htlc(htlc_id_arg, err_packet, true, logger)
1954+
.map(|msg_opt| assert!(msg_opt.is_none(), "We forced holding cell?"))
1955+
}
1956+
1957+
/// We can only have one resolution per HTLC. In some cases around reconnect, we may fulfill
1958+
/// an HTLC more than once or fulfill once and then attempt to fail after reconnect. We cannot,
1959+
/// however, fail more than once as we wait for an upstream failure to be irrevocably committed
1960+
/// before we fail backwards.
1961+
/// If we do fail twice, we debug_assert!(false) and return Ok(None). Thus, will always return
1962+
/// Ok(_) if debug assertions are turned on or preconditions are met.
1963+
fn fail_htlc<L: Deref>(&mut self, htlc_id_arg: u64, err_packet: msgs::OnionErrorPacket, mut force_holding_cell: bool, logger: &L)
1964+
-> Result<Option<msgs::UpdateFailHTLC>, ChannelError> where L::Target: Logger {
19521965
if (self.channel_state & (ChannelState::ChannelReady as u32)) != (ChannelState::ChannelReady as u32) {
19531966
panic!("Was asked to fail an HTLC when channel was not in an operational state");
19541967
}
@@ -1986,8 +1999,13 @@ impl<Signer: Sign> Channel<Signer> {
19861999
return Ok(None);
19872000
}
19882001

1989-
// Now update local state:
19902002
if (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32 | ChannelState::PeerDisconnected as u32 | ChannelState::MonitorUpdateInProgress as u32)) != 0 {
2003+
debug_assert!(force_holding_cell, "We don't expect to need to use the holding cell if we weren't trying to");
2004+
force_holding_cell = true;
2005+
}
2006+
2007+
// Now update local state:
2008+
if force_holding_cell {
19912009
for pending_update in self.holding_cell_htlc_updates.iter() {
19922010
match pending_update {
19932011
&HTLCUpdateAwaitingACK::ClaimHTLC { htlc_id, .. } => {
@@ -3173,7 +3191,7 @@ impl<Signer: Sign> Channel<Signer> {
31733191
// to rebalance channels.
31743192
match &htlc_update {
31753193
&HTLCUpdateAwaitingACK::AddHTLC {amount_msat, cltv_expiry, ref payment_hash, ref source, ref onion_routing_packet, ..} => {
3176-
match self.send_htlc(amount_msat, *payment_hash, cltv_expiry, source.clone(), onion_routing_packet.clone(), logger) {
3194+
match self.send_htlc(amount_msat, *payment_hash, cltv_expiry, source.clone(), onion_routing_packet.clone(), false, logger) {
31773195
Ok(update_add_msg_option) => update_add_htlcs.push(update_add_msg_option.unwrap()),
31783196
Err(e) => {
31793197
match e {
@@ -3209,13 +3227,13 @@ impl<Signer: Sign> Channel<Signer> {
32093227
monitor_update.updates.append(&mut additional_monitor_update.updates);
32103228
},
32113229
&HTLCUpdateAwaitingACK::FailHTLC { htlc_id, ref err_packet } => {
3212-
match self.get_update_fail_htlc(htlc_id, err_packet.clone(), logger) {
3230+
match self.fail_htlc(htlc_id, err_packet.clone(), false, logger) {
32133231
Ok(update_fail_msg_option) => {
32143232
// If an HTLC failure was previously added to the holding cell (via
3215-
// `get_update_fail_htlc`) then generating the fail message itself
3216-
// must not fail - we should never end up in a state where we
3217-
// double-fail an HTLC or fail-then-claim an HTLC as it indicates
3218-
// we didn't wait for a full revocation before failing.
3233+
// `fail_htlc`) then generating the fail message itself must not
3234+
// fail - we should never end up in a state where we double-fail an
3235+
// HTLC or fail-then-claim an HTLC as it indicates we didn't wait
3236+
// for a full revocation before failing.
32193237
update_fail_htlcs.push(update_fail_msg_option.unwrap())
32203238
},
32213239
Err(e) => {
@@ -5499,6 +5517,19 @@ impl<Signer: Sign> Channel<Signer> {
54995517

55005518
// Send stuff to our remote peers:
55015519

5520+
/// Queues up an outbound HTLC to send by placing it in the holding cell. You should call
5521+
/// `maybe_free_holding_cell_htlcs` in order to actually generate and send the commitment
5522+
/// update.
5523+
///
5524+
/// If an Err is returned, it's a ChannelError::Ignore!
5525+
pub fn queue_htlc<L: Deref>(&mut self, amount_msat: u64, payment_hash: PaymentHash, cltv_expiry: u32, source: HTLCSource,
5526+
onion_routing_packet: msgs::OnionPacket, logger: &L)
5527+
-> Result<(), ChannelError> where L::Target: Logger {
5528+
self.send_htlc(amount_msat, payment_hash, cltv_expiry, source, onion_routing_packet, true, logger)
5529+
.map(|msg_opt| assert!(msg_opt.is_none(), "We forced holding cell?"))
5530+
5531+
}
5532+
55025533
/// Adds a pending outbound HTLC to this channel, note that you probably want
55035534
/// send_htlc_and_commit instead cause you'll want both messages at once.
55045535
///
@@ -5514,7 +5545,9 @@ impl<Signer: Sign> Channel<Signer> {
55145545
/// You MUST call send_commitment prior to calling any other methods on this Channel!
55155546
///
55165547
/// If an Err is returned, it's a ChannelError::Ignore!
5517-
pub fn send_htlc<L: Deref>(&mut self, amount_msat: u64, payment_hash: PaymentHash, cltv_expiry: u32, source: HTLCSource, onion_routing_packet: msgs::OnionPacket, logger: &L) -> Result<Option<msgs::UpdateAddHTLC>, ChannelError> where L::Target: Logger {
5548+
fn send_htlc<L: Deref>(&mut self, amount_msat: u64, payment_hash: PaymentHash, cltv_expiry: u32, source: HTLCSource,
5549+
onion_routing_packet: msgs::OnionPacket, mut force_holding_cell: bool, logger: &L)
5550+
-> Result<Option<msgs::UpdateAddHTLC>, ChannelError> where L::Target: Logger {
55185551
if (self.channel_state & (ChannelState::ChannelReady as u32 | BOTH_SIDES_SHUTDOWN_MASK)) != (ChannelState::ChannelReady as u32) {
55195552
return Err(ChannelError::Ignore("Cannot send HTLC until channel is fully established and we haven't started shutting down".to_owned()));
55205553
}
@@ -5609,8 +5642,12 @@ impl<Signer: Sign> Channel<Signer> {
56095642
return Err(ChannelError::Ignore(format!("Cannot send value that would put our balance under counterparty-announced channel reserve value ({})", chan_reserve_msat)));
56105643
}
56115644

5612-
// Now update local state:
56135645
if (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32 | ChannelState::MonitorUpdateInProgress as u32)) != 0 {
5646+
force_holding_cell = true;
5647+
}
5648+
5649+
// Now update local state:
5650+
if force_holding_cell {
56145651
self.holding_cell_htlc_updates.push(HTLCUpdateAwaitingACK::AddHTLC {
56155652
amount_msat,
56165653
payment_hash,
@@ -5803,7 +5840,7 @@ impl<Signer: Sign> Channel<Signer> {
58035840
/// Shorthand for calling send_htlc() followed by send_commitment(), see docs on those for
58045841
/// more info.
58055842
pub fn send_htlc_and_commit<L: Deref>(&mut self, amount_msat: u64, payment_hash: PaymentHash, cltv_expiry: u32, source: HTLCSource, onion_routing_packet: msgs::OnionPacket, logger: &L) -> Result<Option<(msgs::UpdateAddHTLC, msgs::CommitmentSigned, ChannelMonitorUpdate)>, ChannelError> where L::Target: Logger {
5806-
match self.send_htlc(amount_msat, payment_hash, cltv_expiry, source, onion_routing_packet, logger)? {
5843+
match self.send_htlc(amount_msat, payment_hash, cltv_expiry, source, onion_routing_packet, false, logger)? {
58075844
Some(update_add_htlc) => {
58085845
let (commitment_signed, monitor_update) = self.send_commitment_no_status_check(logger)?;
58095846
Ok(Some((update_add_htlc, commitment_signed, monitor_update)))

lightning/src/ln/channelmanager.rs

Lines changed: 9 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -3148,7 +3148,6 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
31483148
let mut new_events = Vec::new();
31493149
let mut failed_forwards = Vec::new();
31503150
let mut phantom_receives: Vec<(u64, OutPoint, Vec<(PendingHTLCInfo, u64)>)> = Vec::new();
3151-
let mut handle_errors = Vec::new();
31523151
{
31533152
let mut forward_htlcs = HashMap::new();
31543153
mem::swap(&mut forward_htlcs, &mut self.forward_htlcs.lock().unwrap());
@@ -3264,8 +3263,6 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
32643263
continue;
32653264
},
32663265
hash_map::Entry::Occupied(mut chan) => {
3267-
let mut add_htlc_msgs = Vec::new();
3268-
let mut fail_htlc_msgs = Vec::new();
32693266
for forward_info in pending_forwards.drain(..) {
32703267
match forward_info {
32713268
HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
@@ -3284,7 +3281,7 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
32843281
// Phantom payments are only PendingHTLCRouting::Receive.
32853282
phantom_shared_secret: None,
32863283
});
3287-
match chan.get_mut().send_htlc(outgoing_amt_msat, payment_hash, outgoing_cltv_value, htlc_source.clone(), onion_packet, &self.logger) {
3284+
match chan.get_mut().queue_htlc(outgoing_amt_msat, payment_hash, outgoing_cltv_value, htlc_source.clone(), onion_packet, &self.logger) {
32883285
Err(e) => {
32893286
if let ChannelError::Ignore(msg) = e {
32903287
log_trace!(self.logger, "Failed to forward HTLC with payment_hash {}: {}", log_bytes!(payment_hash.0), msg);
@@ -3298,28 +3295,15 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
32983295
));
32993296
continue;
33003297
},
3301-
Ok(update_add) => {
3302-
match update_add {
3303-
Some(msg) => { add_htlc_msgs.push(msg); },
3304-
None => {
3305-
// Nothing to do here...we're waiting on a remote
3306-
// revoke_and_ack before we can add anymore HTLCs. The Channel
3307-
// will automatically handle building the update_add_htlc and
3308-
// commitment_signed messages when we can.
3309-
// TODO: Do some kind of timer to set the channel as !is_live()
3310-
// as we don't really want others relying on us relaying through
3311-
// this channel currently :/.
3312-
}
3313-
}
3314-
}
3298+
Ok(()) => { },
33153299
}
33163300
},
33173301
HTLCForwardInfo::AddHTLC { .. } => {
33183302
panic!("short_channel_id != 0 should imply any pending_forward entries are of type Forward");
33193303
},
33203304
HTLCForwardInfo::FailHTLC { htlc_id, err_packet } => {
33213305
log_trace!(self.logger, "Failing HTLC back to channel with short id {} (backward HTLC ID {}) after delay", short_chan_id, htlc_id);
3322-
match chan.get_mut().get_update_fail_htlc(htlc_id, err_packet, &self.logger) {
3306+
match chan.get_mut().queue_fail_htlc(htlc_id, err_packet, &self.logger) {
33233307
Err(e) => {
33243308
if let ChannelError::Ignore(msg) = e {
33253309
log_trace!(self.logger, "Failed to fail HTLC with ID {} backwards to short_id {}: {}", htlc_id, short_chan_id, msg);
@@ -3331,65 +3315,11 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
33313315
// the chain and sending the HTLC-Timeout is their problem.
33323316
continue;
33333317
},
3334-
Ok(Some(msg)) => { fail_htlc_msgs.push(msg); },
3335-
Ok(None) => {
3336-
// Nothing to do here...we're waiting on a remote
3337-
// revoke_and_ack before we can update the commitment
3338-
// transaction. The Channel will automatically handle
3339-
// building the update_fail_htlc and commitment_signed
3340-
// messages when we can.
3341-
// We don't need any kind of timer here as they should fail
3342-
// the channel onto the chain if they can't get our
3343-
// update_fail_htlc in time, it's not our problem.
3344-
}
3318+
Ok(()) => { },
33453319
}
33463320
},
33473321
}
33483322
}
3349-
3350-
if !add_htlc_msgs.is_empty() || !fail_htlc_msgs.is_empty() {
3351-
let (commitment_msg, monitor_update) = match chan.get_mut().send_commitment(&self.logger) {
3352-
Ok(res) => res,
3353-
Err(e) => {
3354-
// We surely failed send_commitment due to bad keys, in that case
3355-
// close channel and then send error message to peer.
3356-
let counterparty_node_id = chan.get().get_counterparty_node_id();
3357-
let err: Result<(), _> = match e {
3358-
ChannelError::Ignore(_) | ChannelError::Warn(_) => {
3359-
panic!("Stated return value requirements in send_commitment() were not met");
3360-
}
3361-
ChannelError::Close(msg) => {
3362-
log_trace!(self.logger, "Closing channel {} due to Close-required error: {}", log_bytes!(chan.key()[..]), msg);
3363-
let mut channel = remove_channel!(self, chan);
3364-
// ChannelClosed event is generated by handle_error for us.
3365-
Err(MsgHandleErrInternal::from_finish_shutdown(msg, channel.channel_id(), channel.get_user_id(), channel.force_shutdown(true), self.get_channel_update_for_broadcast(&channel).ok()))
3366-
},
3367-
};
3368-
handle_errors.push((counterparty_node_id, err));
3369-
continue;
3370-
}
3371-
};
3372-
match self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), monitor_update) {
3373-
ChannelMonitorUpdateStatus::Completed => {},
3374-
e => {
3375-
handle_errors.push((chan.get().get_counterparty_node_id(), handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::CommitmentFirst, false, true)));
3376-
continue;
3377-
}
3378-
}
3379-
log_debug!(self.logger, "Forwarding HTLCs resulted in a commitment update with {} HTLCs added and {} HTLCs failed for channel {}",
3380-
add_htlc_msgs.len(), fail_htlc_msgs.len(), log_bytes!(chan.get().channel_id()));
3381-
channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
3382-
node_id: chan.get().get_counterparty_node_id(),
3383-
updates: msgs::CommitmentUpdate {
3384-
update_add_htlcs: add_htlc_msgs,
3385-
update_fulfill_htlcs: Vec::new(),
3386-
update_fail_htlcs: fail_htlc_msgs,
3387-
update_fail_malformed_htlcs: Vec::new(),
3388-
update_fee: None,
3389-
commitment_signed: commitment_msg,
3390-
},
3391-
});
3392-
}
33933323
}
33943324
}
33953325
} else {
@@ -3578,9 +3508,11 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
35783508
}
35793509
self.forward_htlcs(&mut phantom_receives);
35803510

3581-
for (counterparty_node_id, err) in handle_errors.drain(..) {
3582-
let _ = handle_error!(self, err, counterparty_node_id);
3583-
}
3511+
// Freeing the holding cell here is relatively redundant - in practice we'll do it when we
3512+
// next get a `get_and_clear_pending_msg_events` call, but some tests rely on it, and it's
3513+
// nice to do the work now if we can rather than while we're trying to get messages in the
3514+
// network stack.
3515+
self.check_free_holding_cells();
35843516

35853517
if new_events.is_empty() { return }
35863518
let mut events = self.pending_events.lock().unwrap();

0 commit comments

Comments
 (0)