Skip to content

Commit b2bc5b1

Browse files
committed
refactor: Move drain_messages() behind Transport
1 parent 462bc10 commit b2bc5b1

File tree

2 files changed

+91
-58
lines changed

2 files changed

+91
-58
lines changed

lightning/src/ln/peers/handler.rs

Lines changed: 5 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use ln::msgs::{ChannelMessageHandler, LightningError, RoutingMessageHandler};
2323
use ln::channelmanager::{SimpleArcChannelManager, SimpleRefChannelManager};
2424
use util::ser::{Writeable};
2525
use ln::wire;
26-
use ln::wire::Encode;
26+
use ln::wire::{Encode, Message};
2727
use util::byte_utils;
2828
use util::events::{MessageSendEvent, MessageSendEventsProvider};
2929
use util::logger::Logger;
@@ -60,6 +60,9 @@ pub(super) trait ITransport {
6060
/// Returns true if the connection is established and encrypted messages can be sent.
6161
fn is_connected(&self) -> bool;
6262

63+
/// Returns all Messages that have been received and can be parsed by the Transport
64+
fn drain_messages<L: Deref>(&mut self, logger: L) -> Result<Vec<Message>, PeerHandleError> where L::Target: Logger;
65+
6366
/// Encodes, encrypts, and enqueues a message to the outbound queue. Panics if the connection is
6467
/// not established yet.
6568
fn enqueue_message<M: Encode + Writeable, Q: PayloadQueuer>(&mut self, message: &M, output_buffer: &mut Q);
@@ -580,60 +583,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
580583
}
581584
}
582585

583-
let mut received_messages = vec![];
584-
match peer.transport.conduit {
585-
None => { }
586-
Some(ref mut conduit) => {
587-
// Using Iterators that can error requires special handling
588-
// The item returned from next() has type Option<Result<Option<Vec>, String>>
589-
// The Some wrapper is stripped for each item inside the loop
590-
// There are 3 valid match cases:
591-
// 1) Some(Ok(Some(msg_data))) => Indicates a valid decrypted msg accessed via msg_data
592-
// 2) Some(Err(_)) => Indicates an error during decryption that should be handled
593-
// 3) None -> Indicates there were no messages available to decrypt
594-
// Invalid Cases
595-
// 1) Some(Ok(None)) => Translated to None case above so users of iterators can stop correctly
596-
for msg_data_result in &mut conduit.decryptor {
597-
match msg_data_result {
598-
Ok(Some(msg_data)) => {
599-
let mut reader = ::std::io::Cursor::new(&msg_data[..]);
600-
let message_result = wire::read(&mut reader);
601-
let message = match message_result {
602-
Ok(x) => x,
603-
Err(e) => {
604-
match e {
605-
msgs::DecodeError::UnknownVersion => return Err(PeerHandleError { no_connection_possible: false }),
606-
msgs::DecodeError::UnknownRequiredFeature => {
607-
log_debug!(self.logger, "Got a channel/node announcement with an known required feature flag, you may want to update!");
608-
continue;
609-
}
610-
msgs::DecodeError::InvalidValue => {
611-
log_debug!(self.logger, "Got an invalid value while deserializing message");
612-
return Err(PeerHandleError { no_connection_possible: false });
613-
}
614-
msgs::DecodeError::ShortRead => {
615-
log_debug!(self.logger, "Deserialization failed due to shortness of message");
616-
return Err(PeerHandleError { no_connection_possible: false });
617-
}
618-
msgs::DecodeError::BadLengthDescriptor => return Err(PeerHandleError { no_connection_possible: false }),
619-
msgs::DecodeError::Io(_) => return Err(PeerHandleError { no_connection_possible: false }),
620-
}
621-
}
622-
};
623-
624-
received_messages.push(message);
625-
},
626-
Err(e) => {
627-
log_trace!(self.logger, "Message decryption failed due to: {}", e);
628-
return Err(PeerHandleError { no_connection_possible: false });
629-
}
630-
Ok(None) => {
631-
panic!("Invalid behavior. Conduit iterator should never return this match.")
632-
}
633-
}
634-
}
635-
}
636-
}
586+
let received_messages = peer.transport.drain_messages(&*self.logger)?;
637587

638588
for message in received_messages {
639589
macro_rules! try_potential_handleerror {

lightning/src/ln/peers/transport.rs

Lines changed: 86 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@
33
use bitcoin::secp256k1::{SecretKey, PublicKey};
44

55
use ln::peers::conduit::Conduit;
6-
use ln::peers::handler::{ITransport, PayloadQueuer};
6+
use ln::peers::handler::{ITransport, PeerHandleError, PayloadQueuer};
77
use ln::peers::handshake::PeerHandshake;
8-
use ln::wire;
9-
use ln::wire::Encode;
8+
use ln::{wire, msgs};
9+
use ln::wire::{Encode, Message};
1010

1111
use util::ser::{Writeable, VecWriter};
12+
use util::logger::Logger;
13+
use std::ops::Deref;
1214

1315
/// Interface used by Transport to interact with a handshake object
1416
pub trait IPeerHandshake {
@@ -78,6 +80,68 @@ impl<PeerHandshakeImpl: IPeerHandshake> ITransport for Transport<PeerHandshakeIm
7880
Ok(())
7981
}
8082

83+
fn drain_messages<L: Deref>(&mut self, logger: L) -> Result<Vec<Message>, PeerHandleError>
84+
where L::Target: Logger {
85+
86+
let mut received_messages = vec![];
87+
88+
match self.conduit {
89+
None => {}
90+
Some(ref mut conduit) => {
91+
// Using Iterators that can error requires special handling
92+
// The item returned from next() has type Option<Result<Option<Vec>, String>>
93+
// The Some wrapper is stripped for each item inside the loop
94+
// There are 3 valid match cases:
95+
// 1) Some(Ok(Some(msg_data))) => Indicates a valid decrypted msg accessed via msg_data
96+
// 2) Some(Err(_)) => Indicates an error during decryption that should be handled
97+
// 3) None -> Indicates there were no messages available to decrypt
98+
// Invalid Cases
99+
// 1) Some(Ok(None)) => Translated to None case above so users of iterators can stop correctly
100+
for msg_data_result in &mut conduit.decryptor {
101+
match msg_data_result {
102+
Ok(Some(msg_data)) => {
103+
let mut reader = ::std::io::Cursor::new(&msg_data[..]);
104+
let message_result = wire::read(&mut reader);
105+
let message = match message_result {
106+
Ok(x) => x,
107+
Err(e) => {
108+
match e {
109+
msgs::DecodeError::UnknownVersion => return Err(PeerHandleError { no_connection_possible: false }),
110+
msgs::DecodeError::UnknownRequiredFeature => {
111+
log_debug!(logger, "Got a channel/node announcement with an known required feature flag, you may want to update!");
112+
continue;
113+
}
114+
msgs::DecodeError::InvalidValue => {
115+
log_debug!(logger, "Got an invalid value while deserializing message");
116+
return Err(PeerHandleError { no_connection_possible: false });
117+
}
118+
msgs::DecodeError::ShortRead => {
119+
log_debug!(logger, "Deserialization failed due to shortness of message");
120+
return Err(PeerHandleError { no_connection_possible: false });
121+
}
122+
msgs::DecodeError::BadLengthDescriptor => return Err(PeerHandleError { no_connection_possible: false }),
123+
msgs::DecodeError::Io(_) => return Err(PeerHandleError { no_connection_possible: false }),
124+
}
125+
}
126+
};
127+
128+
received_messages.push(message);
129+
},
130+
Err(e) => {
131+
log_trace!(logger, "Message decryption failed due to: {}", e);
132+
return Err(PeerHandleError { no_connection_possible: false });
133+
}
134+
Ok(None) => {
135+
panic!("Invalid behavior. Conduit iterator should never return this match.")
136+
}
137+
}
138+
}
139+
}
140+
}
141+
142+
Ok(received_messages)
143+
}
144+
81145
fn is_connected(&self) -> bool {
82146
self.conduit.is_some()
83147
}
@@ -102,6 +166,7 @@ mod tests {
102166
use bitcoin::secp256k1;
103167
use bitcoin::secp256k1::key::{PublicKey, SecretKey};
104168
use ln::msgs;
169+
use util::test_utils::TestLogger;
105170

106171
fn create_outbound_for_test<PeerHandshakeImpl: IPeerHandshake>() -> Transport<PeerHandshakeImpl> {
107172
let curve = secp256k1::Secp256k1::new();
@@ -254,4 +319,22 @@ mod tests {
254319

255320
assert_matches!(&spy[..], [_]);
256321
}
322+
323+
#[test]
324+
fn inbound_not_connected_empty() {
325+
let logger = TestLogger::new();
326+
let mut transport = create_inbound_for_test::<PeerHandshakeTestStubComplete>();
327+
328+
let messages = transport.drain_messages(&logger).unwrap();
329+
assert_eq!(messages.len(), 0);
330+
}
331+
332+
#[test]
333+
fn outbound_not_connected_empty() {
334+
let logger = TestLogger::new();
335+
let mut transport = create_outbound_for_test::<PeerHandshakeTestStubComplete>();
336+
337+
let messages = transport.drain_messages(&logger).unwrap();
338+
assert_eq!(messages.len(), 0);
339+
}
257340
}

0 commit comments

Comments
 (0)