Skip to content

Commit b21da64

Browse files
committed
Send shutdown/closing_signed msgs out-of-band for ordered delivery
1 parent 9db3159 commit b21da64

File tree

5 files changed

+120
-69
lines changed

5 files changed

+120
-69
lines changed

src/ln/channelmanager.rs

Lines changed: 95 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1723,8 +1723,8 @@ impl ChannelManager {
17231723
}
17241724
}
17251725

1726-
fn internal_shutdown(&self, their_node_id: &PublicKey, msg: &msgs::Shutdown) -> Result<(Option<msgs::Shutdown>, Option<msgs::ClosingSigned>), MsgHandleErrInternal> {
1727-
let (mut res, chan_option) = {
1726+
fn internal_shutdown(&self, their_node_id: &PublicKey, msg: &msgs::Shutdown) -> Result<(), MsgHandleErrInternal> {
1727+
let (mut dropped_htlcs, chan_option) = {
17281728
let mut channel_state_lock = self.channel_state.lock().unwrap();
17291729
let channel_state = channel_state_lock.borrow_parts();
17301730

@@ -1734,18 +1734,30 @@ impl ChannelManager {
17341734
//TODO: here and below MsgHandleErrInternal, #153 case
17351735
return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!", msg.channel_id));
17361736
}
1737-
let res = chan_entry.get_mut().shutdown(&*self.fee_estimator, &msg).map_err(|e| MsgHandleErrInternal::from_maybe_close(e))?;
1737+
let (shutdown, closing_signed, dropped_htlcs) = chan_entry.get_mut().shutdown(&*self.fee_estimator, &msg).map_err(|e| MsgHandleErrInternal::from_maybe_close(e))?;
1738+
if let Some(msg) = shutdown {
1739+
channel_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown {
1740+
node_id: their_node_id.clone(),
1741+
msg,
1742+
});
1743+
}
1744+
if let Some(msg) = closing_signed {
1745+
channel_state.pending_msg_events.push(events::MessageSendEvent::SendClosingSigned {
1746+
node_id: their_node_id.clone(),
1747+
msg,
1748+
});
1749+
}
17381750
if chan_entry.get().is_shutdown() {
17391751
if let Some(short_id) = chan_entry.get().get_short_channel_id() {
17401752
channel_state.short_to_id.remove(&short_id);
17411753
}
1742-
(res, Some(chan_entry.remove_entry().1))
1743-
} else { (res, None) }
1754+
(dropped_htlcs, Some(chan_entry.remove_entry().1))
1755+
} else { (dropped_htlcs, None) }
17441756
},
17451757
hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel", msg.channel_id))
17461758
}
17471759
};
1748-
for htlc_source in res.2.drain(..) {
1760+
for htlc_source in dropped_htlcs.drain(..) {
17491761
// unknown_next_peer...I dunno who that is anymore....
17501762
self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_source.0, &htlc_source.1, HTLCFailReason::Reason { failure_code: 0x4000 | 10, data: Vec::new() });
17511763
}
@@ -1757,11 +1769,11 @@ impl ChannelManager {
17571769
});
17581770
}
17591771
}
1760-
Ok((res.0, res.1))
1772+
Ok(())
17611773
}
17621774

1763-
fn internal_closing_signed(&self, their_node_id: &PublicKey, msg: &msgs::ClosingSigned) -> Result<Option<msgs::ClosingSigned>, MsgHandleErrInternal> {
1764-
let (res, chan_option) = {
1775+
fn internal_closing_signed(&self, their_node_id: &PublicKey, msg: &msgs::ClosingSigned) -> Result<(), MsgHandleErrInternal> {
1776+
let (tx, chan_option) = {
17651777
let mut channel_state_lock = self.channel_state.lock().unwrap();
17661778
let channel_state = channel_state_lock.borrow_parts();
17671779
match channel_state.by_id.entry(msg.channel_id.clone()) {
@@ -1770,8 +1782,14 @@ impl ChannelManager {
17701782
//TODO: here and below MsgHandleErrInternal, #153 case
17711783
return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!", msg.channel_id));
17721784
}
1773-
let res = chan_entry.get_mut().closing_signed(&*self.fee_estimator, &msg).map_err(|e| MsgHandleErrInternal::from_maybe_close(e))?;
1774-
if res.1.is_some() {
1785+
let (closing_signed, tx) = chan_entry.get_mut().closing_signed(&*self.fee_estimator, &msg).map_err(|e| MsgHandleErrInternal::from_maybe_close(e))?;
1786+
if let Some(msg) = closing_signed {
1787+
channel_state.pending_msg_events.push(events::MessageSendEvent::SendClosingSigned {
1788+
node_id: their_node_id.clone(),
1789+
msg,
1790+
});
1791+
}
1792+
if tx.is_some() {
17751793
// We're done with this channel, we've got a signed closing transaction and
17761794
// will send the closing_signed back to the remote peer upon return. This
17771795
// also implies there are no pending HTLCs left on the channel, so we can
@@ -1780,13 +1798,13 @@ impl ChannelManager {
17801798
if let Some(short_id) = chan_entry.get().get_short_channel_id() {
17811799
channel_state.short_to_id.remove(&short_id);
17821800
}
1783-
(res, Some(chan_entry.remove_entry().1))
1784-
} else { (res, None) }
1801+
(tx, Some(chan_entry.remove_entry().1))
1802+
} else { (tx, None) }
17851803
},
17861804
hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel", msg.channel_id))
17871805
}
17881806
};
1789-
if let Some(broadcast_tx) = res.1 {
1807+
if let Some(broadcast_tx) = tx {
17901808
self.tx_broadcaster.broadcast_transaction(&broadcast_tx);
17911809
}
17921810
if let Some(chan) = chan_option {
@@ -1797,7 +1815,7 @@ impl ChannelManager {
17971815
});
17981816
}
17991817
}
1800-
Ok(res.0)
1818+
Ok(())
18011819
}
18021820

18031821
fn internal_update_add_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateAddHTLC) -> Result<(), MsgHandleErrInternal> {
@@ -2320,11 +2338,11 @@ impl ChannelMessageHandler for ChannelManager {
23202338
handle_error!(self, self.internal_funding_locked(their_node_id, msg), their_node_id)
23212339
}
23222340

2323-
fn handle_shutdown(&self, their_node_id: &PublicKey, msg: &msgs::Shutdown) -> Result<(Option<msgs::Shutdown>, Option<msgs::ClosingSigned>), HandleError> {
2341+
fn handle_shutdown(&self, their_node_id: &PublicKey, msg: &msgs::Shutdown) -> Result<(), HandleError> {
23242342
handle_error!(self, self.internal_shutdown(their_node_id, msg), their_node_id)
23252343
}
23262344

2327-
fn handle_closing_signed(&self, their_node_id: &PublicKey, msg: &msgs::ClosingSigned) -> Result<Option<msgs::ClosingSigned>, HandleError> {
2345+
fn handle_closing_signed(&self, their_node_id: &PublicKey, msg: &msgs::ClosingSigned) -> Result<(), HandleError> {
23282346
handle_error!(self, self.internal_closing_signed(their_node_id, msg), their_node_id)
23292347
}
23302348

@@ -2847,68 +2865,91 @@ mod tests {
28472865
}
28482866

28492867
fn close_channel(outbound_node: &Node, inbound_node: &Node, channel_id: &[u8; 32], funding_tx: Transaction, close_inbound_first: bool) -> (msgs::ChannelUpdate, msgs::ChannelUpdate) {
2850-
let (node_a, broadcaster_a) = if close_inbound_first { (&inbound_node.node, &inbound_node.tx_broadcaster) } else { (&outbound_node.node, &outbound_node.tx_broadcaster) };
2868+
let (node_a, broadcaster_a, struct_a) = if close_inbound_first { (&inbound_node.node, &inbound_node.tx_broadcaster, inbound_node) } else { (&outbound_node.node, &outbound_node.tx_broadcaster, outbound_node) };
28512869
let (node_b, broadcaster_b) = if close_inbound_first { (&outbound_node.node, &outbound_node.tx_broadcaster) } else { (&inbound_node.node, &inbound_node.tx_broadcaster) };
28522870
let (tx_a, tx_b);
28532871

28542872
node_a.close_channel(channel_id).unwrap();
2855-
let events_1 = node_a.get_and_clear_pending_msg_events();
2856-
assert_eq!(events_1.len(), 1);
2857-
let shutdown_a = match events_1[0] {
2873+
node_b.handle_shutdown(&node_a.get_our_node_id(), &get_event_msg!(struct_a, MessageSendEvent::SendShutdown, node_b.get_our_node_id())).unwrap();
2874+
2875+
let events_1 = node_b.get_and_clear_pending_msg_events();
2876+
assert!(events_1.len() >= 1);
2877+
let shutdown_b = match events_1[0] {
28582878
MessageSendEvent::SendShutdown { ref node_id, ref msg } => {
2859-
assert_eq!(node_id, &node_b.get_our_node_id());
2879+
assert_eq!(node_id, &node_a.get_our_node_id());
28602880
msg.clone()
28612881
},
28622882
_ => panic!("Unexpected event"),
28632883
};
28642884

2865-
let (shutdown_b, mut closing_signed_b) = node_b.handle_shutdown(&node_a.get_our_node_id(), &shutdown_a).unwrap();
2866-
if !close_inbound_first {
2867-
assert!(closing_signed_b.is_none());
2885+
let closing_signed_b = if !close_inbound_first {
2886+
assert_eq!(events_1.len(), 1);
2887+
None
2888+
} else {
2889+
Some(match events_1[1] {
2890+
MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => {
2891+
assert_eq!(node_id, &node_a.get_our_node_id());
2892+
msg.clone()
2893+
},
2894+
_ => panic!("Unexpected event"),
2895+
})
2896+
};
2897+
2898+
macro_rules! get_closing_signed_broadcast {
2899+
($node: expr, $dest_pubkey: expr) => {
2900+
{
2901+
let events = $node.get_and_clear_pending_msg_events();
2902+
assert!(events.len() == 1 || events.len() == 2);
2903+
(match events[events.len() - 1] {
2904+
MessageSendEvent::BroadcastChannelUpdate { ref msg } => {
2905+
msg.clone()
2906+
},
2907+
_ => panic!("Unexpected event"),
2908+
}, if events.len() == 2 {
2909+
match events[0] {
2910+
MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => {
2911+
assert_eq!(*node_id, $dest_pubkey);
2912+
Some(msg.clone())
2913+
},
2914+
_ => panic!("Unexpected event"),
2915+
}
2916+
} else { None })
2917+
}
2918+
}
28682919
}
2869-
let (empty_a, mut closing_signed_a) = node_a.handle_shutdown(&node_b.get_our_node_id(), &shutdown_b.unwrap()).unwrap();
2870-
assert!(empty_a.is_none());
2871-
if close_inbound_first {
2872-
assert!(closing_signed_a.is_none());
2873-
closing_signed_a = node_a.handle_closing_signed(&node_b.get_our_node_id(), &closing_signed_b.unwrap()).unwrap();
2920+
2921+
node_a.handle_shutdown(&node_b.get_our_node_id(), &shutdown_b).unwrap();
2922+
let (as_update, bs_update) = if close_inbound_first {
2923+
assert!(node_a.get_and_clear_pending_msg_events().is_empty());
2924+
node_a.handle_closing_signed(&node_b.get_our_node_id(), &closing_signed_b.unwrap()).unwrap();
28742925
assert_eq!(broadcaster_a.txn_broadcasted.lock().unwrap().len(), 1);
28752926
tx_a = broadcaster_a.txn_broadcasted.lock().unwrap().remove(0);
2927+
let (as_update, closing_signed_a) = get_closing_signed_broadcast!(node_a, node_b.get_our_node_id());
28762928

2877-
let empty_b = node_b.handle_closing_signed(&node_a.get_our_node_id(), &closing_signed_a.unwrap()).unwrap();
2878-
assert!(empty_b.is_none());
2929+
node_b.handle_closing_signed(&node_a.get_our_node_id(), &closing_signed_a.unwrap()).unwrap();
2930+
let (bs_update, none_b) = get_closing_signed_broadcast!(node_b, node_a.get_our_node_id());
2931+
assert!(none_b.is_none());
28792932
assert_eq!(broadcaster_b.txn_broadcasted.lock().unwrap().len(), 1);
28802933
tx_b = broadcaster_b.txn_broadcasted.lock().unwrap().remove(0);
2934+
(as_update, bs_update)
28812935
} else {
2882-
closing_signed_b = node_b.handle_closing_signed(&node_a.get_our_node_id(), &closing_signed_a.unwrap()).unwrap();
2936+
let closing_signed_a = get_event_msg!(struct_a, MessageSendEvent::SendClosingSigned, node_b.get_our_node_id());
2937+
2938+
node_b.handle_closing_signed(&node_a.get_our_node_id(), &closing_signed_a).unwrap();
28832939
assert_eq!(broadcaster_b.txn_broadcasted.lock().unwrap().len(), 1);
28842940
tx_b = broadcaster_b.txn_broadcasted.lock().unwrap().remove(0);
2941+
let (bs_update, closing_signed_b) = get_closing_signed_broadcast!(node_b, node_a.get_our_node_id());
28852942

2886-
let empty_a2 = node_a.handle_closing_signed(&node_b.get_our_node_id(), &closing_signed_b.unwrap()).unwrap();
2887-
assert!(empty_a2.is_none());
2943+
node_a.handle_closing_signed(&node_b.get_our_node_id(), &closing_signed_b.unwrap()).unwrap();
2944+
let (as_update, none_a) = get_closing_signed_broadcast!(node_a, node_b.get_our_node_id());
2945+
assert!(none_a.is_none());
28882946
assert_eq!(broadcaster_a.txn_broadcasted.lock().unwrap().len(), 1);
28892947
tx_a = broadcaster_a.txn_broadcasted.lock().unwrap().remove(0);
2890-
}
2948+
(as_update, bs_update)
2949+
};
28912950
assert_eq!(tx_a, tx_b);
28922951
check_spends!(tx_a, funding_tx);
28932952

2894-
let events_2 = node_a.get_and_clear_pending_msg_events();
2895-
assert_eq!(events_2.len(), 1);
2896-
let as_update = match events_2[0] {
2897-
MessageSendEvent::BroadcastChannelUpdate { ref msg } => {
2898-
msg.clone()
2899-
},
2900-
_ => panic!("Unexpected event"),
2901-
};
2902-
2903-
let events_3 = node_b.get_and_clear_pending_msg_events();
2904-
assert_eq!(events_3.len(), 1);
2905-
let bs_update = match events_3[0] {
2906-
MessageSendEvent::BroadcastChannelUpdate { ref msg } => {
2907-
msg.clone()
2908-
},
2909-
_ => panic!("Unexpected event"),
2910-
};
2911-
29122953
(as_update, bs_update)
29132954
}
29142955

src/ln/msgs.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -235,12 +235,14 @@ pub struct FundingLocked {
235235
}
236236

237237
/// A shutdown message to be sent or received from a peer
238+
#[derive(Clone)]
238239
pub struct Shutdown {
239240
pub(crate) channel_id: [u8; 32],
240241
pub(crate) scriptpubkey: Script,
241242
}
242243

243244
/// A closing_signed message to be sent or received from a peer
245+
#[derive(Clone)]
244246
pub struct ClosingSigned {
245247
pub(crate) channel_id: [u8; 32],
246248
pub(crate) fee_satoshis: u64,
@@ -528,9 +530,9 @@ pub trait ChannelMessageHandler : events::MessageSendEventsProvider + Send + Syn
528530

529531
// Channl close:
530532
/// Handle an incoming shutdown message from the given peer.
531-
fn handle_shutdown(&self, their_node_id: &PublicKey, msg: &Shutdown) -> Result<(Option<Shutdown>, Option<ClosingSigned>), HandleError>;
533+
fn handle_shutdown(&self, their_node_id: &PublicKey, msg: &Shutdown) -> Result<(), HandleError>;
532534
/// Handle an incoming closing_signed message from the given peer.
533-
fn handle_closing_signed(&self, their_node_id: &PublicKey, msg: &ClosingSigned) -> Result<Option<ClosingSigned>, HandleError>;
535+
fn handle_closing_signed(&self, their_node_id: &PublicKey, msg: &ClosingSigned) -> Result<(), HandleError>;
534536

535537
// HTLC handling:
536538
/// Handle an incoming update_add_htlc message from the given peer.

src/ln/peer_handler.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -581,20 +581,11 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
581581

582582
38 => {
583583
let msg = try_potential_decodeerror!(msgs::Shutdown::read(&mut reader));
584-
let resp_options = try_potential_handleerror!(self.message_handler.chan_handler.handle_shutdown(&peer.their_node_id.unwrap(), &msg));
585-
if let Some(resp) = resp_options.0 {
586-
encode_and_send_msg!(resp, 38);
587-
}
588-
if let Some(resp) = resp_options.1 {
589-
encode_and_send_msg!(resp, 39);
590-
}
584+
try_potential_handleerror!(self.message_handler.chan_handler.handle_shutdown(&peer.their_node_id.unwrap(), &msg));
591585
},
592586
39 => {
593587
let msg = try_potential_decodeerror!(msgs::ClosingSigned::read(&mut reader));
594-
let resp_option = try_potential_handleerror!(self.message_handler.chan_handler.handle_closing_signed(&peer.their_node_id.unwrap(), &msg));
595-
if let Some(resp) = resp_option {
596-
encode_and_send_msg!(resp, 39);
597-
}
588+
try_potential_handleerror!(self.message_handler.chan_handler.handle_closing_signed(&peer.their_node_id.unwrap(), &msg));
598589
},
599590

600591
128 => {
@@ -886,6 +877,16 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
886877
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 133)));
887878
Self::do_attempt_write_data(&mut descriptor, peer);
888879
},
880+
MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => {
881+
log_trace!(self, "Handling SendClosingSigned event in peer_handler for node {} for channel {}",
882+
log_pubkey!(node_id),
883+
log_bytes!(msg.channel_id));
884+
let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
885+
//TODO: Do whatever we're gonna do for handling dropped messages
886+
});
887+
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 39)));
888+
Self::do_attempt_write_data(&mut descriptor, peer);
889+
},
889890
MessageSendEvent::SendShutdown { ref node_id, ref msg } => {
890891
log_trace!(self, "Handling Shutdown event in peer_handler for node {} for channel {}",
891892
log_pubkey!(node_id),

src/util/events.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,13 @@ pub enum MessageSendEvent {
146146
/// The message which should be sent.
147147
msg: msgs::RevokeAndACK,
148148
},
149+
/// Used to indicate that a closing_signed message should be sent to the peer with the given node_id.
150+
SendClosingSigned {
151+
/// The node_id of the node which should receive this message
152+
node_id: PublicKey,
153+
/// The message which should be sent.
154+
msg: msgs::ClosingSigned,
155+
},
149156
/// Used to indicate that a shutdown message should be sent to the peer with the given node_id.
150157
SendShutdown {
151158
/// The node_id of the node which should receive this message

src/util/test_utils.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,10 +101,10 @@ impl msgs::ChannelMessageHandler for TestChannelMessageHandler {
101101
fn handle_funding_locked(&self, _their_node_id: &PublicKey, _msg: &msgs::FundingLocked) -> Result<(), HandleError> {
102102
Err(HandleError { err: "", action: None })
103103
}
104-
fn handle_shutdown(&self, _their_node_id: &PublicKey, _msg: &msgs::Shutdown) -> Result<(Option<msgs::Shutdown>, Option<msgs::ClosingSigned>), HandleError> {
104+
fn handle_shutdown(&self, _their_node_id: &PublicKey, _msg: &msgs::Shutdown) -> Result<(), HandleError> {
105105
Err(HandleError { err: "", action: None })
106106
}
107-
fn handle_closing_signed(&self, _their_node_id: &PublicKey, _msg: &msgs::ClosingSigned) -> Result<Option<msgs::ClosingSigned>, HandleError> {
107+
fn handle_closing_signed(&self, _their_node_id: &PublicKey, _msg: &msgs::ClosingSigned) -> Result<(), HandleError> {
108108
Err(HandleError { err: "", action: None })
109109
}
110110
fn handle_update_add_htlc(&self, _their_node_id: &PublicKey, _msg: &msgs::UpdateAddHTLC) -> Result<(), HandleError> {

0 commit comments

Comments
 (0)