Skip to content

Commit 4f3073f

Browse files
committed
Add stream::take implementation
1 parent dbc03f9 commit 4f3073f

File tree

2 files changed

+26
-1
lines changed

2 files changed

+26
-1
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ Stream
4848
- [x] stream::next
4949
- [ ] stream::skip
5050
- [ ] stream::skip_while
51-
- [ ] stream::take
51+
- [x] stream::take
5252
- [ ] stream::take_while
5353
- [ ] stream::then
5454
- [ ] stream::unfold

src/stream.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,23 @@ pub async fn for_each<St, Fut, F>(stream: St, f: F) -> ()
118118
}
119119
}
120120

121+
pub fn take<St>(stream: St, n: u64) -> impl Stream<Item = St::Item>
122+
where St: Stream,
123+
{
124+
let stream = Box::pin(stream);
125+
futures::stream::unfold((stream, n), async move | (mut stream, n)| {
126+
if n == 0 {
127+
None
128+
} else {
129+
if let Some(item) = await!(next(&mut stream)) {
130+
Some((item, (stream, n - 1)))
131+
} else {
132+
None
133+
}
134+
}
135+
})
136+
}
137+
121138
#[cfg(test)]
122139
mod tests {
123140
use futures::executor;
@@ -214,4 +231,12 @@ mod tests {
214231

215232
assert_eq!(x, 6);
216233
}
234+
235+
#[test]
236+
fn test_take() {
237+
let stream = iter(1..=10);
238+
let stream = take(stream, 3);
239+
240+
assert_eq!(vec![1, 2, 3], executor::block_on(collect::<_, Vec<_>>(stream)));
241+
}
217242
}

0 commit comments

Comments
 (0)