Skip to content

Commit 1634117

Browse files
authored
Merge pull request async-rs#66 from smol-rs/taiki-e/owned
Add Async::{readable_owned, writable_owned}
2 parents e82546d + 790058b commit 1634117

File tree

2 files changed

+114
-34
lines changed

2 files changed

+114
-34
lines changed

src/lib.rs

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ mod driver;
8585
mod reactor;
8686

8787
pub use driver::block_on;
88-
pub use reactor::{Readable, Writable};
88+
pub use reactor::{Readable, ReadableOwned, Writable, WritableOwned};
8989

9090
/// Use `Duration::MAX` once `duration_constants` are stabilized.
9191
fn duration_max() -> Duration {
@@ -687,8 +687,15 @@ impl<T> Async<T> {
687687
/// listener.readable().await?;
688688
/// # std::io::Result::Ok(()) });
689689
/// ```
690-
pub fn readable(&self) -> Readable {
691-
self.source.readable()
690+
pub fn readable(&self) -> Readable<'_, T> {
691+
Source::readable(self)
692+
}
693+
694+
/// Waits until the I/O handle is readable.
695+
///
696+
/// This method completes when a read operation on this I/O handle wouldn't block.
697+
pub fn readable_owned(self: Arc<Self>) -> ReadableOwned<T> {
698+
Source::readable_owned(self)
692699
}
693700

694701
/// Waits until the I/O handle is writable.
@@ -709,8 +716,15 @@ impl<T> Async<T> {
709716
/// stream.writable().await?;
710717
/// # std::io::Result::Ok(()) });
711718
/// ```
712-
pub fn writable(&self) -> Writable {
713-
self.source.writable()
719+
pub fn writable(&self) -> Writable<'_, T> {
720+
Source::writable(self)
721+
}
722+
723+
/// Waits until the I/O handle is writable.
724+
///
725+
/// This method completes when a write operation on this I/O handle wouldn't block.
726+
pub fn writable_owned(self: Arc<Self>) -> WritableOwned<T> {
727+
Source::writable_owned(self)
714728
}
715729

716730
/// Polls the I/O handle for readability.

src/reactor.rs

Lines changed: 95 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
1+
use std::borrow::Borrow;
12
use std::collections::BTreeMap;
3+
use std::fmt;
24
use std::future::Future;
35
use std::io;
6+
use std::marker::PhantomData;
47
use std::mem;
58
#[cfg(unix)]
69
use std::os::unix::io::RawFd;
@@ -443,19 +446,29 @@ impl Source {
443446
}
444447

445448
/// Waits until the I/O source is readable.
446-
pub(crate) fn readable(self: &Arc<Self>) -> Readable {
447-
Readable(self.ready(READ))
449+
pub(crate) fn readable<T>(handle: &crate::Async<T>) -> Readable<'_, T> {
450+
Readable(Self::ready(handle, READ))
451+
}
452+
453+
/// Waits until the I/O source is readable.
454+
pub(crate) fn readable_owned<T>(handle: Arc<crate::Async<T>>) -> ReadableOwned<T> {
455+
ReadableOwned(Self::ready(handle, READ))
456+
}
457+
458+
/// Waits until the I/O source is writable.
459+
pub(crate) fn writable<T>(handle: &crate::Async<T>) -> Writable<'_, T> {
460+
Writable(Self::ready(handle, WRITE))
448461
}
449462

450463
/// Waits until the I/O source is writable.
451-
pub(crate) fn writable(self: &Arc<Self>) -> Writable {
452-
Writable(self.ready(WRITE))
464+
pub(crate) fn writable_owned<T>(handle: Arc<crate::Async<T>>) -> WritableOwned<T> {
465+
WritableOwned(Self::ready(handle, WRITE))
453466
}
454467

455468
/// Waits until the I/O source is readable or writable.
456-
fn ready(self: &Arc<Self>, dir: usize) -> Ready {
469+
fn ready<H: Borrow<crate::Async<T>> + Clone, T>(handle: H, dir: usize) -> Ready<H, T> {
457470
Ready {
458-
source: self.clone(),
471+
handle,
459472
dir,
460473
ticks: None,
461474
index: None,
@@ -465,57 +478,109 @@ impl Source {
465478
}
466479

467480
/// Future for [`Async::readable`](crate::Async::readable).
468-
#[derive(Debug)]
469481
#[must_use = "futures do nothing unless you `.await` or poll them"]
470-
pub struct Readable(Ready);
482+
pub struct Readable<'a, T>(Ready<&'a crate::Async<T>, T>);
483+
484+
impl<T> Future for Readable<'_, T> {
485+
type Output = io::Result<()>;
486+
487+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
488+
ready!(Pin::new(&mut self.0).poll(cx))?;
489+
log::trace!("readable: fd={}", self.0.handle.source.raw);
490+
Poll::Ready(Ok(()))
491+
}
492+
}
493+
494+
impl<T> fmt::Debug for Readable<'_, T> {
495+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
496+
f.debug_struct("Readable").finish()
497+
}
498+
}
499+
500+
/// Future for [`Async::readable_owned`](crate::Async::readable_owned).
501+
#[must_use = "futures do nothing unless you `.await` or poll them"]
502+
pub struct ReadableOwned<T>(Ready<Arc<crate::Async<T>>, T>);
471503

472-
impl Future for Readable {
504+
impl<T> Future for ReadableOwned<T> {
473505
type Output = io::Result<()>;
474506

475507
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
476508
ready!(Pin::new(&mut self.0).poll(cx))?;
477-
log::trace!("readable: fd={}", self.0.source.raw);
509+
log::trace!("readable_owned: fd={}", self.0.handle.source.raw);
478510
Poll::Ready(Ok(()))
479511
}
480512
}
481513

514+
impl<T> fmt::Debug for ReadableOwned<T> {
515+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
516+
f.debug_struct("ReadableOwned").finish()
517+
}
518+
}
519+
482520
/// Future for [`Async::writable`](crate::Async::writable).
483-
#[derive(Debug)]
484521
#[must_use = "futures do nothing unless you `.await` or poll them"]
485-
pub struct Writable(Ready);
522+
pub struct Writable<'a, T>(Ready<&'a crate::Async<T>, T>);
486523

487-
impl Future for Writable {
524+
impl<T> Future for Writable<'_, T> {
488525
type Output = io::Result<()>;
489526

490527
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
491528
ready!(Pin::new(&mut self.0).poll(cx))?;
492-
log::trace!("writable: fd={}", self.0.source.raw);
529+
log::trace!("writable: fd={}", self.0.handle.source.raw);
493530
Poll::Ready(Ok(()))
494531
}
495532
}
496533

497-
#[derive(Debug)]
498-
struct Ready {
499-
source: Arc<Source>,
534+
impl<T> fmt::Debug for Writable<'_, T> {
535+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
536+
f.debug_struct("Writable").finish()
537+
}
538+
}
539+
540+
/// Future for [`Async::writable_owned`](crate::Async::writable_owned).
541+
#[must_use = "futures do nothing unless you `.await` or poll them"]
542+
pub struct WritableOwned<T>(Ready<Arc<crate::Async<T>>, T>);
543+
544+
impl<T> Future for WritableOwned<T> {
545+
type Output = io::Result<()>;
546+
547+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
548+
ready!(Pin::new(&mut self.0).poll(cx))?;
549+
log::trace!("writable_owned: fd={}", self.0.handle.source.raw);
550+
Poll::Ready(Ok(()))
551+
}
552+
}
553+
554+
impl<T> fmt::Debug for WritableOwned<T> {
555+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
556+
f.debug_struct("WritableOwned").finish()
557+
}
558+
}
559+
560+
struct Ready<H: Borrow<crate::Async<T>>, T> {
561+
handle: H,
500562
dir: usize,
501563
ticks: Option<(usize, usize)>,
502564
index: Option<usize>,
503-
_guard: Option<RemoveOnDrop>,
565+
_guard: Option<RemoveOnDrop<H, T>>,
504566
}
505567

506-
impl Future for Ready {
568+
impl<H: Borrow<crate::Async<T>>, T> Unpin for Ready<H, T> {}
569+
570+
impl<H: Borrow<crate::Async<T>> + Clone, T> Future for Ready<H, T> {
507571
type Output = io::Result<()>;
508572

509573
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
510574
let Self {
511-
source,
575+
ref handle,
512576
dir,
513577
ticks,
514578
index,
515579
_guard,
580+
..
516581
} = &mut *self;
517582

518-
let mut state = source.state.lock().unwrap();
583+
let mut state = handle.borrow().source.state.lock().unwrap();
519584

520585
// Check if the reactor has delivered an event.
521586
if let Some((a, b)) = *ticks {
@@ -534,9 +599,10 @@ impl Future for Ready {
534599
None => {
535600
let i = state[*dir].wakers.insert(None);
536601
*_guard = Some(RemoveOnDrop {
537-
source: source.clone(),
602+
handle: handle.clone(),
538603
dir: *dir,
539604
key: i,
605+
_marker: PhantomData,
540606
});
541607
*index = Some(i);
542608
*ticks = Some((Reactor::get().ticker(), state[*dir].tick));
@@ -548,9 +614,9 @@ impl Future for Ready {
548614
// Update interest in this I/O handle.
549615
if was_empty {
550616
Reactor::get().poller.modify(
551-
source.raw,
617+
handle.borrow().source.raw,
552618
Event {
553-
key: source.key,
619+
key: handle.borrow().source.key,
554620
readable: !state[READ].is_empty(),
555621
writable: !state[WRITE].is_empty(),
556622
},
@@ -562,16 +628,16 @@ impl Future for Ready {
562628
}
563629

564630
/// Remove waker when dropped.
565-
#[derive(Debug)]
566-
struct RemoveOnDrop {
567-
source: Arc<Source>,
631+
struct RemoveOnDrop<H: Borrow<crate::Async<T>>, T> {
632+
handle: H,
568633
dir: usize,
569634
key: usize,
635+
_marker: PhantomData<fn() -> T>,
570636
}
571637

572-
impl Drop for RemoveOnDrop {
638+
impl<H: Borrow<crate::Async<T>>, T> Drop for RemoveOnDrop<H, T> {
573639
fn drop(&mut self) {
574-
let mut state = self.source.state.lock().unwrap();
640+
let mut state = self.handle.borrow().source.state.lock().unwrap();
575641
let wakers = &mut state[self.dir].wakers;
576642
if wakers.contains(self.key) {
577643
wakers.remove(self.key);

0 commit comments

Comments
 (0)