Skip to content

Commit 86060c2

Browse files
committed
Add callbacks for when threads start and stop doing work
1 parent 0f55a99 commit 86060c2

File tree

3 files changed

+92
-9
lines changed

3 files changed

+92
-9
lines changed

rayon-core/src/lib.rs

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,12 @@ pub struct ThreadPoolBuilder {
152152
/// Closure invoked on worker thread start.
153153
main_handler: Option<Box<MainHandler>>,
154154

155+
/// Closure invoked when starting computations in a thread.
156+
acquire_thread_handler: Option<Box<AcquireThreadHandler>>,
157+
158+
/// Closure invoked when blocking in a thread.
159+
release_thread_handler: Option<Box<ReleaseThreadHandler>>,
160+
155161
/// If false, worker threads will execute spawned jobs in a
156162
/// "depth-first" fashion. If true, they will do a "breadth-first"
157163
/// fashion. Depth-first is the default.
@@ -190,6 +196,14 @@ type ExitHandler = Fn(usize) + Send + Sync;
190196
/// Note that this same closure may be invoked multiple times in parallel.
191197
type MainHandler = Fn(usize, &mut FnMut()) + Send + Sync;
192198

199+
/// The type for a closure that gets invoked before starting computations in a thread.
200+
/// Note that this same closure may be invoked multiple times in parallel.
201+
type AcquireThreadHandler = Fn() + Send + Sync;
202+
203+
/// The type for a closure that gets invoked before blocking in a thread.
204+
/// Note that this same closure may be invoked multiple times in parallel.
205+
type ReleaseThreadHandler = Fn() + Send + Sync;
206+
193207
impl ThreadPoolBuilder {
194208
/// Creates and returns a valid rayon thread pool builder, but does not initialize it.
195209
pub fn new() -> ThreadPoolBuilder {
@@ -363,6 +377,32 @@ impl ThreadPoolBuilder {
363377
self.breadth_first
364378
}
365379

380+
/// Takes the current acquire thread callback, leaving `None`.
381+
fn take_acquire_thread_handler(&mut self) -> Option<Box<AcquireThreadHandler>> {
382+
self.acquire_thread_handler.take()
383+
}
384+
385+
/// Set a callback to be invoked when starting computations in a thread.
386+
pub fn acquire_thread_handler<H>(mut self, acquire_thread_handler: H) -> ThreadPoolBuilder
387+
where H: Fn() + Send + Sync + 'static
388+
{
389+
self.acquire_thread_handler = Some(Box::new(acquire_thread_handler));
390+
self
391+
}
392+
393+
/// Takes the current release thread callback, leaving `None`.
394+
fn take_release_thread_handler(&mut self) -> Option<Box<ReleaseThreadHandler>> {
395+
self.release_thread_handler.take()
396+
}
397+
398+
/// Set a callback to be invoked when blocking in thread.
399+
pub fn release_thread_handler<H>(mut self, release_thread_handler: H) -> ThreadPoolBuilder
400+
where H: Fn() + Send + Sync + 'static
401+
{
402+
self.release_thread_handler = Some(Box::new(release_thread_handler));
403+
self
404+
}
405+
366406
/// Takes the current deadlock callback, leaving `None`.
367407
fn take_deadlock_handler(&mut self) -> Option<Box<DeadlockHandler>> {
368408
self.deadlock_handler.take()
@@ -546,10 +586,12 @@ impl fmt::Debug for ThreadPoolBuilder {
546586
ref get_thread_name,
547587
ref panic_handler,
548588
ref stack_size,
549-
ref deadlock_handler,
589+
ref deadlock_handler,
550590
ref start_handler,
551591
ref main_handler,
552592
ref exit_handler,
593+
ref acquire_thread_handler,
594+
ref release_thread_handler,
553595
ref breadth_first,
554596
} = *self;
555597

@@ -567,6 +609,8 @@ impl fmt::Debug for ThreadPoolBuilder {
567609
let start_handler = start_handler.as_ref().map(|_| ClosurePlaceholder);
568610
let exit_handler = exit_handler.as_ref().map(|_| ClosurePlaceholder);
569611
let main_handler = main_handler.as_ref().map(|_| ClosurePlaceholder);
612+
let acquire_thread_handler = acquire_thread_handler.as_ref().map(|_| ClosurePlaceholder);
613+
let release_thread_handler = release_thread_handler.as_ref().map(|_| ClosurePlaceholder);
570614

571615
f.debug_struct("ThreadPoolBuilder")
572616
.field("num_threads", num_threads)
@@ -577,6 +621,8 @@ impl fmt::Debug for ThreadPoolBuilder {
577621
.field("start_handler", &start_handler)
578622
.field("exit_handler", &exit_handler)
579623
.field("main_handler", &main_handler)
624+
.field("acquire_thread_handler", &acquire_thread_handler)
625+
.field("release_thread_handler", &release_thread_handler)
580626
.field("breadth_first", &breadth_first)
581627
.finish()
582628
}

rayon-core/src/registry.rs

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,21 @@ use std::thread;
1818
use std::usize;
1919
use unwind;
2020
use util::leak;
21-
use {ErrorKind, ExitHandler, PanicHandler, DeadlockHandler, StartHandler,
22-
MainHandler, ThreadPoolBuildError, ThreadPoolBuilder};
21+
use {ErrorKind, ExitHandler, PanicHandler, DeadlockHandler, StartHandler, AcquireThreadHandler,
22+
MainHandler, ThreadPoolBuildError, ThreadPoolBuilder, ReleaseThreadHandler};
2323

2424
pub struct Registry {
2525
thread_infos: Vec<ThreadInfo>,
2626
state: Mutex<RegistryState>,
2727
sleep: Sleep,
2828
job_uninjector: Stealer<JobRef>,
2929
panic_handler: Option<Box<PanicHandler>>,
30-
deadlock_handler: Option<Box<DeadlockHandler>>,
30+
pub(crate) deadlock_handler: Option<Box<DeadlockHandler>>,
3131
start_handler: Option<Box<StartHandler>>,
3232
exit_handler: Option<Box<ExitHandler>>,
3333
main_handler: Option<Box<MainHandler>>,
34+
pub(crate) acquire_thread_handler: Option<Box<AcquireThreadHandler>>,
35+
pub(crate) release_thread_handler: Option<Box<ReleaseThreadHandler>>,
3436

3537
// When this latch reaches 0, it means that all work on this
3638
// registry must be complete. This is ensured in the following ways:
@@ -123,6 +125,8 @@ impl Registry {
123125
start_handler: builder.take_start_handler(),
124126
main_handler: builder.take_main_handler(),
125127
exit_handler: builder.take_exit_handler(),
128+
acquire_thread_handler: builder.take_acquire_thread_handler(),
129+
release_thread_handler: builder.take_release_thread_handler(),
126130
});
127131

128132
// If we return early or panic, make sure to terminate existing threads.
@@ -222,9 +226,23 @@ impl Registry {
222226
/// Waits for the worker threads to stop. This is used for testing
223227
/// -- so we can check that termination actually works.
224228
pub(crate) fn wait_until_stopped(&self) {
229+
self.release_thread();
225230
for info in &self.thread_infos {
226231
info.stopped.wait();
227232
}
233+
self.acquire_thread();
234+
}
235+
236+
pub(crate) fn acquire_thread(&self) {
237+
if let Some(ref acquire_thread_handler) = self.acquire_thread_handler {
238+
acquire_thread_handler();
239+
}
240+
}
241+
242+
pub(crate) fn release_thread(&self) {
243+
if let Some(ref release_thread_handler) = self.release_thread_handler {
244+
release_thread_handler();
245+
}
228246
}
229247

230248
/// ////////////////////////////////////////////////////////////////////////
@@ -375,7 +393,9 @@ impl Registry {
375393
op(&*worker_thread, true)
376394
}, LockLatch::new());
377395
self.inject(&[job.as_job_ref()]);
396+
self.release_thread();
378397
job.latch.wait();
398+
self.acquire_thread();
379399
job.into_result()
380400
}
381401

@@ -614,7 +634,7 @@ impl WorkerThread {
614634
yields = self.registry.sleep.no_work_found(
615635
self.index,
616636
yields,
617-
&self.registry.deadlock_handler
637+
&self.registry
618638
);
619639
}
620640
}
@@ -716,6 +736,8 @@ unsafe fn main_loop(
716736
worker_thread.wait_until(&registry.terminate_latch);
717737
};
718738

739+
registry.acquire_thread();
740+
719741
if let Some(ref handler) = registry.main_handler {
720742
match unwind::halt_unwinding(|| handler(index, &mut work)) {
721743
Ok(()) => {
@@ -748,6 +770,8 @@ unsafe fn main_loop(
748770
}
749771
// We're already exiting the thread, there's nothing else to do.
750772
}
773+
774+
registry.release_thread();
751775
}
752776

753777
/// If already in a worker-thread, just execute `op`. Otherwise,

rayon-core/src/sleep/mod.rs

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
44
use DeadlockHandler;
55
use log::Event::*;
6+
use registry::Registry;
67
use std::sync::atomic::{AtomicUsize, Ordering};
78
use std::sync::{Condvar, Mutex};
89
use std::thread;
@@ -113,7 +114,12 @@ impl Sleep {
113114
}
114115

115116
#[inline]
116-
pub fn no_work_found(&self, worker_index: usize, yields: usize, deadlock_handler: &Option<Box<DeadlockHandler>>) -> usize {
117+
pub fn no_work_found(
118+
&self,
119+
worker_index: usize,
120+
yields: usize,
121+
registry: &Registry,
122+
) -> usize {
117123
log!(DidNotFindWork {
118124
worker: worker_index,
119125
yields: yields,
@@ -140,7 +146,7 @@ impl Sleep {
140146
}
141147
} else {
142148
debug_assert_eq!(yields, ROUNDS_UNTIL_ASLEEP);
143-
self.sleep(worker_index, deadlock_handler);
149+
self.sleep(worker_index, registry);
144150
0
145151
}
146152
}
@@ -243,7 +249,11 @@ impl Sleep {
243249
self.worker_is_sleepy(state, worker_index)
244250
}
245251

246-
fn sleep(&self, worker_index: usize, deadlock_handler: &Option<Box<DeadlockHandler>>) {
252+
fn sleep(
253+
&self,
254+
worker_index: usize,
255+
registry: &Registry,
256+
) {
247257
loop {
248258
// Acquire here suffices. If we observe that the current worker is still
249259
// sleepy, then in fact we know that no writes have occurred, and anyhow
@@ -322,12 +332,15 @@ impl Sleep {
322332

323333
// Decrement the number of active threads and check for a deadlock
324334
data.active_threads -= 1;
325-
data.deadlock_check(deadlock_handler);
335+
data.deadlock_check(&registry.deadlock_handler);
336+
337+
registry.release_thread();
326338

327339
let _ = self.tickle.wait(data).unwrap();
328340
log!(GotAwoken {
329341
worker: worker_index
330342
});
343+
registry.acquire_thread();
331344
return;
332345
}
333346
} else {

0 commit comments

Comments
 (0)