1
+ use std:: borrow:: Borrow ;
1
2
use std:: collections:: BTreeMap ;
3
+ use std:: fmt;
2
4
use std:: future:: Future ;
3
5
use std:: io;
6
+ use std:: marker:: PhantomData ;
4
7
use std:: mem;
5
8
#[ cfg( unix) ]
6
9
use std:: os:: unix:: io:: RawFd ;
@@ -443,19 +446,29 @@ impl Source {
443
446
}
444
447
445
448
/// Waits until the I/O source is readable.
446
- pub ( crate ) fn readable ( self : & Arc < Self > ) -> Readable {
447
- Readable ( self . ready ( READ ) )
449
+ pub ( crate ) fn readable < T > ( handle : & crate :: Async < T > ) -> Readable < ' _ , T > {
450
+ Readable ( Self :: ready ( handle, READ ) )
451
+ }
452
+
453
+ /// Waits until the I/O source is readable.
454
+ pub ( crate ) fn readable_owned < T > ( handle : Arc < crate :: Async < T > > ) -> ReadableOwned < T > {
455
+ ReadableOwned ( Self :: ready ( handle, READ ) )
456
+ }
457
+
458
+ /// Waits until the I/O source is writable.
459
+ pub ( crate ) fn writable < T > ( handle : & crate :: Async < T > ) -> Writable < ' _ , T > {
460
+ Writable ( Self :: ready ( handle, WRITE ) )
448
461
}
449
462
450
463
/// Waits until the I/O source is writable.
451
- pub ( crate ) fn writable ( self : & Arc < Self > ) -> Writable {
452
- Writable ( self . ready ( WRITE ) )
464
+ pub ( crate ) fn writable_owned < T > ( handle : Arc < crate :: Async < T > > ) -> WritableOwned < T > {
465
+ WritableOwned ( Self :: ready ( handle , WRITE ) )
453
466
}
454
467
455
468
/// Waits until the I/O source is readable or writable.
456
- fn ready ( self : & Arc < Self > , dir : usize ) -> Ready {
469
+ fn ready < H : Borrow < crate :: Async < T > > + Clone , T > ( handle : H , dir : usize ) -> Ready < H , T > {
457
470
Ready {
458
- source : self . clone ( ) ,
471
+ handle ,
459
472
dir,
460
473
ticks : None ,
461
474
index : None ,
@@ -465,57 +478,109 @@ impl Source {
465
478
}
466
479
467
480
/// Future for [`Async::readable`](crate::Async::readable).
468
- #[ derive( Debug ) ]
469
481
#[ must_use = "futures do nothing unless you `.await` or poll them" ]
470
- pub struct Readable ( Ready ) ;
482
+ pub struct Readable < ' a , T > ( Ready < & ' a crate :: Async < T > , T > ) ;
483
+
484
+ impl < T > Future for Readable < ' _ , T > {
485
+ type Output = io:: Result < ( ) > ;
486
+
487
+ fn poll ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
488
+ ready ! ( Pin :: new( & mut self . 0 ) . poll( cx) ) ?;
489
+ log:: trace!( "readable: fd={}" , self . 0 . handle. source. raw) ;
490
+ Poll :: Ready ( Ok ( ( ) ) )
491
+ }
492
+ }
493
+
494
+ impl < T > fmt:: Debug for Readable < ' _ , T > {
495
+ fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
496
+ f. debug_struct ( "Readable" ) . finish ( )
497
+ }
498
+ }
499
+
500
+ /// Future for [`Async::readable_owned`](crate::Async::readable_owned).
501
+ #[ must_use = "futures do nothing unless you `.await` or poll them" ]
502
+ pub struct ReadableOwned < T > ( Ready < Arc < crate :: Async < T > > , T > ) ;
471
503
472
- impl Future for Readable {
504
+ impl < T > Future for ReadableOwned < T > {
473
505
type Output = io:: Result < ( ) > ;
474
506
475
507
fn poll ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
476
508
ready ! ( Pin :: new( & mut self . 0 ) . poll( cx) ) ?;
477
- log:: trace!( "readable : fd={}" , self . 0 . source. raw) ;
509
+ log:: trace!( "readable_owned : fd={}" , self . 0 . handle . source. raw) ;
478
510
Poll :: Ready ( Ok ( ( ) ) )
479
511
}
480
512
}
481
513
514
+ impl < T > fmt:: Debug for ReadableOwned < T > {
515
+ fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
516
+ f. debug_struct ( "ReadableOwned" ) . finish ( )
517
+ }
518
+ }
519
+
482
520
/// Future for [`Async::writable`](crate::Async::writable).
483
- #[ derive( Debug ) ]
484
521
#[ must_use = "futures do nothing unless you `.await` or poll them" ]
485
- pub struct Writable ( Ready ) ;
522
+ pub struct Writable < ' a , T > ( Ready < & ' a crate :: Async < T > , T > ) ;
486
523
487
- impl Future for Writable {
524
+ impl < T > Future for Writable < ' _ , T > {
488
525
type Output = io:: Result < ( ) > ;
489
526
490
527
fn poll ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
491
528
ready ! ( Pin :: new( & mut self . 0 ) . poll( cx) ) ?;
492
- log:: trace!( "writable: fd={}" , self . 0 . source. raw) ;
529
+ log:: trace!( "writable: fd={}" , self . 0 . handle . source. raw) ;
493
530
Poll :: Ready ( Ok ( ( ) ) )
494
531
}
495
532
}
496
533
497
- #[ derive( Debug ) ]
498
- struct Ready {
499
- source : Arc < Source > ,
534
+ impl < T > fmt:: Debug for Writable < ' _ , T > {
535
+ fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
536
+ f. debug_struct ( "Writable" ) . finish ( )
537
+ }
538
+ }
539
+
540
+ /// Future for [`Async::writable_owned`](crate::Async::writable_owned).
541
+ #[ must_use = "futures do nothing unless you `.await` or poll them" ]
542
+ pub struct WritableOwned < T > ( Ready < Arc < crate :: Async < T > > , T > ) ;
543
+
544
+ impl < T > Future for WritableOwned < T > {
545
+ type Output = io:: Result < ( ) > ;
546
+
547
+ fn poll ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
548
+ ready ! ( Pin :: new( & mut self . 0 ) . poll( cx) ) ?;
549
+ log:: trace!( "writable_owned: fd={}" , self . 0 . handle. source. raw) ;
550
+ Poll :: Ready ( Ok ( ( ) ) )
551
+ }
552
+ }
553
+
554
+ impl < T > fmt:: Debug for WritableOwned < T > {
555
+ fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
556
+ f. debug_struct ( "WritableOwned" ) . finish ( )
557
+ }
558
+ }
559
+
560
+ struct Ready < H : Borrow < crate :: Async < T > > , T > {
561
+ handle : H ,
500
562
dir : usize ,
501
563
ticks : Option < ( usize , usize ) > ,
502
564
index : Option < usize > ,
503
- _guard : Option < RemoveOnDrop > ,
565
+ _guard : Option < RemoveOnDrop < H , T > > ,
504
566
}
505
567
506
- impl Future for Ready {
568
+ impl < H : Borrow < crate :: Async < T > > , T > Unpin for Ready < H , T > { }
569
+
570
+ impl < H : Borrow < crate :: Async < T > > + Clone , T > Future for Ready < H , T > {
507
571
type Output = io:: Result < ( ) > ;
508
572
509
573
fn poll ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
510
574
let Self {
511
- source ,
575
+ ref handle ,
512
576
dir,
513
577
ticks,
514
578
index,
515
579
_guard,
580
+ ..
516
581
} = & mut * self ;
517
582
518
- let mut state = source. state . lock ( ) . unwrap ( ) ;
583
+ let mut state = handle . borrow ( ) . source . state . lock ( ) . unwrap ( ) ;
519
584
520
585
// Check if the reactor has delivered an event.
521
586
if let Some ( ( a, b) ) = * ticks {
@@ -534,9 +599,10 @@ impl Future for Ready {
534
599
None => {
535
600
let i = state[ * dir] . wakers . insert ( None ) ;
536
601
* _guard = Some ( RemoveOnDrop {
537
- source : source . clone ( ) ,
602
+ handle : handle . clone ( ) ,
538
603
dir : * dir,
539
604
key : i,
605
+ _marker : PhantomData ,
540
606
} ) ;
541
607
* index = Some ( i) ;
542
608
* ticks = Some ( ( Reactor :: get ( ) . ticker ( ) , state[ * dir] . tick ) ) ;
@@ -548,9 +614,9 @@ impl Future for Ready {
548
614
// Update interest in this I/O handle.
549
615
if was_empty {
550
616
Reactor :: get ( ) . poller . modify (
551
- source. raw ,
617
+ handle . borrow ( ) . source . raw ,
552
618
Event {
553
- key : source. key ,
619
+ key : handle . borrow ( ) . source . key ,
554
620
readable : !state[ READ ] . is_empty ( ) ,
555
621
writable : !state[ WRITE ] . is_empty ( ) ,
556
622
} ,
@@ -562,16 +628,16 @@ impl Future for Ready {
562
628
}
563
629
564
630
/// Remove waker when dropped.
565
- #[ derive( Debug ) ]
566
- struct RemoveOnDrop {
567
- source : Arc < Source > ,
631
+ struct RemoveOnDrop < H : Borrow < crate :: Async < T > > , T > {
632
+ handle : H ,
568
633
dir : usize ,
569
634
key : usize ,
635
+ _marker : PhantomData < fn ( ) -> T > ,
570
636
}
571
637
572
- impl Drop for RemoveOnDrop {
638
+ impl < H : Borrow < crate :: Async < T > > , T > Drop for RemoveOnDrop < H , T > {
573
639
fn drop ( & mut self ) {
574
- let mut state = self . source . state . lock ( ) . unwrap ( ) ;
640
+ let mut state = self . handle . borrow ( ) . source . state . lock ( ) . unwrap ( ) ;
575
641
let wakers = & mut state[ self . dir ] . wakers ;
576
642
if wakers. contains ( self . key ) {
577
643
wakers. remove ( self . key ) ;
0 commit comments