Skip to content

Commit 72febd5

Browse files
committed
chore: Clean up code a little
- Use an iterator when processing many tasks instead of a for loop. - Add a test for spawning large numbers of tasks at a time. - Fix Clippy warnings Signed-off-by: John Nunley <[email protected]>
1 parent ea2e16e commit 72febd5

File tree

2 files changed

+55
-32
lines changed

2 files changed

+55
-32
lines changed

src/lib.rs

Lines changed: 15 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -199,21 +199,27 @@ impl<'a> Executor<'a> {
199199
futures: impl IntoIterator<Item = F>,
200200
handles: &mut impl Extend<Task<F::Output>>,
201201
) {
202-
let mut active = self.state().active.lock().unwrap();
202+
let mut active = Some(self.state().active.lock().unwrap());
203203

204-
for (i, future) in futures.into_iter().enumerate() {
204+
// Convert the futures into tasks.
205+
let tasks = futures.into_iter().enumerate().map(move |(i, future)| {
205206
// SAFETY: `T` and the future are `Send`.
206-
handles.extend(Some(unsafe { self.spawn_inner(future, &mut active) }));
207+
let task = unsafe { self.spawn_inner(future, active.as_mut().unwrap()) };
207208

208209
// Yield the lock every once in a while to ease contention.
209210
if i.wrapping_sub(1) % 500 == 0 {
210-
drop(active);
211-
active = self.state().active.lock().unwrap();
211+
drop(active.take());
212+
active = Some(self.state().active.lock().unwrap());
212213
}
213-
}
214+
215+
task
216+
});
217+
218+
// Push the tasks to the user's collection.
219+
handles.extend(tasks);
214220
}
215221

216-
/// Spawn a future using the inner lock.
222+
/// Spawn a future while holding the inner lock.
217223
///
218224
/// # Safety
219225
///
@@ -527,16 +533,14 @@ impl<'a> LocalExecutor<'a> {
527533
let tasks = futures.into_iter().map(|future| {
528534
// SAFETY: This executor is not thread safe, so the future and its result
529535
// cannot be sent to another thread.
530-
unsafe {
531-
self.inner().spawn_inner(future, &mut active)
532-
}
536+
unsafe { self.inner().spawn_inner(future, &mut active) }
533537

534538
// As only one thread can spawn or poll tasks at a time, there is no need
535539
// to release lock contention here.
536540
});
537541

538542
// Push them to the user's collection.
539-
handles.extend(tasks);
543+
handles.extend(tasks);
540544
}
541545

542546
/// Attempts to run a task if at least one is scheduled.
@@ -602,16 +606,6 @@ impl<'a> LocalExecutor<'a> {
602606
self.inner().run(future).await
603607
}
604608

605-
/// Returns a function that schedules a runnable task when it gets woken up.
606-
fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static {
607-
let state = self.inner().state().clone();
608-
609-
move |runnable| {
610-
state.queue.push(runnable).unwrap();
611-
state.notify();
612-
}
613-
}
614-
615609
/// Returns a reference to the inner executor.
616610
fn inner(&self) -> &Executor<'a> {
617611
&self.inner
@@ -1060,17 +1054,6 @@ fn debug_executor(executor: &Executor<'_>, name: &str, f: &mut fmt::Formatter<'_
10601054
.finish()
10611055
}
10621056

1063-
/// Container with one item.
1064-
///
1065-
/// This implements `Extend` for one-off cases.
1066-
struct Container<T>(Option<T>);
1067-
1068-
impl<T> Extend<T> for Container<T> {
1069-
fn extend<X: IntoIterator<Item = T>>(&mut self, iter: X) {
1070-
self.0 = iter.into_iter().next();
1071-
}
1072-
}
1073-
10741057
/// Runs a closure when dropped.
10751058
struct CallOnDrop<F: FnMut()>(F);
10761059

tests/spawn_many.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
use async_executor::{Executor, LocalExecutor};
2+
use futures_lite::future;
3+
4+
#[test]
5+
fn spawn_many() {
6+
future::block_on(async {
7+
let ex = Executor::new();
8+
9+
// Spawn a lot of tasks.
10+
let mut tasks = vec![];
11+
ex.spawn_many((0..50_000).map(|i| future::ready(i)), &mut tasks);
12+
13+
// Run all of the tasks in parallel.
14+
ex.run(async move {
15+
for (i, task) in tasks.into_iter().enumerate() {
16+
assert_eq!(task.await, i);
17+
}
18+
})
19+
.await;
20+
});
21+
}
22+
23+
#[test]
24+
fn spawn_many_local() {
25+
future::block_on(async {
26+
let ex = LocalExecutor::new();
27+
28+
// Spawn a lot of tasks.
29+
let mut tasks = vec![];
30+
ex.spawn_many((0..50_000).map(|i| future::ready(i)), &mut tasks);
31+
32+
// Run all of the tasks in parallel.
33+
ex.run(async move {
34+
for (i, task) in tasks.into_iter().enumerate() {
35+
assert_eq!(task.await, i);
36+
}
37+
})
38+
.await;
39+
});
40+
}

0 commit comments

Comments
 (0)