Skip to content

Commit 922c4f7

Browse files
committed
Send channel_reestablish out-of-band to ensure ordered deliver
1 parent 93a8429 commit 922c4f7

File tree

5 files changed

+63
-22
lines changed

5 files changed

+63
-22
lines changed

src/ln/channelmanager.rs

Lines changed: 41 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2703,9 +2703,10 @@ impl ChannelMessageHandler for ChannelManager {
27032703
}
27042704
}
27052705

2706-
fn peer_connected(&self, their_node_id: &PublicKey) -> Vec<msgs::ChannelReestablish> {
2707-
let mut res = Vec::new();
2708-
let mut channel_state = self.channel_state.lock().unwrap();
2706+
fn peer_connected(&self, their_node_id: &PublicKey) {
2707+
let mut channel_state_lock = self.channel_state.lock().unwrap();
2708+
let channel_state = channel_state_lock.borrow_parts();
2709+
let pending_msg_events = channel_state.pending_msg_events;
27092710
channel_state.by_id.retain(|_, chan| {
27102711
if chan.get_their_node_id() == *their_node_id {
27112712
if !chan.have_received_message() {
@@ -2715,13 +2716,15 @@ impl ChannelMessageHandler for ChannelManager {
27152716
// drop it.
27162717
false
27172718
} else {
2718-
res.push(chan.get_channel_reestablish());
2719+
pending_msg_events.push(events::MessageSendEvent::SendChannelReestablish {
2720+
node_id: chan.get_their_node_id(),
2721+
msg: chan.get_channel_reestablish(),
2722+
});
27192723
true
27202724
}
27212725
} else { true }
27222726
});
27232727
//TODO: Also re-broadcast announcement_signatures
2724-
res
27252728
}
27262729

27272730
fn handle_error(&self, their_node_id: &PublicKey, msg: &msgs::ErrorMessage) {
@@ -5224,6 +5227,23 @@ mod tests {
52245227
assert_eq!(channel_state.short_to_id.len(), 0);
52255228
}
52265229

5230+
macro_rules! get_chan_reestablish_msgs {
5231+
($src_node: expr, $dst_node: expr) => {
5232+
{
5233+
let mut res = Vec::with_capacity(1);
5234+
for msg in $src_node.node.get_and_clear_pending_msg_events() {
5235+
if let MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } = msg {
5236+
assert_eq!(*node_id, $dst_node.node.get_our_node_id());
5237+
res.push(msg.clone());
5238+
} else {
5239+
panic!("Unexpected event")
5240+
}
5241+
}
5242+
res
5243+
}
5244+
}
5245+
}
5246+
52275247
macro_rules! handle_chan_reestablish_msgs {
52285248
($src_node: expr, $dst_node: expr) => {
52295249
{
@@ -5282,8 +5302,10 @@ mod tests {
52825302
/// pending_htlc_adds includes both the holding cell and in-flight update_add_htlcs, whereas
52835303
/// for claims/fails they are separated out.
52845304
fn reconnect_nodes(node_a: &Node, node_b: &Node, pre_all_htlcs: bool, pending_htlc_adds: (i64, i64), pending_htlc_claims: (usize, usize), pending_cell_htlc_claims: (usize, usize), pending_cell_htlc_fails: (usize, usize), pending_raa: (bool, bool)) {
5285-
let reestablish_1 = node_a.node.peer_connected(&node_b.node.get_our_node_id());
5286-
let reestablish_2 = node_b.node.peer_connected(&node_a.node.get_our_node_id());
5305+
node_a.node.peer_connected(&node_b.node.get_our_node_id());
5306+
let reestablish_1 = get_chan_reestablish_msgs!(node_a, node_b);
5307+
node_b.node.peer_connected(&node_a.node.get_our_node_id());
5308+
let reestablish_2 = get_chan_reestablish_msgs!(node_b, node_a);
52875309

52885310
let mut resp_1 = Vec::new();
52895311
for msg in reestablish_1 {
@@ -5781,9 +5803,11 @@ mod tests {
57815803
nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false);
57825804
nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false);
57835805

5784-
let reestablish_1 = nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id());
5806+
nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id());
5807+
let reestablish_1 = get_chan_reestablish_msgs!(nodes[0], nodes[1]);
57855808
assert_eq!(reestablish_1.len(), 1);
5786-
let reestablish_2 = nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id());
5809+
nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id());
5810+
let reestablish_2 = get_chan_reestablish_msgs!(nodes[1], nodes[0]);
57875811
assert_eq!(reestablish_2.len(), 1);
57885812

57895813
nodes[0].node.handle_channel_reestablish(&nodes[1].node.get_our_node_id(), &reestablish_2[0]).unwrap();
@@ -6069,9 +6093,11 @@ mod tests {
60696093
nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false);
60706094
nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false);
60716095

6072-
let reestablish_1 = nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id());
6096+
nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id());
6097+
let reestablish_1 = get_chan_reestablish_msgs!(nodes[0], nodes[1]);
60736098
assert_eq!(reestablish_1.len(), 1);
6074-
let reestablish_2 = nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id());
6099+
nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id());
6100+
let reestablish_2 = get_chan_reestablish_msgs!(nodes[1], nodes[0]);
60756101
assert_eq!(reestablish_2.len(), 1);
60766102

60776103
nodes[0].node.handle_channel_reestablish(&nodes[1].node.get_our_node_id(), &reestablish_2[0]).unwrap();
@@ -6089,9 +6115,11 @@ mod tests {
60896115
assert!(nodes[0].node.get_and_clear_pending_events().is_empty());
60906116
assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty());
60916117

6092-
let reestablish_1 = nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id());
6118+
nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id());
6119+
let reestablish_1 = get_chan_reestablish_msgs!(nodes[0], nodes[1]);
60936120
assert_eq!(reestablish_1.len(), 1);
6094-
let reestablish_2 = nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id());
6121+
nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id());
6122+
let reestablish_2 = get_chan_reestablish_msgs!(nodes[1], nodes[0]);
60956123
assert_eq!(reestablish_2.len(), 1);
60966124

60976125
nodes[0].node.handle_channel_reestablish(&nodes[1].node.get_our_node_id(), &reestablish_2[0]).unwrap();

src/ln/msgs.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -308,14 +308,14 @@ pub struct UpdateFee {
308308
pub(crate) feerate_per_kw: u32,
309309
}
310310

311-
#[derive(PartialEq)]
311+
#[derive(PartialEq, Clone)]
312312
pub(crate) struct DataLossProtect {
313313
pub(crate) your_last_per_commitment_secret: [u8; 32],
314314
pub(crate) my_current_per_commitment_point: PublicKey,
315315
}
316316

317317
/// A channel_reestablish message to be sent or received from a peer
318-
#[derive(PartialEq)]
318+
#[derive(PartialEq, Clone)]
319319
pub struct ChannelReestablish {
320320
pub(crate) channel_id: [u8; 32],
321321
pub(crate) next_local_commitment_number: u64,
@@ -563,7 +563,7 @@ pub trait ChannelMessageHandler : events::MessageSendEventsProvider + Send + Syn
563563
fn peer_disconnected(&self, their_node_id: &PublicKey, no_connection_possible: bool);
564564

565565
/// Handle a peer reconnecting, possibly generating channel_reestablish message(s).
566-
fn peer_connected(&self, their_node_id: &PublicKey) -> Vec<ChannelReestablish>;
566+
fn peer_connected(&self, their_node_id: &PublicKey);
567567
/// Handle an incoming channel_reestablish message from the given peer.
568568
fn handle_channel_reestablish(&self, their_node_id: &PublicKey, msg: &ChannelReestablish) -> Result<(), HandleError>;
569569

src/ln/peer_handler.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -520,9 +520,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
520520
}, 16);
521521
}
522522

523-
for msg in self.message_handler.chan_handler.peer_connected(&peer.their_node_id.unwrap()) {
524-
encode_and_send_msg!(msg, 136);
525-
}
523+
self.message_handler.chan_handler.peer_connected(&peer.their_node_id.unwrap());
526524
},
527525
17 => {
528526
let msg = try_potential_decodeerror!(msgs::ErrorMessage::read(&mut reader));
@@ -834,6 +832,16 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
834832
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 38)));
835833
Self::do_attempt_write_data(&mut descriptor, peer);
836834
},
835+
MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => {
836+
log_trace!(self, "Handling SendChannelReestablish event in peer_handler for node {} for channel {}",
837+
log_pubkey!(node_id),
838+
log_bytes!(msg.channel_id));
839+
let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
840+
//TODO: Do whatever we're gonna do for handling dropped messages
841+
});
842+
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 136)));
843+
Self::do_attempt_write_data(&mut descriptor, peer);
844+
},
837845
MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => {
838846
log_trace!(self, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id);
839847
if self.message_handler.route_handler.handle_channel_announcement(msg).is_ok() && self.message_handler.route_handler.handle_channel_update(update_msg).is_ok() {

src/util/events.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,13 @@ pub enum MessageSendEvent {
164164
/// The message which should be sent.
165165
msg: msgs::Shutdown,
166166
},
167+
/// Used to indicate that a channel_reestablish message should be sent to the peer with the given node_id.
168+
SendChannelReestablish {
169+
/// The node_id of the node which should receive this message
170+
node_id: PublicKey,
171+
/// The message which should be sent.
172+
msg: msgs::ChannelReestablish,
173+
},
167174
/// Used to indicate that a channel_announcement and channel_update should be broadcast to all
168175
/// peers (except the peer with node_id either msg.contents.node_id_1 or msg.contents.node_id_2).
169176
BroadcastChannelAnnouncement {

src/util/test_utils.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -135,9 +135,7 @@ impl msgs::ChannelMessageHandler for TestChannelMessageHandler {
135135
Err(HandleError { err: "", action: None })
136136
}
137137
fn peer_disconnected(&self, _their_node_id: &PublicKey, _no_connection_possible: bool) {}
138-
fn peer_connected(&self, _their_node_id: &PublicKey) -> Vec<msgs::ChannelReestablish> {
139-
Vec::new()
140-
}
138+
fn peer_connected(&self, _their_node_id: &PublicKey) {}
141139
fn handle_error(&self, _their_node_id: &PublicKey, _msg: &msgs::ErrorMessage) {}
142140
}
143141

0 commit comments

Comments
 (0)