Skip to content

Commit 3150752

Browse files
Refactor TryStream combinator
1 parent 520212d commit 3150752

File tree

2 files changed

+19
-21
lines changed

2 files changed

+19
-21
lines changed

futures-util/src/try_stream/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@ impl<S: TryStream> TryStreamExt for S {}
1515

1616
/// Adapters specific to `Result`-returning streams
1717
pub trait TryStreamExt: TryStream {
18-
/// Attempt to Collect all of the values of this stream into a vector, returning a
19-
/// future representing the result of that computation.
18+
/// Attempt to Collect all of the values of this stream into a vector,
19+
/// returning a future representing the result of that computation.
2020
///
2121
/// This combinator will collect all successful results of this stream and
2222
/// collect them into a `Vec<Self::Item>`. If an error happens then all
@@ -53,6 +53,6 @@ pub trait TryStreamExt: TryStream {
5353
fn try_collect<C: Default + Extend<Self::Ok>>(self) -> TryCollect<Self, C>
5454
where Self: Sized
5555
{
56-
try_collect::new(self)
56+
TryCollect::new(self)
5757
}
5858
}

futures-util/src/try_stream/try_collect.rs

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,39 +10,37 @@ use std::prelude::v1::*;
1010
/// This future is created by the `Stream::try_collect` method.
1111
#[derive(Debug)]
1212
#[must_use = "streams do nothing unless polled"]
13-
pub struct TryCollect<S, C> where S: TryStream {
14-
stream: S,
13+
pub struct TryCollect<St, C> where St: TryStream {
14+
stream: St,
1515
items: C,
1616
}
1717

18-
pub fn new<S, C>(s: S) -> TryCollect<S, C>
19-
where S: TryStream, C: Default
20-
{
21-
TryCollect {
22-
stream: s,
23-
items: Default::default(),
18+
impl<St: TryStream, C: Default> TryCollect<St, C> {
19+
unsafe_pinned!(stream: St);
20+
unsafe_unpinned!(items: C);
21+
22+
pub(super) fn new(s: St) -> TryCollect<St, C> {
23+
TryCollect {
24+
stream: s,
25+
items: Default::default(),
26+
}
2427
}
25-
}
2628

27-
impl<S: TryStream, C: Default> TryCollect<S, C> {
2829
fn finish(mut self: PinMut<Self>) -> C {
2930
mem::replace(self.items(), Default::default())
3031
}
31-
32-
unsafe_pinned!(stream: S);
33-
unsafe_unpinned!(items: C);
3432
}
3533

36-
impl<S: Unpin + TryStream, C> Unpin for TryCollect<S, C> {}
34+
impl<St: Unpin + TryStream, C> Unpin for TryCollect<St, C> {}
3735

38-
impl<S, C> Future for TryCollect<S, C>
39-
where S: TryStream, C: Default + Extend<S::Ok>
36+
impl<St, C> Future for TryCollect<St, C>
37+
where St: TryStream, C: Default + Extend<St::Ok>
4038
{
41-
type Output = Result<C, S::Error>;
39+
type Output = Result<C, St::Error>;
4240

4341
fn poll(
4442
mut self: PinMut<Self>,
45-
cx: &mut task::Context
43+
cx: &mut task::Context,
4644
) -> Poll<Self::Output> {
4745
loop {
4846
match ready!(self.stream().try_poll_next(cx)) {

0 commit comments

Comments
 (0)