Skip to content

Refactor the networking driver #188

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from Sep 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
250 changes: 57 additions & 193 deletions src/net/driver/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
use std::fmt;
use std::io::{Read as _, Write as _};
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};

use futures_io::{AsyncRead, AsyncWrite};
use lazy_static::lazy_static;
use mio::{self, Evented};
use slab::Slab;
Expand All @@ -19,9 +15,6 @@ struct Entry {
/// A unique identifier.
token: mio::Token,

/// Indicates whether this I/O handle is ready for reading, writing, or if it is disconnected.
readiness: AtomicUsize,

/// Tasks that are blocked on reading from this I/O handle.
readers: Mutex<Vec<Waker>>,

Expand Down Expand Up @@ -75,7 +68,6 @@ impl Reactor {
// Allocate an entry and insert it into the slab.
let entry = Arc::new(Entry {
token,
readiness: AtomicUsize::new(mio::Ready::empty().as_usize()),
readers: Mutex::new(Vec::new()),
writers: Mutex::new(Vec::new()),
});
Expand Down Expand Up @@ -151,9 +143,6 @@ fn main_loop() -> io::Result<()> {
if let Some(entry) = entries.get(token.0) {
// Set the readiness flags from this I/O event.
let readiness = event.readiness();
entry
.readiness
.fetch_or(readiness.as_usize(), Ordering::SeqCst);

// Wake up reader tasks blocked on this I/O handle.
if !(readiness & reader_interests()).is_empty() {
Expand All @@ -178,21 +167,21 @@ fn main_loop() -> io::Result<()> {
///
/// This handle wraps an I/O event source and exposes a "futurized" interface on top of it,
/// implementing traits `AsyncRead` and `AsyncWrite`.
pub struct IoHandle<T: Evented> {
pub struct Watcher<T: Evented> {
/// Data associated with the I/O handle.
entry: Arc<Entry>,

/// The I/O event source.
source: Option<T>,
}

impl<T: Evented> IoHandle<T> {
impl<T: Evented> Watcher<T> {
/// Creates a new I/O handle.
///
/// The provided I/O event source will be kept registered inside the reactor's poller for the
/// lifetime of the returned I/O handle.
pub fn new(source: T) -> IoHandle<T> {
IoHandle {
pub fn new(source: T) -> Watcher<T> {
Watcher {
entry: REACTOR
.register(&source)
.expect("cannot register an I/O event source"),
Expand All @@ -205,91 +194,76 @@ impl<T: Evented> IoHandle<T> {
self.source.as_ref().unwrap()
}

/// Polls the I/O handle for reading.
/// Polls the inner I/O source for a non-blocking read operation.
///
/// If reading from the I/O handle would block, `Poll::Pending` will be returned.
pub fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let mask = reader_interests();
let mut readiness = mio::Ready::from_usize(self.entry.readiness.load(Ordering::SeqCst));

if (readiness & mask).is_empty() {
let mut list = self.entry.readers.lock().unwrap();
if list.iter().all(|w| !w.will_wake(cx.waker())) {
list.push(cx.waker().clone());
}

readiness = mio::Ready::from_usize(self.entry.readiness.fetch_or(0, Ordering::SeqCst));
/// If the operation returns an error of the `io::ErrorKind::WouldBlock` kind, the current task
/// will be registered for wakeup when the I/O source becomes readable.
pub fn poll_read_with<'a, F, R>(&'a self, cx: &mut Context<'_>, mut f: F) -> Poll<io::Result<R>>
where
F: FnMut(&'a T) -> io::Result<R>,
{
// If the operation isn't blocked, return its result.
match f(self.source.as_ref().unwrap()) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return Poll::Ready(res),
}

if (readiness & mask).is_empty() {
Poll::Pending
} else {
Poll::Ready(Ok(()))
// Lock the waker list.
let mut list = self.entry.readers.lock().unwrap();

// Try running the operation again.
match f(self.source.as_ref().unwrap()) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return Poll::Ready(res),
}
}

/// Clears the readability status.
///
/// This method is usually called when an attempt at reading from the OS-level I/O handle
/// returns `io::ErrorKind::WouldBlock`.
pub fn clear_readable(&self, cx: &mut Context<'_>) -> io::Result<()> {
let mask = reader_interests() - hup();
self.entry
.readiness
.fetch_and(!mask.as_usize(), Ordering::SeqCst);

if self.poll_readable(cx)?.is_ready() {
// Wake the current task.
cx.waker().wake_by_ref();
// Register the task if it isn't registered already.
if list.iter().all(|w| !w.will_wake(cx.waker())) {
list.push(cx.waker().clone());
}

Ok(())
Poll::Pending
}

/// Polls the I/O handle for writing.
/// Polls the inner I/O source for a non-blocking write operation.
///
/// If writing into the I/O handle would block, `Poll::Pending` will be returned.
pub fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let mask = writer_interests();
let mut readiness = mio::Ready::from_usize(self.entry.readiness.load(Ordering::SeqCst));

if (readiness & mask).is_empty() {
let mut list = self.entry.writers.lock().unwrap();
if list.iter().all(|w| !w.will_wake(cx.waker())) {
list.push(cx.waker().clone());
}

readiness = mio::Ready::from_usize(self.entry.readiness.fetch_or(0, Ordering::SeqCst));
/// If the operation returns an error of the `io::ErrorKind::WouldBlock` kind, the current task
/// will be registered for wakeup when the I/O source becomes writable.
pub fn poll_write_with<'a, F, R>(
&'a self,
cx: &mut Context<'_>,
mut f: F,
) -> Poll<io::Result<R>>
where
F: FnMut(&'a T) -> io::Result<R>,
{
// If the operation isn't blocked, return its result.
match f(self.source.as_ref().unwrap()) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return Poll::Ready(res),
}

if (readiness & mask).is_empty() {
Poll::Pending
} else {
Poll::Ready(Ok(()))
// Lock the waker list.
let mut list = self.entry.writers.lock().unwrap();

// Try running the operation again.
match f(self.source.as_ref().unwrap()) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return Poll::Ready(res),
}
}

/// Clears the writability status.
///
/// This method is usually called when an attempt at writing from the OS-level I/O handle
/// returns `io::ErrorKind::WouldBlock`.
pub fn clear_writable(&self, cx: &mut Context<'_>) -> io::Result<()> {
let mask = writer_interests() - hup();
self.entry
.readiness
.fetch_and(!mask.as_usize(), Ordering::SeqCst);

if self.poll_writable(cx)?.is_ready() {
// Wake the current task.
cx.waker().wake_by_ref();
// Register the task if it isn't registered already.
if list.iter().all(|w| !w.will_wake(cx.waker())) {
list.push(cx.waker().clone());
}

Ok(())
Poll::Pending
}

/// Deregisters and returns the inner I/O source.
///
/// This method is typically used to convert `IoHandle`s to raw file descriptors/handles.
/// This method is typically used to convert `Watcher`s to raw file descriptors/handles.
#[allow(dead_code)]
pub fn into_inner(mut self) -> T {
let source = self.source.take().unwrap();
REACTOR
Expand All @@ -299,7 +273,7 @@ impl<T: Evented> IoHandle<T> {
}
}

impl<T: Evented> Drop for IoHandle<T> {
impl<T: Evented> Drop for Watcher<T> {
fn drop(&mut self) {
if let Some(ref source) = self.source {
REACTOR
Expand All @@ -309,125 +283,15 @@ impl<T: Evented> Drop for IoHandle<T> {
}
}

impl<T: Evented + fmt::Debug> fmt::Debug for IoHandle<T> {
impl<T: Evented + fmt::Debug> fmt::Debug for Watcher<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("IoHandle")
f.debug_struct("Watcher")
.field("entry", &self.entry)
.field("source", &self.source)
.finish()
}
}

impl<T: Evented + std::io::Read + Unpin> AsyncRead for IoHandle<T> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
futures_core::ready!(Pin::new(&mut *self).poll_readable(cx)?);

match self.source.as_mut().unwrap().read(buf) {
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
self.clear_readable(cx)?;
Poll::Pending
}
res => Poll::Ready(res),
}
}
}

impl<'a, T: Evented + Unpin> AsyncRead for &'a IoHandle<T>
where
&'a T: std::io::Read,
{
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
futures_core::ready!(Pin::new(&mut *self).poll_readable(cx)?);

match self.source.as_ref().unwrap().read(buf) {
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
self.clear_readable(cx)?;
Poll::Pending
}
res => Poll::Ready(res),
}
}
}

impl<T: Evented + std::io::Write + Unpin> AsyncWrite for IoHandle<T> {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
futures_core::ready!(self.poll_writable(cx)?);

match self.source.as_mut().unwrap().write(buf) {
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
self.clear_writable(cx)?;
Poll::Pending
}
res => Poll::Ready(res),
}
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
futures_core::ready!(self.poll_writable(cx)?);

match self.source.as_mut().unwrap().flush() {
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
self.clear_writable(cx)?;
Poll::Pending
}
res => Poll::Ready(res),
}
}

fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
}

impl<'a, T: Evented + Unpin> AsyncWrite for &'a IoHandle<T>
where
&'a T: std::io::Write,
{
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
futures_core::ready!(self.poll_writable(cx)?);

match self.get_ref().write(buf) {
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
self.clear_writable(cx)?;
Poll::Pending
}
res => Poll::Ready(res),
}
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
futures_core::ready!(self.poll_writable(cx)?);

match self.get_ref().flush() {
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
self.clear_writable(cx)?;
Poll::Pending
}
res => Poll::Ready(res),
}
}

fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
}

/// Returns a mask containing flags that interest tasks reading from I/O handles.
#[inline]
fn reader_interests() -> mio::Ready {
Expand Down
Loading