Skip to content

Commit d9aec10

Browse files
feat(io): implement Read::chain
1 parent e681e29 commit d9aec10

File tree

2 files changed

+238
-1
lines changed

2 files changed

+238
-1
lines changed

src/io/read/chain.rs

Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
use crate::io::IoSliceMut;
2+
use std::fmt;
3+
use std::pin::Pin;
4+
5+
use crate::io::{self, BufRead, Read};
6+
use crate::task::{Context, Poll};
7+
8+
/// Adaptor to chain together two readers.
9+
///
10+
/// This struct is generally created by calling [`chain`] on a reader.
11+
/// Please see the documentation of [`chain`] for more details.
12+
///
13+
/// [`chain`]: trait.Read.html#method.chain
14+
pub struct Chain<T, U> {
15+
pub(crate) first: T,
16+
pub(crate) second: U,
17+
pub(crate) done_first: bool,
18+
}
19+
20+
impl<T, U> Chain<T, U> {
21+
/// Consumes the `Chain`, returning the wrapped readers.
22+
///
23+
/// # Examples
24+
///
25+
/// ```no_run
26+
/// use async_std::io;
27+
/// use async_std::prelude::*;
28+
/// use async_std::fs::File;
29+
///
30+
/// fn main() -> io::Result<()> { async_std::task::block_on(async {
31+
/// let foo_file = File::open("foo.txt").await?;
32+
/// let bar_file = File::open("bar.txt").await?;
33+
///
34+
/// let chain = foo_file.chain(bar_file);
35+
/// let (foo_file, bar_file) = chain.into_inner();
36+
/// Ok(())
37+
/// }) }
38+
/// ```
39+
pub fn into_inner(self) -> (T, U) {
40+
(self.first, self.second)
41+
}
42+
43+
/// Gets references to the underlying readers in this `Chain`.
44+
///
45+
/// # Examples
46+
///
47+
/// ```no_run
48+
/// use async_std::io;
49+
/// use async_std::prelude::*;
50+
/// use async_std::fs::File;
51+
///
52+
/// fn main() -> io::Result<()> { async_std::task::block_on(async {
53+
/// let foo_file = File::open("foo.txt").await?;
54+
/// let bar_file = File::open("bar.txt").await?;
55+
///
56+
/// let chain = foo_file.chain(bar_file);
57+
/// let (foo_file, bar_file) = chain.get_ref();
58+
/// Ok(())
59+
/// }) }
60+
/// ```
61+
pub fn get_ref(&self) -> (&T, &U) {
62+
(&self.first, &self.second)
63+
}
64+
65+
/// Gets mutable references to the underlying readers in this `Chain`.
66+
///
67+
/// Care should be taken to avoid modifying the internal I/O state of the
68+
/// underlying readers as doing so may corrupt the internal state of this
69+
/// `Chain`.
70+
///
71+
/// # Examples
72+
///
73+
/// ```no_run
74+
/// use async_std::io;
75+
/// use async_std::prelude::*;
76+
/// use async_std::fs::File;
77+
///
78+
/// fn main() -> io::Result<()> { async_std::task::block_on(async {
79+
/// let foo_file = File::open("foo.txt").await?;
80+
/// let bar_file = File::open("bar.txt").await?;
81+
///
82+
/// let mut chain = foo_file.chain(bar_file);
83+
/// let (foo_file, bar_file) = chain.get_mut();
84+
/// Ok(())
85+
/// }) }
86+
/// ```
87+
pub fn get_mut(&mut self) -> (&mut T, &mut U) {
88+
(&mut self.first, &mut self.second)
89+
}
90+
}
91+
92+
impl<T: fmt::Debug, U: fmt::Debug> fmt::Debug for Chain<T, U> {
93+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
94+
f.debug_struct("Chain")
95+
.field("t", &self.first)
96+
.field("u", &self.second)
97+
.finish()
98+
}
99+
}
100+
101+
impl<T: Read + Unpin, U: Read + Unpin> Read for Chain<T, U> {
102+
fn poll_read(
103+
mut self: Pin<&mut Self>,
104+
cx: &mut Context<'_>,
105+
buf: &mut [u8],
106+
) -> Poll<io::Result<usize>> {
107+
if !self.done_first {
108+
let rd = Pin::new(&mut self.first);
109+
110+
match futures_core::ready!(rd.poll_read(cx, buf)) {
111+
Ok(0) if !buf.is_empty() => self.done_first = true,
112+
Ok(n) => return Poll::Ready(Ok(n)),
113+
Err(err) => return Poll::Ready(Err(err)),
114+
}
115+
}
116+
117+
let rd = Pin::new(&mut self.second);
118+
rd.poll_read(cx, buf)
119+
}
120+
121+
fn poll_read_vectored(
122+
mut self: Pin<&mut Self>,
123+
cx: &mut Context<'_>,
124+
bufs: &mut [IoSliceMut<'_>],
125+
) -> Poll<io::Result<usize>> {
126+
if !self.done_first {
127+
let rd = Pin::new(&mut self.first);
128+
129+
match futures_core::ready!(rd.poll_read_vectored(cx, bufs)) {
130+
Ok(0) if !bufs.is_empty() => self.done_first = true,
131+
Ok(n) => return Poll::Ready(Ok(n)),
132+
Err(err) => return Poll::Ready(Err(err)),
133+
}
134+
}
135+
136+
let rd = Pin::new(&mut self.second);
137+
rd.poll_read_vectored(cx, bufs)
138+
}
139+
}
140+
141+
impl<T: BufRead + Unpin, U: BufRead + Unpin> BufRead for Chain<T, U> {
142+
fn poll_fill_buf(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
143+
// FIXME: how to make this compile?
144+
145+
// let Self {
146+
// first,
147+
// second,
148+
// done_first
149+
// } = &mut *self;
150+
151+
// if !*done_first {
152+
// let rd = Pin::new(first);
153+
154+
// match futures_core::ready!(rd.poll_fill_buf(cx)) {
155+
// Ok(buf) if buf.is_empty() => { *done_first = true; }
156+
// Ok(buf) => return Poll::Ready(Ok(buf)),
157+
// Err(err) => return Poll::Ready(Err(err)),
158+
// }
159+
// }
160+
161+
// let rd = Pin::new(second);
162+
// rd.poll_fill_buf(cx)
163+
unimplemented!()
164+
}
165+
166+
fn consume(mut self: Pin<&mut Self>, amt: usize) {
167+
if !self.done_first {
168+
let rd = Pin::new(&mut self.first);
169+
rd.consume(amt)
170+
} else {
171+
let rd = Pin::new(&mut self.second);
172+
rd.consume(amt)
173+
}
174+
}
175+
}
176+
177+
#[cfg(test)]
178+
mod tests {
179+
use crate::io;
180+
use crate::prelude::*;
181+
use crate::task;
182+
183+
#[test]
184+
fn test_chain_basics() -> std::io::Result<()> {
185+
let source1: io::Cursor<Vec<u8>> = io::Cursor::new(vec![0, 1, 2]);
186+
let source2: io::Cursor<Vec<u8>> = io::Cursor::new(vec![3, 4, 5]);
187+
188+
task::block_on(async move {
189+
let mut buffer = Vec::new();
190+
191+
let mut source = source1.chain(source2);
192+
193+
assert_eq!(6, source.read_to_end(&mut buffer).await?);
194+
assert_eq!(buffer, vec![0, 1, 2, 3, 4, 5]);
195+
196+
Ok(())
197+
})
198+
}
199+
}

src/io/read/mod.rs

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
mod bytes;
2+
mod chain;
23
mod read;
34
mod read_exact;
45
mod read_to_end;
@@ -344,7 +345,7 @@ extension_trait! {
344345
fn by_ref(&mut self) -> &mut Self where Self: Sized { self }
345346

346347

347-
#[doc=r#"
348+
#[doc = r#"
348349
Transforms this `Read` instance to a `Stream` over its bytes.
349350
350351
The returned type implements `Stream` where the `Item` is
@@ -377,6 +378,43 @@ extension_trait! {
377378
fn bytes(self) -> bytes::Bytes<Self> where Self: Sized {
378379
bytes::Bytes { inner: self }
379380
}
381+
382+
#[doc = r#"
383+
Creates an adaptor which will chain this stream with another.
384+
385+
The returned `Read` instance will first read all bytes from this object
386+
until EOF is encountered. Afterwards the output is equivalent to the
387+
output of `next`.
388+
389+
# Examples
390+
391+
[`File`][file]s implement `Read`:
392+
393+
[file]: ../fs/struct.File.html
394+
395+
```no_run
396+
use async_std::io;
397+
use async_std::prelude::*;
398+
use async_std::fs::File;
399+
400+
fn main() -> io::Result<()> { async_std::task::block_on(async {
401+
let f1 = File::open("foo.txt").await?;
402+
let f2 = File::open("bar.txt").await?;
403+
404+
let mut handle = f1.chain(f2);
405+
let mut buffer = String::new();
406+
407+
// read the value into a String. We could use any Read method here,
408+
// this is just one example.
409+
handle.read_to_string(&mut buffer).await?;
410+
Ok(())
411+
}) }
412+
```
413+
"#]
414+
fn chain<R: Read>(self, next: R) -> chain::Chain<Self, R> where Self: Sized {
415+
chain::Chain { first: self, second: next, done_first: false }
416+
}
417+
380418
}
381419

382420
impl<T: Read + Unpin + ?Sized> Read for Box<T> {

0 commit comments

Comments
 (0)