Skip to content

Commit 327075d

Browse files
committed
Add callbacks for when threads start and stop doing work
1 parent 630975d commit 327075d

File tree

3 files changed

+92
-9
lines changed

3 files changed

+92
-9
lines changed

rayon-core/src/lib.rs

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,12 @@ pub struct ThreadPoolBuilder {
151151
/// Closure invoked on worker thread start.
152152
main_handler: Option<Box<MainHandler>>,
153153

154+
/// Closure invoked when starting computations in a thread.
155+
acquire_thread_handler: Option<Box<AcquireThreadHandler>>,
156+
157+
/// Closure invoked when blocking in a thread.
158+
release_thread_handler: Option<Box<ReleaseThreadHandler>>,
159+
154160
/// If false, worker threads will execute spawned jobs in a
155161
/// "depth-first" fashion. If true, they will do a "breadth-first"
156162
/// fashion. Depth-first is the default.
@@ -189,6 +195,14 @@ type ExitHandler = Fn(usize) + Send + Sync;
189195
/// Note that this same closure may be invoked multiple times in parallel.
190196
type MainHandler = Fn(usize, &mut FnMut()) + Send + Sync;
191197

198+
/// The type for a closure that gets invoked before starting computations in a thread.
199+
/// Note that this same closure may be invoked multiple times in parallel.
200+
type AcquireThreadHandler = Fn() + Send + Sync;
201+
202+
/// The type for a closure that gets invoked before blocking in a thread.
203+
/// Note that this same closure may be invoked multiple times in parallel.
204+
type ReleaseThreadHandler = Fn() + Send + Sync;
205+
192206
impl ThreadPoolBuilder {
193207
/// Creates and returns a valid rayon thread pool builder, but does not initialize it.
194208
pub fn new() -> ThreadPoolBuilder {
@@ -362,6 +376,32 @@ impl ThreadPoolBuilder {
362376
self.breadth_first
363377
}
364378

379+
/// Takes the current acquire thread callback, leaving `None`.
380+
fn take_acquire_thread_handler(&mut self) -> Option<Box<AcquireThreadHandler>> {
381+
self.acquire_thread_handler.take()
382+
}
383+
384+
/// Set a callback to be invoked when starting computations in a thread.
385+
pub fn acquire_thread_handler<H>(mut self, acquire_thread_handler: H) -> ThreadPoolBuilder
386+
where H: Fn() + Send + Sync + 'static
387+
{
388+
self.acquire_thread_handler = Some(Box::new(acquire_thread_handler));
389+
self
390+
}
391+
392+
/// Takes the current release thread callback, leaving `None`.
393+
fn take_release_thread_handler(&mut self) -> Option<Box<ReleaseThreadHandler>> {
394+
self.release_thread_handler.take()
395+
}
396+
397+
/// Set a callback to be invoked when blocking in thread.
398+
pub fn release_thread_handler<H>(mut self, release_thread_handler: H) -> ThreadPoolBuilder
399+
where H: Fn() + Send + Sync + 'static
400+
{
401+
self.release_thread_handler = Some(Box::new(release_thread_handler));
402+
self
403+
}
404+
365405
/// Takes the current deadlock callback, leaving `None`.
366406
fn take_deadlock_handler(&mut self) -> Option<Box<DeadlockHandler>> {
367407
self.deadlock_handler.take()
@@ -545,10 +585,12 @@ impl fmt::Debug for ThreadPoolBuilder {
545585
ref get_thread_name,
546586
ref panic_handler,
547587
ref stack_size,
548-
ref deadlock_handler,
588+
ref deadlock_handler,
549589
ref start_handler,
550590
ref main_handler,
551591
ref exit_handler,
592+
ref acquire_thread_handler,
593+
ref release_thread_handler,
552594
ref breadth_first,
553595
} = *self;
554596

@@ -566,6 +608,8 @@ impl fmt::Debug for ThreadPoolBuilder {
566608
let start_handler = start_handler.as_ref().map(|_| ClosurePlaceholder);
567609
let exit_handler = exit_handler.as_ref().map(|_| ClosurePlaceholder);
568610
let main_handler = main_handler.as_ref().map(|_| ClosurePlaceholder);
611+
let acquire_thread_handler = acquire_thread_handler.as_ref().map(|_| ClosurePlaceholder);
612+
let release_thread_handler = release_thread_handler.as_ref().map(|_| ClosurePlaceholder);
569613

570614
f.debug_struct("ThreadPoolBuilder")
571615
.field("num_threads", num_threads)
@@ -576,6 +620,8 @@ impl fmt::Debug for ThreadPoolBuilder {
576620
.field("start_handler", &start_handler)
577621
.field("exit_handler", &exit_handler)
578622
.field("main_handler", &main_handler)
623+
.field("acquire_thread_handler", &acquire_thread_handler)
624+
.field("release_thread_handler", &release_thread_handler)
579625
.field("breadth_first", &breadth_first)
580626
.finish()
581627
}

rayon-core/src/registry.rs

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,21 @@ use std::thread;
1818
use std::usize;
1919
use unwind;
2020
use util::leak;
21-
use {ErrorKind, ExitHandler, PanicHandler, DeadlockHandler, StartHandler,
22-
MainHandler, ThreadPoolBuildError, ThreadPoolBuilder};
21+
use {ErrorKind, ExitHandler, PanicHandler, DeadlockHandler, StartHandler, AcquireThreadHandler,
22+
MainHandler, ThreadPoolBuildError, ThreadPoolBuilder, ReleaseThreadHandler};
2323

2424
pub struct Registry {
2525
thread_infos: Vec<ThreadInfo>,
2626
state: Mutex<RegistryState>,
2727
sleep: Sleep,
2828
job_uninjector: Stealer<JobRef>,
2929
panic_handler: Option<Box<PanicHandler>>,
30-
deadlock_handler: Option<Box<DeadlockHandler>>,
30+
pub(crate) deadlock_handler: Option<Box<DeadlockHandler>>,
3131
start_handler: Option<Box<StartHandler>>,
3232
exit_handler: Option<Box<ExitHandler>>,
3333
main_handler: Option<Box<MainHandler>>,
34+
pub(crate) acquire_thread_handler: Option<Box<AcquireThreadHandler>>,
35+
pub(crate) release_thread_handler: Option<Box<ReleaseThreadHandler>>,
3436

3537
// When this latch reaches 0, it means that all work on this
3638
// registry must be complete. This is ensured in the following ways:
@@ -123,6 +125,8 @@ impl Registry {
123125
start_handler: builder.take_start_handler(),
124126
main_handler: builder.take_main_handler(),
125127
exit_handler: builder.take_exit_handler(),
128+
acquire_thread_handler: builder.take_acquire_thread_handler(),
129+
release_thread_handler: builder.take_release_thread_handler(),
126130
});
127131

128132
// If we return early or panic, make sure to terminate existing threads.
@@ -222,9 +226,23 @@ impl Registry {
222226
/// Waits for the worker threads to stop. This is used for testing
223227
/// -- so we can check that termination actually works.
224228
pub(crate) fn wait_until_stopped(&self) {
229+
self.release_thread();
225230
for info in &self.thread_infos {
226231
info.stopped.wait();
227232
}
233+
self.acquire_thread();
234+
}
235+
236+
pub(crate) fn acquire_thread(&self) {
237+
if let Some(ref acquire_thread_handler) = self.acquire_thread_handler {
238+
acquire_thread_handler();
239+
}
240+
}
241+
242+
pub(crate) fn release_thread(&self) {
243+
if let Some(ref release_thread_handler) = self.release_thread_handler {
244+
release_thread_handler();
245+
}
228246
}
229247

230248
/// ////////////////////////////////////////////////////////////////////////
@@ -379,7 +397,9 @@ impl Registry {
379397
LockLatch::new(),
380398
);
381399
self.inject(&[job.as_job_ref()]);
400+
self.release_thread();
382401
job.latch.wait();
402+
self.acquire_thread();
383403
job.into_result()
384404
}
385405

@@ -619,7 +639,7 @@ impl WorkerThread {
619639
yields = self.registry.sleep.no_work_found(
620640
self.index,
621641
yields,
622-
&self.registry.deadlock_handler
642+
&self.registry
623643
);
624644
}
625645
}
@@ -721,6 +741,8 @@ unsafe fn main_loop(
721741
worker_thread.wait_until(&registry.terminate_latch);
722742
};
723743

744+
registry.acquire_thread();
745+
724746
if let Some(ref handler) = registry.main_handler {
725747
match unwind::halt_unwinding(|| handler(index, &mut work)) {
726748
Ok(()) => {
@@ -753,6 +775,8 @@ unsafe fn main_loop(
753775
}
754776
// We're already exiting the thread, there's nothing else to do.
755777
}
778+
779+
registry.release_thread();
756780
}
757781

758782
/// If already in a worker-thread, just execute `op`. Otherwise,

rayon-core/src/sleep/mod.rs

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
44
use DeadlockHandler;
55
use log::Event::*;
6+
use registry::Registry;
67
use std::sync::atomic::{AtomicUsize, Ordering};
78
use std::sync::{Condvar, Mutex};
89
use std::thread;
@@ -113,7 +114,12 @@ impl Sleep {
113114
}
114115

115116
#[inline]
116-
pub fn no_work_found(&self, worker_index: usize, yields: usize, deadlock_handler: &Option<Box<DeadlockHandler>>) -> usize {
117+
pub fn no_work_found(
118+
&self,
119+
worker_index: usize,
120+
yields: usize,
121+
registry: &Registry,
122+
) -> usize {
117123
log!(DidNotFindWork {
118124
worker: worker_index,
119125
yields: yields,
@@ -140,7 +146,7 @@ impl Sleep {
140146
}
141147
} else {
142148
debug_assert_eq!(yields, ROUNDS_UNTIL_ASLEEP);
143-
self.sleep(worker_index, deadlock_handler);
149+
self.sleep(worker_index, registry);
144150
0
145151
}
146152
}
@@ -243,7 +249,11 @@ impl Sleep {
243249
self.worker_is_sleepy(state, worker_index)
244250
}
245251

246-
fn sleep(&self, worker_index: usize, deadlock_handler: &Option<Box<DeadlockHandler>>) {
252+
fn sleep(
253+
&self,
254+
worker_index: usize,
255+
registry: &Registry,
256+
) {
247257
loop {
248258
// Acquire here suffices. If we observe that the current worker is still
249259
// sleepy, then in fact we know that no writes have occurred, and anyhow
@@ -322,12 +332,15 @@ impl Sleep {
322332

323333
// Decrement the number of active threads and check for a deadlock
324334
data.active_threads -= 1;
325-
data.deadlock_check(deadlock_handler);
335+
data.deadlock_check(&registry.deadlock_handler);
336+
337+
registry.release_thread();
326338

327339
let _ = self.tickle.wait(data).unwrap();
328340
log!(GotAwoken {
329341
worker: worker_index
330342
});
343+
registry.acquire_thread();
331344
return;
332345
}
333346
} else {

0 commit comments

Comments
 (0)