Skip to content

Commit dbc03f9

Browse files
committed
Add stream::for_each implementation
1 parent 4a265a0 commit dbc03f9

File tree

2 files changed

+29
-1
lines changed

2 files changed

+29
-1
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ Stream
4040
- [x] stream::filter_map
4141
- [ ] stream::flatten
4242
- [ ] stream::fold
43-
- [ ] stream::for_each
43+
- [x] stream::for_each
4444
- [ ] stream::for_each_concurrent
4545
- [x] stream::into_future
4646
- [x] stream::iter

src/stream.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,18 @@ pub async fn concat<St>(stream: St) -> St::Item
106106
collection
107107
}
108108

109+
pub async fn for_each<St, Fut, F>(stream: St, f: F) -> ()
110+
where St: Stream,
111+
F: FnMut(St::Item) -> Fut,
112+
Fut: Future<Output = ()>,
113+
{
114+
pin_mut!(stream);
115+
let mut f = f;
116+
while let Some(item) = await!(next(&mut stream)) {
117+
f(item);
118+
}
119+
}
120+
109121
#[cfg(test)]
110122
mod tests {
111123
use futures::executor;
@@ -186,4 +198,20 @@ mod tests {
186198
let collection : Vec<i32> = executor::block_on(concat(stream));
187199
assert_eq!(collection, vec![1, 2, 3, 4, 5]);
188200
}
201+
202+
#[test]
203+
fn test_for_each() {
204+
let mut x = 0;
205+
206+
{
207+
let stream = iter(1..=3);
208+
let future = for_each(stream, |item| {
209+
x += item;
210+
ready(())
211+
});
212+
executor::block_on(future);
213+
}
214+
215+
assert_eq!(x, 6);
216+
}
189217
}

0 commit comments

Comments
 (0)