@@ -69,6 +69,7 @@ impl Notifier {
69
69
} else {
70
70
let state = Arc :: new ( Mutex :: new ( FutureState {
71
71
callbacks : Vec :: new ( ) ,
72
+ callbacks_with_state : Vec :: new ( ) ,
72
73
complete : lock. 0 ,
73
74
callbacks_made : false ,
74
75
} ) ) ;
@@ -112,6 +113,7 @@ pub(crate) struct FutureState {
112
113
// first bool - set to false if we're just calling a Waker, and true if we're calling an actual
113
114
// user-provided function.
114
115
callbacks : Vec < ( bool , Box < dyn FutureCallback > ) > ,
116
+ callbacks_with_state : Vec < ( bool , Box < dyn Fn ( & Arc < Mutex < FutureState > > ) -> ( ) + Send > ) > ,
115
117
complete : bool ,
116
118
callbacks_made : bool ,
117
119
}
@@ -123,6 +125,10 @@ fn complete_future(this: &Arc<Mutex<FutureState>>) -> bool {
123
125
callback. call ( ) ;
124
126
state. callbacks_made |= counts_as_call;
125
127
}
128
+ for ( counts_as_call, callback) in state. callbacks_with_state . drain ( ..) {
129
+ ( callback) ( this) ;
130
+ state. callbacks_made |= counts_as_call;
131
+ }
126
132
state. complete = true ;
127
133
state. callbacks_made
128
134
}
@@ -240,14 +246,13 @@ impl Sleeper {
240
246
for notifier_mtx in self . notifiers . iter ( ) {
241
247
let cv_ref = Arc :: clone ( & cv) ;
242
248
let notified_fut_ref = Arc :: clone ( & notified_fut_mtx) ;
243
- let notifier_ref = Arc :: clone ( & notifier_mtx) ;
244
249
let mut notifier = notifier_mtx. lock ( ) . unwrap ( ) ;
245
250
if notifier. complete {
246
- * notified_fut_mtx. lock ( ) . unwrap ( ) = Some ( notifier_ref ) ;
251
+ * notified_fut_mtx. lock ( ) . unwrap ( ) = Some ( Arc :: clone ( & notifier_mtx ) ) ;
247
252
break ;
248
253
}
249
- notifier. callbacks . push ( ( false , Box :: new ( move || {
250
- * notified_fut_ref. lock ( ) . unwrap ( ) = Some ( Arc :: clone ( & notifier_ref) ) ;
254
+ notifier. callbacks_with_state . push ( ( false , Box :: new ( move |notifier_ref | {
255
+ * notified_fut_ref. lock ( ) . unwrap ( ) = Some ( Arc :: clone ( notifier_ref) ) ;
251
256
cv_ref. notify_all ( ) ;
252
257
} ) ) ) ;
253
258
}
@@ -407,11 +412,50 @@ mod tests {
407
412
}
408
413
}
409
414
415
+ #[ cfg( feature = "std" ) ]
416
+ #[ test]
417
+ fn test_state_drops ( ) {
418
+ // Previously, there was a leak if a `Notifier` was `drop`ed without ever being notified
419
+ // but after having been slept-on. This tests for that leak.
420
+ use crate :: sync:: Arc ;
421
+ use std:: thread;
422
+
423
+ let notifier_a = Arc :: new ( Notifier :: new ( ) ) ;
424
+ let notifier_b = Arc :: new ( Notifier :: new ( ) ) ;
425
+
426
+ let thread_notifier_a = Arc :: clone ( & notifier_a) ;
427
+
428
+ let future_a = notifier_a. get_future ( ) ;
429
+ let future_state_a = Arc :: downgrade ( & future_a. state ) ;
430
+
431
+ let future_b = notifier_b. get_future ( ) ;
432
+ let future_state_b = Arc :: downgrade ( & future_b. state ) ;
433
+
434
+ let join_handle = thread:: spawn ( move || {
435
+ // Let the other thread get to the wait point, then notify it.
436
+ std:: thread:: sleep ( Duration :: from_millis ( 50 ) ) ;
437
+ thread_notifier_a. notify ( ) ;
438
+ } ) ;
439
+
440
+ // Wait on the other thread to finish its sleep, note that the leak only happened if we
441
+ // actually have to sleep here, not if we immediately return.
442
+ Sleeper :: from_two_futures ( future_a, future_b) . wait ( ) ;
443
+
444
+ join_handle. join ( ) . unwrap ( ) ;
445
+
446
+ // then drop the notifiers and make sure the future states are gone.
447
+ mem:: drop ( notifier_a) ;
448
+ mem:: drop ( notifier_b) ;
449
+
450
+ assert ! ( future_state_a. upgrade( ) . is_none( ) && future_state_b. upgrade( ) . is_none( ) ) ;
451
+ }
452
+
410
453
#[ test]
411
454
fn test_future_callbacks ( ) {
412
455
let future = Future {
413
456
state : Arc :: new ( Mutex :: new ( FutureState {
414
457
callbacks : Vec :: new ( ) ,
458
+ callbacks_with_state : Vec :: new ( ) ,
415
459
complete : false ,
416
460
callbacks_made : false ,
417
461
} ) )
@@ -431,6 +475,7 @@ mod tests {
431
475
let future = Future {
432
476
state : Arc :: new ( Mutex :: new ( FutureState {
433
477
callbacks : Vec :: new ( ) ,
478
+ callbacks_with_state : Vec :: new ( ) ,
434
479
complete : false ,
435
480
callbacks_made : false ,
436
481
} ) )
@@ -469,6 +514,7 @@ mod tests {
469
514
let mut future = Future {
470
515
state : Arc :: new ( Mutex :: new ( FutureState {
471
516
callbacks : Vec :: new ( ) ,
517
+ callbacks_with_state : Vec :: new ( ) ,
472
518
complete : false ,
473
519
callbacks_made : false ,
474
520
} ) )
0 commit comments