Skip to content

Commit 380f032

Browse files
committed
feat: Expose other kqueue filters
1 parent 5fffa8f commit 380f032

File tree

7 files changed

+405
-32
lines changed

7 files changed

+405
-32
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ concurrent-queue = "2"
2525
futures-lite = "1.11.0"
2626
log = "0.4.11"
2727
parking = "2.0.0"
28-
polling = "2.0.0"
28+
polling = { git = "https://github.com/smol-rs/polling.git" }
2929
rustix = { version = "0.36.0", default-features = false, features = ["std", "fs"] }
3030
slab = "0.4.2"
3131
socket2 = { version = "0.4.2", features = ["all"] }

examples/kqueue-process.rs

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
//! Uses the `async_io::os::kqueue` module to wait for a process to terminate.
2+
//!
3+
//! Run with:
4+
//!
5+
//! ```
6+
//! cargo run --example kqueue-process
7+
//! ```
8+
9+
#[cfg(any(
10+
target_os = "macos",
11+
target_os = "ios",
12+
target_os = "tvos",
13+
target_os = "watchos",
14+
target_os = "freebsd",
15+
target_os = "netbsd",
16+
target_os = "openbsd",
17+
target_os = "dragonfly",
18+
))]
19+
fn main() -> std::io::Result<()> {
20+
use std::process::Command;
21+
22+
use async_io::os::kqueue::{AsyncKqueueExt, Exit};
23+
use async_io::Async;
24+
use futures_lite::future;
25+
26+
future::block_on(async {
27+
// Spawn a process.
28+
let process = Command::new("sleep")
29+
.arg("3")
30+
.spawn()
31+
.expect("failed to spawn process");
32+
33+
// Wrap the process in an `Async` object that waits for it to exit.
34+
let process = Async::with_filter(Exit::new(process))?;
35+
36+
// Wait for the process to exit.
37+
process.readable().await?;
38+
39+
Ok(())
40+
})
41+
}
42+
43+
#[cfg(not(any(
44+
target_os = "macos",
45+
target_os = "ios",
46+
target_os = "tvos",
47+
target_os = "watchos",
48+
target_os = "freebsd",
49+
target_os = "netbsd",
50+
target_os = "openbsd",
51+
target_os = "dragonfly",
52+
)))]
53+
fn main() {
54+
println!("This example only works for kqueue-enabled platforms.");
55+
}

src/lib.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ use crate::reactor::{Reactor, Source};
8888
mod driver;
8989
mod reactor;
9090

91+
pub mod os;
92+
9193
pub use driver::block_on;
9294
pub use reactor::{Readable, ReadableOwned, Writable, WritableOwned};
9395

@@ -664,7 +666,7 @@ impl<T: AsRawFd> Async<T> {
664666
#[cfg(unix)]
665667
impl<T: AsRawFd> AsRawFd for Async<T> {
666668
fn as_raw_fd(&self) -> RawFd {
667-
self.source.raw
669+
self.get_ref().as_raw_fd()
668670
}
669671
}
670672

@@ -740,7 +742,7 @@ impl<T: AsRawSocket> Async<T> {
740742
#[cfg(windows)]
741743
impl<T: AsRawSocket> AsRawSocket for Async<T> {
742744
fn as_raw_socket(&self) -> RawSocket {
743-
self.source.raw
745+
self.get_ref().as_raw_socket()
744746
}
745747
}
746748

src/os.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
//! Platform-specific functionality.
2+
3+
#[cfg(any(
4+
target_os = "macos",
5+
target_os = "ios",
6+
target_os = "tvos",
7+
target_os = "watchos",
8+
target_os = "freebsd",
9+
target_os = "netbsd",
10+
target_os = "openbsd",
11+
target_os = "dragonfly",
12+
))]
13+
pub mod kqueue;
14+
15+
mod __private {
16+
#[doc(hidden)]
17+
pub trait AsyncSealed {}
18+
19+
impl<T> AsyncSealed for crate::Async<T> {}
20+
}

src/os/kqueue.rs

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
//! Functionality that is only available for `kqueue`-based platforms.
2+
3+
use super::__private::AsyncSealed;
4+
use __private::FilterSealed;
5+
6+
use crate::reactor::{Reactor, Registration};
7+
use crate::Async;
8+
9+
use std::io::Result;
10+
use std::process::Child;
11+
12+
/// An extension trait for [`Async`](crate::Async) that provides the ability to register other
13+
/// queueable objects into the reactor.
14+
///
15+
/// The underlying `kqueue` implementation can be used to poll for events besides file descriptor
16+
/// read/write readiness. This API makes these faculties available to the user.
17+
///
18+
/// See the [`Filter`] trait and its implementors for objects that currently support being registered
19+
/// into the reactor.
20+
pub trait AsyncKqueueExt<T: Filter>: AsyncSealed {
21+
/// Create a new [`Async`](crate::Async) around a [`Filter`].
22+
///
23+
/// # Examples
24+
///
25+
/// ```no_run
26+
/// use std::process::Command;
27+
///
28+
/// use async_io::Async;
29+
/// use async_io::os::kqueue::{AsyncKqueueExt, Exit};
30+
///
31+
/// // Create a new process to wait for.
32+
/// let mut child = Command::new("sleep").arg("5").spawn().unwrap();
33+
///
34+
/// // Wrap the process in an `Async` object that waits for it to exit.
35+
/// let process = Async::with_filter(Exit::new(child)).unwrap();
36+
///
37+
/// // Wait for the process to exit.
38+
/// # async_io::block_on(async {
39+
/// process.readable().await.unwrap();
40+
/// # });
41+
/// ```
42+
fn with_filter(filter: T) -> Result<Async<T>>;
43+
}
44+
45+
impl<T: Filter> AsyncKqueueExt<T> for Async<T> {
46+
fn with_filter(mut filter: T) -> Result<Async<T>> {
47+
Ok(Async {
48+
source: Reactor::get().insert_io(filter.registration())?,
49+
io: Some(filter),
50+
})
51+
}
52+
}
53+
54+
/// Objects that can be registered into the reactor via a [`Async`](crate::Async).
55+
pub trait Filter: FilterSealed {}
56+
57+
/// An object representing a signal.
58+
///
59+
/// When registered into [`Async`](crate::Async) via [`with_filter`](AsyncKqueueExt::with_filter),
60+
/// it will return a [`readable`](crate::Async::readable) event when the signal is received.
61+
#[derive(Debug, Copy, Clone, Eq, PartialEq, PartialOrd, Ord, Hash)]
62+
pub struct Signal(pub i32);
63+
64+
impl FilterSealed for Signal {
65+
fn registration(&mut self) -> Registration {
66+
(*self).into()
67+
}
68+
}
69+
impl Filter for Signal {}
70+
71+
/// Wait for a child process to exit.
72+
///
73+
/// When registered into [`Async`](crate::Async) via [`with_filter`](AsyncKqueueExt::with_filter),
74+
/// it will return a [`readable`](crate::Async::readable) event when the child process exits.
75+
#[derive(Debug)]
76+
pub struct Exit(Option<Child>);
77+
78+
impl Exit {
79+
/// Create a new `Exit` object.
80+
pub fn new(child: Child) -> Self {
81+
Self(Some(child))
82+
}
83+
}
84+
85+
impl FilterSealed for Exit {
86+
fn registration(&mut self) -> Registration {
87+
self.0.take().expect("Cannot reregister child").into()
88+
}
89+
}
90+
impl Filter for Exit {}
91+
92+
mod __private {
93+
use crate::reactor::Registration;
94+
95+
#[doc(hidden)]
96+
pub trait FilterSealed {
97+
/// Get a registration object for this filter.
98+
fn registration(&mut self) -> Registration;
99+
}
100+
}

src/reactor.rs

Lines changed: 25 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,6 @@ use std::future::Future;
55
use std::io;
66
use std::marker::PhantomData;
77
use std::mem;
8-
#[cfg(unix)]
9-
use std::os::unix::io::RawFd;
10-
#[cfg(windows)]
11-
use std::os::windows::io::RawSocket;
128
use std::panic;
139
use std::pin::Pin;
1410
use std::sync::atomic::{AtomicUsize, Ordering};
@@ -22,6 +18,9 @@ use futures_lite::ready;
2218
use polling::{Event, Poller};
2319
use slab::Slab;
2420

21+
mod registration;
22+
pub use registration::Registration;
23+
2524
const READ: usize = 0;
2625
const WRITE: usize = 1;
2726

@@ -88,17 +87,13 @@ impl Reactor {
8887
}
8988

9089
/// Registers an I/O source in the reactor.
91-
pub(crate) fn insert_io(
92-
&self,
93-
#[cfg(unix)] raw: RawFd,
94-
#[cfg(windows)] raw: RawSocket,
95-
) -> io::Result<Arc<Source>> {
90+
pub(crate) fn insert_io(&self, raw: impl Into<Registration>) -> io::Result<Arc<Source>> {
9691
// Create an I/O source for this file descriptor.
9792
let source = {
9893
let mut sources = self.sources.lock().unwrap();
9994
let key = sources.vacant_entry().key();
10095
let source = Arc::new(Source {
101-
raw,
96+
registration: raw.into(),
10297
key,
10398
state: Default::default(),
10499
});
@@ -107,7 +102,7 @@ impl Reactor {
107102
};
108103

109104
// Register the file descriptor.
110-
if let Err(err) = self.poller.add(raw, Event::none(source.key)) {
105+
if let Err(err) = source.registration.add(&self.poller, source.key) {
111106
let mut sources = self.sources.lock().unwrap();
112107
sources.remove(source.key);
113108
return Err(err);
@@ -120,7 +115,7 @@ impl Reactor {
120115
pub(crate) fn remove_io(&self, source: &Source) -> io::Result<()> {
121116
let mut sources = self.sources.lock().unwrap();
122117
sources.remove(source.key);
123-
self.poller.delete(source.raw)
118+
source.registration.delete(&self.poller)
124119
}
125120

126121
/// Registers a timer in the reactor.
@@ -299,8 +294,8 @@ impl ReactorLock<'_> {
299294
// e.g. we were previously interested in both readability and writability,
300295
// but only one of them was emitted.
301296
if !state[READ].is_empty() || !state[WRITE].is_empty() {
302-
self.reactor.poller.modify(
303-
source.raw,
297+
source.registration.modify(
298+
&self.reactor.poller,
304299
Event {
305300
key: source.key,
306301
readable: !state[READ].is_empty(),
@@ -341,13 +336,8 @@ enum TimerOp {
341336
/// A registered source of I/O events.
342337
#[derive(Debug)]
343338
pub(crate) struct Source {
344-
/// Raw file descriptor on Unix platforms.
345-
#[cfg(unix)]
346-
pub(crate) raw: RawFd,
347-
348-
/// Raw socket handle on Windows.
349-
#[cfg(windows)]
350-
pub(crate) raw: RawSocket,
339+
/// This source's registration into the reactor.
340+
registration: Registration,
351341

352342
/// The key of this source obtained during registration.
353343
key: usize,
@@ -436,8 +426,8 @@ impl Source {
436426

437427
// Update interest in this I/O handle.
438428
if was_empty {
439-
Reactor::get().poller.modify(
440-
self.raw,
429+
self.registration.modify(
430+
&Reactor::get().poller,
441431
Event {
442432
key: self.key,
443433
readable: !state[READ].is_empty(),
@@ -490,7 +480,7 @@ impl<T> Future for Readable<'_, T> {
490480

491481
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
492482
ready!(Pin::new(&mut self.0).poll(cx))?;
493-
log::trace!("readable: fd={}", self.0.handle.source.raw);
483+
log::trace!("readable: fd={:?}", &self.0.handle.source.registration);
494484
Poll::Ready(Ok(()))
495485
}
496486
}
@@ -510,7 +500,10 @@ impl<T> Future for ReadableOwned<T> {
510500

511501
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
512502
ready!(Pin::new(&mut self.0).poll(cx))?;
513-
log::trace!("readable_owned: fd={}", self.0.handle.source.raw);
503+
log::trace!(
504+
"readable_owned: fd={:?}",
505+
&self.0.handle.source.registration
506+
);
514507
Poll::Ready(Ok(()))
515508
}
516509
}
@@ -530,7 +523,7 @@ impl<T> Future for Writable<'_, T> {
530523

531524
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
532525
ready!(Pin::new(&mut self.0).poll(cx))?;
533-
log::trace!("writable: fd={}", self.0.handle.source.raw);
526+
log::trace!("writable: fd={:?}", &self.0.handle.source.registration);
534527
Poll::Ready(Ok(()))
535528
}
536529
}
@@ -550,7 +543,10 @@ impl<T> Future for WritableOwned<T> {
550543

551544
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
552545
ready!(Pin::new(&mut self.0).poll(cx))?;
553-
log::trace!("writable_owned: fd={}", self.0.handle.source.raw);
546+
log::trace!(
547+
"writable_owned: fd={:?}",
548+
&self.0.handle.source.registration
549+
);
554550
Poll::Ready(Ok(()))
555551
}
556552
}
@@ -610,8 +606,8 @@ impl<H: Borrow<crate::Async<T>> + Clone, T> Future for Ready<H, T> {
610606

611607
// Update interest in this I/O handle.
612608
if was_empty {
613-
Reactor::get().poller.modify(
614-
handle.borrow().source.raw,
609+
handle.borrow().source.registration.modify(
610+
&Reactor::get().poller,
615611
Event {
616612
key: handle.borrow().source.key,
617613
readable: !state[READ].is_empty(),

0 commit comments

Comments
 (0)