@@ -3,26 +3,25 @@ use std::pin::Pin;
3
3
use std:: time:: Duration ;
4
4
5
5
use futures_timer:: Delay ;
6
+ use pin_project_lite:: pin_project;
6
7
7
8
use crate :: stream:: Stream ;
8
9
use crate :: task:: { Context , Poll } ;
9
10
10
- /// A stream that only yields one element once every `duration`, and applies backpressure. Does not drop any elements.
11
- /// #[doc(hidden)]
12
- #[ allow( missing_debug_implementations) ]
13
- pub struct Throttle < S > {
14
- stream : S ,
15
- duration : Duration ,
16
- delay : Option < Delay > ,
11
+ pin_project ! {
12
+ /// A stream that only yields one element once every `duration`, and applies backpressure. Does not drop any elements.
13
+ #[ doc( hidden) ]
14
+ #[ allow( missing_debug_implementations) ]
15
+ pub struct Throttle <S > {
16
+ #[ pin]
17
+ stream: S ,
18
+ duration: Duration ,
19
+ #[ pin]
20
+ delay: Option <Delay >,
21
+ }
17
22
}
18
23
19
- impl < S : Unpin > Unpin for Throttle < S > { }
20
-
21
24
impl < S : Stream > Throttle < S > {
22
- pin_utils:: unsafe_pinned!( stream: S ) ;
23
- pin_utils:: unsafe_unpinned!( duration: Duration ) ;
24
- pin_utils:: unsafe_pinned!( delay: Option <Delay >) ;
25
-
26
25
pub ( super ) fn new ( stream : S , duration : Duration ) -> Self {
27
26
Throttle {
28
27
stream,
@@ -35,23 +34,24 @@ impl<S: Stream> Throttle<S> {
35
34
impl < S : Stream > Stream for Throttle < S > {
36
35
type Item = S :: Item ;
37
36
38
- fn poll_next ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < S :: Item > > {
39
- if let Some ( d) = self . as_mut ( ) . delay ( ) . as_pin_mut ( ) {
37
+ fn poll_next ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < S :: Item > > {
38
+ let mut this = self . project ( ) ;
39
+ if let Some ( d) = this. delay . as_mut ( ) . as_pin_mut ( ) {
40
40
if d. poll ( cx) . is_ready ( ) {
41
- * self . as_mut ( ) . delay ( ) = None ;
41
+ this . delay . set ( None ) ;
42
42
} else {
43
43
return Poll :: Pending ;
44
44
}
45
45
}
46
46
47
- match self . as_mut ( ) . stream ( ) . poll_next ( cx) {
47
+ match this . stream . poll_next ( cx) {
48
48
Poll :: Pending => {
49
49
cx. waker ( ) . wake_by_ref ( ) ; // Continue driving even though emitting Pending
50
50
Poll :: Pending
51
51
}
52
52
Poll :: Ready ( None ) => Poll :: Ready ( None ) ,
53
53
Poll :: Ready ( Some ( v) ) => {
54
- * self . as_mut ( ) . delay ( ) = Some ( Delay :: new ( self . duration ) ) ;
54
+ this . delay . set ( Some ( Delay :: new ( * this . duration ) ) ) ;
55
55
Poll :: Ready ( Some ( v) )
56
56
}
57
57
}
0 commit comments