13
13
//!
14
14
//! [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
15
15
16
+ use crate :: sync:: Mutex ;
16
17
use alloc:: sync:: Arc ;
17
18
use core:: mem;
18
- use crate :: sync:: Mutex ;
19
19
20
20
#[ allow( unused_imports) ]
21
21
use crate :: prelude:: * ;
@@ -26,9 +26,8 @@ use crate::sync::Condvar;
26
26
use std:: time:: Duration ;
27
27
28
28
use core:: future:: Future as StdFuture ;
29
- use core:: task:: { Context , Poll } ;
30
29
use core:: pin:: Pin ;
31
-
30
+ use core :: task :: { Context , Poll } ;
32
31
33
32
/// Used to signal to one of many waiters that the condition they're waiting on has happened.
34
33
pub ( crate ) struct Notifier {
@@ -37,9 +36,7 @@ pub(crate) struct Notifier {
37
36
38
37
impl Notifier {
39
38
pub ( crate ) fn new ( ) -> Self {
40
- Self {
41
- notify_pending : Mutex :: new ( ( false , None ) ) ,
42
- }
39
+ Self { notify_pending : Mutex :: new ( ( false , None ) ) }
43
40
}
44
41
45
42
/// Wake waiters, tracking that wake needs to occur even if there are currently no waiters.
@@ -199,7 +196,9 @@ impl Future {
199
196
if state. complete {
200
197
state. callbacks_made = true ;
201
198
true
202
- } else { false }
199
+ } else {
200
+ false
201
+ }
203
202
}
204
203
}
205
204
@@ -252,11 +251,8 @@ impl Sleeper {
252
251
// Note that this is the common case - a ChannelManager, a ChainMonitor, and an
253
252
// OnionMessenger.
254
253
pub fn from_three_futures ( fut_a : & Future , fut_b : & Future , fut_c : & Future ) -> Self {
255
- let notifiers = vec ! [
256
- Arc :: clone( & fut_a. state) ,
257
- Arc :: clone( & fut_b. state) ,
258
- Arc :: clone( & fut_c. state)
259
- ] ;
254
+ let notifiers =
255
+ vec ! [ Arc :: clone( & fut_a. state) , Arc :: clone( & fut_b. state) , Arc :: clone( & fut_c. state) ] ;
260
256
Self { notifiers }
261
257
}
262
258
/// Constructs a new sleeper on many futures, allowing blocking on all at once.
@@ -290,8 +286,11 @@ impl Sleeper {
290
286
/// Wait until one of the [`Future`]s registered with this [`Sleeper`] has completed.
291
287
pub fn wait ( & self ) {
292
288
let ( cv, notified_fut_mtx) = self . setup_wait ( ) ;
293
- let notified_fut = cv. wait_while ( notified_fut_mtx. lock ( ) . unwrap ( ) , |fut_opt| fut_opt. is_none ( ) )
294
- . unwrap ( ) . take ( ) . expect ( "CV wait shouldn't have returned until the notifying future was set" ) ;
289
+ let notified_fut = cv
290
+ . wait_while ( notified_fut_mtx. lock ( ) . unwrap ( ) , |fut_opt| fut_opt. is_none ( ) )
291
+ . unwrap ( )
292
+ . take ( )
293
+ . expect ( "CV wait shouldn't have returned until the notifying future was set" ) ;
295
294
notified_fut. lock ( ) . unwrap ( ) . callbacks_made = true ;
296
295
}
297
296
@@ -301,10 +300,13 @@ impl Sleeper {
301
300
pub fn wait_timeout ( & self , max_wait : Duration ) -> bool {
302
301
let ( cv, notified_fut_mtx) = self . setup_wait ( ) ;
303
302
let notified_fut =
304
- match cv. wait_timeout_while ( notified_fut_mtx. lock ( ) . unwrap ( ) , max_wait, |fut_opt| fut_opt. is_none ( ) ) {
303
+ match cv. wait_timeout_while ( notified_fut_mtx. lock ( ) . unwrap ( ) , max_wait, |fut_opt| {
304
+ fut_opt. is_none ( )
305
+ } ) {
305
306
Ok ( ( _, e) ) if e. timed_out ( ) => return false ,
306
- Ok ( ( mut notified_fut, _) ) =>
307
- notified_fut. take ( ) . expect ( "CV wait shouldn't have returned until the notifying future was set" ) ,
307
+ Ok ( ( mut notified_fut, _) ) => notified_fut
308
+ . take ( )
309
+ . expect ( "CV wait shouldn't have returned until the notifying future was set" ) ,
308
310
Err ( _) => panic ! ( "Previous panic while a lock was held led to a lock panic" ) ,
309
311
} ;
310
312
notified_fut. lock ( ) . unwrap ( ) . callbacks_made = true ;
@@ -315,8 +317,8 @@ impl Sleeper {
315
317
#[ cfg( test) ]
316
318
mod tests {
317
319
use super :: * ;
318
- use core:: sync:: atomic:: { AtomicBool , Ordering } ;
319
320
use core:: future:: Future as FutureTrait ;
321
+ use core:: sync:: atomic:: { AtomicBool , Ordering } ;
320
322
use core:: task:: { RawWaker , RawWakerVTable } ;
321
323
322
324
#[ test]
@@ -329,7 +331,9 @@ mod tests {
329
331
330
332
let callback = Arc :: new ( AtomicBool :: new ( false ) ) ;
331
333
let callback_ref = Arc :: clone ( & callback) ;
332
- notifier. get_future ( ) . register_callback ( Box :: new ( move || assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) ) ) ) ;
334
+ notifier. get_future ( ) . register_callback ( Box :: new ( move || {
335
+ assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) )
336
+ } ) ) ;
333
337
assert ! ( callback. load( Ordering :: SeqCst ) ) ;
334
338
}
335
339
@@ -344,15 +348,19 @@ mod tests {
344
348
// a second `notify`.
345
349
let callback = Arc :: new ( AtomicBool :: new ( false ) ) ;
346
350
let callback_ref = Arc :: clone ( & callback) ;
347
- notifier. get_future ( ) . register_callback ( Box :: new ( move || assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) ) ) ) ;
351
+ notifier. get_future ( ) . register_callback ( Box :: new ( move || {
352
+ assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) )
353
+ } ) ) ;
348
354
assert ! ( !callback. load( Ordering :: SeqCst ) ) ;
349
355
350
356
notifier. notify ( ) ;
351
357
assert ! ( callback. load( Ordering :: SeqCst ) ) ;
352
358
353
359
let callback = Arc :: new ( AtomicBool :: new ( false ) ) ;
354
360
let callback_ref = Arc :: clone ( & callback) ;
355
- notifier. get_future ( ) . register_callback ( Box :: new ( move || assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) ) ) ) ;
361
+ notifier. get_future ( ) . register_callback ( Box :: new ( move || {
362
+ assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) )
363
+ } ) ) ;
356
364
assert ! ( !callback. load( Ordering :: SeqCst ) ) ;
357
365
358
366
notifier. notify ( ) ;
@@ -366,12 +374,16 @@ mod tests {
366
374
367
375
let callback = Arc :: new ( AtomicBool :: new ( false ) ) ;
368
376
let callback_ref = Arc :: clone ( & callback) ;
369
- future. register_callback ( Box :: new ( move || assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) ) ) ) ;
377
+ future. register_callback ( Box :: new ( move || {
378
+ assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) )
379
+ } ) ) ;
370
380
assert ! ( callback. load( Ordering :: SeqCst ) ) ;
371
381
372
382
let callback = Arc :: new ( AtomicBool :: new ( false ) ) ;
373
383
let callback_ref = Arc :: clone ( & callback) ;
374
- notifier. get_future ( ) . register_callback ( Box :: new ( move || assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) ) ) ) ;
384
+ notifier. get_future ( ) . register_callback ( Box :: new ( move || {
385
+ assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) )
386
+ } ) ) ;
375
387
assert ! ( !callback. load( Ordering :: SeqCst ) ) ;
376
388
}
377
389
@@ -385,12 +397,16 @@ mod tests {
385
397
386
398
let callback = Arc :: new ( AtomicBool :: new ( false ) ) ;
387
399
let callback_ref = Arc :: clone ( & callback) ;
388
- notifier. get_future ( ) . register_callback ( Box :: new ( move || assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) ) ) ) ;
400
+ notifier. get_future ( ) . register_callback ( Box :: new ( move || {
401
+ assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) )
402
+ } ) ) ;
389
403
assert ! ( callback. load( Ordering :: SeqCst ) ) ;
390
404
391
405
let callback = Arc :: new ( AtomicBool :: new ( false ) ) ;
392
406
let callback_ref = Arc :: clone ( & callback) ;
393
- notifier. get_future ( ) . register_callback ( Box :: new ( move || assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) ) ) ) ;
407
+ notifier. get_future ( ) . register_callback ( Box :: new ( move || {
408
+ assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) )
409
+ } ) ) ;
394
410
assert ! ( !callback. load( Ordering :: SeqCst ) ) ;
395
411
396
412
notifier. notify ( ) ;
@@ -408,12 +424,10 @@ mod tests {
408
424
409
425
let exit_thread = Arc :: new ( AtomicBool :: new ( false ) ) ;
410
426
let exit_thread_clone = exit_thread. clone ( ) ;
411
- thread:: spawn ( move || {
412
- loop {
413
- thread_notifier. notify ( ) ;
414
- if exit_thread_clone. load ( Ordering :: SeqCst ) {
415
- break
416
- }
427
+ thread:: spawn ( move || loop {
428
+ thread_notifier. notify ( ) ;
429
+ if exit_thread_clone. load ( Ordering :: SeqCst ) {
430
+ break ;
417
431
}
418
432
} ) ;
419
433
@@ -424,7 +438,7 @@ mod tests {
424
438
// available.
425
439
loop {
426
440
if persistence_notifier. get_future ( ) . wait_timeout ( Duration :: from_millis ( 100 ) ) {
427
- break
441
+ break ;
428
442
}
429
443
}
430
444
@@ -434,7 +448,7 @@ mod tests {
434
448
// are available.
435
449
loop {
436
450
if !persistence_notifier. get_future ( ) . wait_timeout ( Duration :: from_millis ( 100 ) ) {
437
- break
451
+ break ;
438
452
}
439
453
}
440
454
}
@@ -494,7 +508,9 @@ mod tests {
494
508
} ;
495
509
let callback = Arc :: new ( AtomicBool :: new ( false ) ) ;
496
510
let callback_ref = Arc :: clone ( & callback) ;
497
- future. register_callback ( Box :: new ( move || assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) ) ) ) ;
511
+ future. register_callback ( Box :: new ( move || {
512
+ assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) )
513
+ } ) ) ;
498
514
499
515
assert ! ( !callback. load( Ordering :: SeqCst ) ) ;
500
516
complete_future ( & future. state ) ;
@@ -519,7 +535,9 @@ mod tests {
519
535
520
536
let callback = Arc :: new ( AtomicBool :: new ( false ) ) ;
521
537
let callback_ref = Arc :: clone ( & callback) ;
522
- future. register_callback ( Box :: new ( move || assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) ) ) ) ;
538
+ future. register_callback ( Box :: new ( move || {
539
+ assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) )
540
+ } ) ) ;
523
541
524
542
assert ! ( callback. load( Ordering :: SeqCst ) ) ;
525
543
assert ! ( future. state. lock( ) . unwrap( ) . callbacks. is_empty( ) ) ;
@@ -530,17 +548,27 @@ mod tests {
530
548
// compared to a raw VTable). Instead, we have to write out a lot of boilerplate to build a
531
549
// waker, which we do here with a trivial Arc<AtomicBool> data element to track woke-ness.
532
550
const WAKER_V_TABLE : RawWakerVTable = RawWakerVTable :: new ( waker_clone, wake, wake_by_ref, drop) ;
533
- unsafe fn wake_by_ref ( ptr : * const ( ) ) { let p = ptr as * const Arc < AtomicBool > ; assert ! ( !( * p) . fetch_or( true , Ordering :: SeqCst ) ) ; }
534
- unsafe fn drop ( ptr : * const ( ) ) { let p = ptr as * mut Arc < AtomicBool > ; let _freed = Box :: from_raw ( p) ; }
535
- unsafe fn wake ( ptr : * const ( ) ) { wake_by_ref ( ptr) ; drop ( ptr) ; }
551
+ unsafe fn wake_by_ref ( ptr : * const ( ) ) {
552
+ let p = ptr as * const Arc < AtomicBool > ;
553
+ assert ! ( !( * p) . fetch_or( true , Ordering :: SeqCst ) ) ;
554
+ }
555
+ unsafe fn drop ( ptr : * const ( ) ) {
556
+ let p = ptr as * mut Arc < AtomicBool > ;
557
+ let _freed = Box :: from_raw ( p) ;
558
+ }
559
+ unsafe fn wake ( ptr : * const ( ) ) {
560
+ wake_by_ref ( ptr) ;
561
+ drop ( ptr) ;
562
+ }
536
563
unsafe fn waker_clone ( ptr : * const ( ) ) -> RawWaker {
537
564
let p = ptr as * const Arc < AtomicBool > ;
538
565
RawWaker :: new ( Box :: into_raw ( Box :: new ( Arc :: clone ( & * p) ) ) as * const ( ) , & WAKER_V_TABLE )
539
566
}
540
567
541
568
fn create_waker ( ) -> ( Arc < AtomicBool > , Waker ) {
542
569
let a = Arc :: new ( AtomicBool :: new ( false ) ) ;
543
- let waker = unsafe { Waker :: from_raw ( waker_clone ( ( & a as * const Arc < AtomicBool > ) as * const ( ) ) ) } ;
570
+ let waker =
571
+ unsafe { Waker :: from_raw ( waker_clone ( ( & a as * const Arc < AtomicBool > ) as * const ( ) ) ) } ;
544
572
( a, waker)
545
573
}
546
574
@@ -564,14 +592,20 @@ mod tests {
564
592
assert ! ( !woken. load( Ordering :: SeqCst ) ) ;
565
593
566
594
let ( second_woken, second_waker) = create_waker ( ) ;
567
- assert_eq ! ( Pin :: new( & mut second_future) . poll( & mut Context :: from_waker( & second_waker) ) , Poll :: Pending ) ;
595
+ assert_eq ! (
596
+ Pin :: new( & mut second_future) . poll( & mut Context :: from_waker( & second_waker) ) ,
597
+ Poll :: Pending
598
+ ) ;
568
599
assert ! ( !second_woken. load( Ordering :: SeqCst ) ) ;
569
600
570
601
complete_future ( & future. state ) ;
571
602
assert ! ( woken. load( Ordering :: SeqCst ) ) ;
572
603
assert ! ( second_woken. load( Ordering :: SeqCst ) ) ;
573
604
assert_eq ! ( Pin :: new( & mut future) . poll( & mut Context :: from_waker( & waker) ) , Poll :: Ready ( ( ) ) ) ;
574
- assert_eq ! ( Pin :: new( & mut second_future) . poll( & mut Context :: from_waker( & second_waker) ) , Poll :: Ready ( ( ) ) ) ;
605
+ assert_eq ! (
606
+ Pin :: new( & mut second_future) . poll( & mut Context :: from_waker( & second_waker) ) ,
607
+ Poll :: Ready ( ( ) )
608
+ ) ;
575
609
}
576
610
577
611
#[ test]
@@ -714,8 +748,12 @@ mod tests {
714
748
let callback_b = Arc :: new ( AtomicBool :: new ( false ) ) ;
715
749
let callback_a_ref = Arc :: clone ( & callback_a) ;
716
750
let callback_b_ref = Arc :: clone ( & callback_b) ;
717
- notifier_a. get_future ( ) . register_callback ( Box :: new ( move || assert ! ( !callback_a_ref. fetch_or( true , Ordering :: SeqCst ) ) ) ) ;
718
- notifier_b. get_future ( ) . register_callback ( Box :: new ( move || assert ! ( !callback_b_ref. fetch_or( true , Ordering :: SeqCst ) ) ) ) ;
751
+ notifier_a. get_future ( ) . register_callback ( Box :: new ( move || {
752
+ assert ! ( !callback_a_ref. fetch_or( true , Ordering :: SeqCst ) )
753
+ } ) ) ;
754
+ notifier_b. get_future ( ) . register_callback ( Box :: new ( move || {
755
+ assert ! ( !callback_b_ref. fetch_or( true , Ordering :: SeqCst ) )
756
+ } ) ) ;
719
757
assert ! ( callback_a. load( Ordering :: SeqCst ) ^ callback_b. load( Ordering :: SeqCst ) ) ;
720
758
721
759
// If we now notify both notifiers again, the other callback will fire, completing the
@@ -740,14 +778,23 @@ mod tests {
740
778
741
779
// Test that simply polling a future twice doesn't result in two pending `Waker`s.
742
780
let mut future_a = notifier. get_future ( ) ;
743
- assert_eq ! ( Pin :: new( & mut future_a) . poll( & mut Context :: from_waker( & create_waker( ) . 1 ) ) , Poll :: Pending ) ;
781
+ assert_eq ! (
782
+ Pin :: new( & mut future_a) . poll( & mut Context :: from_waker( & create_waker( ) . 1 ) ) ,
783
+ Poll :: Pending
784
+ ) ;
744
785
assert_eq ! ( future_state. lock( ) . unwrap( ) . std_future_callbacks. len( ) , 1 ) ;
745
- assert_eq ! ( Pin :: new( & mut future_a) . poll( & mut Context :: from_waker( & create_waker( ) . 1 ) ) , Poll :: Pending ) ;
786
+ assert_eq ! (
787
+ Pin :: new( & mut future_a) . poll( & mut Context :: from_waker( & create_waker( ) . 1 ) ) ,
788
+ Poll :: Pending
789
+ ) ;
746
790
assert_eq ! ( future_state. lock( ) . unwrap( ) . std_future_callbacks. len( ) , 1 ) ;
747
791
748
792
// If we poll a second future, however, that will store a second `Waker`.
749
793
let mut future_b = notifier. get_future ( ) ;
750
- assert_eq ! ( Pin :: new( & mut future_b) . poll( & mut Context :: from_waker( & create_waker( ) . 1 ) ) , Poll :: Pending ) ;
794
+ assert_eq ! (
795
+ Pin :: new( & mut future_b) . poll( & mut Context :: from_waker( & create_waker( ) . 1 ) ) ,
796
+ Poll :: Pending
797
+ ) ;
751
798
assert_eq ! ( future_state. lock( ) . unwrap( ) . std_future_callbacks. len( ) , 2 ) ;
752
799
753
800
// but when we drop the `Future`s, the pending Wakers will also be dropped.
@@ -758,13 +805,22 @@ mod tests {
758
805
759
806
// Further, after polling a future twice, if the notifier is woken all Wakers are dropped.
760
807
let mut future_a = notifier. get_future ( ) ;
761
- assert_eq ! ( Pin :: new( & mut future_a) . poll( & mut Context :: from_waker( & create_waker( ) . 1 ) ) , Poll :: Pending ) ;
808
+ assert_eq ! (
809
+ Pin :: new( & mut future_a) . poll( & mut Context :: from_waker( & create_waker( ) . 1 ) ) ,
810
+ Poll :: Pending
811
+ ) ;
762
812
assert_eq ! ( future_state. lock( ) . unwrap( ) . std_future_callbacks. len( ) , 1 ) ;
763
- assert_eq ! ( Pin :: new( & mut future_a) . poll( & mut Context :: from_waker( & create_waker( ) . 1 ) ) , Poll :: Pending ) ;
813
+ assert_eq ! (
814
+ Pin :: new( & mut future_a) . poll( & mut Context :: from_waker( & create_waker( ) . 1 ) ) ,
815
+ Poll :: Pending
816
+ ) ;
764
817
assert_eq ! ( future_state. lock( ) . unwrap( ) . std_future_callbacks. len( ) , 1 ) ;
765
818
notifier. notify ( ) ;
766
819
assert_eq ! ( future_state. lock( ) . unwrap( ) . std_future_callbacks. len( ) , 0 ) ;
767
- assert_eq ! ( Pin :: new( & mut future_a) . poll( & mut Context :: from_waker( & create_waker( ) . 1 ) ) , Poll :: Ready ( ( ) ) ) ;
820
+ assert_eq ! (
821
+ Pin :: new( & mut future_a) . poll( & mut Context :: from_waker( & create_waker( ) . 1 ) ) ,
822
+ Poll :: Ready ( ( ) )
823
+ ) ;
768
824
assert_eq ! ( future_state. lock( ) . unwrap( ) . std_future_callbacks. len( ) , 0 ) ;
769
825
}
770
826
}
0 commit comments