Skip to content

Commit b0c6f7a

Browse files
committed
feat: Add a way to batch spawn tasks
For some workloads many tasks are spawned at a time. This requires locking and unlocking the executor's inner lock every time you spawn a task. If you spawn many tasks this can be expensive. This commit exposes a new "spawn_batch" method on both types. This method allows the user to spawn an entire set of tasks at a time. Closes #91 Signed-off-by: John Nunley <[email protected]>
1 parent 17720b0 commit b0c6f7a

File tree

4 files changed

+244
-33
lines changed

4 files changed

+244
-33
lines changed

benches/executor.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,21 @@ fn running_benches(c: &mut Criterion) {
5151
);
5252
});
5353

54+
group.bench_function("executor::spawn_batch", |b| {
55+
run(
56+
|| {
57+
let mut handles = vec![];
58+
59+
b.iter(|| {
60+
EX.spawn_many((0..250).map(|_| future::yield_now()), &mut handles);
61+
});
62+
63+
handles.clear();
64+
},
65+
*multithread,
66+
)
67+
});
68+
5469
group.bench_function("executor::spawn_many_local", |b| {
5570
run(
5671
|| {

src/lib.rs

Lines changed: 170 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,85 @@ impl<'a> Executor<'a> {
149149
pub fn spawn<T: Send + 'a>(&self, future: impl Future<Output = T> + Send + 'a) -> Task<T> {
150150
let mut active = self.state().active.lock().unwrap();
151151

152+
// SAFETY: `T` and the future are `Send`.
153+
unsafe { self.spawn_inner(future, &mut active) }
154+
}
155+
156+
/// Spawns many tasks onto the executor.
157+
///
158+
/// As opposed to the [`spawn`] method, this locks the executor's inner task lock once and
159+
/// spawns all of the tasks in one go. With large amounts of tasks this can improve
160+
/// contention.
161+
///
162+
/// For very large numbers of tasks the lock is occasionally dropped and re-acquired to
163+
/// prevent runner thread starvation. It is assumed that the iterator provided does not
164+
/// block; blocking iterators can lock up the internal mutex and therefore the entire
165+
/// executor.
166+
///
167+
/// ## Example
168+
///
169+
/// ```
170+
/// use async_executor::Executor;
171+
/// use futures_lite::{stream, prelude::*};
172+
/// use std::future::ready;
173+
///
174+
/// # futures_lite::future::block_on(async {
175+
/// let mut ex = Executor::new();
176+
///
177+
/// let futures = [
178+
/// ready(1),
179+
/// ready(2),
180+
/// ready(3)
181+
/// ];
182+
///
183+
/// // Spawn all of the futures onto the executor at once.
184+
/// let mut tasks = vec![];
185+
/// ex.spawn_many(futures, &mut tasks);
186+
///
187+
/// // Await all of them.
188+
/// let results = ex.run(async move {
189+
/// stream::iter(tasks).then(|x| x).collect::<Vec<_>>().await
190+
/// }).await;
191+
/// assert_eq!(results, [1, 2, 3]);
192+
/// # });
193+
/// ```
194+
///
195+
/// [`spawn`]: Executor::spawn
196+
pub fn spawn_many<T: Send + 'a, F: Future<Output = T> + Send + 'a>(
197+
&self,
198+
futures: impl IntoIterator<Item = F>,
199+
handles: &mut impl Extend<Task<F::Output>>,
200+
) {
201+
let mut active = Some(self.state().active.lock().unwrap());
202+
203+
// Convert the futures into tasks.
204+
let tasks = futures.into_iter().enumerate().map(move |(i, future)| {
205+
// SAFETY: `T` and the future are `Send`.
206+
let task = unsafe { self.spawn_inner(future, active.as_mut().unwrap()) };
207+
208+
// Yield the lock every once in a while to ease contention.
209+
if i.wrapping_sub(1) % 500 == 0 {
210+
drop(active.take());
211+
active = Some(self.state().active.lock().unwrap());
212+
}
213+
214+
task
215+
});
216+
217+
// Push the tasks to the user's collection.
218+
handles.extend(tasks);
219+
}
220+
221+
/// Spawn a future while holding the inner lock.
222+
///
223+
/// # Safety
224+
///
225+
/// If this is an `Executor`, `F` and `T` must be `Send`.
226+
unsafe fn spawn_inner<T: 'a>(
227+
&self,
228+
future: impl Future<Output = T> + 'a,
229+
active: &mut Slab<Waker>,
230+
) -> Task<T> {
152231
// Remove the task from the set of active tasks when the future finishes.
153232
let entry = active.vacant_entry();
154233
let index = entry.key();
@@ -159,11 +238,30 @@ impl<'a> Executor<'a> {
159238
};
160239

161240
// Create the task and register it in the set of active tasks.
162-
let (runnable, task) = unsafe {
163-
Builder::new()
164-
.propagate_panic(true)
165-
.spawn_unchecked(|()| future, self.schedule())
166-
};
241+
//
242+
// SAFETY:
243+
//
244+
// If `future` is not `Send`, this must be a `LocalExecutor` as per this
245+
// function's unsafe precondition. Since `LocalExecutor` is `!Sync`,
246+
// `try_tick`, `tick` and `run` can only be called from the origin
247+
// thread of the `LocalExecutor`. Similarly, `spawn` can only be called
248+
// from the origin thread, ensuring that `future` and the executor share
249+
// the same origin thread. The `Runnable` can be scheduled from other
250+
// threads, but because of the above `Runnable` can only be called or
251+
// dropped on the origin thread.
252+
//
253+
// `future` is not `'static`, but we make sure that the `Runnable` does
254+
// not outlive `'a`. When the executor is dropped, the `active` field is
255+
// drained and all of the `Waker`s are woken. Then, the queue inside of
256+
// the `Executor` is drained of all of its runnables. This ensures that
257+
// runnables are dropped and this precondition is satisfied.
258+
//
259+
// `self.schedule()` is `Send`, `Sync` and `'static`, as checked below.
260+
// Therefore we do not need to worry about what is done with the
261+
// `Waker`.
262+
let (runnable, task) = Builder::new()
263+
.propagate_panic(true)
264+
.spawn_unchecked(|()| future, self.schedule());
167265
entry.insert(runnable.waker());
168266

169267
runnable.schedule();
@@ -292,7 +390,7 @@ impl<'a> Executor<'a> {
292390
impl Drop for Executor<'_> {
293391
fn drop(&mut self) {
294392
if let Some(state) = self.state.get() {
295-
let mut active = state.active.lock().unwrap();
393+
let mut active = state.active.lock().unwrap_or_else(|e| e.into_inner());
296394
for w in active.drain() {
297395
w.wake();
298396
}
@@ -397,25 +495,70 @@ impl<'a> LocalExecutor<'a> {
397495
pub fn spawn<T: 'a>(&self, future: impl Future<Output = T> + 'a) -> Task<T> {
398496
let mut active = self.inner().state().active.lock().unwrap();
399497

400-
// Remove the task from the set of active tasks when the future finishes.
401-
let entry = active.vacant_entry();
402-
let index = entry.key();
403-
let state = self.inner().state().clone();
404-
let future = async move {
405-
let _guard = CallOnDrop(move || drop(state.active.lock().unwrap().try_remove(index)));
406-
future.await
407-
};
498+
// SAFETY: This executor is not thread safe, so the future and its result
499+
// cannot be sent to another thread.
500+
unsafe { self.inner().spawn_inner(future, &mut active) }
501+
}
408502

409-
// Create the task and register it in the set of active tasks.
410-
let (runnable, task) = unsafe {
411-
Builder::new()
412-
.propagate_panic(true)
413-
.spawn_unchecked(|()| future, self.schedule())
414-
};
415-
entry.insert(runnable.waker());
503+
/// Spawns many tasks onto the executor.
504+
///
505+
/// As opposed to the [`spawn`] method, this locks the executor's inner task lock once and
506+
/// spawns all of the tasks in one go. With large amounts of tasks this can improve
507+
/// contention.
508+
///
509+
/// It is assumed that the iterator provided does not block; blocking iterators can lock up
510+
/// the internal mutex and therefore the entire executor. Unlike [`Executor::spawn`], the
511+
/// mutex is not released, as there are no other threads that can poll this executor.
512+
///
513+
/// ## Example
514+
///
515+
/// ```
516+
/// use async_executor::LocalExecutor;
517+
/// use futures_lite::{stream, prelude::*};
518+
/// use std::future::ready;
519+
///
520+
/// # futures_lite::future::block_on(async {
521+
/// let mut ex = LocalExecutor::new();
522+
///
523+
/// let futures = [
524+
/// ready(1),
525+
/// ready(2),
526+
/// ready(3)
527+
/// ];
528+
///
529+
/// // Spawn all of the futures onto the executor at once.
530+
/// let mut tasks = vec![];
531+
/// ex.spawn_many(futures, &mut tasks);
532+
///
533+
/// // Await all of them.
534+
/// let results = ex.run(async move {
535+
/// stream::iter(tasks).then(|x| x).collect::<Vec<_>>().await
536+
/// }).await;
537+
/// assert_eq!(results, [1, 2, 3]);
538+
/// # });
539+
/// ```
540+
///
541+
/// [`spawn`]: LocalExecutor::spawn
542+
/// [`Executor::spawn_many`]: Executor::spawn_many
543+
pub fn spawn_many<T: Send + 'a, F: Future<Output = T> + Send + 'a>(
544+
&self,
545+
futures: impl IntoIterator<Item = F>,
546+
handles: &mut impl Extend<Task<F::Output>>,
547+
) {
548+
let mut active = self.inner().state().active.lock().unwrap();
416549

417-
runnable.schedule();
418-
task
550+
// Convert all of the futures to tasks.
551+
let tasks = futures.into_iter().map(|future| {
552+
// SAFETY: This executor is not thread safe, so the future and its result
553+
// cannot be sent to another thread.
554+
unsafe { self.inner().spawn_inner(future, &mut active) }
555+
556+
// As only one thread can spawn or poll tasks at a time, there is no need
557+
// to release lock contention here.
558+
});
559+
560+
// Push them to the user's collection.
561+
handles.extend(tasks);
419562
}
420563

421564
/// Attempts to run a task if at least one is scheduled.
@@ -481,16 +624,6 @@ impl<'a> LocalExecutor<'a> {
481624
self.inner().run(future).await
482625
}
483626

484-
/// Returns a function that schedules a runnable task when it gets woken up.
485-
fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static {
486-
let state = self.inner().state().clone();
487-
488-
move |runnable| {
489-
state.queue.push(runnable).unwrap();
490-
state.notify();
491-
}
492-
}
493-
494627
/// Returns a reference to the inner executor.
495628
fn inner(&self) -> &Executor<'a> {
496629
&self.inner
@@ -953,6 +1086,7 @@ fn _ensure_send_and_sync() {
9531086

9541087
fn is_send<T: Send>(_: T) {}
9551088
fn is_sync<T: Sync>(_: T) {}
1089+
fn is_static<T: 'static>(_: T) {}
9561090

9571091
is_send::<Executor<'_>>(Executor::new());
9581092
is_sync::<Executor<'_>>(Executor::new());
@@ -962,6 +1096,9 @@ fn _ensure_send_and_sync() {
9621096
is_sync(ex.run(pending::<()>()));
9631097
is_send(ex.tick());
9641098
is_sync(ex.tick());
1099+
is_send(ex.schedule());
1100+
is_sync(ex.schedule());
1101+
is_static(ex.schedule());
9651102

9661103
/// ```compile_fail
9671104
/// use async_executor::LocalExecutor;

tests/drop.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,20 @@ fn drop_finished_task_and_then_drop_executor() {
121121
assert_eq!(DROP.load(Ordering::SeqCst), 1);
122122
}
123123

124+
#[test]
125+
fn iterator_panics_mid_run() {
126+
let ex = Executor::new();
127+
128+
let panic = std::panic::catch_unwind(|| {
129+
let mut handles = vec![];
130+
ex.spawn_many(
131+
(0..50).map(|i| if i == 25 { panic!() } else { future::ready(i) }),
132+
&mut handles,
133+
)
134+
});
135+
assert!(panic.is_err());
136+
}
137+
124138
struct CallOnDrop<F: Fn()>(F);
125139

126140
impl<F: Fn()> Drop for CallOnDrop<F> {

tests/spawn_many.rs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
use async_executor::{Executor, LocalExecutor};
2+
use futures_lite::future;
3+
4+
#[cfg(not(miri))]
5+
const READY_COUNT: usize = 50_000;
6+
#[cfg(miri)]
7+
const READY_COUNT: usize = 505;
8+
9+
#[test]
10+
fn spawn_many() {
11+
future::block_on(async {
12+
let ex = Executor::new();
13+
14+
// Spawn a lot of tasks.
15+
let mut tasks = vec![];
16+
ex.spawn_many((0..READY_COUNT).map(future::ready), &mut tasks);
17+
18+
// Run all of the tasks in parallel.
19+
ex.run(async move {
20+
for (i, task) in tasks.into_iter().enumerate() {
21+
assert_eq!(task.await, i);
22+
}
23+
})
24+
.await;
25+
});
26+
}
27+
28+
#[test]
29+
fn spawn_many_local() {
30+
future::block_on(async {
31+
let ex = LocalExecutor::new();
32+
33+
// Spawn a lot of tasks.
34+
let mut tasks = vec![];
35+
ex.spawn_many((0..READY_COUNT).map(future::ready), &mut tasks);
36+
37+
// Run all of the tasks in parallel.
38+
ex.run(async move {
39+
for (i, task) in tasks.into_iter().enumerate() {
40+
assert_eq!(task.await, i);
41+
}
42+
})
43+
.await;
44+
});
45+
}

0 commit comments

Comments
 (0)