Skip to content

Commit 9c3a27a

Browse files
committed
Merge rust-bitcoin/rust-bitcoin#680: Deprecate StreamReader
e860333 Fix typos (Riccardo Casatta) 9189539 Use BufReader internally in StreamReader to avoid performance regression on existing callers (Riccardo Casatta) 5dfb93d Deprecate StreamReader (Riccardo Casatta) 9ca6c75 Bench StreamReader (Riccardo Casatta) Pull request description: `StreamReader` performance is extremely poor in case the object decoded is "big enough" for example a full Block. In the common case, the buffer is 64k, so to successfully parse a 1MB block 16 decode attempts are made. Even if a user increases the buffer size, `read` is not going to necessarily fill the buffer, as stated in the doc https://doc.rust-lang.org/stable/std/io/trait.Read.html#tymethod.read. In my tests, the reads are 64kB even with a 1MB buffer. I think this is the root issue of the performance issue found in electrs in romanz/electrs#547 and they now have decided to decode the TCP stream with their own code in romanz/electrs@cd0531b and romanz/electrs@05e0221. Using directly `consensus_encode` seems to make more sense (taking care of using `BufRead` if necessary) so the `StreamReader` is deprecated ACKs for top commit: Kixunil: ACK e860333 apoelstra: ACK e860333 Tree-SHA512: a15a14f3f087be36271da5008d8dfb63866c9ddeb5ceb0e328b4a6d870131132a8b05103f7a3fed231f5bca099865efd07856b4766834d56ce2384b1bcdb889b
2 parents 4fa477c + e860333 commit 9c3a27a

File tree

3 files changed

+57
-65
lines changed

3 files changed

+57
-65
lines changed

examples/handshake.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,10 @@ extern crate bitcoin;
33
use std::net::{IpAddr, Ipv4Addr, Shutdown, SocketAddr, TcpStream};
44
use std::time::{SystemTime, UNIX_EPOCH};
55
use std::{env, process};
6-
use std::io::Write;
6+
use std::io::{Write, BufReader};
77

8-
use bitcoin::consensus::encode;
8+
use bitcoin::consensus::{encode, Decodable};
99
use bitcoin::network::{address, constants, message, message_network};
10-
use bitcoin::network::stream_reader::StreamReader;
1110
use bitcoin::secp256k1;
1211
use bitcoin::secp256k1::rand::Rng;
1312

@@ -41,10 +40,10 @@ fn main() {
4140

4241
// Setup StreamReader
4342
let read_stream = stream.try_clone().unwrap();
44-
let mut stream_reader = StreamReader::new(read_stream, None);
43+
let mut stream_reader = BufReader::new(read_stream);
4544
loop {
4645
// Loop an retrieve new messages
47-
let reply: message::RawNetworkMessage = stream_reader.read_next().unwrap();
46+
let reply = message::RawNetworkMessage::consensus_decode(&mut stream_reader).unwrap();
4847
match reply.payload {
4948
message::NetworkMessage::Version(_) => {
5049
println!("Received version message: {:?}", reply.payload);

src/blockdata/block.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -481,12 +481,33 @@ mod tests {
481481
mod benches {
482482
use super::Block;
483483
use EmptyWrite;
484-
use consensus::{deserialize, Encodable};
484+
use consensus::{deserialize, Encodable, serialize};
485485
use hashes::hex::FromHex;
486486
use test::{black_box, Bencher};
487+
use network::stream_reader::StreamReader;
487488

488489
const SOME_BLOCK: &'static str = "";
489490

491+
#[bench]
492+
#[allow(deprecated)]
493+
pub fn bench_stream_reader(bh: &mut Bencher) {
494+
let raw_block = Vec::from_hex(SOME_BLOCK).unwrap();
495+
let mut block: Block = deserialize(&raw_block).unwrap();
496+
for _ in 0..8 {
497+
// make a big block
498+
block.txdata.extend(block.txdata.clone().into_iter());
499+
}
500+
let big_block = serialize(&block);
501+
assert_eq!(big_block.len(), 1_085_011);
502+
let big_block = black_box(big_block);
503+
504+
bh.iter(|| {
505+
let mut reader = StreamReader::new(&big_block[..], None);
506+
let block: Block = reader.read_next().unwrap();
507+
black_box(&block);
508+
});
509+
}
510+
490511
#[bench]
491512
pub fn bench_block_serialize(bh: &mut Bencher) {
492513
let raw_block = Vec::from_hex(SOME_BLOCK).unwrap();

src/network/stream_reader.rs

Lines changed: 31 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -14,78 +14,53 @@
1414

1515
//! Stream reader.
1616
//!
17-
//! This module defines the `StreamReader` struct and its implementation which
18-
//! is used for parsing an incoming stream into separate `RawNetworkMessage`s,
19-
//! handling assembling messages from multiple packets, or dealing with partial
20-
//! or multiple messages in the stream (e.g. when reading from a TCP socket).
17+
//! Deprecated
18+
//!
19+
//! This module defines `StreamReader` struct and its implementation which is used
20+
//! for parsing incoming stream into separate `Decodable`s, handling assembling
21+
//! messages from multiple packets or dealing with partial or multiple messages in the stream
22+
//! (like can happen with reading from TCP socket)
2123
//!
22-
23-
use prelude::*;
2424
2525
use core::fmt;
26-
use io::{self, Read};
26+
use io::{Read, BufReader};
2727

2828
use consensus::{encode, Decodable};
2929

3030
/// Struct used to configure stream reader function
3131
pub struct StreamReader<R: Read> {
3232
/// Stream to read from
33-
pub stream: R,
34-
/// I/O buffer
35-
data: Vec<u8>,
36-
/// Buffer containing unparsed message part
37-
unparsed: Vec<u8>
33+
pub stream: BufReader<R>,
3834
}
3935

4036
impl<R: Read> fmt::Debug for StreamReader<R> {
4137
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
42-
write!(f, "StreamReader with buffer_size={} and unparsed content {:?}",
43-
self.data.capacity(), self.unparsed)
38+
write!(f, "StreamReader")
4439
}
4540
}
4641

4742
impl<R: Read> StreamReader<R> {
48-
/// Constructs new stream reader for a given input stream `stream` with
49-
/// optional parameter `buffer_size` determining reading buffer size
50-
pub fn new(stream: R, buffer_size: Option<usize>) -> StreamReader<R> {
43+
/// Constructs new stream reader for a given input stream `stream`
44+
#[deprecated(since="0.28.0", note="wrap your stream into a buffered reader if necessary and use consensus_encode directly")]
45+
pub fn new(stream: R, _buffer_size: Option<usize>) -> StreamReader<R> {
5146
StreamReader {
52-
stream,
53-
data: vec![0u8; buffer_size.unwrap_or(64 * 1024)],
54-
unparsed: vec![]
47+
stream: BufReader::new(stream),
5548
}
5649
}
5750

58-
/// Reads stream and parses next message from its current input,
59-
/// also taking into account previously unparsed partial message (if there was such).
51+
/// Reads stream and parses next message from its current input
52+
#[deprecated(since="0.28.0", note="wrap your stream into a buffered reader if necessary and use consensus_encode directly")]
6053
pub fn read_next<D: Decodable>(&mut self) -> Result<D, encode::Error> {
61-
loop {
62-
match encode::deserialize_partial::<D>(&self.unparsed) {
63-
// In this case we just have an incomplete data, so we need to read more
64-
Err(encode::Error::Io(ref err)) if err.kind () == io::ErrorKind::UnexpectedEof => {
65-
let count = self.stream.read(&mut self.data)?;
66-
if count > 0 {
67-
self.unparsed.extend(self.data[0..count].iter());
68-
}
69-
else {
70-
return Err(encode::Error::Io(io::Error::from(io::ErrorKind::UnexpectedEof)));
71-
}
72-
},
73-
Err(err) => return Err(err),
74-
// We have successfully read from the buffer
75-
Ok((message, index)) => {
76-
self.unparsed.drain(..index);
77-
return Ok(message)
78-
},
79-
}
80-
}
54+
Decodable::consensus_decode(&mut self.stream)
8155
}
8256
}
8357

58+
#[allow(deprecated)]
8459
#[cfg(test)]
8560
mod test {
8661
use std::thread;
8762
use std::time::Duration;
88-
use io::{self, BufReader, Write};
63+
use io::{BufReader, Write};
8964
use std::net::{TcpListener, TcpStream, Shutdown};
9065
use std::thread::JoinHandle;
9166
use network::constants::ServiceFlags;
@@ -193,22 +168,6 @@ mod test {
193168
}
194169
}
195170

196-
#[test]
197-
fn parse_multipartmsg_test() {
198-
let stream = io::empty();
199-
let mut reader = StreamReader::new(stream, None);
200-
reader.unparsed = MSG_ALERT[..24].to_vec();
201-
let message: Result<RawNetworkMessage, _> = reader.read_next();
202-
assert!(message.is_err());
203-
assert_eq!(reader.unparsed.len(), 24);
204-
205-
reader.unparsed = MSG_ALERT.to_vec();
206-
let message = reader.read_next().unwrap();
207-
assert_eq!(reader.unparsed.len(), 0);
208-
209-
check_alert_msg(&message);
210-
}
211-
212171
#[test]
213172
fn read_singlemsg_test() {
214173
let stream = MSG_VERSION[..].to_vec();
@@ -346,4 +305,17 @@ mod test {
346305
// should be also ok for a non-witness block as commitment is optional in that case
347306
assert!(block.check_witness_commitment());
348307
}
308+
309+
#[test]
310+
fn parse_multipartmsg_test() {
311+
let mut multi = MSG_ALERT.to_vec();
312+
multi.extend(&MSG_ALERT[..]);
313+
let mut reader = StreamReader::new(&multi[..], None);
314+
let message: Result<RawNetworkMessage, _> = reader.read_next();
315+
assert!(message.is_ok());
316+
check_alert_msg(&message.unwrap());
317+
let message: Result<RawNetworkMessage, _> = reader.read_next();
318+
assert!(message.is_ok());
319+
check_alert_msg(&message.unwrap());
320+
}
349321
}

0 commit comments

Comments
 (0)