Skip to content

Commit b4553ab

Browse files
bors[bot]cuviper
andcommitted
615: Implement RFC #1: FIFO spawns r=nikomatsakis a=cuviper This implements rayon-rs/rfcs#1, adding `spawn_fifo`, `scope_fifo`, and `ScopeFifo`, and deprecating the `breadth_first` flag. Fixes rayon-rs#590. Closes rayon-rs#601. Co-authored-by: Josh Stone <[email protected]>
2 parents 346bcdf + 88d6a4e commit b4553ab

File tree

12 files changed

+803
-109
lines changed

12 files changed

+803
-109
lines changed

rayon-core/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ lazy_static = "1"
2323
[dependencies.crossbeam-deque]
2424
version = "0.2.0"
2525

26+
# Also held back for rustc compatibility
27+
[dependencies.crossbeam]
28+
version = "0.3.0"
29+
2630
[dev-dependencies]
2731
rand = "0.5"
2832

rayon-core/src/job.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crossbeam::sync::SegQueue;
12
use latch::Latch;
23
use std::any::Any;
34
use std::cell::UnsafeCell;
@@ -179,3 +180,35 @@ impl<T> JobResult<T> {
179180
}
180181
}
181182
}
183+
184+
/// Indirect queue to provide FIFO job priority.
185+
pub struct JobFifo {
186+
inner: SegQueue<JobRef>,
187+
}
188+
189+
impl JobFifo {
190+
pub fn new() -> Self {
191+
JobFifo {
192+
inner: SegQueue::new(),
193+
}
194+
}
195+
196+
pub unsafe fn push(&self, job_ref: JobRef) -> JobRef {
197+
// A little indirection ensures that spawns are always prioritized in FIFO order. The
198+
// jobs in a thread's deque may be popped from the back (LIFO) or stolen from the front
199+
// (FIFO), but either way they will end up popping from the front of this queue.
200+
self.inner.push(job_ref);
201+
JobRef::new(self)
202+
}
203+
}
204+
205+
impl Job for JobFifo {
206+
unsafe fn execute(this: *const Self) {
207+
// We "execute" a queue by executing its first job, FIFO.
208+
(*this)
209+
.inner
210+
.try_pop()
211+
.expect("job in fifo queue")
212+
.execute()
213+
}
214+
}

rayon-core/src/lib.rs

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use std::io;
3131
use std::marker::PhantomData;
3232
use std::str::FromStr;
3333

34+
extern crate crossbeam;
3435
extern crate crossbeam_deque;
3536
#[macro_use]
3637
extern crate lazy_static;
@@ -61,7 +62,8 @@ mod test;
6162
pub mod internal;
6263
pub use join::{join, join_context};
6364
pub use scope::{scope, Scope};
64-
pub use spawn::spawn;
65+
pub use scope::{scope_fifo, ScopeFifo};
66+
pub use spawn::{spawn, spawn_fifo};
6567
pub use thread_pool::current_thread_has_pending_tasks;
6668
pub use thread_pool::current_thread_index;
6769
pub use thread_pool::ThreadPool;
@@ -314,16 +316,17 @@ impl ThreadPoolBuilder {
314316
self
315317
}
316318

317-
/// Suggest to worker threads that they execute spawned jobs in a
318-
/// "breadth-first" fashion. Typically, when a worker thread is
319-
/// idle or blocked, it will attempt to execute the job from the
320-
/// *top* of its local deque of work (i.e., the job most recently
321-
/// spawned). If this flag is set to true, however, workers will
322-
/// prefer to execute in a *breadth-first* fashion -- that is,
323-
/// they will search for jobs at the *bottom* of their local
324-
/// deque. (At present, workers *always* steal from the bottom of
325-
/// other worker's deques, regardless of the setting of this
326-
/// flag.)
319+
/// **(DEPRECATED)** Suggest to worker threads that they execute
320+
/// spawned jobs in a "breadth-first" fashion.
321+
///
322+
/// Typically, when a worker thread is idle or blocked, it will
323+
/// attempt to execute the job from the *top* of its local deque of
324+
/// work (i.e., the job most recently spawned). If this flag is set
325+
/// to true, however, workers will prefer to execute in a
326+
/// *breadth-first* fashion -- that is, they will search for jobs at
327+
/// the *bottom* of their local deque. (At present, workers *always*
328+
/// steal from the bottom of other worker's deques, regardless of
329+
/// the setting of this flag.)
327330
///
328331
/// If you think of the tasks as a tree, where a parent task
329332
/// spawns its children in the tree, then this flag loosely
@@ -334,6 +337,14 @@ impl ThreadPoolBuilder {
334337
/// execution is highly dynamic and the precise order in which
335338
/// independent tasks are executed is not intended to be
336339
/// guaranteed.
340+
///
341+
/// This `breadth_first()` method is now deprecated per [RFC #1],
342+
/// and in the future its effect may be removed. Consider using
343+
/// [`scope_fifo()`] for a similar effect.
344+
///
345+
/// [RFC #1]: https://github.com/rayon-rs/rfcs/blob/master/accepted/rfc0001-scope-scheduling.md
346+
/// [`scope_fifo()`]: fn.scope_fifo.html
347+
#[deprecated(note = "use `scope_fifo` and `spawn_fifo` for similar effect")]
337348
pub fn breadth_first(mut self) -> Self {
338349
self.breadth_first = true;
339350
self

rayon-core/src/registry.rs

Lines changed: 45 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1+
use crossbeam::sync::SegQueue;
12
use crossbeam_deque::{Deque, Steal, Stealer};
23
#[cfg(rayon_unstable)]
34
use internal::task::Task;
45
#[cfg(rayon_unstable)]
56
use job::Job;
6-
use job::{JobRef, StackJob};
7+
use job::{JobFifo, JobRef, StackJob};
78
use latch::{CountLatch, Latch, LatchProbe, LockLatch, SpinLatch, TickleLatch};
89
use log::Event::*;
910
use sleep::Sleep;
@@ -13,7 +14,7 @@ use std::collections::hash_map::DefaultHasher;
1314
use std::hash::Hasher;
1415
use std::mem;
1516
use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
16-
use std::sync::{Arc, Mutex, Once, ONCE_INIT};
17+
use std::sync::{Arc, Once, ONCE_INIT};
1718
use std::thread;
1819
use std::usize;
1920
use unwind;
@@ -22,9 +23,8 @@ use {ErrorKind, ExitHandler, PanicHandler, StartHandler, ThreadPoolBuildError, T
2223

2324
pub struct Registry {
2425
thread_infos: Vec<ThreadInfo>,
25-
state: Mutex<RegistryState>,
2626
sleep: Sleep,
27-
job_uninjector: Stealer<JobRef>,
27+
injected_jobs: SegQueue<JobRef>,
2828
panic_handler: Option<Box<PanicHandler>>,
2929
start_handler: Option<Box<StartHandler>>,
3030
exit_handler: Option<Box<ExitHandler>>,
@@ -45,10 +45,6 @@ pub struct Registry {
4545
terminate_latch: CountLatch,
4646
}
4747

48-
struct RegistryState {
49-
job_injector: Deque<JobRef>,
50-
}
51-
5248
/// ////////////////////////////////////////////////////////////////////////
5349
/// Initialization
5450
@@ -104,16 +100,13 @@ impl Registry {
104100
let n_threads = builder.get_num_threads();
105101
let breadth_first = builder.get_breadth_first();
106102

107-
let inj_worker = Deque::new();
108-
let inj_stealer = inj_worker.stealer();
109103
let workers: Vec<_> = (0..n_threads).map(|_| Deque::new()).collect();
110104
let stealers: Vec<_> = workers.iter().map(|d| d.stealer()).collect();
111105

112106
let registry = Arc::new(Registry {
113107
thread_infos: stealers.into_iter().map(|s| ThreadInfo::new(s)).collect(),
114-
state: Mutex::new(RegistryState::new(inj_worker)),
115108
sleep: Sleep::new(),
116-
job_uninjector: inj_stealer,
109+
injected_jobs: SegQueue::new(),
117110
terminate_latch: CountLatch::new(),
118111
panic_handler: builder.take_panic_handler(),
119112
start_handler: builder.take_start_handler(),
@@ -175,6 +168,18 @@ impl Registry {
175168
}
176169
}
177170

171+
/// Returns the current `WorkerThread` if it's part of this `Registry`.
172+
pub fn current_thread(&self) -> Option<&WorkerThread> {
173+
unsafe {
174+
if let Some(worker) = WorkerThread::current().as_ref() {
175+
if worker.registry().id() == self.id() {
176+
return Some(worker);
177+
}
178+
}
179+
None
180+
}
181+
}
182+
178183
/// Returns an opaque identifier for this registry.
179184
pub fn id(&self) -> RegistryId {
180185
// We can rely on `self` not to change since we only ever create
@@ -297,39 +302,31 @@ impl Registry {
297302
log!(InjectJobs {
298303
count: injected_jobs.len()
299304
});
300-
{
301-
let state = self.state.lock().unwrap();
302-
303-
// It should not be possible for `state.terminate` to be true
304-
// here. It is only set to true when the user creates (and
305-
// drops) a `ThreadPool`; and, in that case, they cannot be
306-
// calling `inject()` later, since they dropped their
307-
// `ThreadPool`.
308-
assert!(
309-
!self.terminate_latch.probe(),
310-
"inject() sees state.terminate as true"
311-
);
312-
313-
for &job_ref in injected_jobs {
314-
state.job_injector.push(job_ref);
315-
}
305+
306+
// It should not be possible for `state.terminate` to be true
307+
// here. It is only set to true when the user creates (and
308+
// drops) a `ThreadPool`; and, in that case, they cannot be
309+
// calling `inject()` later, since they dropped their
310+
// `ThreadPool`.
311+
assert!(
312+
!self.terminate_latch.probe(),
313+
"inject() sees state.terminate as true"
314+
);
315+
316+
for &job_ref in injected_jobs {
317+
self.injected_jobs.push(job_ref);
316318
}
317319
self.sleep.tickle(usize::MAX);
318320
}
319321

320322
fn pop_injected_job(&self, worker_index: usize) -> Option<JobRef> {
321-
loop {
322-
match self.job_uninjector.steal() {
323-
Steal::Empty => return None,
324-
Steal::Data(d) => {
325-
log!(UninjectedWork {
326-
worker: worker_index
327-
});
328-
return Some(d);
329-
}
330-
Steal::Retry => {}
331-
}
323+
let job = self.injected_jobs.try_pop();
324+
if job.is_some() {
325+
log!(UninjectedWork {
326+
worker: worker_index
327+
});
332328
}
329+
job
333330
}
334331

335332
/// If already in a worker-thread of this registry, just execute `op`.
@@ -439,14 +436,6 @@ pub struct RegistryId {
439436
addr: usize,
440437
}
441438

442-
impl RegistryState {
443-
pub fn new(job_injector: Deque<JobRef>) -> RegistryState {
444-
RegistryState {
445-
job_injector: job_injector,
446-
}
447-
}
448-
}
449-
450439
struct ThreadInfo {
451440
/// Latch set once thread has started and we are entering into the
452441
/// main loop. Used to wait for worker threads to become primed,
@@ -478,6 +467,9 @@ pub struct WorkerThread {
478467
/// the "worker" half of our local deque
479468
worker: Deque<JobRef>,
480469

470+
/// local queue used for `spawn_fifo` indirection
471+
fifo: JobFifo,
472+
481473
index: usize,
482474

483475
/// are these workers configured to steal breadth-first or not?
@@ -534,6 +526,11 @@ impl WorkerThread {
534526
self.registry.sleep.tickle(self.index);
535527
}
536528

529+
#[inline]
530+
pub unsafe fn push_fifo(&self, job: JobRef) {
531+
self.push(self.fifo.push(job));
532+
}
533+
537534
#[inline]
538535
pub fn local_deque_is_empty(&self) -> bool {
539536
self.worker.len() == 0
@@ -663,6 +660,7 @@ unsafe fn main_loop(
663660
) {
664661
let worker_thread = WorkerThread {
665662
worker: worker,
663+
fifo: JobFifo::new(),
666664
breadth_first: breadth_first,
667665
index: index,
668666
rng: XorShift64Star::new(),

rayon-core/src/scope/internal.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#![cfg(rayon_unstable)]
22

3-
use super::Scope;
3+
use super::{Scope, ScopeBase};
44
use internal::task::{ScopeHandle, Task, ToScopeHandle};
55
use std::any::Any;
66
use std::mem;
@@ -16,16 +16,16 @@ impl<'scope> ToScopeHandle<'scope> for Scope<'scope> {
1616

1717
#[derive(Debug)]
1818
pub struct LocalScopeHandle<'scope> {
19-
scope: *const Scope<'scope>,
19+
scope: *const ScopeBase<'scope>,
2020
}
2121

2222
impl<'scope> LocalScopeHandle<'scope> {
2323
/// Caller guarantees that `*scope` will remain valid
2424
/// until the scope completes. Since we acquire a ref,
2525
/// that means it will remain valid until we release it.
2626
unsafe fn new(scope: &Scope<'scope>) -> Self {
27-
scope.job_completed_latch.increment();
28-
LocalScopeHandle { scope: scope }
27+
scope.base.increment();
28+
LocalScopeHandle { scope: &scope.base }
2929
}
3030
}
3131

0 commit comments

Comments
 (0)