Skip to content

Commit 0f55a99

Browse files
committed
Add deadlock detection
1 parent 046336e commit 0f55a99

File tree

4 files changed

+156
-21
lines changed

4 files changed

+156
-21
lines changed

rayon-core/src/lib.rs

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@
2020
//! succeed.
2121
2222
#![doc(html_root_url = "https://docs.rs/rayon-core/1.4")]
23-
#![deny(missing_debug_implementations)]
24-
#![deny(missing_docs)]
2523

2624
use std::any::Any;
2725
use std::env;
@@ -65,6 +63,7 @@ pub mod tlv;
6563
pub mod internal;
6664
pub use join::{join, join_context};
6765
pub use scope::{scope, Scope};
66+
pub use registry::{Registry, mark_blocked, mark_unblocked};
6867
pub use spawn::spawn;
6968
pub use worker_local::WorkerLocal;
7069

@@ -141,6 +140,9 @@ pub struct ThreadPoolBuilder {
141140
/// The stack size for the created worker threads
142141
stack_size: Option<usize>,
143142

143+
/// Closure invoked on deadlock.
144+
deadlock_handler: Option<Box<DeadlockHandler>>,
145+
144146
/// Closure invoked on worker thread start.
145147
start_handler: Option<Box<StartHandler>>,
146148

@@ -169,6 +171,9 @@ pub struct Configuration {
169171
/// may be invoked multiple times in parallel.
170172
type PanicHandler = Fn(Box<Any + Send>) + Send + Sync;
171173

174+
/// The type for a closure that gets invoked when the Rayon thread pool deadlocks
175+
type DeadlockHandler = Fn() + Send + Sync;
176+
172177
/// The type for a closure that gets invoked when a thread starts. The
173178
/// closure is passed the index of the thread on which it is invoked.
174179
/// Note that this same closure may be invoked multiple times in parallel.
@@ -358,6 +363,19 @@ impl ThreadPoolBuilder {
358363
self.breadth_first
359364
}
360365

366+
/// Takes the current deadlock callback, leaving `None`.
367+
fn take_deadlock_handler(&mut self) -> Option<Box<DeadlockHandler>> {
368+
self.deadlock_handler.take()
369+
}
370+
371+
/// Set a callback to be invoked on current deadlock.
372+
pub fn deadlock_handler<H>(mut self, deadlock_handler: H) -> ThreadPoolBuilder
373+
where H: Fn() + Send + Sync + 'static
374+
{
375+
self.deadlock_handler = Some(Box::new(deadlock_handler));
376+
self
377+
}
378+
361379
/// Takes the current thread start callback, leaving `None`.
362380
fn take_start_handler(&mut self) -> Option<Box<StartHandler>> {
363381
self.start_handler.take()
@@ -528,6 +546,7 @@ impl fmt::Debug for ThreadPoolBuilder {
528546
ref get_thread_name,
529547
ref panic_handler,
530548
ref stack_size,
549+
ref deadlock_handler,
531550
ref start_handler,
532551
ref main_handler,
533552
ref exit_handler,
@@ -544,6 +563,7 @@ impl fmt::Debug for ThreadPoolBuilder {
544563
}
545564
let get_thread_name = get_thread_name.as_ref().map(|_| ClosurePlaceholder);
546565
let panic_handler = panic_handler.as_ref().map(|_| ClosurePlaceholder);
566+
let deadlock_handler = deadlock_handler.as_ref().map(|_| ClosurePlaceholder);
547567
let start_handler = start_handler.as_ref().map(|_| ClosurePlaceholder);
548568
let exit_handler = exit_handler.as_ref().map(|_| ClosurePlaceholder);
549569
let main_handler = main_handler.as_ref().map(|_| ClosurePlaceholder);
@@ -553,6 +573,7 @@ impl fmt::Debug for ThreadPoolBuilder {
553573
.field("get_thread_name", &get_thread_name)
554574
.field("panic_handler", &panic_handler)
555575
.field("stack_size", &stack_size)
576+
.field("deadlock_handler", &deadlock_handler)
556577
.field("start_handler", &start_handler)
557578
.field("exit_handler", &exit_handler)
558579
.field("main_handler", &main_handler)

rayon-core/src/registry.rs

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use std::thread;
1818
use std::usize;
1919
use unwind;
2020
use util::leak;
21-
use {ErrorKind, ExitHandler, PanicHandler, StartHandler,
21+
use {ErrorKind, ExitHandler, PanicHandler, DeadlockHandler, StartHandler,
2222
MainHandler, ThreadPoolBuildError, ThreadPoolBuilder};
2323

2424
pub struct Registry {
@@ -27,6 +27,7 @@ pub struct Registry {
2727
sleep: Sleep,
2828
job_uninjector: Stealer<JobRef>,
2929
panic_handler: Option<Box<PanicHandler>>,
30+
deadlock_handler: Option<Box<DeadlockHandler>>,
3031
start_handler: Option<Box<StartHandler>>,
3132
exit_handler: Option<Box<ExitHandler>>,
3233
main_handler: Option<Box<MainHandler>>,
@@ -114,10 +115,11 @@ impl Registry {
114115
let registry = Arc::new(Registry {
115116
thread_infos: stealers.into_iter().map(|s| ThreadInfo::new(s)).collect(),
116117
state: Mutex::new(RegistryState::new(inj_worker)),
117-
sleep: Sleep::new(),
118+
sleep: Sleep::new(n_threads),
118119
job_uninjector: inj_stealer,
119120
terminate_latch: CountLatch::new(),
120121
panic_handler: builder.take_panic_handler(),
122+
deadlock_handler: builder.take_deadlock_handler(),
121123
start_handler: builder.take_start_handler(),
122124
main_handler: builder.take_main_handler(),
123125
exit_handler: builder.take_exit_handler(),
@@ -367,14 +369,11 @@ impl Registry {
367369
{
368370
// This thread isn't a member of *any* thread pool, so just block.
369371
debug_assert!(WorkerThread::current().is_null());
370-
let job = StackJob::new(
371-
|injected| {
372-
let worker_thread = WorkerThread::current();
373-
assert!(injected && !worker_thread.is_null());
374-
op(&*worker_thread, true)
375-
},
376-
LockLatch::new(),
377-
);
372+
let job = StackJob::new(|injected| {
373+
let worker_thread = WorkerThread::current();
374+
assert!(injected && !worker_thread.is_null());
375+
op(&*worker_thread, true)
376+
}, LockLatch::new());
378377
self.inject(&[job.as_job_ref()]);
379378
job.latch.wait();
380379
job.into_result()
@@ -436,6 +435,24 @@ impl Registry {
436435
}
437436
}
438437

438+
/// Mark a Rayon worker thread as blocked. This triggers the deadlock handler
439+
/// if no other worker thread is active
440+
#[inline]
441+
pub fn mark_blocked() {
442+
let worker_thread = WorkerThread::current();
443+
assert!(!worker_thread.is_null());
444+
unsafe {
445+
let registry = &(*worker_thread).registry;
446+
registry.sleep.mark_blocked(&registry.deadlock_handler)
447+
}
448+
}
449+
450+
/// Mark a previously blocked Rayon worker thread as unblocked
451+
#[inline]
452+
pub fn mark_unblocked(registry: &Registry) {
453+
registry.sleep.mark_unblocked()
454+
}
455+
439456
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
440457
pub struct RegistryId {
441458
addr: usize,
@@ -594,7 +611,11 @@ impl WorkerThread {
594611
yields = self.registry.sleep.work_found(self.index, yields);
595612
self.execute(job);
596613
} else {
597-
yields = self.registry.sleep.no_work_found(self.index, yields);
614+
yields = self.registry.sleep.no_work_found(
615+
self.index,
616+
yields,
617+
&self.registry.deadlock_handler
618+
);
598619
}
599620
}
600621

rayon-core/src/sleep/README.md

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -386,3 +386,36 @@ some of them were hit hard:
386386
- 8-10% overhead on nbody-parreduce
387387
- 35% overhead on increment-all
388388
- 245% overhead on join-recursively
389+
390+
# Deadlock detection
391+
392+
This module tracks a number of variables in order to detect deadlocks due to user code blocking.
393+
These variables are stored in the `SleepData` struct which itself is kept behind a mutex.
394+
It contains the following fields:
395+
- `worker_count` - The number of threads in the thread pool.
396+
- `active_threads` - The number of threads in the thread pool which are running
397+
and aren't blocked in user code or sleeping.
398+
- `blocked_threads` - The number of threads which are blocked in user code.
399+
This doesn't include threads blocked by Rayon.
400+
401+
User code can indicate blocking by calling `mark_blocked` before blocking and
402+
calling `mark_unblocked` before unblocking a thread.
403+
This will adjust `active_threads` and `blocked_threads` accordingly.
404+
405+
When we tickle the thread pool in `Sleep::tickle_cold`, we set `active_threads` to
406+
`worker_count` - `blocked_threads` since we wake up all Rayon threads, but not thread blocked
407+
by user code.
408+
409+
A deadlock is detected by checking if `active_threads` is 0 and `blocked_threads` is above 0.
410+
If we ignored `blocked_threads` we would have a deadlock
411+
immediately when creating the thread pool.
412+
We would also deadlock once the thread pool ran out of work.
413+
It is not possible for Rayon itself to deadlock.
414+
Deadlocks can only be caused by user code blocking, so this condition doesn't miss any deadlocks.
415+
416+
We check for the deadlock condition when
417+
threads fall asleep in `mark_unblocked` and in `Sleep::sleep`.
418+
If there's a deadlock detected we call the user provided deadlock handler while we hold the
419+
lock to `SleepData`. This means the deadlock handler cannot call `mark_blocked` and
420+
`mark_unblocked`. The user is expected to handle the deadlock in some non-Rayon thread.
421+
Once the deadlock handler returns, the thread which called the deadlock handler will go to sleep.

rayon-core/src/sleep/mod.rs

Lines changed: 68 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,39 @@
11
//! Code that decides when workers should go to sleep. See README.md
22
//! for an overview.
33
4+
use DeadlockHandler;
45
use log::Event::*;
56
use std::sync::atomic::{AtomicUsize, Ordering};
67
use std::sync::{Condvar, Mutex};
78
use std::thread;
89
use std::usize;
910

11+
struct SleepData {
12+
/// The number of threads in the thread pool.
13+
worker_count: usize,
14+
15+
/// The number of threads in the thread pool which are running and
16+
/// aren't blocked in user code or sleeping.
17+
active_threads: usize,
18+
19+
/// The number of threads which are blocked in user code.
20+
/// This doesn't include threads blocked by this module.
21+
blocked_threads: usize,
22+
}
23+
24+
impl SleepData {
25+
/// Checks if the conditions for a deadlock holds and if so calls the deadlock handler
26+
#[inline]
27+
pub fn deadlock_check(&self, deadlock_handler: &Option<Box<DeadlockHandler>>) {
28+
if self.active_threads == 0 && self.blocked_threads > 0 {
29+
(deadlock_handler.as_ref().unwrap())();
30+
}
31+
}
32+
}
33+
1034
pub struct Sleep {
1135
state: AtomicUsize,
12-
data: Mutex<()>,
36+
data: Mutex<SleepData>,
1337
tickle: Condvar,
1438
}
1539

@@ -20,14 +44,42 @@ const ROUNDS_UNTIL_SLEEPY: usize = 32;
2044
const ROUNDS_UNTIL_ASLEEP: usize = 64;
2145

2246
impl Sleep {
23-
pub fn new() -> Sleep {
47+
pub fn new(worker_count: usize) -> Sleep {
2448
Sleep {
2549
state: AtomicUsize::new(AWAKE),
26-
data: Mutex::new(()),
50+
data: Mutex::new(SleepData {
51+
worker_count,
52+
active_threads: worker_count,
53+
blocked_threads: 0,
54+
}),
2755
tickle: Condvar::new(),
2856
}
2957
}
3058

59+
/// Mark a Rayon worker thread as blocked. This triggers the deadlock handler
60+
/// if no other worker thread is active
61+
#[inline]
62+
pub fn mark_blocked(&self, deadlock_handler: &Option<Box<DeadlockHandler>>) {
63+
let mut data = self.data.lock().unwrap();
64+
debug_assert!(data.active_threads > 0);
65+
debug_assert!(data.blocked_threads < data.worker_count);
66+
debug_assert!(data.active_threads > 0);
67+
data.active_threads -= 1;
68+
data.blocked_threads += 1;
69+
70+
data.deadlock_check(deadlock_handler);
71+
}
72+
73+
/// Mark a previously blocked Rayon worker thread as unblocked
74+
#[inline]
75+
pub fn mark_unblocked(&self) {
76+
let mut data = self.data.lock().unwrap();
77+
debug_assert!(data.active_threads < data.worker_count);
78+
debug_assert!(data.blocked_threads > 0);
79+
data.active_threads += 1;
80+
data.blocked_threads -= 1;
81+
}
82+
3183
fn anyone_sleeping(&self, state: usize) -> bool {
3284
state & SLEEPING != 0
3385
}
@@ -61,7 +113,7 @@ impl Sleep {
61113
}
62114

63115
#[inline]
64-
pub fn no_work_found(&self, worker_index: usize, yields: usize) -> usize {
116+
pub fn no_work_found(&self, worker_index: usize, yields: usize, deadlock_handler: &Option<Box<DeadlockHandler>>) -> usize {
65117
log!(DidNotFindWork {
66118
worker: worker_index,
67119
yields: yields,
@@ -88,7 +140,7 @@ impl Sleep {
88140
}
89141
} else {
90142
debug_assert_eq!(yields, ROUNDS_UNTIL_ASLEEP);
91-
self.sleep(worker_index);
143+
self.sleep(worker_index, deadlock_handler);
92144
0
93145
}
94146
}
@@ -122,7 +174,10 @@ impl Sleep {
122174
old_state: old_state,
123175
});
124176
if self.anyone_sleeping(old_state) {
125-
let _data = self.data.lock().unwrap();
177+
let mut data = self.data.lock().unwrap();
178+
// Set the active threads to the number of workers,
179+
// excluding threads blocked by the user since we won't wake those up
180+
data.active_threads = data.worker_count - data.blocked_threads;
126181
self.tickle.notify_all();
127182
}
128183
}
@@ -188,7 +243,7 @@ impl Sleep {
188243
self.worker_is_sleepy(state, worker_index)
189244
}
190245

191-
fn sleep(&self, worker_index: usize) {
246+
fn sleep(&self, worker_index: usize, deadlock_handler: &Option<Box<DeadlockHandler>>) {
192247
loop {
193248
// Acquire here suffices. If we observe that the current worker is still
194249
// sleepy, then in fact we know that no writes have occurred, and anyhow
@@ -235,7 +290,7 @@ impl Sleep {
235290
// reason for the `compare_exchange` to fail is if an
236291
// awaken comes, in which case the next cycle around
237292
// the loop will just return.
238-
let data = self.data.lock().unwrap();
293+
let mut data = self.data.lock().unwrap();
239294

240295
// This must be SeqCst on success because we want to
241296
// ensure:
@@ -264,6 +319,11 @@ impl Sleep {
264319
log!(FellAsleep {
265320
worker: worker_index
266321
});
322+
323+
// Decrement the number of active threads and check for a deadlock
324+
data.active_threads -= 1;
325+
data.deadlock_check(deadlock_handler);
326+
267327
let _ = self.tickle.wait(data).unwrap();
268328
log!(GotAwoken {
269329
worker: worker_index

0 commit comments

Comments
 (0)