Skip to content

Commit 914c1b0

Browse files
committed
Add stream::filter implementation
1 parent d573a1f commit 914c1b0

File tree

1 file changed

+32
-3
lines changed

1 file changed

+32
-3
lines changed

src/stream.rs

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use futures::stream::Stream;
2+
use futures::future::Future;
23
use core::pin::Pin;
34
use pin_utils::pin_mut;
45

@@ -24,8 +25,8 @@ pub async fn collect<St, C>(stream: St) -> C
2425
}
2526

2627
pub fn map<St, U, F>(stream: St, f: F) -> impl Stream<Item = U>
27-
where F: FnMut(St::Item) -> U,
28-
St: Stream,
28+
where St: Stream,
29+
F: FnMut(St::Item) -> U,
2930
{
3031
let stream = Box::pin(stream);
3132
futures::stream::unfold((stream, f), async move | (mut stream, mut f)| {
@@ -34,9 +35,28 @@ pub fn map<St, U, F>(stream: St, f: F) -> impl Stream<Item = U>
3435
})
3536
}
3637

38+
pub fn filter<St, Fut, F>(stream: St, f: F) -> impl Stream<Item = St::Item>
39+
where St: Stream,
40+
F: FnMut(&St::Item) -> Fut,
41+
Fut: Future<Output = bool>
42+
{
43+
let stream = Box::pin(stream);
44+
futures::stream::unfold((stream, f), async move | (mut stream, mut f)| {
45+
while let Some(item) = await!(next(&mut stream)) {
46+
let matched = await!(f(&item));
47+
if matched {
48+
return Some((item, (stream, f)))
49+
} else {
50+
continue;
51+
}
52+
};
53+
None
54+
})
55+
}
56+
3757
#[cfg(test)]
3858
mod tests {
39-
use futures::{stream, executor};
59+
use futures::{executor, future, stream};
4060
use crate::stream::*;
4161

4262
#[test]
@@ -64,4 +84,13 @@ mod tests {
6484

6585
assert_eq!(vec![2, 4, 6], executor::block_on(collect::<_, Vec<_>>(stream)));
6686
}
87+
88+
#[test]
89+
fn test_filter() {
90+
let stream = stream::iter(1..=10);
91+
let evens = filter(stream, |x| future::ready(x % 2 == 0));
92+
93+
assert_eq!(vec![2, 4, 6, 8, 10], executor::block_on(collect::<_, Vec<_>>(evens)));
94+
}
95+
6796
}

0 commit comments

Comments
 (0)