Skip to content

Cleanup lock orders #1773

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 104 additions & 81 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,13 +401,6 @@ pub(super) struct ChannelHolder<Signer: Sign> {
/// SCIDs being added once the funding transaction is confirmed at the channel's required
/// confirmation depth.
pub(super) short_to_chan_info: HashMap<u64, (PublicKey, [u8; 32])>,
/// Map from payment hash to the payment data and any HTLCs which are to us and can be
/// failed/claimed by the user.
///
/// Note that while this is held in the same mutex as the channels themselves, no consistency
/// guarantees are made about the channels given here actually existing anymore by the time you
/// go to read them!
claimable_htlcs: HashMap<PaymentHash, (events::PaymentPurpose, Vec<ClaimableHTLC>)>,
/// Messages to send to peers - pushed to in the same lock that they are generated in (except
/// for broadcast messages, where ordering isn't as strict).
pub(super) pending_msg_events: Vec<MessageSendEvent>,
Expand Down Expand Up @@ -681,23 +674,25 @@ pub type SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, M, T, F, L> = ChannelManage
// |
// |__`forward_htlcs`
// |
// |__`channel_state`
// |__`pending_background_events`
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd just like to ask if process_background_events through timer_tick_occurred can be called by a thread while another thread has other locks acquired? If so, I'll drop the second commit that moves this.

Copy link
Contributor Author

@ViktorTigerstrom ViktorTigerstrom Nov 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've realized in hindsight that this question is rather stupid, as "background threads" acquiring pending_background_events is fine even if other threads are holding other locks at the same time. We just need to ensure that every single thread acquires the locks according to the lock order in that specific thread only to avoid deadlocks.

// |
// |__`pending_inbound_payments`
// | |
// | |__`id_to_peer`
// | |__`claimable_htlcs`
// | |
// | |__`per_peer_state`
// | |__`pending_outbound_payments`
// | |
// | |__`outbound_scid_aliases`
// | |
// | |__`pending_inbound_payments`
// | |__`channel_state`
// | |
// | |__`id_to_peer`
// | |
// | |__`pending_events`
// | |
// | |__`pending_outbound_payments`
// | |__`per_peer_state`
// | |
// | |__`best_block`
// | |__`outbound_scid_aliases`
// | |
// | |__`pending_events`
// | |
// | |__`pending_background_events`
// | |__`best_block`
//
pub struct ChannelManager<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
where M::Target: chain::Watch<Signer>,
Expand Down Expand Up @@ -762,6 +757,15 @@ pub struct ChannelManager<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref,
#[cfg(not(test))]
forward_htlcs: Mutex<HashMap<u64, Vec<HTLCForwardInfo>>>,

/// Map from payment hash to the payment data and any HTLCs which are to us and can be
/// failed/claimed by the user.
///
/// Note that, no consistency guarantees are made about the channels given here actually
/// existing anymore by the time you go to read them!
///
/// See `ChannelManager` struct-level documentation for lock order requirements.
claimable_htlcs: Mutex<HashMap<PaymentHash, (events::PaymentPurpose, Vec<ClaimableHTLC>)>>,

/// The set of outbound SCID aliases across all our channels, including unconfirmed channels
/// and some closed channels which reached a usable state prior to being closed. This is used
/// only to avoid duplicates, and is not persisted explicitly to disk, but rebuilt from the
Expand Down Expand Up @@ -1626,13 +1630,13 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
channel_state: Mutex::new(ChannelHolder{
by_id: HashMap::new(),
short_to_chan_info: HashMap::new(),
claimable_htlcs: HashMap::new(),
pending_msg_events: Vec::new(),
}),
outbound_scid_aliases: Mutex::new(HashSet::new()),
pending_inbound_payments: Mutex::new(HashMap::new()),
pending_outbound_payments: Mutex::new(HashMap::new()),
forward_htlcs: Mutex::new(HashMap::new()),
claimable_htlcs: Mutex::new(HashMap::new()),
id_to_peer: Mutex::new(HashMap::new()),

our_network_key: keys_manager.get_node_secret(Recipient::Node).unwrap(),
Expand Down Expand Up @@ -2451,7 +2455,6 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);

let err: Result<(), _> = loop {
let mut channel_lock = self.channel_state.lock().unwrap();

let mut pending_outbounds = self.pending_outbound_payments.lock().unwrap();
let payment_entry = pending_outbounds.entry(payment_id);
Expand All @@ -2463,6 +2466,8 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
}
}

let mut channel_lock = self.channel_state.lock().unwrap();

let id = match channel_lock.short_to_chan_info.get(&path.first().unwrap().short_channel_id) {
None => return Err(APIError::ChannelUnavailable{err: "No channel available with first hop!".to_owned()}),
Some((_cp_id, chan_id)) => chan_id.clone(),
Expand Down Expand Up @@ -3046,9 +3051,9 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
mem::swap(&mut forward_htlcs, &mut self.forward_htlcs.lock().unwrap());

for (short_chan_id, mut pending_forwards) in forward_htlcs {
let mut channel_state_lock = self.channel_state.lock().unwrap();
let channel_state = &mut *channel_state_lock;
if short_chan_id != 0 {
let mut channel_state_lock = self.channel_state.lock().unwrap();
let channel_state = &mut *channel_state_lock;
let forward_chan_id = match channel_state.short_to_chan_info.get(&short_chan_id) {
Some((_cp_id, chan_id)) => chan_id.clone(),
None => {
Expand Down Expand Up @@ -3329,7 +3334,8 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
payment_secret: $payment_data.payment_secret,
}
};
let (_, htlcs) = channel_state.claimable_htlcs.entry(payment_hash)
let mut claimable_htlcs = self.claimable_htlcs.lock().unwrap();
let (_, htlcs) = claimable_htlcs.entry(payment_hash)
.or_insert_with(|| (purpose(), Vec::new()));
if htlcs.len() == 1 {
if let OnionPayload::Spontaneous(_) = htlcs[0].onion_payload {
Expand Down Expand Up @@ -3397,7 +3403,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
check_total_value!(payment_data, payment_preimage);
},
OnionPayload::Spontaneous(preimage) => {
match channel_state.claimable_htlcs.entry(payment_hash) {
match self.claimable_htlcs.lock().unwrap().entry(payment_hash) {
hash_map::Entry::Vacant(e) => {
let purpose = events::PaymentPurpose::SpontaneousPayment(preimage);
e.insert((purpose.clone(), vec![claimable_htlc]));
Expand Down Expand Up @@ -3650,7 +3656,9 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
true
});

channel_state.claimable_htlcs.retain(|payment_hash, (_, htlcs)| {
mem::drop(channel_state_lock);

self.claimable_htlcs.lock().unwrap().retain(|payment_hash, (_, htlcs)| {
if htlcs.is_empty() {
// This should be unreachable
debug_assert!(false);
Expand Down Expand Up @@ -3701,10 +3709,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
pub fn fail_htlc_backwards(&self, payment_hash: &PaymentHash) {
let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);

let removed_source = {
let mut channel_state = self.channel_state.lock().unwrap();
channel_state.claimable_htlcs.remove(payment_hash)
};
let removed_source = self.claimable_htlcs.lock().unwrap().remove(payment_hash);
if let Some((_, mut sources)) = removed_source {
for htlc in sources.drain(..) {
let mut htlc_msat_height_data = byte_utils::be64_to_array(htlc.value).to_vec();
Expand Down Expand Up @@ -4006,7 +4011,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana

let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);

let removed_source = self.channel_state.lock().unwrap().claimable_htlcs.remove(&payment_hash);
let removed_source = self.claimable_htlcs.lock().unwrap().remove(&payment_hash);
if let Some((payment_purpose, mut sources)) = removed_source {
assert!(!sources.is_empty());

Expand Down Expand Up @@ -5355,6 +5360,16 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
/// pushing the channel monitor update (if any) to the background events queue and removing the
/// Channel object.
fn handle_init_event_channel_failures(&self, mut failed_channels: Vec<ShutdownResult>) {
#[cfg(debug_assertions)]
{
// Ensure that the different lock branches are not held when calling this function.
// This ensures that future code doesn't introduce a lock_order requirement for
// `pending_background_events`.
assert!(self.channel_state.try_lock().is_ok());
assert!(self.per_peer_state.read().is_ok());
assert!(self.pending_inbound_payments.try_lock().is_ok());
assert!(self.pending_events.try_lock().is_ok());
}
for mut failure in failed_channels.drain(..) {
// Either a commitment transactions has been confirmed on-chain or
// Channel::block_disconnected detected that the funding transaction has been
Expand Down Expand Up @@ -5883,28 +5898,28 @@ where
}
true
});
}

if let Some(height) = height_opt {
channel_state.claimable_htlcs.retain(|payment_hash, (_, htlcs)| {
htlcs.retain(|htlc| {
// If height is approaching the number of blocks we think it takes us to get
// our commitment transaction confirmed before the HTLC expires, plus the
// number of blocks we generally consider it to take to do a commitment update,
// just give up on it and fail the HTLC.
if height >= htlc.cltv_expiry - HTLC_FAIL_BACK_BUFFER {
let mut htlc_msat_height_data = byte_utils::be64_to_array(htlc.value).to_vec();
htlc_msat_height_data.extend_from_slice(&byte_utils::be32_to_array(height));

timed_out_htlcs.push((HTLCSource::PreviousHopData(htlc.prev_hop.clone()), payment_hash.clone(), HTLCFailReason::Reason {
failure_code: 0x4000 | 15,
data: htlc_msat_height_data
}, HTLCDestination::FailedPayment { payment_hash: payment_hash.clone() }));
false
} else { true }
});
!htlcs.is_empty() // Only retain this entry if htlcs has at least one entry.
if let Some(height) = height_opt {
self.claimable_htlcs.lock().unwrap().retain(|payment_hash, (_, htlcs)| {
htlcs.retain(|htlc| {
// If height is approaching the number of blocks we think it takes us to get
// our commitment transaction confirmed before the HTLC expires, plus the
// number of blocks we generally consider it to take to do a commitment update,
// just give up on it and fail the HTLC.
if height >= htlc.cltv_expiry - HTLC_FAIL_BACK_BUFFER {
let mut htlc_msat_height_data = byte_utils::be64_to_array(htlc.value).to_vec();
htlc_msat_height_data.extend_from_slice(&byte_utils::be32_to_array(height));

timed_out_htlcs.push((HTLCSource::PreviousHopData(htlc.prev_hop.clone()), payment_hash.clone(), HTLCFailReason::Reason {
failure_code: 0x4000 | 15,
data: htlc_msat_height_data
}, HTLCDestination::FailedPayment { payment_hash: payment_hash.clone() }));
false
} else { true }
});
}
!htlcs.is_empty() // Only retain this entry if htlcs has at least one entry.
});
}

self.handle_init_event_channel_failures(failed_channels);
Expand Down Expand Up @@ -6639,43 +6654,49 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable f
}
}

let channel_state = self.channel_state.lock().unwrap();
let mut htlc_purposes: Vec<&events::PaymentPurpose> = Vec::new();
(channel_state.claimable_htlcs.len() as u64).write(writer)?;
for (payment_hash, (purpose, previous_hops)) in channel_state.claimable_htlcs.iter() {
payment_hash.write(writer)?;
(previous_hops.len() as u64).write(writer)?;
for htlc in previous_hops.iter() {
htlc.write(writer)?;
let mut htlc_purposes: Vec<events::PaymentPurpose> = Vec::new();
{
let claimable_htlcs = self.claimable_htlcs.lock().unwrap();
(claimable_htlcs.len() as u64).write(writer)?;
for (payment_hash, (purpose, previous_hops)) in claimable_htlcs.iter() {
payment_hash.write(writer)?;
(previous_hops.len() as u64).write(writer)?;
for htlc in previous_hops.iter() {
htlc.write(writer)?;
}
htlc_purposes.push(purpose.clone());
}
htlc_purposes.push(purpose);
}

let per_peer_state = self.per_peer_state.write().unwrap();
(per_peer_state.len() as u64).write(writer)?;
for (peer_pubkey, peer_state_mutex) in per_peer_state.iter() {
peer_pubkey.write(writer)?;
let peer_state = peer_state_mutex.lock().unwrap();
peer_state.latest_features.write(writer)?;
{
let per_peer_state = self.per_peer_state.write().unwrap();
(per_peer_state.len() as u64).write(writer)?;
for (peer_pubkey, peer_state_mutex) in per_peer_state.iter() {
peer_pubkey.write(writer)?;
let peer_state = peer_state_mutex.lock().unwrap();
peer_state.latest_features.write(writer)?;
}
}

let pending_inbound_payments = self.pending_inbound_payments.lock().unwrap();
let pending_outbound_payments = self.pending_outbound_payments.lock().unwrap();
let events = self.pending_events.lock().unwrap();
(events.len() as u64).write(writer)?;
for event in events.iter() {
event.write(writer)?;
{
let events = self.pending_events.lock().unwrap();
(events.len() as u64).write(writer)?;
for event in events.iter() {
event.write(writer)?;
}
}

let background_events = self.pending_background_events.lock().unwrap();
(background_events.len() as u64).write(writer)?;
for event in background_events.iter() {
match event {
BackgroundEvent::ClosingMonitorUpdate((funding_txo, monitor_update)) => {
0u8.write(writer)?;
funding_txo.write(writer)?;
monitor_update.write(writer)?;
},
{
let background_events = self.pending_background_events.lock().unwrap();
(background_events.len() as u64).write(writer)?;
for event in background_events.iter() {
match event {
BackgroundEvent::ClosingMonitorUpdate((funding_txo, monitor_update)) => {
0u8.write(writer)?;
funding_txo.write(writer)?;
monitor_update.write(writer)?;
},
}
}
}

Expand All @@ -6685,12 +6706,14 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable f
(self.highest_seen_timestamp.load(Ordering::Acquire) as u32).write(writer)?;
(self.highest_seen_timestamp.load(Ordering::Acquire) as u32).write(writer)?;

let pending_inbound_payments = self.pending_inbound_payments.lock().unwrap();
(pending_inbound_payments.len() as u64).write(writer)?;
for (hash, pending_payment) in pending_inbound_payments.iter() {
hash.write(writer)?;
pending_payment.write(writer)?;
}

let pending_outbound_payments = self.pending_outbound_payments.lock().unwrap();
// For backwards compat, write the session privs and their total length.
let mut num_pending_outbounds_compat: u64 = 0;
for (_, outbound) in pending_outbound_payments.iter() {
Expand Down Expand Up @@ -7244,14 +7267,14 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
channel_state: Mutex::new(ChannelHolder {
by_id,
short_to_chan_info,
claimable_htlcs,
pending_msg_events: Vec::new(),
}),
inbound_payment_key: expanded_inbound_key,
pending_inbound_payments: Mutex::new(pending_inbound_payments),
pending_outbound_payments: Mutex::new(pending_outbound_payments.unwrap()),

forward_htlcs: Mutex::new(forward_htlcs),
claimable_htlcs: Mutex::new(claimable_htlcs),
outbound_scid_aliases: Mutex::new(outbound_scid_aliases),
id_to_peer: Mutex::new(id_to_peer),
fake_scid_rand_bytes: fake_scid_rand_bytes.unwrap(),
Expand Down