Skip to content

Refactor ShutdownResult type and construction #2613

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Oct 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 50 additions & 29 deletions lightning/src/ln/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -543,18 +543,17 @@ pub(super) struct ReestablishResponses {
pub shutdown_msg: Option<msgs::Shutdown>,
}

/// The return type of `force_shutdown`
///
/// Contains a tuple with the following:
/// - An optional (counterparty_node_id, funding_txo, [`ChannelMonitorUpdate`]) tuple
/// - A list of HTLCs to fail back in the form of the (source, payment hash, and this channel's
/// counterparty_node_id and channel_id).
/// - An optional transaction id identifying a corresponding batch funding transaction.
pub(crate) type ShutdownResult = (
Option<(PublicKey, OutPoint, ChannelMonitorUpdate)>,
Vec<(HTLCSource, PaymentHash, PublicKey, ChannelId)>,
Option<Txid>
);
/// The result of a shutdown that should be handled.
#[must_use]
pub(crate) struct ShutdownResult {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub(crate) struct ShutdownResult {
#[must_use]
pub(crate) struct ShutdownResult {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

/// A channel monitor update to apply.
pub(crate) monitor_update: Option<(PublicKey, OutPoint, ChannelMonitorUpdate)>,
/// A list of dropped outbound HTLCs that can safely be failed backwards immediately.
pub(crate) dropped_outbound_htlcs: Vec<(HTLCSource, PaymentHash, PublicKey, ChannelId)>,
/// An unbroadcasted batch funding transaction id. The closure of this channel should be
/// propagated to the remainder of the batch.
pub(crate) unbroadcasted_batch_funding_txid: Option<Txid>,
}

/// If the majority of the channels funds are to the fundee and the initiator holds only just
/// enough funds to cover their reserve value, channels are at risk of getting "stuck". Because the
Expand Down Expand Up @@ -2064,7 +2063,11 @@ impl<SP: Deref> ChannelContext<SP> where SP::Target: SignerProvider {

self.channel_state = ChannelState::ShutdownComplete as u32;
self.update_time_counter += 1;
(monitor_update, dropped_outbound_htlcs, unbroadcasted_batch_funding_txid)
ShutdownResult {
monitor_update,
dropped_outbound_htlcs,
unbroadcasted_batch_funding_txid,
}
}
}

Expand Down Expand Up @@ -4219,18 +4222,18 @@ impl<SP: Deref> Channel<SP> where

pub fn maybe_propose_closing_signed<F: Deref, L: Deref>(
&mut self, fee_estimator: &LowerBoundedFeeEstimator<F>, logger: &L)
-> Result<(Option<msgs::ClosingSigned>, Option<Transaction>), ChannelError>
-> Result<(Option<msgs::ClosingSigned>, Option<Transaction>, Option<ShutdownResult>), ChannelError>
where F::Target: FeeEstimator, L::Target: Logger
{
if self.context.last_sent_closing_fee.is_some() || !self.closing_negotiation_ready() {
return Ok((None, None));
return Ok((None, None, None));
}

if !self.context.is_outbound() {
if let Some(msg) = &self.context.pending_counterparty_closing_signed.take() {
return self.closing_signed(fee_estimator, &msg);
}
return Ok((None, None));
return Ok((None, None, None));
}

let (our_min_fee, our_max_fee) = self.calculate_closing_fee_limits(fee_estimator);
Expand All @@ -4255,7 +4258,7 @@ impl<SP: Deref> Channel<SP> where
min_fee_satoshis: our_min_fee,
max_fee_satoshis: our_max_fee,
}),
}), None))
}), None, None))
}
}
}
Expand Down Expand Up @@ -4404,7 +4407,7 @@ impl<SP: Deref> Channel<SP> where

pub fn closing_signed<F: Deref>(
&mut self, fee_estimator: &LowerBoundedFeeEstimator<F>, msg: &msgs::ClosingSigned)
-> Result<(Option<msgs::ClosingSigned>, Option<Transaction>), ChannelError>
-> Result<(Option<msgs::ClosingSigned>, Option<Transaction>, Option<ShutdownResult>), ChannelError>
where F::Target: FeeEstimator
{
if self.context.channel_state & BOTH_SIDES_SHUTDOWN_MASK != BOTH_SIDES_SHUTDOWN_MASK {
Expand All @@ -4426,7 +4429,7 @@ impl<SP: Deref> Channel<SP> where

if self.context.channel_state & ChannelState::MonitorUpdateInProgress as u32 != 0 {
self.context.pending_counterparty_closing_signed = Some(msg.clone());
return Ok((None, None));
return Ok((None, None, None));
}

let funding_redeemscript = self.context.get_funding_redeemscript();
Expand Down Expand Up @@ -4456,10 +4459,15 @@ impl<SP: Deref> Channel<SP> where
assert!(self.context.shutdown_scriptpubkey.is_some());
if let Some((last_fee, sig)) = self.context.last_sent_closing_fee {
if last_fee == msg.fee_satoshis {
let shutdown_result = ShutdownResult {
monitor_update: None,
dropped_outbound_htlcs: Vec::new(),
unbroadcasted_batch_funding_txid: self.context.unbroadcasted_batch_funding_txid(),
};
let tx = self.build_signed_closing_transaction(&mut closing_tx, &msg.signature, &sig);
self.context.channel_state = ChannelState::ShutdownComplete as u32;
self.context.update_time_counter += 1;
return Ok((None, Some(tx)));
return Ok((None, Some(tx), Some(shutdown_result)));
}
}

Expand All @@ -4478,13 +4486,19 @@ impl<SP: Deref> Channel<SP> where
let sig = ecdsa
.sign_closing_transaction(&closing_tx, &self.context.secp_ctx)
.map_err(|_| ChannelError::Close("External signer refused to sign closing transaction".to_owned()))?;

let signed_tx = if $new_fee == msg.fee_satoshis {
let (signed_tx, shutdown_result) = if $new_fee == msg.fee_satoshis {
let shutdown_result = ShutdownResult {
monitor_update: None,
dropped_outbound_htlcs: Vec::new(),
unbroadcasted_batch_funding_txid: self.context.unbroadcasted_batch_funding_txid(),
};
self.context.channel_state = ChannelState::ShutdownComplete as u32;
self.context.update_time_counter += 1;
let tx = self.build_signed_closing_transaction(&closing_tx, &msg.signature, &sig);
Some(tx)
} else { None };
(Some(tx), Some(shutdown_result))
} else {
(None, None)
};

self.context.last_sent_closing_fee = Some((used_fee, sig.clone()));
Ok((Some(msgs::ClosingSigned {
Expand All @@ -4495,7 +4509,7 @@ impl<SP: Deref> Channel<SP> where
min_fee_satoshis: our_min_fee,
max_fee_satoshis: our_max_fee,
}),
}), signed_tx))
}), signed_tx, shutdown_result))
}
}
}
Expand Down Expand Up @@ -5573,7 +5587,7 @@ impl<SP: Deref> Channel<SP> where
/// [`ChannelMonitorUpdate`] will be returned).
pub fn get_shutdown(&mut self, signer_provider: &SP, their_features: &InitFeatures,
target_feerate_sats_per_kw: Option<u32>, override_shutdown_script: Option<ShutdownScript>)
-> Result<(msgs::Shutdown, Option<ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>), APIError>
-> Result<(msgs::Shutdown, Option<ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>, Option<ShutdownResult>), APIError>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if these Option<ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)> can be collapsed into Option<ShutdownResult> although the use cases are slightly different.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe? I think I'd rather keep them separate though. We'd want to handle the monitor update result on the non-shutdown case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, that was my main hesitation. It seems possible to branch on the update_id of the ChannelMonitorUpdate being equal to CLOSED_CHANNEL_UPDATE_ID, but might not be something we want to unify.

{
for htlc in self.context.pending_outbound_htlcs.iter() {
if let OutboundHTLCState::LocalAnnounced(_) = htlc.state {
Expand Down Expand Up @@ -5628,11 +5642,18 @@ impl<SP: Deref> Channel<SP> where

// From here on out, we may not fail!
self.context.target_closing_feerate_sats_per_kw = target_feerate_sats_per_kw;
if self.context.channel_state & !STATE_FLAGS < ChannelState::FundingSent as u32 {
let shutdown_result = if self.context.channel_state & !STATE_FLAGS < ChannelState::FundingSent as u32 {
let shutdown_result = ShutdownResult {
monitor_update: None,
dropped_outbound_htlcs: Vec::new(),
unbroadcasted_batch_funding_txid: self.context.unbroadcasted_batch_funding_txid(),
};
self.context.channel_state = ChannelState::ShutdownComplete as u32;
Some(shutdown_result)
} else {
self.context.channel_state |= ChannelState::LocalShutdownSent as u32;
}
None
};
self.context.update_time_counter += 1;

let monitor_update = if update_shutdown_script {
Expand Down Expand Up @@ -5668,7 +5689,7 @@ impl<SP: Deref> Channel<SP> where
debug_assert!(!self.is_shutdown() || monitor_update.is_none(),
"we can't both complete shutdown and return a monitor update");

Ok((shutdown, monitor_update, dropped_outbound_htlcs))
Ok((shutdown, monitor_update, dropped_outbound_htlcs, shutdown_result))
}

pub fn inflight_htlc_sources(&self) -> impl Iterator<Item=(&HTLCSource, &PaymentHash)> {
Expand Down
56 changes: 27 additions & 29 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ impl MsgHandleErrInternal {
#[inline]
fn from_finish_shutdown(err: String, channel_id: ChannelId, user_channel_id: u128, shutdown_res: ShutdownResult, channel_update: Option<msgs::ChannelUpdate>, channel_capacity: u64) -> Self {
let err_msg = msgs::ErrorMessage { channel_id, data: err.clone() };
let action = if let (Some(_), ..) = &shutdown_res {
let action = if shutdown_res.monitor_update.is_some() {
// We have a closing `ChannelMonitorUpdate`, which means the channel was funded and we
// should disconnect our peer such that we force them to broadcast their latest
// commitment upon reconnecting.
Expand Down Expand Up @@ -2564,7 +2564,7 @@ where
let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);

let mut failed_htlcs: Vec<(HTLCSource, PaymentHash)>;
let mut shutdown_result = None;
let shutdown_result;
loop {
let per_peer_state = self.per_peer_state.read().unwrap();

Expand All @@ -2579,10 +2579,11 @@ where
if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() {
let funding_txo_opt = chan.context.get_funding_txo();
let their_features = &peer_state.latest_features;
let unbroadcasted_batch_funding_txid = chan.context.unbroadcasted_batch_funding_txid();
let (shutdown_msg, mut monitor_update_opt, htlcs) =
let (shutdown_msg, mut monitor_update_opt, htlcs, local_shutdown_result) =
chan.get_shutdown(&self.signer_provider, their_features, target_feerate_sats_per_1000_weight, override_shutdown_script)?;
failed_htlcs = htlcs;
shutdown_result = local_shutdown_result;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's debug_assert our assumptions.

Suggested change
shutdown_result = local_shutdown_result;
shutdown_result = local_shutdown_result;
debug_assert_eq!(shutdown_result.is_some(), chan.is_shutdown());

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this assertion to all the cooperative closing paths.

debug_assert_eq!(shutdown_result.is_some(), chan.is_shutdown());

// We can send the `shutdown` message before updating the `ChannelMonitor`
// here as we don't need the monitor update to complete until we send a
Expand Down Expand Up @@ -2610,7 +2611,6 @@ where
});
}
self.issue_channel_close_events(&chan.context, ClosureReason::HolderForceClosed);
shutdown_result = Some((None, Vec::new(), unbroadcasted_batch_funding_txid));
}
}
break;
Expand Down Expand Up @@ -2702,30 +2702,29 @@ where
self.close_channel_internal(channel_id, counterparty_node_id, target_feerate_sats_per_1000_weight, shutdown_script)
}

fn finish_close_channel(&self, shutdown_res: ShutdownResult) {
fn finish_close_channel(&self, mut shutdown_res: ShutdownResult) {
debug_assert_ne!(self.per_peer_state.held_by_thread(), LockHeldState::HeldByThread);
#[cfg(debug_assertions)]
for (_, peer) in self.per_peer_state.read().unwrap().iter() {
debug_assert_ne!(peer.held_by_thread(), LockHeldState::HeldByThread);
}

let (monitor_update_option, mut failed_htlcs, unbroadcasted_batch_funding_txid) = shutdown_res;
log_debug!(self.logger, "Finishing force-closure of channel with {} HTLCs to fail", failed_htlcs.len());
for htlc_source in failed_htlcs.drain(..) {
log_debug!(self.logger, "Finishing closure of channel with {} HTLCs to fail", shutdown_res.dropped_outbound_htlcs.len());
for htlc_source in shutdown_res.dropped_outbound_htlcs.drain(..) {
let (source, payment_hash, counterparty_node_id, channel_id) = htlc_source;
let reason = HTLCFailReason::from_failure_code(0x4000 | 8);
let receiver = HTLCDestination::NextHopChannel { node_id: Some(counterparty_node_id), channel_id };
self.fail_htlc_backwards_internal(&source, &payment_hash, &reason, receiver);
}
if let Some((_, funding_txo, monitor_update)) = monitor_update_option {
if let Some((_, funding_txo, monitor_update)) = shutdown_res.monitor_update {
// There isn't anything we can do if we get an update failure - we're already
// force-closing. The monitor update on the required in-memory copy should broadcast
// the latest local state, which is the best we can do anyway. Thus, it is safe to
// ignore the result here.
let _ = self.chain_monitor.update_channel(funding_txo, &monitor_update);
}
let mut shutdown_results = Vec::new();
if let Some(txid) = unbroadcasted_batch_funding_txid {
if let Some(txid) = shutdown_res.unbroadcasted_batch_funding_txid {
let mut funding_batch_states = self.funding_batch_states.lock().unwrap();
let affected_channels = funding_batch_states.remove(&txid).into_iter().flatten();
let per_peer_state = self.per_peer_state.read().unwrap();
Expand Down Expand Up @@ -3849,7 +3848,7 @@ where
/// Return values are identical to [`Self::funding_transaction_generated`], respective to
/// each individual channel and transaction output.
///
/// Do NOT broadcast the funding transaction yourself. This batch funding transcaction
/// Do NOT broadcast the funding transaction yourself. This batch funding transaction
/// will only be broadcast when we have safely received and persisted the counterparty's
/// signature for each channel.
///
Expand Down Expand Up @@ -3903,7 +3902,7 @@ where
btree_map::Entry::Vacant(vacant) => Some(vacant.insert(Vec::new())),
}
});
for &(temporary_channel_id, counterparty_node_id) in temporary_channels.iter() {
for &(temporary_channel_id, counterparty_node_id) in temporary_channels {
result = result.and_then(|_| self.funding_transaction_generated_intern(
temporary_channel_id,
counterparty_node_id,
Expand Down Expand Up @@ -6268,22 +6267,20 @@ where
}

fn internal_closing_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::ClosingSigned) -> Result<(), MsgHandleErrInternal> {
let mut shutdown_result = None;
let unbroadcasted_batch_funding_txid;
let per_peer_state = self.per_peer_state.read().unwrap();
let peer_state_mutex = per_peer_state.get(counterparty_node_id)
.ok_or_else(|| {
debug_assert!(false);
MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)
})?;
let (tx, chan_option) = {
let (tx, chan_option, shutdown_result) = {
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
let peer_state = &mut *peer_state_lock;
match peer_state.channel_by_id.entry(msg.channel_id.clone()) {
hash_map::Entry::Occupied(mut chan_phase_entry) => {
if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() {
unbroadcasted_batch_funding_txid = chan.context.unbroadcasted_batch_funding_txid();
let (closing_signed, tx) = try_chan_phase_entry!(self, chan.closing_signed(&self.fee_estimator, &msg), chan_phase_entry);
let (closing_signed, tx, shutdown_result) = try_chan_phase_entry!(self, chan.closing_signed(&self.fee_estimator, &msg), chan_phase_entry);
debug_assert_eq!(shutdown_result.is_some(), chan.is_shutdown());
if let Some(msg) = closing_signed {
peer_state.pending_msg_events.push(events::MessageSendEvent::SendClosingSigned {
node_id: counterparty_node_id.clone(),
Expand All @@ -6296,8 +6293,8 @@ where
// also implies there are no pending HTLCs left on the channel, so we can
// fully delete it from tracking (the channel monitor is still around to
// watch for old state broadcasts)!
(tx, Some(remove_channel_phase!(self, chan_phase_entry)))
} else { (tx, None) }
(tx, Some(remove_channel_phase!(self, chan_phase_entry)), shutdown_result)
} else { (tx, None, shutdown_result) }
} else {
return try_chan_phase_entry!(self, Err(ChannelError::Close(
"Got a closing_signed message for an unfunded channel!".into())), chan_phase_entry);
Expand All @@ -6319,7 +6316,6 @@ where
});
}
self.issue_channel_close_events(&chan.context, ClosureReason::CooperativeClosure);
shutdown_result = Some((None, Vec::new(), unbroadcasted_batch_funding_txid));
}
mem::drop(per_peer_state);
if let Some(shutdown_result) = shutdown_result {
Expand Down Expand Up @@ -7049,15 +7045,18 @@ where
peer_state.channel_by_id.retain(|channel_id, phase| {
match phase {
ChannelPhase::Funded(chan) => {
let unbroadcasted_batch_funding_txid = chan.context.unbroadcasted_batch_funding_txid();
match chan.maybe_propose_closing_signed(&self.fee_estimator, &self.logger) {
Ok((msg_opt, tx_opt)) => {
Ok((msg_opt, tx_opt, shutdown_result_opt)) => {
if let Some(msg) = msg_opt {
has_update = true;
pending_msg_events.push(events::MessageSendEvent::SendClosingSigned {
node_id: chan.context.get_counterparty_node_id(), msg,
});
}
debug_assert_eq!(shutdown_result_opt.is_some(), chan.is_shutdown());
if let Some(shutdown_result) = shutdown_result_opt {
shutdown_results.push(shutdown_result);
}
if let Some(tx) = tx_opt {
// We're done with this channel. We got a closing_signed and sent back
// a closing_signed with a closing transaction to broadcast.
Expand All @@ -7072,7 +7071,6 @@ where
log_info!(self.logger, "Broadcasting {}", log_tx!(tx));
self.tx_broadcaster.broadcast_transactions(&[&tx]);
update_maps_on_chan_removal!(self, &chan.context);
shutdown_results.push((None, Vec::new(), unbroadcasted_batch_funding_txid));
false
} else { true }
},
Expand Down Expand Up @@ -7113,7 +7111,7 @@ where
// Channel::force_shutdown tries to make us do) as we may still be in initialization,
// so we track the update internally and handle it when the user next calls
// timer_tick_occurred, guaranteeing we're running normally.
if let Some((counterparty_node_id, funding_txo, update)) = failure.0.take() {
if let Some((counterparty_node_id, funding_txo, update)) = failure.monitor_update.take() {
assert_eq!(update.updates.len(), 1);
if let ChannelMonitorUpdateStep::ChannelForceClosed { should_broadcast } = update.updates[0] {
assert!(should_broadcast);
Expand Down Expand Up @@ -9405,16 +9403,16 @@ where
log_error!(args.logger, " The ChannelMonitor for channel {} is at counterparty commitment transaction number {} but the ChannelManager is at counterparty commitment transaction number {}.",
&channel.context.channel_id(), monitor.get_cur_counterparty_commitment_number(), channel.get_cur_counterparty_commitment_transaction_number());
}
let (monitor_update, mut new_failed_htlcs, batch_funding_txid) = channel.context.force_shutdown(true);
if batch_funding_txid.is_some() {
let mut shutdown_result = channel.context.force_shutdown(true);
if shutdown_result.unbroadcasted_batch_funding_txid.is_some() {
return Err(DecodeError::InvalidValue);
}
if let Some((counterparty_node_id, funding_txo, update)) = monitor_update {
if let Some((counterparty_node_id, funding_txo, update)) = shutdown_result.monitor_update {
close_background_events.push(BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
counterparty_node_id, funding_txo, update
});
}
failed_htlcs.append(&mut new_failed_htlcs);
failed_htlcs.append(&mut shutdown_result.dropped_outbound_htlcs);
channel_closures.push_back((events::Event::ChannelClosed {
channel_id: channel.context.channel_id(),
user_channel_id: channel.context.get_user_id(),
Expand Down
Loading