Skip to content

Commit 85a066b

Browse files
committed
Add stream::take_while implementation
1 parent 64f1ed9 commit 85a066b

File tree

1 file changed

+27
-0
lines changed

1 file changed

+27
-0
lines changed

src/stream.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,25 @@ pub fn chain<St>(stream: St, other: St) -> impl Stream<Item = St::Item>
245245
})
246246
}
247247

248+
pub fn take_while<St, F, Fut>(stream: St, f: F) -> impl Stream<Item = St::Item>
249+
where St: Stream,
250+
F: FnMut(&St::Item) -> Fut,
251+
Fut: Future<Output = bool>,
252+
{
253+
let stream = Box::pin(stream);
254+
futures::stream::unfold((stream, f), async move | (mut stream, mut f)| {
255+
if let Some(item) = await!(next(&mut stream)) {
256+
if await!(f(&item)) {
257+
Some((item, (stream, f)))
258+
} else {
259+
None
260+
}
261+
} else {
262+
None
263+
}
264+
})
265+
}
266+
248267
#[cfg(test)]
249268
mod tests {
250269
use futures::executor;
@@ -403,4 +422,12 @@ mod tests {
403422

404423
assert_eq!(vec![1, 2, 3, 4], executor::block_on(collect::<_, Vec<_>>(stream)));
405424
}
425+
426+
#[test]
427+
fn test_take_while() {
428+
let stream = iter(1..=10);
429+
let stream = take_while(stream, |x| ready(*x <= 5));
430+
431+
assert_eq!(vec![1, 2, 3, 4, 5], executor::block_on(collect::<_, Vec<_>>(stream)));
432+
}
406433
}

0 commit comments

Comments
 (0)