Skip to content

Commit 1c843a8

Browse files
committed
Re-implemented Throttle to keep last value in memory
1 parent ced5281 commit 1c843a8

File tree

2 files changed

+28
-17
lines changed

2 files changed

+28
-17
lines changed

src/stream/stream/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ extension_trait! {
281281
TakeWhile::new(self, predicate)
282282
}
283283

284-
fn throttle(self, d: Duration) -> Throttle<Self>
284+
fn throttle(self, d: Duration) -> Throttle<Self, Self::Item>
285285
where
286286
Self: Sized,
287287
{

src/stream/stream/throttle.rs

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,47 +10,58 @@ use crate::task::{Context, Poll};
1010
/// A stream that only yields one element once every `duration`, and drops all others.
1111
/// #[doc(hidden)]
1212
#[allow(missing_debug_implementations)]
13-
pub struct Throttle<S> {
13+
pub struct Throttle<S, T> {
1414
stream: S,
1515
duration: Duration,
1616
delay: Option<Delay>,
17+
last: Option<T>,
1718
}
1819

19-
impl<S: Unpin> Unpin for Throttle<S> {}
20+
impl<S: Unpin, T> Unpin for Throttle<S, T> {}
2021

21-
impl<S: Stream> Throttle<S> {
22+
impl<S: Stream> Throttle<S, S::Item> {
2223
pin_utils::unsafe_pinned!(stream: S);
2324
pin_utils::unsafe_unpinned!(duration: Duration);
2425
pin_utils::unsafe_pinned!(delay: Option<Delay>);
26+
pin_utils::unsafe_unpinned!(last: Option<S::Item>);
2527

2628
pub(super) fn new(stream: S, duration: Duration) -> Self {
2729
Throttle {
2830
stream,
2931
duration,
3032
delay: None,
33+
last: None,
3134
}
3235
}
3336
}
3437

35-
impl<S: Stream> Stream for Throttle<S> {
38+
impl<S: Stream> Stream for Throttle<S, S::Item> {
3639
type Item = S::Item;
3740

3841
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
39-
match self.as_mut().stream().poll_next(cx) {
40-
Poll::Ready(v) => match self.as_mut().delay().as_pin_mut() {
41-
None => {
42+
if let Some(d) = self.as_mut().delay().as_pin_mut() {
43+
if d.poll(cx).is_ready() {
44+
if let Some(v) = self.as_mut().last().take() {
45+
// Sets last to None.
4246
*self.as_mut().delay() = Some(Delay::new(self.duration));
43-
Poll::Ready(v)
47+
return Poll::Ready(Some(v));
4448
}
45-
Some(d) => match d.poll(cx) {
46-
Poll::Ready(_) => {
47-
*self.as_mut().delay() = Some(Delay::new(self.duration));
48-
Poll::Ready(v)
49-
}
50-
Poll::Pending => Poll::Pending,
51-
},
52-
},
49+
}
50+
}
51+
52+
match self.as_mut().stream().poll_next(cx) {
5353
Poll::Pending => Poll::Pending,
54+
Poll::Ready(None) => return Poll::Ready(None),
55+
Poll::Ready(Some(v)) => {
56+
if self.as_mut().delay().is_some() {
57+
*self.as_mut().last() = Some(v);
58+
cx.waker().wake_by_ref(); // Continue driving even though emitting Pending
59+
return Poll::Pending;
60+
}
61+
62+
*self.as_mut().delay() = Some(Delay::new(self.duration));
63+
Poll::Ready(Some(v))
64+
}
5465
}
5566
}
5667
}

0 commit comments

Comments
 (0)