Skip to content

Commit f77c5dd

Browse files
committed
feat: Add a way to batch spawn tasks
For some workloads many tasks are spawned at a time. This requires locking and unlocking the executor's inner lock every time you spawn a task. If you spawn many tasks this can be expensive. This commit exposes a new "spawn_batch" method on both types. This method allows the user to spawn an entire set of tasks at a time. Closes #91 Signed-off-by: John Nunley <[email protected]>
1 parent 0baba46 commit f77c5dd

File tree

2 files changed

+175
-0
lines changed

2 files changed

+175
-0
lines changed

benches/executor.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,24 @@ fn running_benches(c: &mut Criterion) {
5252
);
5353
});
5454

55+
group.bench_function("executor::spawn_batch", |b| {
56+
run(
57+
|| {
58+
let mut handles = vec![];
59+
60+
b.iter(|| {
61+
EX.spawn_many(
62+
(0..250).map(|_| future::yield_now()),
63+
&mut handles
64+
);
65+
});
66+
67+
handles.clear();
68+
},
69+
*multithread,
70+
)
71+
});
72+
5573
group.bench_function("executor::spawn_many_local", |b| {
5674
run(
5775
|| {

src/lib.rs

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,82 @@ impl<'a> Executor<'a> {
170170
task
171171
}
172172

173+
/// Spawns many tasks onto the executor.
174+
///
175+
/// As opposed to the [`spawn`] method, this locks the executor's inner task lock once and
176+
/// spawns all of the tasks in one go. With large amounts of tasks this can improve
177+
/// contention.
178+
///
179+
/// For very large numbers of tasks the lock is occasionally dropped and re-acquired to
180+
/// prevent runner thread starvation. It is assumed that the iterator provided does not
181+
/// block; blocking iterators can lock up the internal mutex and therefore the entire
182+
/// executor.
183+
///
184+
/// ## Example
185+
///
186+
/// ```
187+
/// use async_executor::Executor;
188+
/// use futures_lite::{stream, prelude::*};
189+
/// use std::future::ready;
190+
///
191+
/// # futures_lite::future::block_on(async {
192+
/// let mut ex = Executor::new();
193+
///
194+
/// let futures = [
195+
/// ready(1),
196+
/// ready(2),
197+
/// ready(3)
198+
/// ];
199+
///
200+
/// // Spawn all of the futures onto the executor at once.
201+
/// let mut tasks = vec![];
202+
/// ex.spawn_many(futures, &mut tasks);
203+
///
204+
/// // Await all of them.
205+
/// let results = ex.run(async move {
206+
/// stream::iter(tasks).then(|x| x).collect::<Vec<_>>().await
207+
/// }).await;
208+
/// assert_eq!(results, [1, 2, 3]);
209+
/// # });
210+
/// ```
211+
///
212+
/// [`spawn`]: Executor::spawn
213+
pub fn spawn_many<T: Send + 'a, F: Future<Output = T> + Send + 'a>(
214+
&self,
215+
futures: impl IntoIterator<Item = F>,
216+
handles: &mut impl Extend<Task<F::Output>>
217+
) {
218+
let mut active = self.state().active.lock().unwrap();
219+
220+
for (i, future) in futures.into_iter().enumerate() {
221+
// Remove the task from the set of active tasks when the future finishes.
222+
let index = active.vacant_entry().key();
223+
let state = self.state().clone();
224+
let future = async move {
225+
let _guard =
226+
CallOnDrop(move || drop(state.active.lock().unwrap().try_remove(index)));
227+
future.await
228+
};
229+
230+
// Create the task and register it in the set of active tasks.
231+
let (runnable, task) = unsafe {
232+
Builder::new()
233+
.propagate_panic(true)
234+
.spawn_unchecked(|()| future, self.schedule())
235+
};
236+
active.insert(runnable.waker());
237+
238+
runnable.schedule();
239+
handles.extend(Some(task));
240+
241+
// Yield the lock every once in a while to ease contention.
242+
if i.wrapping_sub(1) % 500 == 0 {
243+
drop(active);
244+
active = self.state().active.lock().unwrap();
245+
}
246+
}
247+
}
248+
173249
/// Attempts to run a task if at least one is scheduled.
174250
///
175251
/// Running a scheduled task means simply polling its future once.
@@ -417,6 +493,76 @@ impl<'a> LocalExecutor<'a> {
417493
task
418494
}
419495

496+
/// Spawns many tasks onto the executor.
497+
///
498+
/// As opposed to the [`spawn`] method, this locks the executor's inner task lock once and
499+
/// spawns all of the tasks in one go. With large amounts of tasks this can improve
500+
/// contention.
501+
///
502+
/// It is assumed that the iterator provided does not block; blocking iterators can lock up
503+
/// the internal mutex and therefore the entire executor. Unlike [`Executor::spawn`], the
504+
/// mutex is not released, as there are no other threads that can poll this executor.
505+
///
506+
/// ## Example
507+
///
508+
/// ```
509+
/// use async_executor::LocalExecutor;
510+
/// use futures_lite::{stream, prelude::*};
511+
/// use std::future::ready;
512+
///
513+
/// # futures_lite::future::block_on(async {
514+
/// let mut ex = LocalExecutor::new();
515+
///
516+
/// let futures = [
517+
/// ready(1),
518+
/// ready(2),
519+
/// ready(3)
520+
/// ];
521+
///
522+
/// // Spawn all of the futures onto the executor at once.
523+
/// let mut tasks = vec![];
524+
/// ex.spawn_many(futures, &mut tasks);
525+
///
526+
/// // Await all of them.
527+
/// let results = ex.run(async move {
528+
/// stream::iter(tasks).then(|x| x).collect::<Vec<_>>().await
529+
/// }).await;
530+
/// assert_eq!(results, [1, 2, 3]);
531+
/// # });
532+
/// ```
533+
///
534+
/// [`spawn`]: LocalExecutor::spawn
535+
/// [`Executor::spawn_many`]: Executor::spawn_many
536+
pub fn spawn_many<T: Send + 'a, F: Future<Output = T> + Send + 'a>(
537+
&self,
538+
futures: impl IntoIterator<Item = F>,
539+
handles: &mut impl Extend<Task<F::Output>>
540+
) {
541+
let mut active = self.inner().state().active.lock().unwrap();
542+
543+
for future in futures {
544+
// Remove the task from the set of active tasks when the future finishes.
545+
let index = active.vacant_entry().key();
546+
let state = self.inner().state().clone();
547+
let future = async move {
548+
let _guard =
549+
CallOnDrop(move || drop(state.active.lock().unwrap().try_remove(index)));
550+
future.await
551+
};
552+
553+
// Create the task and register it in the set of active tasks.
554+
let (runnable, task) = unsafe {
555+
Builder::new()
556+
.propagate_panic(true)
557+
.spawn_unchecked(|()| future, self.schedule())
558+
};
559+
active.insert(runnable.waker());
560+
561+
runnable.schedule();
562+
handles.extend(Some(task));
563+
}
564+
}
565+
420566
/// Attempts to run a task if at least one is scheduled.
421567
///
422568
/// Running a scheduled task means simply polling its future once.
@@ -942,6 +1088,17 @@ fn debug_executor(executor: &Executor<'_>, name: &str, f: &mut fmt::Formatter<'_
9421088
.finish()
9431089
}
9441090

1091+
/// Container with one item.
1092+
///
1093+
/// This implements `Extend` for one-off cases.
1094+
struct Container<T>(Option<T>);
1095+
1096+
impl<T> Extend<T> for Container<T> {
1097+
fn extend<X: IntoIterator<Item = T>>(&mut self, iter: X) {
1098+
self.0 = iter.into_iter().next();
1099+
}
1100+
}
1101+
9451102
/// Runs a closure when dropped.
9461103
struct CallOnDrop<F: FnMut()>(F);
9471104

0 commit comments

Comments
 (0)