Skip to content

Commit 49fb38a

Browse files
bors[bot]cuviper
andcommitted
636: Add the ability to customize thread spawning r=nikomatsakis a=cuviper As an alternative to `ThreadPoolBuilder::build()` and `build_global()`, the new `spawn()` and `spawn_global()` methods take a closure which will be responsible for spawning the actual threads. This is called with a `ThreadBuilder` argument that provides the thread index, name, and stack size, with the expectation to call its `run()` method in the new thread. The motivating use cases for this are: - experimental WASM threading, to be externally implemented. - scoped threads, like the new test using `scoped_tls`. Co-authored-by: Josh Stone <[email protected]>
2 parents 12afe12 + 249ad3f commit 49fb38a

File tree

8 files changed

+533
-50
lines changed

8 files changed

+533
-50
lines changed

rayon-core/Cargo.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@ num_cpus = "1.2"
1919
lazy_static = "1"
2020
crossbeam-deque = "0.6.3"
2121
crossbeam-queue = "0.1.2"
22+
crossbeam-utils = "0.6.5"
2223

2324
[dev-dependencies]
2425
rand = "0.6"
2526
rand_xorshift = "0.1"
27+
scoped-tls = "1.0"
2628

2729
[target.'cfg(unix)'.dev-dependencies]
2830
libc = "0.2"
@@ -49,3 +51,7 @@ path = "tests/scope_join.rs"
4951
[[test]]
5052
name = "simple_panic"
5153
path = "tests/simple_panic.rs"
54+
55+
[[test]]
56+
name = "scoped_threadpool"
57+
path = "tests/scoped_threadpool.rs"

rayon-core/src/lib.rs

Lines changed: 189 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use std::str::FromStr;
3434

3535
extern crate crossbeam_deque;
3636
extern crate crossbeam_queue;
37+
extern crate crossbeam_utils;
3738
#[cfg(any(debug_assertions, rayon_unstable))]
3839
#[macro_use]
3940
extern crate lazy_static;
@@ -46,6 +47,8 @@ extern crate rand_xorshift;
4647

4748
#[macro_use]
4849
mod log;
50+
#[macro_use]
51+
mod private;
4952

5053
mod job;
5154
mod join;
@@ -64,13 +67,16 @@ mod test;
6467
#[cfg(rayon_unstable)]
6568
pub mod internal;
6669
pub use join::{join, join_context};
70+
pub use registry::ThreadBuilder;
6771
pub use scope::{scope, Scope};
6872
pub use scope::{scope_fifo, ScopeFifo};
6973
pub use spawn::{spawn, spawn_fifo};
7074
pub use thread_pool::current_thread_has_pending_tasks;
7175
pub use thread_pool::current_thread_index;
7276
pub use thread_pool::ThreadPool;
7377

78+
use registry::{CustomSpawn, DefaultSpawn, ThreadSpawn};
79+
7480
/// Returns the number of threads in the current registry. If this
7581
/// code is executing within a Rayon thread-pool, then this will be
7682
/// the number of threads for the thread-pool of the current
@@ -123,8 +129,7 @@ enum ErrorKind {
123129
///
124130
/// [`ThreadPool`]: struct.ThreadPool.html
125131
/// [`build_global()`]: struct.ThreadPoolBuilder.html#method.build_global
126-
#[derive(Default)]
127-
pub struct ThreadPoolBuilder {
132+
pub struct ThreadPoolBuilder<S = DefaultSpawn> {
128133
/// The number of threads in the rayon thread pool.
129134
/// If zero will use the RAYON_NUM_THREADS environment variable.
130135
/// If RAYON_NUM_THREADS is invalid or zero will use the default.
@@ -146,6 +151,9 @@ pub struct ThreadPoolBuilder {
146151
/// Closure invoked on worker thread exit.
147152
exit_handler: Option<Box<ExitHandler>>,
148153

154+
/// Closure invoked to spawn threads.
155+
spawn_handler: S,
156+
149157
/// If false, worker threads will execute spawned jobs in a
150158
/// "depth-first" fashion. If true, they will do a "breadth-first"
151159
/// fashion. Depth-first is the default.
@@ -174,12 +182,35 @@ type StartHandler = Fn(usize) + Send + Sync;
174182
/// Note that this same closure may be invoked multiple times in parallel.
175183
type ExitHandler = Fn(usize) + Send + Sync;
176184

185+
// NB: We can't `#[derive(Default)]` because `S` is left ambiguous.
186+
impl Default for ThreadPoolBuilder {
187+
fn default() -> Self {
188+
ThreadPoolBuilder {
189+
num_threads: 0,
190+
panic_handler: None,
191+
get_thread_name: None,
192+
stack_size: None,
193+
start_handler: None,
194+
exit_handler: None,
195+
spawn_handler: DefaultSpawn,
196+
breadth_first: false,
197+
}
198+
}
199+
}
200+
177201
impl ThreadPoolBuilder {
178202
/// Creates and returns a valid rayon thread pool builder, but does not initialize it.
179-
pub fn new() -> ThreadPoolBuilder {
180-
ThreadPoolBuilder::default()
203+
pub fn new() -> Self {
204+
Self::default()
181205
}
206+
}
182207

208+
/// Note: the `S: ThreadSpawn` constraint is an internal implementation detail for the
209+
/// default spawn and those set by [`spawn_handler`](#method.spawn_handler).
210+
impl<S> ThreadPoolBuilder<S>
211+
where
212+
S: ThreadSpawn,
213+
{
183214
/// Create a new `ThreadPool` initialized using this configuration.
184215
pub fn build(self) -> Result<ThreadPool, ThreadPoolBuildError> {
185216
ThreadPool::build(self)
@@ -207,6 +238,154 @@ impl ThreadPoolBuilder {
207238
registry.wait_until_primed();
208239
Ok(())
209240
}
241+
}
242+
243+
impl ThreadPoolBuilder {
244+
/// Create a scoped `ThreadPool` initialized using this configuration.
245+
///
246+
/// This is a convenience function for building a pool using [`crossbeam::scope`]
247+
/// to spawn threads in a [`spawn_handler`](#method.spawn_handler).
248+
/// The threads in this pool will start by calling `wrapper`, which should
249+
/// do initialization and continue by calling `ThreadBuilder::run()`.
250+
///
251+
/// [`crossbeam::scope`]: https://docs.rs/crossbeam/0.7/crossbeam/fn.scope.html
252+
///
253+
/// # Examples
254+
///
255+
/// A scoped pool may be useful in combination with scoped thread-local variables.
256+
///
257+
/// ```
258+
/// #[macro_use]
259+
/// extern crate scoped_tls;
260+
/// # use rayon_core as rayon;
261+
///
262+
/// scoped_thread_local!(static POOL_DATA: Vec<i32>);
263+
///
264+
/// fn main() -> Result<(), rayon::ThreadPoolBuildError> {
265+
/// let pool_data = vec![1, 2, 3];
266+
///
267+
/// // We haven't assigned any TLS data yet.
268+
/// assert!(!POOL_DATA.is_set());
269+
///
270+
/// rayon::ThreadPoolBuilder::new()
271+
/// .build_scoped(
272+
/// // Borrow `pool_data` in TLS for each thread.
273+
/// |thread| POOL_DATA.set(&pool_data, || thread.run()),
274+
/// // Do some work that needs the TLS data.
275+
/// |pool| pool.install(|| assert!(POOL_DATA.is_set())),
276+
/// )?;
277+
///
278+
/// // Once we've returned, `pool_data` is no longer borrowed.
279+
/// drop(pool_data);
280+
/// Ok(())
281+
/// }
282+
/// ```
283+
pub fn build_scoped<W, F, R>(self, wrapper: W, with_pool: F) -> Result<R, ThreadPoolBuildError>
284+
where
285+
W: Fn(ThreadBuilder) + Sync, // expected to call `run()`
286+
F: FnOnce(&ThreadPool) -> R,
287+
{
288+
let result = crossbeam_utils::thread::scope(|scope| {
289+
let wrapper = &wrapper;
290+
let pool = self
291+
.spawn_handler(|thread| {
292+
let mut builder = scope.builder();
293+
if let Some(name) = thread.name() {
294+
builder = builder.name(name.to_string());
295+
}
296+
if let Some(size) = thread.stack_size() {
297+
builder = builder.stack_size(size);
298+
}
299+
builder.spawn(move |_| wrapper(thread))?;
300+
Ok(())
301+
})
302+
.build()?;
303+
Ok(with_pool(&pool))
304+
});
305+
306+
match result {
307+
Ok(result) => result,
308+
Err(err) => unwind::resume_unwinding(err),
309+
}
310+
}
311+
}
312+
313+
impl<S> ThreadPoolBuilder<S> {
314+
/// Set a custom function for spawning threads.
315+
///
316+
/// Note that the threads will not exit until after the pool is dropped. It
317+
/// is up to the caller to wait for thread termination if that is important
318+
/// for any invariants. For instance, threads created in [`crossbeam::scope`]
319+
/// will be joined before that scope returns, and this will block indefinitely
320+
/// if the pool is leaked. Furthermore, the global thread pool doesn't terminate
321+
/// until the entire process exits!
322+
///
323+
/// [`crossbeam::scope`]: https://docs.rs/crossbeam/0.7/crossbeam/fn.scope.html
324+
///
325+
/// # Examples
326+
///
327+
/// A minimal spawn handler just needs to call `run()` from an independent thread.
328+
///
329+
/// ```
330+
/// # use rayon_core as rayon;
331+
/// fn main() -> Result<(), rayon::ThreadPoolBuildError> {
332+
/// let pool = rayon::ThreadPoolBuilder::new()
333+
/// .spawn_handler(|thread| {
334+
/// std::thread::spawn(|| thread.run());
335+
/// Ok(())
336+
/// })
337+
/// .build()?;
338+
///
339+
/// pool.install(|| println!("Hello from my custom thread!"));
340+
/// Ok(())
341+
/// }
342+
/// ```
343+
///
344+
/// The default spawn handler sets the name and stack size if given, and propagates
345+
/// any errors from the thread builder.
346+
///
347+
/// ```
348+
/// # use rayon_core as rayon;
349+
/// fn main() -> Result<(), rayon::ThreadPoolBuildError> {
350+
/// let pool = rayon::ThreadPoolBuilder::new()
351+
/// .spawn_handler(|thread| {
352+
/// let mut b = std::thread::Builder::new();
353+
/// if let Some(name) = thread.name() {
354+
/// b = b.name(name.to_owned());
355+
/// }
356+
/// if let Some(stack_size) = thread.stack_size() {
357+
/// b = b.stack_size(stack_size);
358+
/// }
359+
/// b.spawn(|| thread.run())?;
360+
/// Ok(())
361+
/// })
362+
/// .build()?;
363+
///
364+
/// pool.install(|| println!("Hello from my fully custom thread!"));
365+
/// Ok(())
366+
/// }
367+
/// ```
368+
pub fn spawn_handler<F>(self, spawn: F) -> ThreadPoolBuilder<CustomSpawn<F>>
369+
where
370+
F: FnMut(ThreadBuilder) -> io::Result<()>,
371+
{
372+
ThreadPoolBuilder {
373+
spawn_handler: CustomSpawn::new(spawn),
374+
// ..self
375+
num_threads: self.num_threads,
376+
panic_handler: self.panic_handler,
377+
get_thread_name: self.get_thread_name,
378+
stack_size: self.stack_size,
379+
start_handler: self.start_handler,
380+
exit_handler: self.exit_handler,
381+
breadth_first: self.breadth_first,
382+
}
383+
}
384+
385+
/// Returns a reference to the current spawn handler.
386+
fn get_spawn_handler(&mut self) -> &mut S {
387+
&mut self.spawn_handler
388+
}
210389

211390
/// Get the number of threads that will be used for the thread
212391
/// pool. See `num_threads()` for more information.
@@ -276,7 +455,7 @@ impl ThreadPoolBuilder {
276455
/// replacement of the now deprecated `RAYON_RS_NUM_CPUS` environment
277456
/// variable. If both variables are specified, `RAYON_NUM_THREADS` will
278457
/// be prefered.
279-
pub fn num_threads(mut self, num_threads: usize) -> ThreadPoolBuilder {
458+
pub fn num_threads(mut self, num_threads: usize) -> Self {
280459
self.num_threads = num_threads;
281460
self
282461
}
@@ -300,7 +479,7 @@ impl ThreadPoolBuilder {
300479
/// If the panic handler itself panics, this will abort the
301480
/// process. To prevent this, wrap the body of your panic handler
302481
/// in a call to `std::panic::catch_unwind()`.
303-
pub fn panic_handler<H>(mut self, panic_handler: H) -> ThreadPoolBuilder
482+
pub fn panic_handler<H>(mut self, panic_handler: H) -> Self
304483
where
305484
H: Fn(Box<Any + Send>) + Send + Sync + 'static,
306485
{
@@ -368,7 +547,7 @@ impl ThreadPoolBuilder {
368547
/// Note that this same closure may be invoked multiple times in parallel.
369548
/// If this closure panics, the panic will be passed to the panic handler.
370549
/// If that handler returns, then startup will continue normally.
371-
pub fn start_handler<H>(mut self, start_handler: H) -> ThreadPoolBuilder
550+
pub fn start_handler<H>(mut self, start_handler: H) -> Self
372551
where
373552
H: Fn(usize) + Send + Sync + 'static,
374553
{
@@ -387,7 +566,7 @@ impl ThreadPoolBuilder {
387566
/// Note that this same closure may be invoked multiple times in parallel.
388567
/// If this closure panics, the panic will be passed to the panic handler.
389568
/// If that handler returns, then the thread will exit normally.
390-
pub fn exit_handler<H>(mut self, exit_handler: H) -> ThreadPoolBuilder
569+
pub fn exit_handler<H>(mut self, exit_handler: H) -> Self
391570
where
392571
H: Fn(usize) + Send + Sync + 'static,
393572
{
@@ -503,7 +682,7 @@ pub fn initialize(config: Configuration) -> Result<(), Box<Error>> {
503682
config.into_builder().build_global().map_err(Box::from)
504683
}
505684

506-
impl fmt::Debug for ThreadPoolBuilder {
685+
impl<S> fmt::Debug for ThreadPoolBuilder<S> {
507686
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
508687
let ThreadPoolBuilder {
509688
ref num_threads,
@@ -512,6 +691,7 @@ impl fmt::Debug for ThreadPoolBuilder {
512691
ref stack_size,
513692
ref start_handler,
514693
ref exit_handler,
694+
spawn_handler: _,
515695
ref breadth_first,
516696
} = *self;
517697

rayon-core/src/private.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
//! The public parts of this private module are used to create traits
2+
//! that cannot be implemented outside of our own crate. This way we
3+
//! can feel free to extend those traits without worrying about it
4+
//! being a breaking change for other implementations.
5+
6+
/// If this type is pub but not publicly reachable, third parties
7+
/// can't name it and can't implement traits using it.
8+
#[allow(missing_debug_implementations)]
9+
pub struct PrivateMarker;
10+
11+
macro_rules! private_decl {
12+
() => {
13+
/// This trait is private; this method exists to make it
14+
/// impossible to implement outside the crate.
15+
#[doc(hidden)]
16+
fn __rayon_private__(&self) -> ::private::PrivateMarker;
17+
}
18+
}
19+
20+
macro_rules! private_impl {
21+
() => {
22+
fn __rayon_private__(&self) -> ::private::PrivateMarker {
23+
::private::PrivateMarker
24+
}
25+
}
26+
}

0 commit comments

Comments
 (0)