Skip to content

Commit 61e339a

Browse files
committed
Add stream::filter_map implementation
1 parent 914c1b0 commit 61e339a

File tree

1 file changed

+28
-0
lines changed

1 file changed

+28
-0
lines changed

src/stream.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,24 @@ pub fn filter<St, Fut, F>(stream: St, f: F) -> impl Stream<Item = St::Item>
5454
})
5555
}
5656

57+
pub fn filter_map<St, Fut, F, U>(stream: St, f: F) -> impl Stream<Item = U>
58+
where St: Stream,
59+
F: FnMut(St::Item) -> Fut,
60+
Fut: Future<Output = Option<U>>
61+
{
62+
let stream = Box::pin(stream);
63+
futures::stream::unfold((stream, f), async move | (mut stream, mut f)| {
64+
while let Some(item) = await!(next(&mut stream)) {
65+
if let Some(item) = await!(f(item)) {
66+
return Some((item, (stream, f)))
67+
} else {
68+
continue;
69+
}
70+
};
71+
None
72+
})
73+
}
74+
5775
#[cfg(test)]
5876
mod tests {
5977
use futures::{executor, future, stream};
@@ -93,4 +111,14 @@ mod tests {
93111
assert_eq!(vec![2, 4, 6, 8, 10], executor::block_on(collect::<_, Vec<_>>(evens)));
94112
}
95113

114+
#[test]
115+
fn test_filter_map() {
116+
let stream = stream::iter(1..=10);
117+
let evens = filter_map(stream, |x| {
118+
let ret = if x % 2 == 0 { Some(x + 1) } else { None };
119+
future::ready(ret)
120+
});
121+
122+
assert_eq!(vec![3, 5, 7, 9, 11], executor::block_on(collect::<_, Vec<_>>(evens)));
123+
}
96124
}

0 commit comments

Comments
 (0)