Skip to content

Commit 83ff11f

Browse files
committed
Switch cycle to stream
1 parent e1ba87e commit 83ff11f

File tree

1 file changed

+48
-19
lines changed

1 file changed

+48
-19
lines changed

src/stream/cycle.rs

Lines changed: 48 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -7,24 +7,53 @@ use crate::task::{Context, Poll};
77

88
pin_project! {
99
/// A stream that will repeatedly yield the same list of elements
10-
pub struct Cycle<T> {
11-
source: Vec<T>,
10+
pub struct Cycle<S, T> {
11+
#[pin]
12+
source: S,
1213
index: usize,
13-
len: usize,
14+
buffer: Vec<T>,
15+
state: CycleState,
1416
}
1517
}
1618

17-
impl<T: Copy> Stream for Cycle<T> {
18-
type Item = T;
19+
#[derive(Eq, PartialEq)]
20+
enum CycleState {
21+
FromStream,
22+
FromBuffer,
23+
}
24+
25+
impl<S, T> Stream for Cycle<S,T>
26+
where
27+
S: Stream<Item = T>,
28+
T: Copy,
29+
{
30+
31+
type Item = S::Item;
32+
33+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
34+
let this = self.project();
1935

20-
fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
21-
let value = self.source[self.index];
36+
let mut next;
37+
if CycleState::FromStream == *this.state {
38+
next = futures_core::ready!(this.source.poll_next(cx));
2239

23-
let next = self.index + 1;
40+
if let Some(val) = next {
41+
this.buffer.push(val);
42+
} else {
43+
*this.state = CycleState::FromBuffer;
44+
next = Some(this.buffer[*this.index]);
45+
}
46+
} else {
47+
let mut index = *this.index;
48+
if index == this.buffer.len() {
49+
index = 0
50+
}
51+
next = Some(this.buffer[index]);
2452

25-
self.as_mut().index = next % self.len;
53+
*this.index = index + 1;
54+
}
2655

27-
Poll::Ready(Some(value))
56+
Poll::Ready(next)
2857
}
2958
}
3059

@@ -40,21 +69,21 @@ impl<T: Copy> Stream for Cycle<T> {
4069
/// use async_std::prelude::*;
4170
/// use async_std::stream;
4271
///
43-
/// let mut s = stream::cycle(vec![1,2,3]);
72+
/// let mut s = stream::cycle(stream::once(7));
4473
///
45-
/// assert_eq!(s.next().await, Some(1));
46-
/// assert_eq!(s.next().await, Some(2));
47-
/// assert_eq!(s.next().await, Some(3));
48-
/// assert_eq!(s.next().await, Some(1));
49-
/// assert_eq!(s.next().await, Some(2));
74+
/// assert_eq!(s.next().await, Some(7));
75+
/// assert_eq!(s.next().await, Some(7));
76+
/// assert_eq!(s.next().await, Some(7));
77+
/// assert_eq!(s.next().await, Some(7));
78+
/// assert_eq!(s.next().await, Some(7));
5079
/// #
5180
/// # })
5281
/// ```
53-
pub fn cycle<T: Copy>(source: Vec<T>) -> impl Stream<Item = T> {
54-
let len = source.len();
82+
pub fn cycle<S: Stream<Item = T>, T: Copy>(source: S) -> impl Stream<Item = S::Item> {
5583
Cycle {
5684
source,
5785
index: 0,
58-
len,
86+
buffer: Vec::new(),
87+
state: CycleState::FromStream,
5988
}
6089
}

0 commit comments

Comments
 (0)