Skip to content

Commit 951959d

Browse files
authored
Merge pull request #10 from async-rs/traits
Use a trait-based design for stop-token.
2 parents 7e42721 + 3082b37 commit 951959d

File tree

9 files changed

+482
-152
lines changed

9 files changed

+482
-152
lines changed

.github/workflows/ci.yml

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,15 @@ jobs:
2525
toolchain: nightly
2626
override: true
2727

28-
- name: tests
28+
- name: tests async-io
2929
uses: actions-rs/cargo@v1
3030
with:
3131
command: test
32+
args: --features async-io
33+
34+
- name: tests tokio
35+
uses: actions-rs/cargo@v1
36+
with:
37+
command: test
38+
args: --features tokio
39+

Cargo.toml

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,20 @@ repository = "https://github.com/async-rs/stop-token"
88

99
description = "Experimental cooperative cancellation for async-std"
1010

11+
[package.metadata.docs.rs]
12+
features = ["docs"]
13+
rustdoc-args = ["--cfg", "feature=\"docs\""]
14+
15+
[features]
16+
docs = ["async-io"]
17+
1118
[dependencies]
1219
pin-project-lite = "0.2.0"
13-
async-std = "1.8"
20+
async-channel = "1.6.1"
21+
futures-core = "0.3.17"
22+
tokio = { version = "1.12.0", features = ["time"], optional = true }
23+
async-io = { version = "1.6.0", optional = true }
24+
25+
[dev-dependencies]
26+
async-std = "1.10.0"
27+
tokio = { version = "1.12.0", features = ["rt", "macros"] }

src/deadline.rs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
use core::fmt;
2+
use std::{error::Error, future::Future, io};
3+
4+
/// An error returned when a future times out.
5+
#[derive(Clone, Copy, Eq, PartialEq, PartialOrd, Ord)]
6+
pub struct TimedOutError {
7+
_private: (),
8+
}
9+
10+
impl fmt::Debug for TimedOutError {
11+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
12+
f.debug_struct("TimeoutError").finish()
13+
}
14+
}
15+
16+
impl TimedOutError {
17+
pub(crate) fn new() -> Self {
18+
Self { _private: () }
19+
}
20+
}
21+
22+
impl Error for TimedOutError {}
23+
24+
impl Into<io::Error> for TimedOutError {
25+
fn into(self) -> io::Error {
26+
io::Error::new(io::ErrorKind::TimedOut, "Future has timed out")
27+
}
28+
}
29+
30+
impl fmt::Display for TimedOutError {
31+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
32+
"Future has timed out".fmt(f)
33+
}
34+
}
35+
36+
/// Conversion into a deadline.
37+
///
38+
/// A deadline is a future which resolves after a certain period or event.
39+
pub trait IntoDeadline {
40+
/// Which kind of future are we turning this into?
41+
type Deadline: Future<Output = ()>;
42+
43+
/// Creates a deadline from a value.
44+
fn into_deadline(self) -> Self::Deadline;
45+
}

src/future.rs

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
//! Extension methods and types for the `Future` trait.
2+
3+
use crate::{deadline::TimedOutError, IntoDeadline};
4+
use core::future::Future;
5+
use core::pin::Pin;
6+
7+
use pin_project_lite::pin_project;
8+
use std::task::{Context, Poll};
9+
10+
/// Extend the `Future` trait with the `until` method.
11+
pub trait FutureExt: Future {
12+
/// Run a future until it resolves, or until a deadline is hit.
13+
fn until<T, D>(self, target: T) -> Stop<Self, D>
14+
where
15+
Self: Sized,
16+
T: IntoDeadline<Deadline = D>,
17+
{
18+
Stop {
19+
deadline: target.into_deadline(),
20+
future: self,
21+
}
22+
}
23+
}
24+
25+
pin_project! {
26+
/// Run a future until it resolves, or until a deadline is hit.
27+
///
28+
/// This method is returned by [`FutureExt::deadline`].
29+
#[must_use = "Futures do nothing unless polled or .awaited"]
30+
#[derive(Debug)]
31+
pub struct Stop<F, D> {
32+
#[pin]
33+
future: F,
34+
#[pin]
35+
deadline: D,
36+
}
37+
}
38+
39+
impl<F, D> Future for Stop<F, D>
40+
where
41+
F: Future,
42+
D: Future<Output = ()>,
43+
{
44+
type Output = Result<F::Output, TimedOutError>;
45+
46+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
47+
let this = self.project();
48+
if let Poll::Ready(()) = this.deadline.poll(cx) {
49+
return Poll::Ready(Err(TimedOutError::new()));
50+
}
51+
match this.future.poll(cx) {
52+
Poll::Pending => Poll::Pending,
53+
Poll::Ready(it) => Poll::Ready(Ok(it)),
54+
}
55+
}
56+
}

src/lib.rs

Lines changed: 24 additions & 146 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,6 @@
55
//! Experimental. The library works as is, breaking changes will bump major
66
//! version, but there are no guarantees of long-term support.
77
//!
8-
//! Additionally, this library uses unstable cargo feature feature of `async-std` and, for
9-
//! this reason, should be used like this:
10-
//!
11-
//! ```toml
12-
//! [dependencies.stop-token]
13-
//! version = "0.1.0"
14-
//! features = [ "unstable" ]
15-
//! ```
16-
//!
178
//! # Motivation
189
//!
1910
//! Rust futures come with a build-in cancellation mechanism: dropping a future
@@ -47,13 +38,14 @@
4738
//!
4839
//! ```
4940
//! use async_std::prelude::*;
41+
//! use stop_token::prelude::*;
5042
//! use stop_token::StopToken;
5143
//!
5244
//! struct Event;
5345
//!
54-
//! async fn do_work(work: impl Stream<Item = Event> + Unpin, stop_token: StopToken) {
55-
//! let mut work = stop_token.stop_stream(work);
56-
//! while let Some(event) = work.next().await {
46+
//! async fn do_work(work: impl Stream<Item = Event> + Unpin, stop: StopToken) {
47+
//! let mut work = work.until(stop);
48+
//! while let Some(Ok(event)) = work.next().await {
5749
//! process_event(event).await
5850
//! }
5951
//! }
@@ -62,145 +54,31 @@
6254
//! }
6355
//! ```
6456
//!
57+
//! # Features
58+
//!
59+
//! The `time` submodule is empty when no features are enabled. To implement [`Deadline`]
60+
//! for `Instant` and `Duration` you can enable one of the following features:
61+
//!
62+
//! - `async-io`: for use with the `async-std` or `smol` runtimes.
63+
//! - `tokio`: for use with the `tokio` runtime.
64+
//!
6565
//! # Lineage
6666
//!
6767
//! The cancellation system is a subset of `C#` [`CancellationToken / CancellationTokenSource`](https://docs.microsoft.com/en-us/dotnet/standard/threading/cancellation-in-managed-threads).
68-
//! The `StopToken / StopTokenSource` terminology is borrowed from C++ paper P0660: https://wg21.link/p0660.
69-
70-
use std::pin::Pin;
71-
use std::task::{Context, Poll};
72-
73-
use async_std::prelude::*;
68+
//! The `StopToken / StopTokenSource` terminology is borrowed from [C++ paper P0660](https://wg21.link/p0660).
7469
75-
use async_std::channel::{self, Receiver, Sender};
76-
use pin_project_lite::pin_project;
77-
78-
enum Never {}
79-
80-
/// `StopSource` produces `StopToken` and cancels all of its tokens on drop.
81-
///
82-
/// # Example:
83-
///
84-
/// ```ignore
85-
/// let stop_source = StopSource::new();
86-
/// let stop_token = stop_source.stop_token();
87-
/// schedule_some_work(stop_token);
88-
/// drop(stop_source); // At this point, scheduled work notices that it is canceled.
89-
/// ```
90-
#[derive(Debug)]
91-
pub struct StopSource {
92-
/// Solely for `Drop`.
93-
_chan: Sender<Never>,
94-
stop_token: StopToken,
95-
}
96-
97-
/// `StopToken` is a future which completes when the associated `StopSource` is dropped.
98-
#[derive(Debug, Clone)]
99-
pub struct StopToken {
100-
chan: Receiver<Never>,
101-
}
102-
103-
impl Default for StopSource {
104-
fn default() -> StopSource {
105-
let (sender, receiver) = channel::bounded::<Never>(1);
106-
107-
StopSource {
108-
_chan: sender,
109-
stop_token: StopToken { chan: receiver },
110-
}
111-
}
112-
}
70+
pub mod future;
71+
pub mod stream;
72+
pub mod time;
11373

114-
impl StopSource {
115-
/// Creates a new `StopSource`.
116-
pub fn new() -> StopSource {
117-
StopSource::default()
118-
}
119-
120-
/// Produces a new `StopToken`, associated with this source.
121-
///
122-
/// Once the source is destroyed, `StopToken` future completes.
123-
pub fn stop_token(&self) -> StopToken {
124-
self.stop_token.clone()
125-
}
126-
}
127-
128-
impl Future for StopToken {
129-
type Output = ();
130-
131-
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
132-
let chan = Pin::new(&mut self.chan);
133-
match Stream::poll_next(chan, cx) {
134-
Poll::Pending => Poll::Pending,
135-
Poll::Ready(Some(never)) => match never {},
136-
Poll::Ready(None) => Poll::Ready(()),
137-
}
138-
}
139-
}
140-
141-
impl StopToken {
142-
/// Applies the token to the `stream`, such that the resulting stream
143-
/// produces no more items once the token becomes cancelled.
144-
pub fn stop_stream<S: Stream>(&self, stream: S) -> StopStream<S> {
145-
StopStream {
146-
stop_token: self.clone(),
147-
stream,
148-
}
149-
}
150-
151-
/// Applies the token to the `future`, such that the resulting future
152-
/// completes with `None` if the token is cancelled.
153-
pub fn stop_future<F: Future>(&self, future: F) -> StopFuture<F> {
154-
StopFuture {
155-
stop_token: self.clone(),
156-
future,
157-
}
158-
}
159-
}
160-
161-
pin_project! {
162-
#[derive(Debug)]
163-
pub struct StopStream<S> {
164-
#[pin]
165-
stop_token: StopToken,
166-
#[pin]
167-
stream: S,
168-
}
169-
}
170-
171-
impl<S: Stream> Stream for StopStream<S> {
172-
type Item = S::Item;
173-
174-
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
175-
let this = self.project();
176-
if let Poll::Ready(()) = this.stop_token.poll(cx) {
177-
return Poll::Ready(None);
178-
}
179-
this.stream.poll_next(cx)
180-
}
181-
}
182-
183-
pin_project! {
184-
#[derive(Debug)]
185-
pub struct StopFuture<F> {
186-
#[pin]
187-
stop_token: StopToken,
188-
#[pin]
189-
future: F,
190-
}
191-
}
74+
mod deadline;
75+
mod stop_source;
19276

193-
impl<F: Future> Future for StopFuture<F> {
194-
type Output = Option<F::Output>;
77+
pub use deadline::{IntoDeadline, TimedOutError};
78+
pub use stop_source::{StopSource, StopToken};
19579

196-
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<F::Output>> {
197-
let this = self.project();
198-
if let Poll::Ready(()) = this.stop_token.poll(cx) {
199-
return Poll::Ready(None);
200-
}
201-
match this.future.poll(cx) {
202-
Poll::Pending => Poll::Pending,
203-
Poll::Ready(it) => Poll::Ready(Some(it)),
204-
}
205-
}
80+
/// A prelude for `stop-token`.
81+
pub mod prelude {
82+
pub use crate::future::FutureExt as _;
83+
pub use crate::stream::StreamExt as _;
20684
}

0 commit comments

Comments
 (0)