Skip to content

Commit b139978

Browse files
committed
chore: Refractor out common logic
Signed-off-by: John Nunley <[email protected]>
1 parent 89d0fb5 commit b139978

File tree

1 file changed

+40
-74
lines changed

1 file changed

+40
-74
lines changed

src/lib.rs

Lines changed: 40 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -150,24 +150,8 @@ impl<'a> Executor<'a> {
150150
pub fn spawn<T: Send + 'a>(&self, future: impl Future<Output = T> + Send + 'a) -> Task<T> {
151151
let mut active = self.state().active.lock().unwrap();
152152

153-
// Remove the task from the set of active tasks when the future finishes.
154-
let index = active.vacant_entry().key();
155-
let state = self.state().clone();
156-
let future = async move {
157-
let _guard = CallOnDrop(move || drop(state.active.lock().unwrap().try_remove(index)));
158-
future.await
159-
};
160-
161-
// Create the task and register it in the set of active tasks.
162-
let (runnable, task) = unsafe {
163-
Builder::new()
164-
.propagate_panic(true)
165-
.spawn_unchecked(|()| future, self.schedule())
166-
};
167-
active.insert(runnable.waker());
168-
169-
runnable.schedule();
170-
task
153+
// SAFETY: `T` and the future are `Send`.
154+
unsafe { self.spawn_inner(future, &mut active) }
171155
}
172156

173157
/// Spawns many tasks onto the executor.
@@ -218,25 +202,8 @@ impl<'a> Executor<'a> {
218202
let mut active = self.state().active.lock().unwrap();
219203

220204
for (i, future) in futures.into_iter().enumerate() {
221-
// Remove the task from the set of active tasks when the future finishes.
222-
let index = active.vacant_entry().key();
223-
let state = self.state().clone();
224-
let future = async move {
225-
let _guard =
226-
CallOnDrop(move || drop(state.active.lock().unwrap().try_remove(index)));
227-
future.await
228-
};
229-
230-
// Create the task and register it in the set of active tasks.
231-
let (runnable, task) = unsafe {
232-
Builder::new()
233-
.propagate_panic(true)
234-
.spawn_unchecked(|()| future, self.schedule())
235-
};
236-
active.insert(runnable.waker());
237-
238-
runnable.schedule();
239-
handles.extend(Some(task));
205+
// SAFETY: `T` and the future are `Send`.
206+
handles.extend(Some(unsafe { self.spawn_inner(future, &mut active) }));
240207

241208
// Yield the lock every once in a while to ease contention.
242209
if i.wrapping_sub(1) % 500 == 0 {
@@ -246,6 +213,36 @@ impl<'a> Executor<'a> {
246213
}
247214
}
248215

216+
/// Spawn a future using the inner lock.
217+
///
218+
/// # Safety
219+
///
220+
/// If this is an `Executor`, `F` and `T` must be `Send`.
221+
unsafe fn spawn_inner<T: 'a>(
222+
&self,
223+
future: impl Future<Output = T> + 'a,
224+
active: &mut Slab<Waker>,
225+
) -> Task<T> {
226+
// Remove the task from the set of active tasks when the future finishes.
227+
let index = active.vacant_entry().key();
228+
let state = self.state().clone();
229+
let future = async move {
230+
let _guard = CallOnDrop(move || drop(state.active.lock().unwrap().try_remove(index)));
231+
future.await
232+
};
233+
234+
// Create the task and register it in the set of active tasks.
235+
let (runnable, task) = unsafe {
236+
Builder::new()
237+
.propagate_panic(true)
238+
.spawn_unchecked(|()| future, self.schedule())
239+
};
240+
active.insert(runnable.waker());
241+
242+
runnable.schedule();
243+
task
244+
}
245+
249246
/// Attempts to run a task if at least one is scheduled.
250247
///
251248
/// Running a scheduled task means simply polling its future once.
@@ -473,24 +470,8 @@ impl<'a> LocalExecutor<'a> {
473470
pub fn spawn<T: 'a>(&self, future: impl Future<Output = T> + 'a) -> Task<T> {
474471
let mut active = self.inner().state().active.lock().unwrap();
475472

476-
// Remove the task from the set of active tasks when the future finishes.
477-
let index = active.vacant_entry().key();
478-
let state = self.inner().state().clone();
479-
let future = async move {
480-
let _guard = CallOnDrop(move || drop(state.active.lock().unwrap().try_remove(index)));
481-
future.await
482-
};
483-
484-
// Create the task and register it in the set of active tasks.
485-
let (runnable, task) = unsafe {
486-
Builder::new()
487-
.propagate_panic(true)
488-
.spawn_unchecked(|()| future, self.schedule())
489-
};
490-
active.insert(runnable.waker());
491-
492-
runnable.schedule();
493-
task
473+
// SAFETY: This future is not thread safe.
474+
unsafe { self.inner().spawn_inner(future, &mut active) }
494475
}
495476

496477
/// Spawns many tasks onto the executor.
@@ -541,25 +522,10 @@ impl<'a> LocalExecutor<'a> {
541522
let mut active = self.inner().state().active.lock().unwrap();
542523

543524
for future in futures {
544-
// Remove the task from the set of active tasks when the future finishes.
545-
let index = active.vacant_entry().key();
546-
let state = self.inner().state().clone();
547-
let future = async move {
548-
let _guard =
549-
CallOnDrop(move || drop(state.active.lock().unwrap().try_remove(index)));
550-
future.await
551-
};
552-
553-
// Create the task and register it in the set of active tasks.
554-
let (runnable, task) = unsafe {
555-
Builder::new()
556-
.propagate_panic(true)
557-
.spawn_unchecked(|()| future, self.schedule())
558-
};
559-
active.insert(runnable.waker());
560-
561-
runnable.schedule();
562-
handles.extend(Some(task));
525+
// SAFETY: F and T are both `Send`.
526+
handles.extend(Some(unsafe {
527+
self.inner().spawn_inner(future, &mut active)
528+
}));
563529
}
564530
}
565531

0 commit comments

Comments
 (0)