-
Notifications
You must be signed in to change notification settings - Fork 47
Add a way to batch spawn tasks #92
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -149,6 +149,85 @@ impl<'a> Executor<'a> { | |
pub fn spawn<T: Send + 'a>(&self, future: impl Future<Output = T> + Send + 'a) -> Task<T> { | ||
let mut active = self.state().active.lock().unwrap(); | ||
|
||
// SAFETY: `T` and the future are `Send`. | ||
unsafe { self.spawn_inner(future, &mut active) } | ||
} | ||
|
||
/// Spawns many tasks onto the executor. | ||
/// | ||
/// As opposed to the [`spawn`] method, this locks the executor's inner task lock once and | ||
/// spawns all of the tasks in one go. With large amounts of tasks this can improve | ||
/// contention. | ||
/// | ||
/// For very large numbers of tasks the lock is occasionally dropped and re-acquired to | ||
/// prevent runner thread starvation. It is assumed that the iterator provided does not | ||
/// block; blocking iterators can lock up the internal mutex and therefore the entire | ||
/// executor. | ||
/// | ||
/// ## Example | ||
/// | ||
/// ``` | ||
/// use async_executor::Executor; | ||
/// use futures_lite::{stream, prelude::*}; | ||
/// use std::future::ready; | ||
/// | ||
/// # futures_lite::future::block_on(async { | ||
/// let mut ex = Executor::new(); | ||
/// | ||
/// let futures = [ | ||
/// ready(1), | ||
/// ready(2), | ||
/// ready(3) | ||
/// ]; | ||
/// | ||
/// // Spawn all of the futures onto the executor at once. | ||
/// let mut tasks = vec![]; | ||
/// ex.spawn_many(futures, &mut tasks); | ||
/// | ||
/// // Await all of them. | ||
/// let results = ex.run(async move { | ||
/// stream::iter(tasks).then(|x| x).collect::<Vec<_>>().await | ||
/// }).await; | ||
/// assert_eq!(results, [1, 2, 3]); | ||
/// # }); | ||
/// ``` | ||
/// | ||
/// [`spawn`]: Executor::spawn | ||
pub fn spawn_many<T: Send + 'a, F: Future<Output = T> + Send + 'a>( | ||
&self, | ||
futures: impl IntoIterator<Item = F>, | ||
handles: &mut impl Extend<Task<F::Output>>, | ||
) { | ||
let mut active = Some(self.state().active.lock().unwrap()); | ||
|
||
// Convert the futures into tasks. | ||
let tasks = futures.into_iter().enumerate().map(move |(i, future)| { | ||
// SAFETY: `T` and the future are `Send`. | ||
let task = unsafe { self.spawn_inner(future, active.as_mut().unwrap()) }; | ||
|
||
// Yield the lock every once in a while to ease contention. | ||
if i.wrapping_sub(1) % 500 == 0 { | ||
drop(active.take()); | ||
active = Some(self.state().active.lock().unwrap()); | ||
} | ||
|
||
task | ||
}); | ||
|
||
// Push the tasks to the user's collection. | ||
handles.extend(tasks); | ||
} | ||
|
||
/// Spawn a future while holding the inner lock. | ||
/// | ||
/// # Safety | ||
/// | ||
/// If this is an `Executor`, `F` and `T` must be `Send`. | ||
unsafe fn spawn_inner<T: 'a>( | ||
&self, | ||
future: impl Future<Output = T> + 'a, | ||
active: &mut Slab<Waker>, | ||
) -> Task<T> { | ||
// Remove the task from the set of active tasks when the future finishes. | ||
let entry = active.vacant_entry(); | ||
let index = entry.key(); | ||
|
@@ -159,11 +238,30 @@ impl<'a> Executor<'a> { | |
}; | ||
|
||
// Create the task and register it in the set of active tasks. | ||
let (runnable, task) = unsafe { | ||
Builder::new() | ||
.propagate_panic(true) | ||
.spawn_unchecked(|()| future, self.schedule()) | ||
}; | ||
// | ||
// SAFETY: | ||
// | ||
// If `future` is not `Send`, this must be a `LocalExecutor` as per this | ||
// function's unsafe precondition. Since `LocalExecutor` is `!Sync`, | ||
// `try_tick`, `tick` and `run` can only be called from the origin | ||
// thread of the `LocalExecutor`. Similarly, `spawn` can only be called | ||
// from the origin thread, ensuring that `future` and the executor share | ||
// the same origin thread. The `Runnable` can be scheduled from other | ||
// threads, but because of the above `Runnable` can only be called or | ||
// dropped on the origin thread. | ||
// | ||
// `future` is not `'static`, but we make sure that the `Runnable` does | ||
// not outlive `'a`. When the executor is dropped, the `active` field is | ||
// drained and all of the `Waker`s are woken. Then, the queue inside of | ||
// the `Executor` is drained of all of its runnables. This ensures that | ||
// runnables are dropped and this precondition is satisfied. | ||
// | ||
// `self.schedule()` is `Send`, `Sync` and `'static`, as checked below. | ||
// Therefore we do not need to worry about what is done with the | ||
// `Waker`. | ||
let (runnable, task) = Builder::new() | ||
.propagate_panic(true) | ||
.spawn_unchecked(|()| future, self.schedule()); | ||
entry.insert(runnable.waker()); | ||
|
||
runnable.schedule(); | ||
|
@@ -292,7 +390,7 @@ impl<'a> Executor<'a> { | |
impl Drop for Executor<'_> { | ||
fn drop(&mut self) { | ||
if let Some(state) = self.state.get() { | ||
let mut active = state.active.lock().unwrap(); | ||
let mut active = state.active.lock().unwrap_or_else(|e| e.into_inner()); | ||
for w in active.drain() { | ||
w.wake(); | ||
} | ||
|
@@ -397,25 +495,70 @@ impl<'a> LocalExecutor<'a> { | |
pub fn spawn<T: 'a>(&self, future: impl Future<Output = T> + 'a) -> Task<T> { | ||
let mut active = self.inner().state().active.lock().unwrap(); | ||
|
||
// Remove the task from the set of active tasks when the future finishes. | ||
let entry = active.vacant_entry(); | ||
let index = entry.key(); | ||
let state = self.inner().state().clone(); | ||
let future = async move { | ||
let _guard = CallOnDrop(move || drop(state.active.lock().unwrap().try_remove(index))); | ||
future.await | ||
}; | ||
// SAFETY: This executor is not thread safe, so the future and its result | ||
// cannot be sent to another thread. | ||
unsafe { self.inner().spawn_inner(future, &mut active) } | ||
} | ||
|
||
// Create the task and register it in the set of active tasks. | ||
let (runnable, task) = unsafe { | ||
Builder::new() | ||
.propagate_panic(true) | ||
.spawn_unchecked(|()| future, self.schedule()) | ||
}; | ||
entry.insert(runnable.waker()); | ||
/// Spawns many tasks onto the executor. | ||
/// | ||
/// As opposed to the [`spawn`] method, this locks the executor's inner task lock once and | ||
/// spawns all of the tasks in one go. With large amounts of tasks this can improve | ||
/// contention. | ||
/// | ||
/// It is assumed that the iterator provided does not block; blocking iterators can lock up | ||
/// the internal mutex and therefore the entire executor. Unlike [`Executor::spawn`], the | ||
/// mutex is not released, as there are no other threads that can poll this executor. | ||
/// | ||
/// ## Example | ||
/// | ||
/// ``` | ||
/// use async_executor::LocalExecutor; | ||
/// use futures_lite::{stream, prelude::*}; | ||
/// use std::future::ready; | ||
/// | ||
/// # futures_lite::future::block_on(async { | ||
/// let mut ex = LocalExecutor::new(); | ||
/// | ||
/// let futures = [ | ||
/// ready(1), | ||
/// ready(2), | ||
/// ready(3) | ||
/// ]; | ||
/// | ||
/// // Spawn all of the futures onto the executor at once. | ||
/// let mut tasks = vec![]; | ||
/// ex.spawn_many(futures, &mut tasks); | ||
/// | ||
/// // Await all of them. | ||
/// let results = ex.run(async move { | ||
/// stream::iter(tasks).then(|x| x).collect::<Vec<_>>().await | ||
/// }).await; | ||
/// assert_eq!(results, [1, 2, 3]); | ||
/// # }); | ||
/// ``` | ||
/// | ||
/// [`spawn`]: LocalExecutor::spawn | ||
/// [`Executor::spawn_many`]: Executor::spawn_many | ||
pub fn spawn_many<T: Send + 'a, F: Future<Output = T> + Send + 'a>( | ||
&self, | ||
futures: impl IntoIterator<Item = F>, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This forces the futures to be the same type and also requires something to collect the tasks. If we're ok with users potentially injecting extra non-blocking code via iterator implementations (with the doc comments as a deterrent), wouldn't a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's much harder to accidentally create a blocking |
||
handles: &mut impl Extend<Task<F::Output>>, | ||
) { | ||
let mut active = self.inner().state().active.lock().unwrap(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For a local executor, why does it need a lock at all? Probably not necessary to address in this PR, but I thought the local executor itself could be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would require a reimplementation of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok, as a change this would propagate through the entire code base because it would require a different There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's probably doable with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think a proper abstraction would require a lot of interior mutability, we just can't directly abstract all that without pulling in niche dependencies, I suppose.. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Generally thread unsafe primitives are provided by |
||
|
||
runnable.schedule(); | ||
task | ||
// Convert all of the futures to tasks. | ||
let tasks = futures.into_iter().map(|future| { | ||
// SAFETY: This executor is not thread safe, so the future and its result | ||
// cannot be sent to another thread. | ||
unsafe { self.inner().spawn_inner(future, &mut active) } | ||
|
||
// As only one thread can spawn or poll tasks at a time, there is no need | ||
// to release lock contention here. | ||
}); | ||
|
||
// Push them to the user's collection. | ||
handles.extend(tasks); | ||
} | ||
|
||
/// Attempts to run a task if at least one is scheduled. | ||
|
@@ -481,16 +624,6 @@ impl<'a> LocalExecutor<'a> { | |
self.inner().run(future).await | ||
} | ||
|
||
/// Returns a function that schedules a runnable task when it gets woken up. | ||
fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static { | ||
let state = self.inner().state().clone(); | ||
|
||
move |runnable| { | ||
state.queue.push(runnable).unwrap(); | ||
state.notify(); | ||
} | ||
} | ||
|
||
/// Returns a reference to the inner executor. | ||
fn inner(&self) -> &Executor<'a> { | ||
&self.inner | ||
|
@@ -953,6 +1086,7 @@ fn _ensure_send_and_sync() { | |
|
||
fn is_send<T: Send>(_: T) {} | ||
fn is_sync<T: Sync>(_: T) {} | ||
fn is_static<T: 'static>(_: T) {} | ||
|
||
is_send::<Executor<'_>>(Executor::new()); | ||
is_sync::<Executor<'_>>(Executor::new()); | ||
|
@@ -962,6 +1096,9 @@ fn _ensure_send_and_sync() { | |
is_sync(ex.run(pending::<()>())); | ||
is_send(ex.tick()); | ||
is_sync(ex.tick()); | ||
is_send(ex.schedule()); | ||
is_sync(ex.schedule()); | ||
is_static(ex.schedule()); | ||
|
||
/// ```compile_fail | ||
/// use async_executor::LocalExecutor; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
use async_executor::{Executor, LocalExecutor}; | ||
use futures_lite::future; | ||
|
||
#[cfg(not(miri))] | ||
const READY_COUNT: usize = 50_000; | ||
#[cfg(miri)] | ||
const READY_COUNT: usize = 505; | ||
|
||
#[test] | ||
fn spawn_many() { | ||
future::block_on(async { | ||
let ex = Executor::new(); | ||
|
||
// Spawn a lot of tasks. | ||
let mut tasks = vec![]; | ||
ex.spawn_many((0..READY_COUNT).map(future::ready), &mut tasks); | ||
|
||
// Run all of the tasks in parallel. | ||
ex.run(async move { | ||
for (i, task) in tasks.into_iter().enumerate() { | ||
assert_eq!(task.await, i); | ||
} | ||
}) | ||
.await; | ||
}); | ||
} | ||
|
||
#[test] | ||
fn spawn_many_local() { | ||
future::block_on(async { | ||
let ex = LocalExecutor::new(); | ||
|
||
// Spawn a lot of tasks. | ||
let mut tasks = vec![]; | ||
ex.spawn_many((0..READY_COUNT).map(future::ready), &mut tasks); | ||
|
||
// Run all of the tasks in parallel. | ||
ex.run(async move { | ||
for (i, task) in tasks.into_iter().enumerate() { | ||
assert_eq!(task.await, i); | ||
} | ||
}) | ||
.await; | ||
}); | ||
} |
Uh oh!
There was an error while loading. Please reload this page.