Skip to content

Commit 913b236

Browse files
authored
feat: Expose other kqueue filters (#112)
1 parent 0c3f75f commit 913b236

File tree

9 files changed

+562
-32
lines changed

9 files changed

+562
-32
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ concurrent-queue = "2.2.0"
2929
futures-lite = "1.11.0"
3030
log = "0.4.11"
3131
parking = "2.0.0"
32-
polling = "2.0.0"
32+
polling = "2.6.0"
3333
rustix = { version = "0.37.1", default-features = false, features = ["std", "fs"] }
3434
slab = "0.4.2"
3535
socket2 = { version = "0.4.2", features = ["all"] }

examples/kqueue-process.rs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
//! Uses the `async_io::os::kqueue` module to wait for a process to terminate.
2+
//!
3+
//! Run with:
4+
//!
5+
//! ```
6+
//! cargo run --example kqueue-process
7+
//! ```
8+
9+
#[cfg(any(
10+
target_os = "macos",
11+
target_os = "ios",
12+
target_os = "tvos",
13+
target_os = "watchos",
14+
target_os = "freebsd",
15+
target_os = "netbsd",
16+
target_os = "openbsd",
17+
target_os = "dragonfly",
18+
))]
19+
fn main() -> std::io::Result<()> {
20+
use std::process::Command;
21+
22+
use async_io::os::kqueue::{Exit, Filter};
23+
use futures_lite::future;
24+
25+
future::block_on(async {
26+
// Spawn a process.
27+
let process = Command::new("sleep")
28+
.arg("3")
29+
.spawn()
30+
.expect("failed to spawn process");
31+
32+
// Wrap the process in an `Async` object that waits for it to exit.
33+
let process = Filter::new(Exit::new(process))?;
34+
35+
// Wait for the process to exit.
36+
process.ready().await?;
37+
38+
Ok(())
39+
})
40+
}
41+
42+
#[cfg(not(any(
43+
target_os = "macos",
44+
target_os = "ios",
45+
target_os = "tvos",
46+
target_os = "watchos",
47+
target_os = "freebsd",
48+
target_os = "netbsd",
49+
target_os = "openbsd",
50+
target_os = "dragonfly",
51+
)))]
52+
fn main() {
53+
println!("This example only works for kqueue-enabled platforms.");
54+
}

src/lib.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ use crate::reactor::{Reactor, Source};
8888
mod driver;
8989
mod reactor;
9090

91+
pub mod os;
92+
9193
pub use driver::block_on;
9294
pub use reactor::{Readable, ReadableOwned, Writable, WritableOwned};
9395

@@ -685,7 +687,7 @@ impl<T: AsRawFd> Async<T> {
685687
#[cfg(unix)]
686688
impl<T: AsRawFd> AsRawFd for Async<T> {
687689
fn as_raw_fd(&self) -> RawFd {
688-
self.source.raw
690+
self.get_ref().as_raw_fd()
689691
}
690692
}
691693

@@ -761,7 +763,7 @@ impl<T: AsRawSocket> Async<T> {
761763
#[cfg(windows)]
762764
impl<T: AsRawSocket> AsRawSocket for Async<T> {
763765
fn as_raw_socket(&self) -> RawSocket {
764-
self.source.raw
766+
self.get_ref().as_raw_socket()
765767
}
766768
}
767769

src/os.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
//! Platform-specific functionality.
2+
3+
#[cfg(any(
4+
target_os = "macos",
5+
target_os = "ios",
6+
target_os = "tvos",
7+
target_os = "watchos",
8+
target_os = "freebsd",
9+
target_os = "netbsd",
10+
target_os = "openbsd",
11+
target_os = "dragonfly",
12+
))]
13+
pub mod kqueue;

src/os/kqueue.rs

Lines changed: 269 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,269 @@
1+
//! Functionality that is only available for `kqueue`-based platforms.
2+
3+
use __private::QueueableSealed;
4+
5+
use crate::reactor::{Reactor, Readable, Registration};
6+
use crate::Async;
7+
8+
use std::convert::{TryFrom, TryInto};
9+
use std::future::Future;
10+
use std::io::{Error, Result};
11+
use std::os::unix::io::{AsRawFd, RawFd};
12+
use std::pin::Pin;
13+
use std::process::Child;
14+
use std::task::{Context, Poll};
15+
16+
#[cfg(not(async_io_no_io_safety))]
17+
use std::os::unix::io::{AsFd, BorrowedFd, OwnedFd};
18+
19+
/// A wrapper around a queueable object that waits until it is ready.
20+
///
21+
/// The underlying `kqueue` implementation can be used to poll for events besides file descriptor
22+
/// read/write readiness. This API makes these faculties available to the user.
23+
///
24+
/// See the [`Queueable`] trait and its implementors for objects that currently support being registered
25+
/// into the reactor.
26+
#[derive(Debug)]
27+
pub struct Filter<T>(Async<T>);
28+
29+
impl<T> AsRef<T> for Filter<T> {
30+
fn as_ref(&self) -> &T {
31+
self.0.as_ref()
32+
}
33+
}
34+
35+
impl<T> AsMut<T> for Filter<T> {
36+
fn as_mut(&mut self) -> &mut T {
37+
self.0.as_mut()
38+
}
39+
}
40+
41+
impl<T: Queueable> Filter<T> {
42+
/// Create a new [`Filter`] around a [`Queueable`].
43+
///
44+
/// # Examples
45+
///
46+
/// ```no_run
47+
/// use std::process::Command;
48+
/// use async_io::os::kqueue::{Exit, Filter};
49+
///
50+
/// // Create a new process to wait for.
51+
/// let mut child = Command::new("sleep").arg("5").spawn().unwrap();
52+
///
53+
/// // Wrap the process in an `Async` object that waits for it to exit.
54+
/// let process = Filter::new(Exit::new(child)).unwrap();
55+
///
56+
/// // Wait for the process to exit.
57+
/// # async_io::block_on(async {
58+
/// process.ready().await.unwrap();
59+
/// # });
60+
/// ```
61+
pub fn new(mut filter: T) -> Result<Self> {
62+
Ok(Self(Async {
63+
source: Reactor::get().insert_io(filter.registration())?,
64+
io: Some(filter),
65+
}))
66+
}
67+
}
68+
69+
impl<T: AsRawFd> AsRawFd for Filter<T> {
70+
fn as_raw_fd(&self) -> RawFd {
71+
self.0.as_raw_fd()
72+
}
73+
}
74+
75+
#[cfg(not(async_io_no_io_safety))]
76+
impl<T: AsFd> AsFd for Filter<T> {
77+
fn as_fd(&self) -> BorrowedFd<'_> {
78+
self.0.as_fd()
79+
}
80+
}
81+
82+
#[cfg(not(async_io_no_io_safety))]
83+
impl<T: AsRawFd + From<OwnedFd>> TryFrom<OwnedFd> for Filter<T> {
84+
type Error = Error;
85+
86+
fn try_from(fd: OwnedFd) -> Result<Self> {
87+
Ok(Self(Async::try_from(fd)?))
88+
}
89+
}
90+
91+
#[cfg(not(async_io_no_io_safety))]
92+
impl<T: Into<OwnedFd>> TryFrom<Filter<T>> for OwnedFd {
93+
type Error = Error;
94+
95+
fn try_from(filter: Filter<T>) -> Result<Self> {
96+
filter.0.try_into()
97+
}
98+
}
99+
100+
impl<T> Filter<T> {
101+
/// Gets a reference to the underlying [`Queueable`] object.
102+
///
103+
/// # Examples
104+
///
105+
/// ```
106+
/// use async_io::os::kqueue::{Exit, Filter};
107+
///
108+
/// # futures_lite::future::block_on(async {
109+
/// let child = std::process::Command::new("sleep").arg("5").spawn().unwrap();
110+
/// let process = Filter::new(Exit::new(child)).unwrap();
111+
/// let inner = process.get_ref();
112+
/// # });
113+
/// ```
114+
pub fn get_ref(&self) -> &T {
115+
self.0.get_ref()
116+
}
117+
118+
/// Gets a mutable reference to the underlying [`Queueable`] object.
119+
///
120+
/// # Examples
121+
///
122+
/// ```
123+
/// use async_io::os::kqueue::{Exit, Filter};
124+
///
125+
/// # futures_lite::future::block_on(async {
126+
/// let child = std::process::Command::new("sleep").arg("5").spawn().unwrap();
127+
/// let mut process = Filter::new(Exit::new(child)).unwrap();
128+
/// let inner = process.get_mut();
129+
/// # });
130+
/// ```
131+
pub fn get_mut(&mut self) -> &mut T {
132+
self.0.get_mut()
133+
}
134+
135+
/// Unwraps the inner [`Queueable`] object.
136+
///
137+
/// # Examples
138+
///
139+
/// ```
140+
/// use async_io::os::kqueue::{Exit, Filter};
141+
///
142+
/// # futures_lite::future::block_on(async {
143+
/// let child = std::process::Command::new("sleep").arg("5").spawn().unwrap();
144+
/// let process = Filter::new(Exit::new(child)).unwrap();
145+
/// let inner = process.into_inner().unwrap();
146+
/// # });
147+
/// ```
148+
pub fn into_inner(self) -> Result<T> {
149+
self.0.into_inner()
150+
}
151+
152+
/// Waits until the [`Queueable`] object is ready.
153+
///
154+
/// This method completes when the underlying [`Queueable`] object has completed. See the documentation
155+
/// for the [`Queueable`] object for more information.
156+
///
157+
/// # Examples
158+
///
159+
/// ```no_run
160+
/// use std::process::Command;
161+
/// use async_io::os::kqueue::{Exit, Filter};
162+
///
163+
/// # futures_lite::future::block_on(async {
164+
/// let child = Command::new("sleep").arg("5").spawn()?;
165+
/// let process = Filter::new(Exit::new(child))?;
166+
///
167+
/// // Wait for the process to exit.
168+
/// process.ready().await?;
169+
/// # std::io::Result::Ok(()) });
170+
/// ```
171+
pub fn ready(&self) -> Ready<'_, T> {
172+
Ready(self.0.readable())
173+
}
174+
175+
/// Polls the I/O handle for readiness.
176+
///
177+
/// When this method returns [`Poll::Ready`], that means that the OS has delivered a notification
178+
/// that the underlying [`Queueable`] object is ready. See the documentation for the [`Queueable`]
179+
/// object for more information.
180+
///
181+
/// # Caveats
182+
///
183+
/// Two different tasks should not call this method concurrently. Otherwise, conflicting tasks
184+
/// will just keep waking each other in turn, thus wasting CPU time.
185+
///
186+
/// # Examples
187+
///
188+
/// ```no_run
189+
/// use std::process::Command;
190+
/// use async_io::os::kqueue::{Exit, Filter};
191+
/// use futures_lite::future;
192+
///
193+
/// # futures_lite::future::block_on(async {
194+
/// let child = Command::new("sleep").arg("5").spawn()?;
195+
/// let process = Filter::new(Exit::new(child))?;
196+
///
197+
/// // Wait for the process to exit.
198+
/// future::poll_fn(|cx| process.poll_ready(cx)).await?;
199+
/// # std::io::Result::Ok(()) });
200+
/// ```
201+
pub fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<()>> {
202+
self.0.poll_readable(cx)
203+
}
204+
}
205+
206+
/// Future for [`Filter::ready`].
207+
#[must_use = "futures do nothing unless you `.await` or poll them"]
208+
#[derive(Debug)]
209+
pub struct Ready<'a, T>(Readable<'a, T>);
210+
211+
impl<T> Future for Ready<'_, T> {
212+
type Output = Result<()>;
213+
214+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
215+
Pin::new(&mut self.0).poll(cx)
216+
}
217+
}
218+
219+
/// Objects that can be registered into the reactor via a [`Async`](crate::Async).
220+
///
221+
/// These objects represent other filters associated with the `kqueue` runtime aside from readability
222+
/// and writability. Rather than waiting on readable/writable, they wait on "readiness". This is
223+
/// typically used for signals and child process exits.
224+
pub trait Queueable: QueueableSealed {}
225+
226+
/// An object representing a signal.
227+
///
228+
/// When registered into [`Async`](crate::Async) via [`with_filter`](AsyncKqueueExt::with_filter),
229+
/// it will return a [`readable`](crate::Async::readable) event when the signal is received.
230+
#[derive(Debug, Copy, Clone, Eq, PartialEq, PartialOrd, Ord, Hash)]
231+
pub struct Signal(pub i32);
232+
233+
impl QueueableSealed for Signal {
234+
fn registration(&mut self) -> Registration {
235+
Registration::Signal(*self)
236+
}
237+
}
238+
impl Queueable for Signal {}
239+
240+
/// Wait for a child process to exit.
241+
///
242+
/// When registered into [`Async`](crate::Async) via [`with_filter`](AsyncKqueueExt::with_filter),
243+
/// it will return a [`readable`](crate::Async::readable) event when the child process exits.
244+
#[derive(Debug)]
245+
pub struct Exit(Option<Child>);
246+
247+
impl Exit {
248+
/// Create a new `Exit` object.
249+
pub fn new(child: Child) -> Self {
250+
Self(Some(child))
251+
}
252+
}
253+
254+
impl QueueableSealed for Exit {
255+
fn registration(&mut self) -> Registration {
256+
Registration::Process(self.0.take().expect("Cannot reregister child"))
257+
}
258+
}
259+
impl Queueable for Exit {}
260+
261+
mod __private {
262+
use crate::reactor::Registration;
263+
264+
#[doc(hidden)]
265+
pub trait QueueableSealed {
266+
/// Get a registration object for this filter.
267+
fn registration(&mut self) -> Registration;
268+
}
269+
}

0 commit comments

Comments
 (0)