Skip to content

Commit 41ffbc0

Browse files
author
Stjepan Glavina
committed
Switch to polling crate
1 parent c00b5d3 commit 41ffbc0

File tree

8 files changed

+68
-773
lines changed

8 files changed

+68
-773
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ futures-lite = "0.1.10"
1919
libc = "0.2.74"
2020
once_cell = "1.4.0"
2121
parking = "2.0.0"
22+
polling = "0.1.0"
2223
socket2 = { version = "0.3.12", features = ["pair", "unix"] }
2324
vec-arena = "0.5.0"
2425

@@ -28,7 +29,6 @@ winapi = { version = "0.3.9", features = ["ioapiset"] }
2829

2930
[dev-dependencies]
3031
async-channel = "1.4.0"
31-
async-dup = "1.2.1"
3232
blocking = "0.5.0"
3333
signal-hook = "0.1.16"
3434
tempfile = "3.1.0"

src/lib.rs

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -54,19 +54,19 @@
5454
5555
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
5656

57-
use std::fmt::Debug;
5857
use std::future::Future;
5958
use std::io::{self, IoSlice, IoSliceMut, Read, Write};
60-
use std::net::{SocketAddr, TcpListener, TcpStream, UdpSocket};
59+
use std::mem::ManuallyDrop;
60+
use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream, UdpSocket};
6161
#[cfg(windows)]
62-
use std::os::windows::io::{AsRawSocket, RawSocket};
62+
use std::os::windows::io::{AsRawSocket, FromRawSocket, RawSocket};
6363
use std::pin::Pin;
6464
use std::sync::Arc;
6565
use std::task::{Context, Poll, Waker};
6666
use std::time::{Duration, Instant};
6767
#[cfg(unix)]
6868
use std::{
69-
os::unix::io::{AsRawFd, RawFd},
69+
os::unix::io::{AsRawFd, FromRawFd, RawFd},
7070
os::unix::net::{SocketAddr as UnixSocketAddr, UnixDatagram, UnixListener, UnixStream},
7171
path::Path,
7272
};
@@ -79,9 +79,7 @@ use socket2::{Domain, Protocol, Socket, Type};
7979
use crate::reactor::{Reactor, Source};
8080

8181
pub mod parking;
82-
8382
mod reactor;
84-
mod sys;
8583

8684
/// A timer that expires after a duration of time.
8785
///
@@ -656,7 +654,7 @@ impl<T: Write> AsyncWrite for Async<T> {
656654
}
657655

658656
fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
659-
Poll::Ready(sys::shutdown_write(self.source.raw))
657+
Poll::Ready(shutdown_write(self.source.raw))
660658
}
661659
}
662660

@@ -685,7 +683,7 @@ where
685683
}
686684

687685
fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
688-
Poll::Ready(sys::shutdown_write(self.source.raw))
686+
Poll::Ready(shutdown_write(self.source.raw))
689687
}
690688
}
691689

@@ -1312,3 +1310,25 @@ async fn optimistic(fut: impl Future<Output = io::Result<()>>) -> io::Result<()>
13121310
})
13131311
.await
13141312
}
1313+
1314+
/// Shuts down the write side of a socket.
1315+
///
1316+
/// If this source is not a socket, the `shutdown()` syscall error is ignored.
1317+
pub fn shutdown_write(#[cfg(unix)] raw: RawFd, #[cfg(windows)] raw: RawSocket) -> io::Result<()> {
1318+
// This may not be a TCP stream, but that's okay. All we do is attempt a `shutdown()` on the
1319+
// raw descriptor and ignore errors.
1320+
let stream = unsafe {
1321+
ManuallyDrop::new(
1322+
#[cfg(unix)]
1323+
TcpStream::from_raw_fd(raw),
1324+
#[cfg(windows)]
1325+
TcpStream::from_raw_socket(raw),
1326+
)
1327+
};
1328+
1329+
// If the socket is a TCP stream, the only actual error can be ENOTCONN.
1330+
match stream.shutdown(Shutdown::Write) {
1331+
Err(err) if err.kind() == io::ErrorKind::NotConnected => Err(err),
1332+
_ => Ok(()),
1333+
}
1334+
}

src/parking.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,10 @@ pub fn pair() -> (Parker, Unparker) {
7373
/// Waits for a notification.
7474
#[derive(Debug)]
7575
pub struct Parker {
76+
/// The inner parker implementation.
7677
inner: parking::Parker,
78+
79+
/// Set to `true` when the parker is polling I/O.
7780
io: Arc<AtomicBool>,
7881
}
7982

@@ -289,7 +292,10 @@ impl Default for Parker {
289292
/// Notifies a parker.
290293
#[derive(Clone, Debug)]
291294
pub struct Unparker {
295+
/// The inner unparker implementation.
292296
inner: parking::Unparker,
297+
298+
/// Set to `true` when the parker is polling I/O.
293299
io: Arc<AtomicBool>,
294300
}
295301

src/reactor.rs

Lines changed: 33 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,9 @@ use std::time::{Duration, Instant};
1515
use concurrent_queue::ConcurrentQueue;
1616
use futures_lite::*;
1717
use once_cell::sync::Lazy;
18+
use polling::{Event, Poller};
1819
use vec_arena::Arena;
1920

20-
use crate::sys;
21-
2221
/// The reactor.
2322
///
2423
/// There is only one global instance of this type, accessible by [`Reactor::get()`].
@@ -29,8 +28,8 @@ pub(crate) struct Reactor {
2928
/// Unparks the async-io thread.
3029
thread_unparker: parking::Unparker,
3130

32-
/// Raw bindings to epoll/kqueue/wepoll.
33-
sys: sys::Reactor,
31+
/// Bindings to epoll/kqueue/wepoll.
32+
poller: Poller,
3433

3534
/// Ticker bumped before polling.
3635
ticker: AtomicUsize,
@@ -39,7 +38,7 @@ pub(crate) struct Reactor {
3938
sources: Mutex<Arena<Arc<Source>>>,
4039

4140
/// Temporary storage for I/O events when polling the reactor.
42-
events: Mutex<sys::Events>,
41+
events: Mutex<Vec<Event>>,
4342

4443
/// An ordered map of registered timers.
4544
///
@@ -74,10 +73,10 @@ impl Reactor {
7473
Reactor {
7574
parker_count: AtomicUsize::new(0),
7675
thread_unparker: unparker,
77-
sys: sys::Reactor::new().expect("cannot initialize I/O event notification"),
76+
poller: Poller::new().expect("cannot initialize I/O event notification"),
7877
ticker: AtomicUsize::new(0),
7978
sources: Mutex::new(Arena::new()),
80-
events: Mutex::new(sys::Events::new()),
79+
events: Mutex::new(Vec::new()),
8180
timers: Mutex::new(BTreeMap::new()),
8281
timer_ops: ConcurrentQueue::bounded(1000),
8382
}
@@ -143,7 +142,7 @@ impl Reactor {
143142

144143
/// Notifies the thread blocked on the reactor.
145144
pub(crate) fn notify(&self) {
146-
self.sys.notify().expect("failed to notify reactor");
145+
self.poller.notify().expect("failed to notify reactor");
147146
}
148147

149148
/// Registers an I/O source in the reactor.
@@ -153,7 +152,7 @@ impl Reactor {
153152
#[cfg(windows)] raw: RawSocket,
154153
) -> io::Result<Arc<Source>> {
155154
// Register the file descriptor.
156-
self.sys.insert(raw)?;
155+
self.poller.insert(raw)?;
157156

158157
// Create an I/O source for this file descriptor.
159158
let mut sources = self.sources.lock().unwrap();
@@ -177,7 +176,7 @@ impl Reactor {
177176
pub(crate) fn remove_io(&self, source: &Source) -> io::Result<()> {
178177
let mut sources = self.sources.lock().unwrap();
179178
sources.remove(source.key);
180-
self.sys.remove(source.raw)
179+
self.poller.remove(source.raw)
181180
}
182181

183182
/// Registers a timer in the reactor.
@@ -287,7 +286,7 @@ impl Reactor {
287286
/// A lock on the reactor.
288287
pub(crate) struct ReactorLock<'a> {
289288
reactor: &'a Reactor,
290-
events: MutexGuard<'a, sys::Events>,
289+
events: MutexGuard<'a, Vec<Event>>,
291290
}
292291

293292
impl ReactorLock<'_> {
@@ -313,7 +312,7 @@ impl ReactorLock<'_> {
313312
.wrapping_add(1);
314313

315314
// Block on I/O events.
316-
let res = match self.reactor.sys.wait(&mut self.events, timeout) {
315+
let res = match self.reactor.poller.wait(&mut self.events, timeout) {
317316
// No I/O events occurred.
318317
Ok(0) => {
319318
if timeout != Some(Duration::from_secs(0)) {
@@ -350,11 +349,13 @@ impl ReactorLock<'_> {
350349
// previously interested in both readability and
351350
// writability, but only one of them was emitted.
352351
if !(w.writers.is_empty() && w.readers.is_empty()) {
353-
self.reactor.sys.interest(
352+
self.reactor.poller.interest(
354353
source.raw,
355-
source.key,
356-
!w.readers.is_empty(),
357-
!w.writers.is_empty(),
354+
Event {
355+
key: source.key,
356+
readable: !w.readers.is_empty(),
357+
writable: !w.writers.is_empty(),
358+
},
358359
)?;
359360
}
360361
}
@@ -442,9 +443,14 @@ impl Source {
442443

443444
// If there are no other readers, re-register in the reactor.
444445
if w.readers.is_empty() {
445-
Reactor::get()
446-
.sys
447-
.interest(self.raw, self.key, true, !w.writers.is_empty())?;
446+
Reactor::get().poller.interest(
447+
self.raw,
448+
Event {
449+
key: self.key,
450+
readable: true,
451+
writable: !w.writers.is_empty(),
452+
},
453+
)?;
448454
}
449455

450456
// Register the current task's waker if not present already.
@@ -483,9 +489,14 @@ impl Source {
483489

484490
// If there are no other writers, re-register in the reactor.
485491
if w.writers.is_empty() {
486-
Reactor::get()
487-
.sys
488-
.interest(self.raw, self.key, !w.readers.is_empty(), true)?;
492+
Reactor::get().poller.interest(
493+
self.raw,
494+
Event {
495+
key: self.key,
496+
readable: !w.readers.is_empty(),
497+
writable: true,
498+
},
499+
)?;
489500
}
490501

491502
// Register the current task's waker if not present already.

0 commit comments

Comments
 (0)