Skip to content

Commit 570329b

Browse files
committed
adds stream::skip combinator
1 parent 55ea367 commit 570329b

File tree

2 files changed

+68
-0
lines changed

2 files changed

+68
-0
lines changed

src/stream/stream/mod.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ mod min_by;
3333
mod next;
3434
mod nth;
3535
mod scan;
36+
mod skip;
3637
mod take;
3738
mod zip;
3839

@@ -51,6 +52,7 @@ use fold::FoldFuture;
5152
use min_by::MinByFuture;
5253
use next::NextFuture;
5354
use nth::NthFuture;
55+
use skip::Skip;
5456

5557
use std::cmp::Ordering;
5658
use std::marker::PhantomData;
@@ -661,6 +663,31 @@ pub trait Stream {
661663
Scan::new(self, initial_state, f)
662664
}
663665

666+
/// Creates a combinator that skips the first `n` elements.
667+
///
668+
/// ## Examples
669+
///
670+
/// ```
671+
/// # fn main() { async_std::task::block_on(async {
672+
/// #
673+
/// use std::collections::VecDeque;
674+
/// use async_std::stream::Stream;
675+
///
676+
/// let s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
677+
/// let mut skipped = s.skip(2);
678+
///
679+
/// assert_eq!(skipped.next().await, Some(3));
680+
/// assert_eq!(skipped.next().await, None);
681+
/// #
682+
/// # }) }
683+
/// ```
684+
fn skip(self, n: usize) -> Skip<Self>
685+
where
686+
Self: Sized,
687+
{
688+
Skip::new(self, n)
689+
}
690+
664691
/// 'Zips up' two streams into a single stream of pairs.
665692
///
666693
/// `zip()` returns a new stream that will iterate over two other streams, returning a tuple

src/stream/stream/skip.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
use std::pin::Pin;
2+
use std::task::{Context, Poll};
3+
4+
use crate::stream::Stream;
5+
6+
#[doc(hidden)]
7+
#[allow(missing_debug_implementations)]
8+
pub struct Skip<S> {
9+
stream: S,
10+
n: usize,
11+
}
12+
13+
impl<S> Skip<S> {
14+
pin_utils::unsafe_pinned!(stream: S);
15+
pin_utils::unsafe_unpinned!(n: usize);
16+
17+
pub(crate) fn new(stream: S, n: usize) -> Self {
18+
Skip { stream, n }
19+
}
20+
}
21+
22+
impl<S> Stream for Skip<S>
23+
where
24+
S: Stream,
25+
{
26+
type Item = S::Item;
27+
28+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
29+
loop {
30+
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
31+
32+
match next {
33+
Some(v) => match self.n {
34+
0 => return Poll::Ready(Some(v)),
35+
_ => *self.as_mut().n() -= 1,
36+
},
37+
None => return Poll::Ready(None),
38+
}
39+
}
40+
}
41+
}

0 commit comments

Comments
 (0)