Skip to content

Commit ea2e16e

Browse files
committed
chore: Refractor out common logic
Signed-off-by: John Nunley <[email protected]>
1 parent bc0a2bf commit ea2e16e

File tree

1 file changed

+52
-78
lines changed

1 file changed

+52
-78
lines changed

src/lib.rs

Lines changed: 52 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -150,25 +150,8 @@ impl<'a> Executor<'a> {
150150
pub fn spawn<T: Send + 'a>(&self, future: impl Future<Output = T> + Send + 'a) -> Task<T> {
151151
let mut active = self.state().active.lock().unwrap();
152152

153-
// Remove the task from the set of active tasks when the future finishes.
154-
let entry = active.vacant_entry();
155-
let index = entry.key();
156-
let state = self.state().clone();
157-
let future = async move {
158-
let _guard = CallOnDrop(move || drop(state.active.lock().unwrap().try_remove(index)));
159-
future.await
160-
};
161-
162-
// Create the task and register it in the set of active tasks.
163-
let (runnable, task) = unsafe {
164-
Builder::new()
165-
.propagate_panic(true)
166-
.spawn_unchecked(|()| future, self.schedule())
167-
};
168-
entry.insert(runnable.waker());
169-
170-
runnable.schedule();
171-
task
153+
// SAFETY: `T` and the future are `Send`.
154+
unsafe { self.spawn_inner(future, &mut active) }
172155
}
173156

174157
/// Spawns many tasks onto the executor.
@@ -219,25 +202,8 @@ impl<'a> Executor<'a> {
219202
let mut active = self.state().active.lock().unwrap();
220203

221204
for (i, future) in futures.into_iter().enumerate() {
222-
// Remove the task from the set of active tasks when the future finishes.
223-
let index = active.vacant_entry().key();
224-
let state = self.state().clone();
225-
let future = async move {
226-
let _guard =
227-
CallOnDrop(move || drop(state.active.lock().unwrap().try_remove(index)));
228-
future.await
229-
};
230-
231-
// Create the task and register it in the set of active tasks.
232-
let (runnable, task) = unsafe {
233-
Builder::new()
234-
.propagate_panic(true)
235-
.spawn_unchecked(|()| future, self.schedule())
236-
};
237-
active.insert(runnable.waker());
238-
239-
runnable.schedule();
240-
handles.extend(Some(task));
205+
// SAFETY: `T` and the future are `Send`.
206+
handles.extend(Some(unsafe { self.spawn_inner(future, &mut active) }));
241207

242208
// Yield the lock every once in a while to ease contention.
243209
if i.wrapping_sub(1) % 500 == 0 {
@@ -247,6 +213,37 @@ impl<'a> Executor<'a> {
247213
}
248214
}
249215

216+
/// Spawn a future using the inner lock.
217+
///
218+
/// # Safety
219+
///
220+
/// If this is an `Executor`, `F` and `T` must be `Send`.
221+
unsafe fn spawn_inner<T: 'a>(
222+
&self,
223+
future: impl Future<Output = T> + 'a,
224+
active: &mut Slab<Waker>,
225+
) -> Task<T> {
226+
// Remove the task from the set of active tasks when the future finishes.
227+
let entry = active.vacant_entry();
228+
let index = entry.key();
229+
let state = self.state().clone();
230+
let future = async move {
231+
let _guard = CallOnDrop(move || drop(state.active.lock().unwrap().try_remove(index)));
232+
future.await
233+
};
234+
235+
// Create the task and register it in the set of active tasks.
236+
let (runnable, task) = unsafe {
237+
Builder::new()
238+
.propagate_panic(true)
239+
.spawn_unchecked(|()| future, self.schedule())
240+
};
241+
entry.insert(runnable.waker());
242+
243+
runnable.schedule();
244+
task
245+
}
246+
250247
/// Attempts to run a task if at least one is scheduled.
251248
///
252249
/// Running a scheduled task means simply polling its future once.
@@ -474,25 +471,9 @@ impl<'a> LocalExecutor<'a> {
474471
pub fn spawn<T: 'a>(&self, future: impl Future<Output = T> + 'a) -> Task<T> {
475472
let mut active = self.inner().state().active.lock().unwrap();
476473

477-
// Remove the task from the set of active tasks when the future finishes.
478-
let entry = active.vacant_entry();
479-
let index = entry.key();
480-
let state = self.inner().state().clone();
481-
let future = async move {
482-
let _guard = CallOnDrop(move || drop(state.active.lock().unwrap().try_remove(index)));
483-
future.await
484-
};
485-
486-
// Create the task and register it in the set of active tasks.
487-
let (runnable, task) = unsafe {
488-
Builder::new()
489-
.propagate_panic(true)
490-
.spawn_unchecked(|()| future, self.schedule())
491-
};
492-
entry.insert(runnable.waker());
493-
494-
runnable.schedule();
495-
task
474+
// SAFETY: This executor is not thread safe, so the future and its result
475+
// cannot be sent to another thread.
476+
unsafe { self.inner().spawn_inner(future, &mut active) }
496477
}
497478

498479
/// Spawns many tasks onto the executor.
@@ -542,27 +523,20 @@ impl<'a> LocalExecutor<'a> {
542523
) {
543524
let mut active = self.inner().state().active.lock().unwrap();
544525

545-
for future in futures {
546-
// Remove the task from the set of active tasks when the future finishes.
547-
let index = active.vacant_entry().key();
548-
let state = self.inner().state().clone();
549-
let future = async move {
550-
let _guard =
551-
CallOnDrop(move || drop(state.active.lock().unwrap().try_remove(index)));
552-
future.await
553-
};
554-
555-
// Create the task and register it in the set of active tasks.
556-
let (runnable, task) = unsafe {
557-
Builder::new()
558-
.propagate_panic(true)
559-
.spawn_unchecked(|()| future, self.schedule())
560-
};
561-
active.insert(runnable.waker());
562-
563-
runnable.schedule();
564-
handles.extend(Some(task));
565-
}
526+
// Convert all of the futures to tasks.
527+
let tasks = futures.into_iter().map(|future| {
528+
// SAFETY: This executor is not thread safe, so the future and its result
529+
// cannot be sent to another thread.
530+
unsafe {
531+
self.inner().spawn_inner(future, &mut active)
532+
}
533+
534+
// As only one thread can spawn or poll tasks at a time, there is no need
535+
// to release lock contention here.
536+
});
537+
538+
// Push them to the user's collection.
539+
handles.extend(tasks);
566540
}
567541

568542
/// Attempts to run a task if at least one is scheduled.

0 commit comments

Comments
 (0)