Skip to content

Commit 9f8cb38

Browse files
committed
Add for_each_concurrent
1 parent 94c9cb0 commit 9f8cb38

File tree

3 files changed

+129
-4
lines changed

3 files changed

+129
-4
lines changed
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
use futures_core::{Async, Future, IntoFuture, Poll, Stream};
2+
use futures_core::task;
3+
4+
use super::futures_unordered::FuturesUnordered;
5+
6+
/// A stream combinator which executes a unit closure over each item on a
7+
/// stream concurrently.
8+
///
9+
/// This structure is returned by the `Stream::for_each` method.
10+
#[derive(Debug)]
11+
#[must_use = "streams do nothing unless polled"]
12+
pub struct ForEachConcurrent<S, U, F> where U: IntoFuture {
13+
stream: Option<S>,
14+
stream_done: bool,
15+
f: F,
16+
futures: FuturesUnordered<U::Future>,
17+
}
18+
19+
pub fn new<S, U, F>(s: S, f: F) -> ForEachConcurrent<S, U, F>
20+
where S: Stream,
21+
F: FnMut(S::Item) -> U,
22+
U: IntoFuture<Item = (), Error = S::Error>,
23+
{
24+
ForEachConcurrent {
25+
stream: Some(s),
26+
stream_done: false,
27+
f: f,
28+
futures: FuturesUnordered::new(),
29+
}
30+
}
31+
32+
impl<S, U, F> Future for ForEachConcurrent<S, U, F>
33+
where S: Stream,
34+
F: FnMut(S::Item) -> U,
35+
U: IntoFuture<Item= (), Error = S::Error>,
36+
{
37+
type Item = S;
38+
type Error = S::Error;
39+
40+
fn poll(&mut self, cx: &mut task::Context) -> Poll<S, S::Error> {
41+
loop {
42+
let mut made_progress_this_iter = false;
43+
44+
// Try and pull an item off of the stream
45+
if !self.stream_done {
46+
// `unwrap` is valid because the stream is only taken after `stream_done` is set
47+
match self.stream.as_mut().unwrap().poll_next(cx)? {
48+
Async::Ready(Some(x)) => {
49+
self.futures.push((self.f)(x).into_future());
50+
made_progress_this_iter = true;
51+
}
52+
// The stream completed, so it shouldn't be polled
53+
// anymore.
54+
Async::Ready(None) => self.stream_done = true,
55+
Async::Pending => {},
56+
}
57+
}
58+
59+
match self.futures.poll_next(cx)? {
60+
Async::Ready(Some(())) => made_progress_this_iter = true,
61+
Async::Ready(None) if self.stream_done => {
62+
// We've processed all of self.futures and self.stream,
63+
// so return self.stream
64+
return Ok(Async::Ready(self.stream.take().expect(
65+
"polled for_each_concurrent after completion"
66+
)));
67+
}
68+
Async::Ready(None)
69+
| Async::Pending => {}
70+
}
71+
72+
if !made_progress_this_iter {
73+
return Ok(Async::Pending);
74+
}
75+
}
76+
}
77+
}

futures-util/src/stream/mod.rs

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ if_std! {
8585
mod catch_unwind;
8686
mod chunks;
8787
mod collect;
88+
mod for_each_concurrent;
8889
mod select_all;
8990
mod split;
9091
mod futures_unordered;
@@ -96,6 +97,7 @@ if_std! {
9697
pub use self::collect::Collect;
9798
pub use self::select_all::{select_all, SelectAll};
9899
pub use self::split::{SplitStream, SplitSink, ReuniteError};
100+
pub use self::for_each_concurrent::ForEachConcurrent;
99101
pub use self::futures_unordered::{futures_unordered, FuturesUnordered};
100102
pub use self::futures_ordered::{futures_ordered, FuturesOrdered};
101103
}
@@ -584,10 +586,10 @@ pub trait StreamExt: Stream {
584586
/// to successfully, producing a future. That future will then be executed
585587
/// to completion before moving on to the next item.
586588
///
587-
/// The returned value is a `Future` where the `Item` type is `()` and
588-
/// errors are otherwise threaded through. Any error on the stream or in the
589-
/// closure will cause iteration to be halted immediately and the future
590-
/// will resolve to that error.
589+
/// The returned value is a `Future` where the `Item` type is the completed
590+
/// stream, and errors are otherwise threaded through. Any error on the
591+
/// stream or in the provided future will cause iteration to be halted
592+
/// immediately and the future will resolve to that error.
591593
///
592594
/// To process each item in the stream and produce another stream instead
593595
/// of a single future, use `and_then` instead.
@@ -599,6 +601,30 @@ pub trait StreamExt: Stream {
599601
for_each::new(self, f)
600602
}
601603

604+
/// Runs this stream to completion, executing the provided closure for each
605+
/// element on the stream. This is similar to `for_each` but may begin
606+
/// processing an element while previous elements are still being processed.
607+
///
608+
/// When this stream successfully resolves to an item, the closure will be
609+
/// called to produce a future. That future will then be added to
610+
/// the set of futures to resolve.
611+
///
612+
/// The returned value is a `Future` where the `Item` type is the completed
613+
/// stream, and errors are otherwise threaded through. Any error on the
614+
/// stream or in the provided future will cause iteration to be halted
615+
/// immediately and the future will resolve to that error.
616+
///
617+
/// To process each item in the stream and produce another stream instead
618+
/// of a single future, use `and_then` instead.
619+
#[cfg(feature = "std")]
620+
fn for_each_concurrent<U, F>(self, f: F) -> ForEachConcurrent<Self, U, F>
621+
where F: FnMut(Self::Item) -> U,
622+
U: IntoFuture<Item=(), Error = Self::Error>,
623+
Self: Sized
624+
{
625+
for_each_concurrent::new(self, f)
626+
}
627+
602628
/// Map this stream's error to a different type using the `Into` trait.
603629
///
604630
/// This function does for streams what `try!` does for `Result`,

futures/tests/stream.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,28 @@ fn fold() {
8686
assert_done(|| err_list().fold(0, |a, b| ok::<i32, u32>(a + b)), Err(3));
8787
}
8888

89+
#[test]
90+
fn for_each_concurrent() {
91+
let (sender, receiver) = oneshot::channel::<()>();
92+
let (sender, receiver) = (&mut Some(sender), &mut Some(receiver));
93+
let fut = list().for_each_concurrent(move |num| {
94+
match num {
95+
// The first future is added
96+
1 => receiver.take().unwrap().map_err(|_| 0).left_future(),
97+
// Second future is added and completes immediately
98+
2 => ok::<(), _>(()).right_future(),
99+
// Third future is added, which when run completes the first
100+
3 => {
101+
sender.take().unwrap().send(()).unwrap();
102+
ok::<(), _>(()).right_future()
103+
}
104+
_ => panic!(),
105+
}
106+
}).map(|_| ());
107+
108+
assert_done(|| fut, Ok(()));
109+
}
110+
89111
#[test]
90112
fn filter() {
91113
assert_done(|| list().filter(|a| ok(*a % 2 == 0)).collect(), Ok(vec![2]));

0 commit comments

Comments
 (0)