Skip to content

Commit e1881b5

Browse files
committed
f - Implement async chunked transfer encoding using chunked_transfer::Decoder
Buffering asynchronously first using read_to_end won't work since the server may not close the stream. Instead read the chunk header and use the decoder only to get the chunk size.
1 parent 115c936 commit e1881b5

File tree

2 files changed

+40
-11
lines changed

2 files changed

+40
-11
lines changed

lightning-block-sync/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ bitcoin = "0.24"
1717
lightning = { version = "0.0.12", path = "../lightning" }
1818
tokio = { version = "1.0", features = [ "io-util", "net" ], optional = true }
1919
serde_json = { version = "1.0", optional = true }
20-
chunked_transfer = { version = "1.3.0", optional = true }
20+
chunked_transfer = { version = "1.4.0", optional = true }
2121
futures = { version = "0.3.8" }
2222

2323
[dev-dependencies]

lightning-block-sync/src/http.rs

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -274,17 +274,46 @@ impl HttpClient {
274274
Err(std::io::Error::new(
275275
std::io::ErrorKind::InvalidInput, "unsupported transfer coding"))
276276
} else {
277-
#[cfg(feature = "tokio")]
278-
let reader = {
279-
// Read the stream into a buffer since chunked_transfer isn't async.
280-
let mut buffer = Vec::new();
281-
reader.read_to_end(&mut buffer).await?;
282-
std::io::Cursor::new(buffer)
283-
};
284-
let mut decoder = chunked_transfer::Decoder::new(reader);
285277
let mut content = Vec::new();
286-
decoder.read_to_end(&mut content)?;
287-
Ok(content)
278+
#[cfg(feature = "tokio")]
279+
{
280+
// Since chunked_transfer doesn't have an async interface, only use it to
281+
// determine the size of each chunk to read.
282+
//
283+
// TODO: Replace with an async interface when available.
284+
// https://github.com/frewsxcv/rust-chunked-transfer/issues/7
285+
loop {
286+
// Read the chunk header which contains the chunk size.
287+
let mut chunk_header = String::new();
288+
reader.read_line(&mut chunk_header).await?;
289+
if chunk_header == "0\r\n" {
290+
// Read the terminator chunk since the decoder consumes the CRLF
291+
// immediately when this chunk is encountered.
292+
reader.read_line(&mut chunk_header).await?;
293+
}
294+
295+
// Decode the chunk header to obtain the chunk size.
296+
let mut buffer = Vec::new();
297+
let mut decoder = chunked_transfer::Decoder::new(chunk_header.as_bytes());
298+
decoder.read_to_end(&mut buffer)?;
299+
300+
// Read the chunk body.
301+
let chunk_size = match decoder.remaining_chunks_size() {
302+
None => break,
303+
Some(chunk_size) => chunk_size,
304+
};
305+
let mut chunk_body = vec![0; chunk_size + "\r\n".len()];
306+
reader.read_exact(&mut chunk_body[..]).await?;
307+
content.extend_from_slice(&chunk_body[..chunk_size]);
308+
}
309+
Ok(content)
310+
}
311+
#[cfg(not(feature = "tokio"))]
312+
{
313+
let mut decoder = chunked_transfer::Decoder::new(reader);
314+
decoder.read_to_end(&mut content)?;
315+
Ok(content)
316+
}
288317
}
289318
},
290319
}

0 commit comments

Comments
 (0)