Skip to content

Commit 64aa315

Browse files
committed
Add Readable and Writable futures
1 parent 68f9d40 commit 64aa315

File tree

2 files changed

+126
-63
lines changed

2 files changed

+126
-63
lines changed

src/lib.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ mod driver;
8585
mod reactor;
8686

8787
pub use driver::block_on;
88+
pub use reactor::{Readable, Writable};
8889

8990
/// Use `Duration::MAX` once `duration_constants` are stabilized.
9091
fn duration_max() -> Duration {
@@ -686,8 +687,8 @@ impl<T> Async<T> {
686687
/// listener.readable().await?;
687688
/// # std::io::Result::Ok(()) });
688689
/// ```
689-
pub async fn readable(&self) -> io::Result<()> {
690-
self.source.readable().await
690+
pub fn readable(&self) -> Readable {
691+
self.source.readable()
691692
}
692693

693694
/// Waits until the I/O handle is writable.
@@ -708,8 +709,8 @@ impl<T> Async<T> {
708709
/// stream.writable().await?;
709710
/// # std::io::Result::Ok(()) });
710711
/// ```
711-
pub async fn writable(&self) -> io::Result<()> {
712-
self.source.writable().await
712+
pub fn writable(&self) -> Writable {
713+
self.source.writable()
713714
}
714715

715716
/// Polls the I/O handle for readability.

src/reactor.rs

Lines changed: 121 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,20 @@
11
use std::collections::BTreeMap;
2+
use std::future::Future;
23
use std::io;
34
use std::mem;
45
#[cfg(unix)]
56
use std::os::unix::io::RawFd;
67
#[cfg(windows)]
78
use std::os::windows::io::RawSocket;
89
use std::panic;
10+
use std::pin::Pin;
911
use std::sync::atomic::{AtomicUsize, Ordering};
1012
use std::sync::{Arc, Mutex, MutexGuard};
1113
use std::task::{Context, Poll, Waker};
1214
use std::time::{Duration, Instant};
1315

1416
use concurrent_queue::ConcurrentQueue;
15-
use futures_lite::future;
17+
use futures_lite::ready;
1618
use once_cell::sync::Lazy;
1719
use polling::{Event, Poller};
1820
use slab::Slab;
@@ -441,78 +443,138 @@ impl Source {
441443
}
442444

443445
/// Waits until the I/O source is readable.
444-
pub(crate) async fn readable(&self) -> io::Result<()> {
445-
self.ready(READ).await?;
446-
log::trace!("readable: fd={}", self.raw);
447-
Ok(())
446+
pub(crate) fn readable(self: &Arc<Self>) -> Readable {
447+
Readable(self.ready(READ))
448448
}
449449

450450
/// Waits until the I/O source is writable.
451-
pub(crate) async fn writable(&self) -> io::Result<()> {
452-
self.ready(WRITE).await?;
453-
log::trace!("writable: fd={}", self.raw);
454-
Ok(())
451+
pub(crate) fn writable(self: &Arc<Self>) -> Writable {
452+
Writable(self.ready(WRITE))
455453
}
456454

457455
/// Waits until the I/O source is readable or writable.
458-
async fn ready(&self, dir: usize) -> io::Result<()> {
459-
let mut ticks = None;
460-
let mut index = None;
461-
let mut _guard = None;
462-
463-
future::poll_fn(|cx| {
464-
let mut state = self.state.lock().unwrap();
465-
466-
// Check if the reactor has delivered an event.
467-
if let Some((a, b)) = ticks {
468-
// If `state[dir].tick` has changed to a value other than the old reactor tick,
469-
// that means a newer reactor tick has delivered an event.
470-
if state[dir].tick != a && state[dir].tick != b {
471-
return Poll::Ready(Ok(()));
472-
}
456+
fn ready(self: &Arc<Self>, dir: usize) -> Ready {
457+
Ready {
458+
source: self.clone(),
459+
dir,
460+
ticks: None,
461+
index: None,
462+
_guard: None,
463+
}
464+
}
465+
}
466+
467+
/// Future for [`Async::readable`](crate::Async::readable).
468+
#[derive(Debug)]
469+
#[must_use = "futures do nothing unless you `.await` or poll them"]
470+
pub struct Readable(Ready);
471+
472+
impl Future for Readable {
473+
type Output = io::Result<()>;
474+
475+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
476+
ready!(Pin::new(&mut self.0).poll(cx))?;
477+
log::trace!("readable: fd={}", self.0.source.raw);
478+
Poll::Ready(Ok(()))
479+
}
480+
}
481+
482+
/// Future for [`Async::writable`](crate::Async::writable).
483+
#[derive(Debug)]
484+
#[must_use = "futures do nothing unless you `.await` or poll them"]
485+
pub struct Writable(Ready);
486+
487+
impl Future for Writable {
488+
type Output = io::Result<()>;
489+
490+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
491+
ready!(Pin::new(&mut self.0).poll(cx))?;
492+
log::trace!("writable: fd={}", self.0.source.raw);
493+
Poll::Ready(Ok(()))
494+
}
495+
}
496+
497+
#[derive(Debug)]
498+
struct Ready {
499+
source: Arc<Source>,
500+
dir: usize,
501+
ticks: Option<(usize, usize)>,
502+
index: Option<usize>,
503+
_guard: Option<RemoveOnDrop>,
504+
}
505+
506+
impl Future for Ready {
507+
type Output = io::Result<()>;
508+
509+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
510+
let Self {
511+
source,
512+
dir,
513+
ticks,
514+
index,
515+
_guard,
516+
} = &mut *self;
517+
518+
let mut state = source.state.lock().unwrap();
519+
520+
// Check if the reactor has delivered an event.
521+
if let Some((a, b)) = *ticks {
522+
// If `state[dir].tick` has changed to a value other than the old reactor tick,
523+
// that means a newer reactor tick has delivered an event.
524+
if state[*dir].tick != a && state[*dir].tick != b {
525+
return Poll::Ready(Ok(()));
473526
}
527+
}
474528

475-
let was_empty = state[dir].is_empty();
476-
477-
// Register the current task's waker.
478-
let i = match index {
479-
Some(i) => i,
480-
None => {
481-
let i = state[dir].wakers.insert(None);
482-
_guard = Some(CallOnDrop(move || {
483-
let mut state = self.state.lock().unwrap();
484-
state[dir].wakers.remove(i);
485-
}));
486-
index = Some(i);
487-
ticks = Some((Reactor::get().ticker(), state[dir].tick));
488-
i
489-
}
490-
};
491-
state[dir].wakers[i] = Some(cx.waker().clone());
492-
493-
// Update interest in this I/O handle.
494-
if was_empty {
495-
Reactor::get().poller.modify(
496-
self.raw,
497-
Event {
498-
key: self.key,
499-
readable: !state[READ].is_empty(),
500-
writable: !state[WRITE].is_empty(),
501-
},
502-
)?;
529+
let was_empty = state[*dir].is_empty();
530+
531+
// Register the current task's waker.
532+
let i = match *index {
533+
Some(i) => i,
534+
None => {
535+
let i = state[*dir].wakers.insert(None);
536+
*_guard = Some(RemoveOnDrop {
537+
source: source.clone(),
538+
dir: *dir,
539+
key: i,
540+
});
541+
*index = Some(i);
542+
*ticks = Some((Reactor::get().ticker(), state[*dir].tick));
543+
i
503544
}
545+
};
546+
state[*dir].wakers[i] = Some(cx.waker().clone());
504547

505-
Poll::Pending
506-
})
507-
.await
548+
// Update interest in this I/O handle.
549+
if was_empty {
550+
Reactor::get().poller.modify(
551+
source.raw,
552+
Event {
553+
key: source.key,
554+
readable: !state[READ].is_empty(),
555+
writable: !state[WRITE].is_empty(),
556+
},
557+
)?;
558+
}
559+
560+
Poll::Pending
508561
}
509562
}
510563

511-
/// Runs a closure when dropped.
512-
struct CallOnDrop<F: Fn()>(F);
564+
/// Remove waker when dropped.
565+
#[derive(Debug)]
566+
struct RemoveOnDrop {
567+
source: Arc<Source>,
568+
dir: usize,
569+
key: usize,
570+
}
513571

514-
impl<F: Fn()> Drop for CallOnDrop<F> {
572+
impl Drop for RemoveOnDrop {
515573
fn drop(&mut self) {
516-
(self.0)();
574+
let mut state = self.source.state.lock().unwrap();
575+
let wakers = &mut state[self.dir].wakers;
576+
if wakers.contains(self.key) {
577+
wakers.remove(self.key);
578+
}
517579
}
518580
}

0 commit comments

Comments
 (0)