Skip to content

Commit b0187f5

Browse files
committed
Lean on the holding cell when batch-forwarding/failing HTLCs
When we batch HTLC updates, we currently do the explicit queueing plus the commitment generation in the `ChannelManager`. This is a bit strange as its ultimately really a `Channel` responsibility to generate commitments at the correct time, with the abstraction leaking into `ChannelManager` with the `send_htlc` and `get_update_fail_htlc` method docs having clear comments about how `send_commitment` MUST be called prior to calling other `Channel` methods. Luckily `Channel` already has an update queue - the holding cell. Thus, we can trivially rewrite the batch update logic as inserting the desired updates into the holding cell and then asking all channels to clear their holding cells.
1 parent 52edb35 commit b0187f5

File tree

2 files changed

+83
-118
lines changed

2 files changed

+83
-118
lines changed

lightning/src/ln/channel.rs

Lines changed: 53 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1942,13 +1942,27 @@ impl<Signer: Sign> Channel<Signer> {
19421942
}
19431943
}
19441944

1945+
/// We can only have one resolution per HTLC. In some cases around reconnect, we may fulfill
1946+
/// an HTLC more than once or fulfill once and then attempt to fail after reconnect. We cannot,
1947+
/// however, fail more than once as we wait for an upstream failure to be irrevocably committed
1948+
/// before we fail backwards.
1949+
///
1950+
/// If we do fail twice, we debug_assert!(false) and return Ok(()). Thus, will always return
1951+
/// Ok(()) if debug assertions are turned on or preconditions are met.
1952+
pub fn queue_fail_htlc<L: Deref>(&mut self, htlc_id_arg: u64, err_packet: msgs::OnionErrorPacket, logger: &L)
1953+
-> Result<(), ChannelError> where L::Target: Logger {
1954+
self.fail_htlc(htlc_id_arg, err_packet, true, logger)
1955+
.map(|msg_opt| assert!(msg_opt.is_none(), "We forced holding cell?"))
1956+
}
1957+
19451958
/// We can only have one resolution per HTLC. In some cases around reconnect, we may fulfill
19461959
/// an HTLC more than once or fulfill once and then attempt to fail after reconnect. We cannot,
19471960
/// however, fail more than once as we wait for an upstream failure to be irrevocably committed
19481961
/// before we fail backwards.
19491962
/// If we do fail twice, we debug_assert!(false) and return Ok(None). Thus, will always return
19501963
/// Ok(_) if debug assertions are turned on or preconditions are met.
1951-
pub fn get_update_fail_htlc<L: Deref>(&mut self, htlc_id_arg: u64, err_packet: msgs::OnionErrorPacket, logger: &L) -> Result<Option<msgs::UpdateFailHTLC>, ChannelError> where L::Target: Logger {
1964+
fn fail_htlc<L: Deref>(&mut self, htlc_id_arg: u64, err_packet: msgs::OnionErrorPacket, mut force_holding_cell: bool, logger: &L)
1965+
-> Result<Option<msgs::UpdateFailHTLC>, ChannelError> where L::Target: Logger {
19521966
if (self.channel_state & (ChannelState::ChannelReady as u32)) != (ChannelState::ChannelReady as u32) {
19531967
panic!("Was asked to fail an HTLC when channel was not in an operational state");
19541968
}
@@ -1986,8 +2000,13 @@ impl<Signer: Sign> Channel<Signer> {
19862000
return Ok(None);
19872001
}
19882002

1989-
// Now update local state:
19902003
if (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32 | ChannelState::PeerDisconnected as u32 | ChannelState::MonitorUpdateInProgress as u32)) != 0 {
2004+
debug_assert!(force_holding_cell, "We don't expect to need to use the holding cell if we weren't trying to");
2005+
force_holding_cell = true;
2006+
}
2007+
2008+
// Now update local state:
2009+
if force_holding_cell {
19912010
for pending_update in self.holding_cell_htlc_updates.iter() {
19922011
match pending_update {
19932012
&HTLCUpdateAwaitingACK::ClaimHTLC { htlc_id, .. } => {
@@ -3146,8 +3165,8 @@ impl<Signer: Sign> Channel<Signer> {
31463165
} else { Ok((None, Vec::new())) }
31473166
}
31483167

3149-
/// Used to fulfill holding_cell_htlcs when we get a remote ack (or implicitly get it by them
3150-
/// fulfilling or failing the last pending HTLC)
3168+
/// Frees any pending commitment updates in the holding cell, generating the relevant messages
3169+
/// for our counterparty.
31513170
fn free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> Result<(Option<(msgs::CommitmentUpdate, ChannelMonitorUpdate)>, Vec<(HTLCSource, PaymentHash)>), ChannelError> where L::Target: Logger {
31523171
assert_eq!(self.channel_state & ChannelState::MonitorUpdateInProgress as u32, 0);
31533172
if self.holding_cell_htlc_updates.len() != 0 || self.holding_cell_update_fee.is_some() {
@@ -3173,7 +3192,7 @@ impl<Signer: Sign> Channel<Signer> {
31733192
// to rebalance channels.
31743193
match &htlc_update {
31753194
&HTLCUpdateAwaitingACK::AddHTLC {amount_msat, cltv_expiry, ref payment_hash, ref source, ref onion_routing_packet, ..} => {
3176-
match self.send_htlc(amount_msat, *payment_hash, cltv_expiry, source.clone(), onion_routing_packet.clone(), logger) {
3195+
match self.send_htlc(amount_msat, *payment_hash, cltv_expiry, source.clone(), onion_routing_packet.clone(), false, logger) {
31773196
Ok(update_add_msg_option) => update_add_htlcs.push(update_add_msg_option.unwrap()),
31783197
Err(e) => {
31793198
match e {
@@ -3209,13 +3228,13 @@ impl<Signer: Sign> Channel<Signer> {
32093228
monitor_update.updates.append(&mut additional_monitor_update.updates);
32103229
},
32113230
&HTLCUpdateAwaitingACK::FailHTLC { htlc_id, ref err_packet } => {
3212-
match self.get_update_fail_htlc(htlc_id, err_packet.clone(), logger) {
3231+
match self.fail_htlc(htlc_id, err_packet.clone(), false, logger) {
32133232
Ok(update_fail_msg_option) => {
32143233
// If an HTLC failure was previously added to the holding cell (via
3215-
// `get_update_fail_htlc`) then generating the fail message itself
3216-
// must not fail - we should never end up in a state where we
3217-
// double-fail an HTLC or fail-then-claim an HTLC as it indicates
3218-
// we didn't wait for a full revocation before failing.
3234+
// `queue_fail_htlc`) then generating the fail message itself must
3235+
// not fail - we should never end up in a state where we double-fail
3236+
// an HTLC or fail-then-claim an HTLC as it indicates we didn't wait
3237+
// for a full revocation before failing.
32193238
update_fail_htlcs.push(update_fail_msg_option.unwrap())
32203239
},
32213240
Err(e) => {
@@ -5470,6 +5489,19 @@ impl<Signer: Sign> Channel<Signer> {
54705489

54715490
// Send stuff to our remote peers:
54725491

5492+
/// Queues up an outbound HTLC to send by placing it in the holding cell. You should call
5493+
/// `maybe_free_holding_cell_htlcs` in order to actually generate and send the commitment
5494+
/// update.
5495+
///
5496+
/// If an Err is returned, it's a ChannelError::Ignore!
5497+
pub fn queue_htlc<L: Deref>(&mut self, amount_msat: u64, payment_hash: PaymentHash, cltv_expiry: u32, source: HTLCSource,
5498+
onion_routing_packet: msgs::OnionPacket, logger: &L)
5499+
-> Result<(), ChannelError> where L::Target: Logger {
5500+
self.send_htlc(amount_msat, payment_hash, cltv_expiry, source, onion_routing_packet, true, logger)
5501+
.map(|msg_opt| assert!(msg_opt.is_none(), "We forced holding cell?"))
5502+
5503+
}
5504+
54735505
/// Adds a pending outbound HTLC to this channel, note that you probably want
54745506
/// send_htlc_and_commit instead cause you'll want both messages at once.
54755507
///
@@ -5482,10 +5514,13 @@ impl<Signer: Sign> Channel<Signer> {
54825514
/// we may not yet have sent the previous commitment update messages and will need to
54835515
/// regenerate them.
54845516
///
5485-
/// You MUST call send_commitment prior to calling any other methods on this Channel!
5517+
/// You MUST call send_commitment prior to calling any other methods on this Channel if
5518+
/// `force_holding_cell` is false.
54865519
///
54875520
/// If an Err is returned, it's a ChannelError::Ignore!
5488-
pub fn send_htlc<L: Deref>(&mut self, amount_msat: u64, payment_hash: PaymentHash, cltv_expiry: u32, source: HTLCSource, onion_routing_packet: msgs::OnionPacket, logger: &L) -> Result<Option<msgs::UpdateAddHTLC>, ChannelError> where L::Target: Logger {
5521+
fn send_htlc<L: Deref>(&mut self, amount_msat: u64, payment_hash: PaymentHash, cltv_expiry: u32, source: HTLCSource,
5522+
onion_routing_packet: msgs::OnionPacket, mut force_holding_cell: bool, logger: &L)
5523+
-> Result<Option<msgs::UpdateAddHTLC>, ChannelError> where L::Target: Logger {
54895524
if (self.channel_state & (ChannelState::ChannelReady as u32 | BOTH_SIDES_SHUTDOWN_MASK)) != (ChannelState::ChannelReady as u32) {
54905525
return Err(ChannelError::Ignore("Cannot send HTLC until channel is fully established and we haven't started shutting down".to_owned()));
54915526
}
@@ -5580,8 +5615,12 @@ impl<Signer: Sign> Channel<Signer> {
55805615
return Err(ChannelError::Ignore(format!("Cannot send value that would put our balance under counterparty-announced channel reserve value ({})", chan_reserve_msat)));
55815616
}
55825617

5583-
// Now update local state:
55845618
if (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32 | ChannelState::MonitorUpdateInProgress as u32)) != 0 {
5619+
force_holding_cell = true;
5620+
}
5621+
5622+
// Now update local state:
5623+
if force_holding_cell {
55855624
self.holding_cell_htlc_updates.push(HTLCUpdateAwaitingACK::AddHTLC {
55865625
amount_msat,
55875626
payment_hash,
@@ -5774,7 +5813,7 @@ impl<Signer: Sign> Channel<Signer> {
57745813
/// Shorthand for calling send_htlc() followed by send_commitment(), see docs on those for
57755814
/// more info.
57765815
pub fn send_htlc_and_commit<L: Deref>(&mut self, amount_msat: u64, payment_hash: PaymentHash, cltv_expiry: u32, source: HTLCSource, onion_routing_packet: msgs::OnionPacket, logger: &L) -> Result<Option<(msgs::UpdateAddHTLC, msgs::CommitmentSigned, ChannelMonitorUpdate)>, ChannelError> where L::Target: Logger {
5777-
match self.send_htlc(amount_msat, payment_hash, cltv_expiry, source, onion_routing_packet, logger)? {
5816+
match self.send_htlc(amount_msat, payment_hash, cltv_expiry, source, onion_routing_packet, false, logger)? {
57785817
Some(update_add_htlc) => {
57795818
let (commitment_signed, monitor_update) = self.send_commitment_no_status_check(logger)?;
57805819
Ok(Some((update_add_htlc, commitment_signed, monitor_update)))

lightning/src/ln/channelmanager.rs

Lines changed: 30 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -3170,7 +3170,6 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
31703170
let mut new_events = Vec::new();
31713171
let mut failed_forwards = Vec::new();
31723172
let mut phantom_receives: Vec<(u64, OutPoint, u128, Vec<(PendingHTLCInfo, u64)>)> = Vec::new();
3173-
let mut handle_errors = Vec::new();
31743173
{
31753174
let mut forward_htlcs = HashMap::new();
31763175
mem::swap(&mut forward_htlcs, &mut self.forward_htlcs.lock().unwrap());
@@ -3286,8 +3285,6 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
32863285
continue;
32873286
},
32883287
hash_map::Entry::Occupied(mut chan) => {
3289-
let mut add_htlc_msgs = Vec::new();
3290-
let mut fail_htlc_msgs = Vec::new();
32913288
for forward_info in pending_forwards.drain(..) {
32923289
match forward_info {
32933290
HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
@@ -3306,112 +3303,44 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
33063303
// Phantom payments are only PendingHTLCRouting::Receive.
33073304
phantom_shared_secret: None,
33083305
});
3309-
match chan.get_mut().send_htlc(outgoing_amt_msat, payment_hash, outgoing_cltv_value, htlc_source.clone(), onion_packet, &self.logger) {
3310-
Err(e) => {
3311-
if let ChannelError::Ignore(msg) = e {
3312-
log_trace!(self.logger, "Failed to forward HTLC with payment_hash {}: {}", log_bytes!(payment_hash.0), msg);
3313-
} else {
3314-
panic!("Stated return value requirements in send_htlc() were not met");
3315-
}
3316-
let (failure_code, data) = self.get_htlc_temp_fail_err_and_data(0x1000|7, short_chan_id, chan.get());
3317-
failed_forwards.push((htlc_source, payment_hash,
3318-
HTLCFailReason::reason(failure_code, data),
3319-
HTLCDestination::NextHopChannel { node_id: Some(chan.get().get_counterparty_node_id()), channel_id: forward_chan_id }
3320-
));
3321-
continue;
3322-
},
3323-
Ok(update_add) => {
3324-
match update_add {
3325-
Some(msg) => { add_htlc_msgs.push(msg); },
3326-
None => {
3327-
// Nothing to do here...we're waiting on a remote
3328-
// revoke_and_ack before we can add anymore HTLCs. The Channel
3329-
// will automatically handle building the update_add_htlc and
3330-
// commitment_signed messages when we can.
3331-
// TODO: Do some kind of timer to set the channel as !is_live()
3332-
// as we don't really want others relying on us relaying through
3333-
// this channel currently :/.
3334-
}
3335-
}
3306+
if let Err(e) = chan.get_mut().queue_htlc(outgoing_amt_msat,
3307+
payment_hash, outgoing_cltv_value, htlc_source.clone(),
3308+
onion_packet, &self.logger)
3309+
{
3310+
if let ChannelError::Ignore(msg) = e {
3311+
log_trace!(self.logger, "Failed to forward HTLC with payment_hash {}: {}", log_bytes!(payment_hash.0), msg);
3312+
} else {
3313+
panic!("Stated return value requirements in send_htlc() were not met");
33363314
}
3315+
let (failure_code, data) = self.get_htlc_temp_fail_err_and_data(0x1000|7, short_chan_id, chan.get());
3316+
failed_forwards.push((htlc_source, payment_hash,
3317+
HTLCFailReason::reason(failure_code, data),
3318+
HTLCDestination::NextHopChannel { node_id: Some(chan.get().get_counterparty_node_id()), channel_id: forward_chan_id }
3319+
));
3320+
continue;
33373321
}
33383322
},
33393323
HTLCForwardInfo::AddHTLC { .. } => {
33403324
panic!("short_channel_id != 0 should imply any pending_forward entries are of type Forward");
33413325
},
33423326
HTLCForwardInfo::FailHTLC { htlc_id, err_packet } => {
33433327
log_trace!(self.logger, "Failing HTLC back to channel with short id {} (backward HTLC ID {}) after delay", short_chan_id, htlc_id);
3344-
match chan.get_mut().get_update_fail_htlc(htlc_id, err_packet, &self.logger) {
3345-
Err(e) => {
3346-
if let ChannelError::Ignore(msg) = e {
3347-
log_trace!(self.logger, "Failed to fail HTLC with ID {} backwards to short_id {}: {}", htlc_id, short_chan_id, msg);
3348-
} else {
3349-
panic!("Stated return value requirements in get_update_fail_htlc() were not met");
3350-
}
3351-
// fail-backs are best-effort, we probably already have one
3352-
// pending, and if not that's OK, if not, the channel is on
3353-
// the chain and sending the HTLC-Timeout is their problem.
3354-
continue;
3355-
},
3356-
Ok(Some(msg)) => { fail_htlc_msgs.push(msg); },
3357-
Ok(None) => {
3358-
// Nothing to do here...we're waiting on a remote
3359-
// revoke_and_ack before we can update the commitment
3360-
// transaction. The Channel will automatically handle
3361-
// building the update_fail_htlc and commitment_signed
3362-
// messages when we can.
3363-
// We don't need any kind of timer here as they should fail
3364-
// the channel onto the chain if they can't get our
3365-
// update_fail_htlc in time, it's not our problem.
3328+
if let Err(e) = chan.get_mut().queue_fail_htlc(
3329+
htlc_id, err_packet, &self.logger
3330+
) {
3331+
if let ChannelError::Ignore(msg) = e {
3332+
log_trace!(self.logger, "Failed to fail HTLC with ID {} backwards to short_id {}: {}", htlc_id, short_chan_id, msg);
3333+
} else {
3334+
panic!("Stated return value requirements in queue_fail_htlc() were not met");
33663335
}
3336+
// fail-backs are best-effort, we probably already have one
3337+
// pending, and if not that's OK, if not, the channel is on
3338+
// the chain and sending the HTLC-Timeout is their problem.
3339+
continue;
33673340
}
33683341
},
33693342
}
33703343
}
3371-
3372-
if !add_htlc_msgs.is_empty() || !fail_htlc_msgs.is_empty() {
3373-
let (commitment_msg, monitor_update) = match chan.get_mut().send_commitment(&self.logger) {
3374-
Ok(res) => res,
3375-
Err(e) => {
3376-
// We surely failed send_commitment due to bad keys, in that case
3377-
// close channel and then send error message to peer.
3378-
let counterparty_node_id = chan.get().get_counterparty_node_id();
3379-
let err: Result<(), _> = match e {
3380-
ChannelError::Ignore(_) | ChannelError::Warn(_) => {
3381-
panic!("Stated return value requirements in send_commitment() were not met");
3382-
}
3383-
ChannelError::Close(msg) => {
3384-
log_trace!(self.logger, "Closing channel {} due to Close-required error: {}", log_bytes!(chan.key()[..]), msg);
3385-
let mut channel = remove_channel!(self, chan);
3386-
// ChannelClosed event is generated by handle_error for us.
3387-
Err(MsgHandleErrInternal::from_finish_shutdown(msg, channel.channel_id(), channel.get_user_id(), channel.force_shutdown(true), self.get_channel_update_for_broadcast(&channel).ok()))
3388-
},
3389-
};
3390-
handle_errors.push((counterparty_node_id, err));
3391-
continue;
3392-
}
3393-
};
3394-
match self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), monitor_update) {
3395-
ChannelMonitorUpdateStatus::Completed => {},
3396-
e => {
3397-
handle_errors.push((chan.get().get_counterparty_node_id(), handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::CommitmentFirst, false, true)));
3398-
continue;
3399-
}
3400-
}
3401-
log_debug!(self.logger, "Forwarding HTLCs resulted in a commitment update with {} HTLCs added and {} HTLCs failed for channel {}",
3402-
add_htlc_msgs.len(), fail_htlc_msgs.len(), log_bytes!(chan.get().channel_id()));
3403-
channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
3404-
node_id: chan.get().get_counterparty_node_id(),
3405-
updates: msgs::CommitmentUpdate {
3406-
update_add_htlcs: add_htlc_msgs,
3407-
update_fulfill_htlcs: Vec::new(),
3408-
update_fail_htlcs: fail_htlc_msgs,
3409-
update_fail_malformed_htlcs: Vec::new(),
3410-
update_fee: None,
3411-
commitment_signed: commitment_msg,
3412-
},
3413-
});
3414-
}
34153344
}
34163345
}
34173346
} else {
@@ -3615,9 +3544,11 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
36153544
}
36163545
self.forward_htlcs(&mut phantom_receives);
36173546

3618-
for (counterparty_node_id, err) in handle_errors.drain(..) {
3619-
let _ = handle_error!(self, err, counterparty_node_id);
3620-
}
3547+
// Freeing the holding cell here is relatively redundant - in practice we'll do it when we
3548+
// next get a `get_and_clear_pending_msg_events` call, but some tests rely on it, and it's
3549+
// nice to do the work now if we can rather than while we're trying to get messages in the
3550+
// network stack.
3551+
self.check_free_holding_cells();
36213552

36223553
if new_events.is_empty() { return }
36233554
let mut events = self.pending_events.lock().unwrap();
@@ -5568,11 +5499,6 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
55685499
/// Check the holding cell in each channel and free any pending HTLCs in them if possible.
55695500
/// Returns whether there were any updates such as if pending HTLCs were freed or a monitor
55705501
/// update was applied.
5571-
///
5572-
/// This should only apply to HTLCs which were added to the holding cell because we were
5573-
/// waiting on a monitor update to finish. In that case, we don't want to free the holding cell
5574-
/// directly in `channel_monitor_updated` as it may introduce deadlocks calling back into user
5575-
/// code to inform them of a channel monitor update.
55765502
fn check_free_holding_cells(&self) -> bool {
55775503
let mut has_monitor_update = false;
55785504
let mut failed_htlcs = Vec::new();

0 commit comments

Comments
 (0)