Skip to content

Commit cb25b4e

Browse files
committed
Send shutdown/closing_signed msgs out-of-band for ordered delivery
1 parent 3d3f8d0 commit cb25b4e

File tree

5 files changed

+120
-69
lines changed

5 files changed

+120
-69
lines changed

src/ln/channelmanager.rs

Lines changed: 95 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1785,8 +1785,8 @@ impl ChannelManager {
17851785
}
17861786
}
17871787

1788-
fn internal_shutdown(&self, their_node_id: &PublicKey, msg: &msgs::Shutdown) -> Result<(Option<msgs::Shutdown>, Option<msgs::ClosingSigned>), MsgHandleErrInternal> {
1789-
let (mut res, chan_option) = {
1788+
fn internal_shutdown(&self, their_node_id: &PublicKey, msg: &msgs::Shutdown) -> Result<(), MsgHandleErrInternal> {
1789+
let (mut dropped_htlcs, chan_option) = {
17901790
let mut channel_state_lock = self.channel_state.lock().unwrap();
17911791
let channel_state = channel_state_lock.borrow_parts();
17921792

@@ -1796,18 +1796,30 @@ impl ChannelManager {
17961796
//TODO: here and below MsgHandleErrInternal, #153 case
17971797
return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!", msg.channel_id));
17981798
}
1799-
let res = chan_entry.get_mut().shutdown(&*self.fee_estimator, &msg).map_err(|e| MsgHandleErrInternal::from_maybe_close(e))?;
1799+
let (shutdown, closing_signed, dropped_htlcs) = chan_entry.get_mut().shutdown(&*self.fee_estimator, &msg).map_err(|e| MsgHandleErrInternal::from_maybe_close(e))?;
1800+
if let Some(msg) = shutdown {
1801+
channel_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown {
1802+
node_id: their_node_id.clone(),
1803+
msg,
1804+
});
1805+
}
1806+
if let Some(msg) = closing_signed {
1807+
channel_state.pending_msg_events.push(events::MessageSendEvent::SendClosingSigned {
1808+
node_id: their_node_id.clone(),
1809+
msg,
1810+
});
1811+
}
18001812
if chan_entry.get().is_shutdown() {
18011813
if let Some(short_id) = chan_entry.get().get_short_channel_id() {
18021814
channel_state.short_to_id.remove(&short_id);
18031815
}
1804-
(res, Some(chan_entry.remove_entry().1))
1805-
} else { (res, None) }
1816+
(dropped_htlcs, Some(chan_entry.remove_entry().1))
1817+
} else { (dropped_htlcs, None) }
18061818
},
18071819
hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel", msg.channel_id))
18081820
}
18091821
};
1810-
for htlc_source in res.2.drain(..) {
1822+
for htlc_source in dropped_htlcs.drain(..) {
18111823
// unknown_next_peer...I dunno who that is anymore....
18121824
self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_source.0, &htlc_source.1, HTLCFailReason::Reason { failure_code: 0x4000 | 10, data: Vec::new() });
18131825
}
@@ -1819,11 +1831,11 @@ impl ChannelManager {
18191831
});
18201832
}
18211833
}
1822-
Ok((res.0, res.1))
1834+
Ok(())
18231835
}
18241836

1825-
fn internal_closing_signed(&self, their_node_id: &PublicKey, msg: &msgs::ClosingSigned) -> Result<Option<msgs::ClosingSigned>, MsgHandleErrInternal> {
1826-
let (res, chan_option) = {
1837+
fn internal_closing_signed(&self, their_node_id: &PublicKey, msg: &msgs::ClosingSigned) -> Result<(), MsgHandleErrInternal> {
1838+
let (tx, chan_option) = {
18271839
let mut channel_state_lock = self.channel_state.lock().unwrap();
18281840
let channel_state = channel_state_lock.borrow_parts();
18291841
match channel_state.by_id.entry(msg.channel_id.clone()) {
@@ -1832,8 +1844,14 @@ impl ChannelManager {
18321844
//TODO: here and below MsgHandleErrInternal, #153 case
18331845
return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!", msg.channel_id));
18341846
}
1835-
let res = chan_entry.get_mut().closing_signed(&*self.fee_estimator, &msg).map_err(|e| MsgHandleErrInternal::from_maybe_close(e))?;
1836-
if res.1.is_some() {
1847+
let (closing_signed, tx) = chan_entry.get_mut().closing_signed(&*self.fee_estimator, &msg).map_err(|e| MsgHandleErrInternal::from_maybe_close(e))?;
1848+
if let Some(msg) = closing_signed {
1849+
channel_state.pending_msg_events.push(events::MessageSendEvent::SendClosingSigned {
1850+
node_id: their_node_id.clone(),
1851+
msg,
1852+
});
1853+
}
1854+
if tx.is_some() {
18371855
// We're done with this channel, we've got a signed closing transaction and
18381856
// will send the closing_signed back to the remote peer upon return. This
18391857
// also implies there are no pending HTLCs left on the channel, so we can
@@ -1842,13 +1860,13 @@ impl ChannelManager {
18421860
if let Some(short_id) = chan_entry.get().get_short_channel_id() {
18431861
channel_state.short_to_id.remove(&short_id);
18441862
}
1845-
(res, Some(chan_entry.remove_entry().1))
1846-
} else { (res, None) }
1863+
(tx, Some(chan_entry.remove_entry().1))
1864+
} else { (tx, None) }
18471865
},
18481866
hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel", msg.channel_id))
18491867
}
18501868
};
1851-
if let Some(broadcast_tx) = res.1 {
1869+
if let Some(broadcast_tx) = tx {
18521870
self.tx_broadcaster.broadcast_transaction(&broadcast_tx);
18531871
}
18541872
if let Some(chan) = chan_option {
@@ -1859,7 +1877,7 @@ impl ChannelManager {
18591877
});
18601878
}
18611879
}
1862-
Ok(res.0)
1880+
Ok(())
18631881
}
18641882

18651883
fn internal_update_add_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateAddHTLC) -> Result<(), MsgHandleErrInternal> {
@@ -2522,11 +2540,11 @@ impl ChannelMessageHandler for ChannelManager {
25222540
handle_error!(self, self.internal_funding_locked(their_node_id, msg), their_node_id)
25232541
}
25242542

2525-
fn handle_shutdown(&self, their_node_id: &PublicKey, msg: &msgs::Shutdown) -> Result<(Option<msgs::Shutdown>, Option<msgs::ClosingSigned>), HandleError> {
2543+
fn handle_shutdown(&self, their_node_id: &PublicKey, msg: &msgs::Shutdown) -> Result<(), HandleError> {
25262544
handle_error!(self, self.internal_shutdown(their_node_id, msg), their_node_id)
25272545
}
25282546

2529-
fn handle_closing_signed(&self, their_node_id: &PublicKey, msg: &msgs::ClosingSigned) -> Result<Option<msgs::ClosingSigned>, HandleError> {
2547+
fn handle_closing_signed(&self, their_node_id: &PublicKey, msg: &msgs::ClosingSigned) -> Result<(), HandleError> {
25302548
handle_error!(self, self.internal_closing_signed(their_node_id, msg), their_node_id)
25312549
}
25322550

@@ -3050,68 +3068,91 @@ mod tests {
30503068
}
30513069

30523070
fn close_channel(outbound_node: &Node, inbound_node: &Node, channel_id: &[u8; 32], funding_tx: Transaction, close_inbound_first: bool) -> (msgs::ChannelUpdate, msgs::ChannelUpdate) {
3053-
let (node_a, broadcaster_a) = if close_inbound_first { (&inbound_node.node, &inbound_node.tx_broadcaster) } else { (&outbound_node.node, &outbound_node.tx_broadcaster) };
3071+
let (node_a, broadcaster_a, struct_a) = if close_inbound_first { (&inbound_node.node, &inbound_node.tx_broadcaster, inbound_node) } else { (&outbound_node.node, &outbound_node.tx_broadcaster, outbound_node) };
30543072
let (node_b, broadcaster_b) = if close_inbound_first { (&outbound_node.node, &outbound_node.tx_broadcaster) } else { (&inbound_node.node, &inbound_node.tx_broadcaster) };
30553073
let (tx_a, tx_b);
30563074

30573075
node_a.close_channel(channel_id).unwrap();
3058-
let events_1 = node_a.get_and_clear_pending_msg_events();
3059-
assert_eq!(events_1.len(), 1);
3060-
let shutdown_a = match events_1[0] {
3076+
node_b.handle_shutdown(&node_a.get_our_node_id(), &get_event_msg!(struct_a, MessageSendEvent::SendShutdown, node_b.get_our_node_id())).unwrap();
3077+
3078+
let events_1 = node_b.get_and_clear_pending_msg_events();
3079+
assert!(events_1.len() >= 1);
3080+
let shutdown_b = match events_1[0] {
30613081
MessageSendEvent::SendShutdown { ref node_id, ref msg } => {
3062-
assert_eq!(node_id, &node_b.get_our_node_id());
3082+
assert_eq!(node_id, &node_a.get_our_node_id());
30633083
msg.clone()
30643084
},
30653085
_ => panic!("Unexpected event"),
30663086
};
30673087

3068-
let (shutdown_b, mut closing_signed_b) = node_b.handle_shutdown(&node_a.get_our_node_id(), &shutdown_a).unwrap();
3069-
if !close_inbound_first {
3070-
assert!(closing_signed_b.is_none());
3088+
let closing_signed_b = if !close_inbound_first {
3089+
assert_eq!(events_1.len(), 1);
3090+
None
3091+
} else {
3092+
Some(match events_1[1] {
3093+
MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => {
3094+
assert_eq!(node_id, &node_a.get_our_node_id());
3095+
msg.clone()
3096+
},
3097+
_ => panic!("Unexpected event"),
3098+
})
3099+
};
3100+
3101+
macro_rules! get_closing_signed_broadcast {
3102+
($node: expr, $dest_pubkey: expr) => {
3103+
{
3104+
let events = $node.get_and_clear_pending_msg_events();
3105+
assert!(events.len() == 1 || events.len() == 2);
3106+
(match events[events.len() - 1] {
3107+
MessageSendEvent::BroadcastChannelUpdate { ref msg } => {
3108+
msg.clone()
3109+
},
3110+
_ => panic!("Unexpected event"),
3111+
}, if events.len() == 2 {
3112+
match events[0] {
3113+
MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => {
3114+
assert_eq!(*node_id, $dest_pubkey);
3115+
Some(msg.clone())
3116+
},
3117+
_ => panic!("Unexpected event"),
3118+
}
3119+
} else { None })
3120+
}
3121+
}
30713122
}
3072-
let (empty_a, mut closing_signed_a) = node_a.handle_shutdown(&node_b.get_our_node_id(), &shutdown_b.unwrap()).unwrap();
3073-
assert!(empty_a.is_none());
3074-
if close_inbound_first {
3075-
assert!(closing_signed_a.is_none());
3076-
closing_signed_a = node_a.handle_closing_signed(&node_b.get_our_node_id(), &closing_signed_b.unwrap()).unwrap();
3123+
3124+
node_a.handle_shutdown(&node_b.get_our_node_id(), &shutdown_b).unwrap();
3125+
let (as_update, bs_update) = if close_inbound_first {
3126+
assert!(node_a.get_and_clear_pending_msg_events().is_empty());
3127+
node_a.handle_closing_signed(&node_b.get_our_node_id(), &closing_signed_b.unwrap()).unwrap();
30773128
assert_eq!(broadcaster_a.txn_broadcasted.lock().unwrap().len(), 1);
30783129
tx_a = broadcaster_a.txn_broadcasted.lock().unwrap().remove(0);
3130+
let (as_update, closing_signed_a) = get_closing_signed_broadcast!(node_a, node_b.get_our_node_id());
30793131

3080-
let empty_b = node_b.handle_closing_signed(&node_a.get_our_node_id(), &closing_signed_a.unwrap()).unwrap();
3081-
assert!(empty_b.is_none());
3132+
node_b.handle_closing_signed(&node_a.get_our_node_id(), &closing_signed_a.unwrap()).unwrap();
3133+
let (bs_update, none_b) = get_closing_signed_broadcast!(node_b, node_a.get_our_node_id());
3134+
assert!(none_b.is_none());
30823135
assert_eq!(broadcaster_b.txn_broadcasted.lock().unwrap().len(), 1);
30833136
tx_b = broadcaster_b.txn_broadcasted.lock().unwrap().remove(0);
3137+
(as_update, bs_update)
30843138
} else {
3085-
closing_signed_b = node_b.handle_closing_signed(&node_a.get_our_node_id(), &closing_signed_a.unwrap()).unwrap();
3139+
let closing_signed_a = get_event_msg!(struct_a, MessageSendEvent::SendClosingSigned, node_b.get_our_node_id());
3140+
3141+
node_b.handle_closing_signed(&node_a.get_our_node_id(), &closing_signed_a).unwrap();
30863142
assert_eq!(broadcaster_b.txn_broadcasted.lock().unwrap().len(), 1);
30873143
tx_b = broadcaster_b.txn_broadcasted.lock().unwrap().remove(0);
3144+
let (bs_update, closing_signed_b) = get_closing_signed_broadcast!(node_b, node_a.get_our_node_id());
30883145

3089-
let empty_a2 = node_a.handle_closing_signed(&node_b.get_our_node_id(), &closing_signed_b.unwrap()).unwrap();
3090-
assert!(empty_a2.is_none());
3146+
node_a.handle_closing_signed(&node_b.get_our_node_id(), &closing_signed_b.unwrap()).unwrap();
3147+
let (as_update, none_a) = get_closing_signed_broadcast!(node_a, node_b.get_our_node_id());
3148+
assert!(none_a.is_none());
30913149
assert_eq!(broadcaster_a.txn_broadcasted.lock().unwrap().len(), 1);
30923150
tx_a = broadcaster_a.txn_broadcasted.lock().unwrap().remove(0);
3093-
}
3151+
(as_update, bs_update)
3152+
};
30943153
assert_eq!(tx_a, tx_b);
30953154
check_spends!(tx_a, funding_tx);
30963155

3097-
let events_2 = node_a.get_and_clear_pending_msg_events();
3098-
assert_eq!(events_2.len(), 1);
3099-
let as_update = match events_2[0] {
3100-
MessageSendEvent::BroadcastChannelUpdate { ref msg } => {
3101-
msg.clone()
3102-
},
3103-
_ => panic!("Unexpected event"),
3104-
};
3105-
3106-
let events_3 = node_b.get_and_clear_pending_msg_events();
3107-
assert_eq!(events_3.len(), 1);
3108-
let bs_update = match events_3[0] {
3109-
MessageSendEvent::BroadcastChannelUpdate { ref msg } => {
3110-
msg.clone()
3111-
},
3112-
_ => panic!("Unexpected event"),
3113-
};
3114-
31153156
(as_update, bs_update)
31163157
}
31173158

src/ln/msgs.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -235,12 +235,14 @@ pub struct FundingLocked {
235235
}
236236

237237
/// A shutdown message to be sent or received from a peer
238+
#[derive(Clone)]
238239
pub struct Shutdown {
239240
pub(crate) channel_id: [u8; 32],
240241
pub(crate) scriptpubkey: Script,
241242
}
242243

243244
/// A closing_signed message to be sent or received from a peer
245+
#[derive(Clone)]
244246
pub struct ClosingSigned {
245247
pub(crate) channel_id: [u8; 32],
246248
pub(crate) fee_satoshis: u64,
@@ -540,9 +542,9 @@ pub trait ChannelMessageHandler : events::MessageSendEventsProvider + Send + Syn
540542

541543
// Channl close:
542544
/// Handle an incoming shutdown message from the given peer.
543-
fn handle_shutdown(&self, their_node_id: &PublicKey, msg: &Shutdown) -> Result<(Option<Shutdown>, Option<ClosingSigned>), HandleError>;
545+
fn handle_shutdown(&self, their_node_id: &PublicKey, msg: &Shutdown) -> Result<(), HandleError>;
544546
/// Handle an incoming closing_signed message from the given peer.
545-
fn handle_closing_signed(&self, their_node_id: &PublicKey, msg: &ClosingSigned) -> Result<Option<ClosingSigned>, HandleError>;
547+
fn handle_closing_signed(&self, their_node_id: &PublicKey, msg: &ClosingSigned) -> Result<(), HandleError>;
546548

547549
// HTLC handling:
548550
/// Handle an incoming update_add_htlc message from the given peer.

src/ln/peer_handler.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -581,20 +581,11 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
581581

582582
38 => {
583583
let msg = try_potential_decodeerror!(msgs::Shutdown::read(&mut reader));
584-
let resp_options = try_potential_handleerror!(self.message_handler.chan_handler.handle_shutdown(&peer.their_node_id.unwrap(), &msg));
585-
if let Some(resp) = resp_options.0 {
586-
encode_and_send_msg!(resp, 38);
587-
}
588-
if let Some(resp) = resp_options.1 {
589-
encode_and_send_msg!(resp, 39);
590-
}
584+
try_potential_handleerror!(self.message_handler.chan_handler.handle_shutdown(&peer.their_node_id.unwrap(), &msg));
591585
},
592586
39 => {
593587
let msg = try_potential_decodeerror!(msgs::ClosingSigned::read(&mut reader));
594-
let resp_option = try_potential_handleerror!(self.message_handler.chan_handler.handle_closing_signed(&peer.their_node_id.unwrap(), &msg));
595-
if let Some(resp) = resp_option {
596-
encode_and_send_msg!(resp, 39);
597-
}
588+
try_potential_handleerror!(self.message_handler.chan_handler.handle_closing_signed(&peer.their_node_id.unwrap(), &msg));
598589
},
599590

600591
128 => {
@@ -883,6 +874,16 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
883874
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 133)));
884875
Self::do_attempt_write_data(&mut descriptor, peer);
885876
},
877+
MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => {
878+
log_trace!(self, "Handling SendClosingSigned event in peer_handler for node {} for channel {}",
879+
log_pubkey!(node_id),
880+
log_bytes!(msg.channel_id));
881+
let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
882+
//TODO: Do whatever we're gonna do for handling dropped messages
883+
});
884+
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 39)));
885+
Self::do_attempt_write_data(&mut descriptor, peer);
886+
},
886887
MessageSendEvent::SendShutdown { ref node_id, ref msg } => {
887888
log_trace!(self, "Handling Shutdown event in peer_handler for node {} for channel {}",
888889
log_pubkey!(node_id),

src/util/events.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,13 @@ pub enum MessageSendEvent {
150150
/// The message which should be sent.
151151
msg: msgs::RevokeAndACK,
152152
},
153+
/// Used to indicate that a closing_signed message should be sent to the peer with the given node_id.
154+
SendClosingSigned {
155+
/// The node_id of the node which should receive this message
156+
node_id: PublicKey,
157+
/// The message which should be sent.
158+
msg: msgs::ClosingSigned,
159+
},
153160
/// Used to indicate that a shutdown message should be sent to the peer with the given node_id.
154161
SendShutdown {
155162
/// The node_id of the node which should receive this message

src/util/test_utils.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,10 +101,10 @@ impl msgs::ChannelMessageHandler for TestChannelMessageHandler {
101101
fn handle_funding_locked(&self, _their_node_id: &PublicKey, _msg: &msgs::FundingLocked) -> Result<(), HandleError> {
102102
Err(HandleError { err: "", action: None })
103103
}
104-
fn handle_shutdown(&self, _their_node_id: &PublicKey, _msg: &msgs::Shutdown) -> Result<(Option<msgs::Shutdown>, Option<msgs::ClosingSigned>), HandleError> {
104+
fn handle_shutdown(&self, _their_node_id: &PublicKey, _msg: &msgs::Shutdown) -> Result<(), HandleError> {
105105
Err(HandleError { err: "", action: None })
106106
}
107-
fn handle_closing_signed(&self, _their_node_id: &PublicKey, _msg: &msgs::ClosingSigned) -> Result<Option<msgs::ClosingSigned>, HandleError> {
107+
fn handle_closing_signed(&self, _their_node_id: &PublicKey, _msg: &msgs::ClosingSigned) -> Result<(), HandleError> {
108108
Err(HandleError { err: "", action: None })
109109
}
110110
fn handle_update_add_htlc(&self, _their_node_id: &PublicKey, _msg: &msgs::UpdateAddHTLC) -> Result<(), HandleError> {

0 commit comments

Comments
 (0)