Skip to content

Add a way to batch spawn tasks #92

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions benches/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,21 @@ fn running_benches(c: &mut Criterion) {
);
});

group.bench_function("executor::spawn_batch", |b| {
run(
|| {
let mut handles = vec![];

b.iter(|| {
EX.spawn_many((0..250).map(|_| future::yield_now()), &mut handles);
});

handles.clear();
},
*multithread,
)
});

group.bench_function("executor::spawn_many_local", |b| {
run(
|| {
Expand Down
203 changes: 170 additions & 33 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,85 @@ impl<'a> Executor<'a> {
pub fn spawn<T: Send + 'a>(&self, future: impl Future<Output = T> + Send + 'a) -> Task<T> {
let mut active = self.state().active.lock().unwrap();

// SAFETY: `T` and the future are `Send`.
unsafe { self.spawn_inner(future, &mut active) }
}

/// Spawns many tasks onto the executor.
///
/// As opposed to the [`spawn`] method, this locks the executor's inner task lock once and
/// spawns all of the tasks in one go. With large amounts of tasks this can improve
/// contention.
///
/// For very large numbers of tasks the lock is occasionally dropped and re-acquired to
/// prevent runner thread starvation. It is assumed that the iterator provided does not
/// block; blocking iterators can lock up the internal mutex and therefore the entire
/// executor.
///
/// ## Example
///
/// ```
/// use async_executor::Executor;
/// use futures_lite::{stream, prelude::*};
/// use std::future::ready;
///
/// # futures_lite::future::block_on(async {
/// let mut ex = Executor::new();
///
/// let futures = [
/// ready(1),
/// ready(2),
/// ready(3)
/// ];
///
/// // Spawn all of the futures onto the executor at once.
/// let mut tasks = vec![];
/// ex.spawn_many(futures, &mut tasks);
///
/// // Await all of them.
/// let results = ex.run(async move {
/// stream::iter(tasks).then(|x| x).collect::<Vec<_>>().await
/// }).await;
/// assert_eq!(results, [1, 2, 3]);
/// # });
/// ```
///
/// [`spawn`]: Executor::spawn
pub fn spawn_many<T: Send + 'a, F: Future<Output = T> + Send + 'a>(
&self,
futures: impl IntoIterator<Item = F>,
handles: &mut impl Extend<Task<F::Output>>,
) {
let mut active = Some(self.state().active.lock().unwrap());

// Convert the futures into tasks.
let tasks = futures.into_iter().enumerate().map(move |(i, future)| {
// SAFETY: `T` and the future are `Send`.
let task = unsafe { self.spawn_inner(future, active.as_mut().unwrap()) };

// Yield the lock every once in a while to ease contention.
if i.wrapping_sub(1) % 500 == 0 {
drop(active.take());
active = Some(self.state().active.lock().unwrap());
}

task
});

// Push the tasks to the user's collection.
handles.extend(tasks);
}

/// Spawn a future while holding the inner lock.
///
/// # Safety
///
/// If this is an `Executor`, `F` and `T` must be `Send`.
unsafe fn spawn_inner<T: 'a>(
&self,
future: impl Future<Output = T> + 'a,
active: &mut Slab<Waker>,
) -> Task<T> {
// Remove the task from the set of active tasks when the future finishes.
let entry = active.vacant_entry();
let index = entry.key();
Expand All @@ -159,11 +238,30 @@ impl<'a> Executor<'a> {
};

// Create the task and register it in the set of active tasks.
let (runnable, task) = unsafe {
Builder::new()
.propagate_panic(true)
.spawn_unchecked(|()| future, self.schedule())
};
//
// SAFETY:
//
// If `future` is not `Send`, this must be a `LocalExecutor` as per this
// function's unsafe precondition. Since `LocalExecutor` is `!Sync`,
// `try_tick`, `tick` and `run` can only be called from the origin
// thread of the `LocalExecutor`. Similarly, `spawn` can only be called
// from the origin thread, ensuring that `future` and the executor share
// the same origin thread. The `Runnable` can be scheduled from other
// threads, but because of the above `Runnable` can only be called or
// dropped on the origin thread.
//
// `future` is not `'static`, but we make sure that the `Runnable` does
// not outlive `'a`. When the executor is dropped, the `active` field is
// drained and all of the `Waker`s are woken. Then, the queue inside of
// the `Executor` is drained of all of its runnables. This ensures that
// runnables are dropped and this precondition is satisfied.
//
// `self.schedule()` is `Send`, `Sync` and `'static`, as checked below.
// Therefore we do not need to worry about what is done with the
// `Waker`.
let (runnable, task) = Builder::new()
.propagate_panic(true)
.spawn_unchecked(|()| future, self.schedule());
entry.insert(runnable.waker());

runnable.schedule();
Expand Down Expand Up @@ -292,7 +390,7 @@ impl<'a> Executor<'a> {
impl Drop for Executor<'_> {
fn drop(&mut self) {
if let Some(state) = self.state.get() {
let mut active = state.active.lock().unwrap();
let mut active = state.active.lock().unwrap_or_else(|e| e.into_inner());
for w in active.drain() {
w.wake();
}
Expand Down Expand Up @@ -397,25 +495,70 @@ impl<'a> LocalExecutor<'a> {
pub fn spawn<T: 'a>(&self, future: impl Future<Output = T> + 'a) -> Task<T> {
let mut active = self.inner().state().active.lock().unwrap();

// Remove the task from the set of active tasks when the future finishes.
let entry = active.vacant_entry();
let index = entry.key();
let state = self.inner().state().clone();
let future = async move {
let _guard = CallOnDrop(move || drop(state.active.lock().unwrap().try_remove(index)));
future.await
};
// SAFETY: This executor is not thread safe, so the future and its result
// cannot be sent to another thread.
unsafe { self.inner().spawn_inner(future, &mut active) }
}

// Create the task and register it in the set of active tasks.
let (runnable, task) = unsafe {
Builder::new()
.propagate_panic(true)
.spawn_unchecked(|()| future, self.schedule())
};
entry.insert(runnable.waker());
/// Spawns many tasks onto the executor.
///
/// As opposed to the [`spawn`] method, this locks the executor's inner task lock once and
/// spawns all of the tasks in one go. With large amounts of tasks this can improve
/// contention.
///
/// It is assumed that the iterator provided does not block; blocking iterators can lock up
/// the internal mutex and therefore the entire executor. Unlike [`Executor::spawn`], the
/// mutex is not released, as there are no other threads that can poll this executor.
///
/// ## Example
///
/// ```
/// use async_executor::LocalExecutor;
/// use futures_lite::{stream, prelude::*};
/// use std::future::ready;
///
/// # futures_lite::future::block_on(async {
/// let mut ex = LocalExecutor::new();
///
/// let futures = [
/// ready(1),
/// ready(2),
/// ready(3)
/// ];
///
/// // Spawn all of the futures onto the executor at once.
/// let mut tasks = vec![];
/// ex.spawn_many(futures, &mut tasks);
///
/// // Await all of them.
/// let results = ex.run(async move {
/// stream::iter(tasks).then(|x| x).collect::<Vec<_>>().await
/// }).await;
/// assert_eq!(results, [1, 2, 3]);
/// # });
/// ```
///
/// [`spawn`]: LocalExecutor::spawn
/// [`Executor::spawn_many`]: Executor::spawn_many
pub fn spawn_many<T: Send + 'a, F: Future<Output = T> + Send + 'a>(
&self,
futures: impl IntoIterator<Item = F>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This forces the futures to be the same type and also requires something to collect the tasks. If we're ok with users potentially injecting extra non-blocking code via iterator implementations (with the doc comments as a deterrent), wouldn't a SpawnScope<'a> be a more flexible API?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's much harder to accidentally create a blocking Iterator than it is to accidentally hold a lock type across an await point. I've seen the former a couple of times and the latter all the time.

handles: &mut impl Extend<Task<F::Output>>,
) {
let mut active = self.inner().state().active.lock().unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a local executor, why does it need a lock at all? Probably not necessary to address in this PR, but I thought the local executor itself could be !Send and !Sync, so wouldn't a RefCell be sufficient for a LocalExecutor state?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would require a reimplementation of LocalExecutor's internals, which is something I don't feel like doing right now as the current implementation works well enough. I would accept a PR for this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, as a change this would propagate through the entire code base because it would require a different State struct to actually eliminate all the mutexes, etc, which then probably needs either a duplication of all the other internal structs which hold references to State, or an abstraction over the two variants of State.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably doable with macro_rules, but that might have a serious impact on readability.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a proper abstraction would require a lot of interior mutability, we just can't directly abstract all that without pulling in niche dependencies, I suppose..

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally thread unsafe primitives are provided by unsend instead of smol. In this case see unsend::Executor


runnable.schedule();
task
// Convert all of the futures to tasks.
let tasks = futures.into_iter().map(|future| {
// SAFETY: This executor is not thread safe, so the future and its result
// cannot be sent to another thread.
unsafe { self.inner().spawn_inner(future, &mut active) }

// As only one thread can spawn or poll tasks at a time, there is no need
// to release lock contention here.
});

// Push them to the user's collection.
handles.extend(tasks);
}

/// Attempts to run a task if at least one is scheduled.
Expand Down Expand Up @@ -481,16 +624,6 @@ impl<'a> LocalExecutor<'a> {
self.inner().run(future).await
}

/// Returns a function that schedules a runnable task when it gets woken up.
fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static {
let state = self.inner().state().clone();

move |runnable| {
state.queue.push(runnable).unwrap();
state.notify();
}
}

/// Returns a reference to the inner executor.
fn inner(&self) -> &Executor<'a> {
&self.inner
Expand Down Expand Up @@ -953,6 +1086,7 @@ fn _ensure_send_and_sync() {

fn is_send<T: Send>(_: T) {}
fn is_sync<T: Sync>(_: T) {}
fn is_static<T: 'static>(_: T) {}

is_send::<Executor<'_>>(Executor::new());
is_sync::<Executor<'_>>(Executor::new());
Expand All @@ -962,6 +1096,9 @@ fn _ensure_send_and_sync() {
is_sync(ex.run(pending::<()>()));
is_send(ex.tick());
is_sync(ex.tick());
is_send(ex.schedule());
is_sync(ex.schedule());
is_static(ex.schedule());

/// ```compile_fail
/// use async_executor::LocalExecutor;
Expand Down
14 changes: 14 additions & 0 deletions tests/drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,20 @@ fn drop_finished_task_and_then_drop_executor() {
assert_eq!(DROP.load(Ordering::SeqCst), 1);
}

#[test]
fn iterator_panics_mid_run() {
let ex = Executor::new();

let panic = std::panic::catch_unwind(|| {
let mut handles = vec![];
ex.spawn_many(
(0..50).map(|i| if i == 25 { panic!() } else { future::ready(i) }),
&mut handles,
)
});
assert!(panic.is_err());
}

struct CallOnDrop<F: Fn()>(F);

impl<F: Fn()> Drop for CallOnDrop<F> {
Expand Down
45 changes: 45 additions & 0 deletions tests/spawn_many.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use async_executor::{Executor, LocalExecutor};
use futures_lite::future;

#[cfg(not(miri))]
const READY_COUNT: usize = 50_000;
#[cfg(miri)]
const READY_COUNT: usize = 505;

#[test]
fn spawn_many() {
future::block_on(async {
let ex = Executor::new();

// Spawn a lot of tasks.
let mut tasks = vec![];
ex.spawn_many((0..READY_COUNT).map(future::ready), &mut tasks);

// Run all of the tasks in parallel.
ex.run(async move {
for (i, task) in tasks.into_iter().enumerate() {
assert_eq!(task.await, i);
}
})
.await;
});
}

#[test]
fn spawn_many_local() {
future::block_on(async {
let ex = LocalExecutor::new();

// Spawn a lot of tasks.
let mut tasks = vec![];
ex.spawn_many((0..READY_COUNT).map(future::ready), &mut tasks);

// Run all of the tasks in parallel.
ex.run(async move {
for (i, task) in tasks.into_iter().enumerate() {
assert_eq!(task.await, i);
}
})
.await;
});
}