Skip to content

Commit 3b72c1e

Browse files
Zoxccuviper
authored andcommitted
Add deadlock detection
1 parent 0690f1a commit 3b72c1e

File tree

4 files changed

+164
-16
lines changed

4 files changed

+164
-16
lines changed

rayon-core/src/lib.rs

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,6 @@
2020
//! succeed.
2121
2222
#![doc(html_root_url = "https://docs.rs/rayon-core/1.6")]
23-
#![deny(missing_debug_implementations)]
24-
#![deny(missing_docs)]
25-
#![deny(unreachable_pub)]
2623

2724
use std::any::Any;
2825
use std::env;
@@ -71,6 +68,7 @@ pub mod tlv;
7168
pub mod internal;
7269
pub use join::{join, join_context};
7370
pub use registry::ThreadBuilder;
71+
pub use registry::{mark_blocked, mark_unblocked, Registry};
7472
pub use scope::{scope, Scope};
7573
pub use scope::{scope_fifo, ScopeFifo};
7674
pub use spawn::{spawn, spawn_fifo};
@@ -149,6 +147,9 @@ pub struct ThreadPoolBuilder<S = DefaultSpawn> {
149147
/// The stack size for the created worker threads
150148
stack_size: Option<usize>,
151149

150+
/// Closure invoked on deadlock.
151+
deadlock_handler: Option<Box<DeadlockHandler>>,
152+
152153
/// Closure invoked on worker thread start.
153154
start_handler: Option<Box<StartHandler>>,
154155

@@ -176,6 +177,9 @@ pub struct Configuration {
176177
/// may be invoked multiple times in parallel.
177178
type PanicHandler = dyn Fn(Box<dyn Any + Send>) + Send + Sync;
178179

180+
/// The type for a closure that gets invoked when the Rayon thread pool deadlocks
181+
type DeadlockHandler = dyn Fn() + Send + Sync;
182+
179183
/// The type for a closure that gets invoked when a thread starts. The
180184
/// closure is passed the index of the thread on which it is invoked.
181185
/// Note that this same closure may be invoked multiple times in parallel.
@@ -196,6 +200,7 @@ impl Default for ThreadPoolBuilder {
196200
stack_size: None,
197201
start_handler: None,
198202
exit_handler: None,
203+
deadlock_handler: None,
199204
spawn_handler: DefaultSpawn,
200205
breadth_first: false,
201206
}
@@ -382,6 +387,7 @@ impl<S> ThreadPoolBuilder<S> {
382387
stack_size: self.stack_size,
383388
start_handler: self.start_handler,
384389
exit_handler: self.exit_handler,
390+
deadlock_handler: self.deadlock_handler,
385391
breadth_first: self.breadth_first,
386392
}
387393
}
@@ -540,6 +546,20 @@ impl<S> ThreadPoolBuilder<S> {
540546
self.breadth_first
541547
}
542548

549+
/// Takes the current deadlock callback, leaving `None`.
550+
fn take_deadlock_handler(&mut self) -> Option<Box<DeadlockHandler>> {
551+
self.deadlock_handler.take()
552+
}
553+
554+
/// Set a callback to be invoked on current deadlock.
555+
pub fn deadlock_handler<H>(mut self, deadlock_handler: H) -> Self
556+
where
557+
H: Fn() + Send + Sync + 'static,
558+
{
559+
self.deadlock_handler = Some(Box::new(deadlock_handler));
560+
self
561+
}
562+
543563
/// Takes the current thread start callback, leaving `None`.
544564
fn take_start_handler(&mut self) -> Option<Box<StartHandler>> {
545565
self.start_handler.take()
@@ -693,6 +713,7 @@ impl<S> fmt::Debug for ThreadPoolBuilder<S> {
693713
ref get_thread_name,
694714
ref panic_handler,
695715
ref stack_size,
716+
ref deadlock_handler,
696717
ref start_handler,
697718
ref exit_handler,
698719
spawn_handler: _,
@@ -709,6 +730,7 @@ impl<S> fmt::Debug for ThreadPoolBuilder<S> {
709730
}
710731
let get_thread_name = get_thread_name.as_ref().map(|_| ClosurePlaceholder);
711732
let panic_handler = panic_handler.as_ref().map(|_| ClosurePlaceholder);
733+
let deadlock_handler = deadlock_handler.as_ref().map(|_| ClosurePlaceholder);
712734
let start_handler = start_handler.as_ref().map(|_| ClosurePlaceholder);
713735
let exit_handler = exit_handler.as_ref().map(|_| ClosurePlaceholder);
714736

@@ -717,6 +739,7 @@ impl<S> fmt::Debug for ThreadPoolBuilder<S> {
717739
.field("get_thread_name", &get_thread_name)
718740
.field("panic_handler", &panic_handler)
719741
.field("stack_size", &stack_size)
742+
.field("deadlock_handler", &deadlock_handler)
720743
.field("start_handler", &start_handler)
721744
.field("exit_handler", &exit_handler)
722745
.field("breadth_first", &breadth_first)

rayon-core/src/registry.rs

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,10 @@ use std::thread;
2424
use std::usize;
2525
use unwind;
2626
use util::leak;
27-
use {ErrorKind, ExitHandler, PanicHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder};
27+
use {
28+
DeadlockHandler, ErrorKind, ExitHandler, PanicHandler, StartHandler, ThreadPoolBuildError,
29+
ThreadPoolBuilder,
30+
};
2831

2932
/// Thread builder used for customization via
3033
/// [`ThreadPoolBuilder::spawn_handler`](struct.ThreadPoolBuilder.html#method.spawn_handler).
@@ -133,11 +136,12 @@ where
133136
}
134137
}
135138

136-
pub(super) struct Registry {
139+
pub struct Registry {
137140
thread_infos: Vec<ThreadInfo>,
138141
sleep: Sleep,
139142
injected_jobs: SegQueue<JobRef>,
140143
panic_handler: Option<Box<PanicHandler>>,
144+
deadlock_handler: Option<Box<DeadlockHandler>>,
141145
start_handler: Option<Box<StartHandler>>,
142146
exit_handler: Option<Box<ExitHandler>>,
143147

@@ -237,10 +241,11 @@ impl Registry {
237241

238242
let registry = Arc::new(Registry {
239243
thread_infos: stealers.into_iter().map(ThreadInfo::new).collect(),
240-
sleep: Sleep::new(),
244+
sleep: Sleep::new(n_threads),
241245
injected_jobs: SegQueue::new(),
242246
terminate_latch: CountLatch::new(),
243247
panic_handler: builder.take_panic_handler(),
248+
deadlock_handler: builder.take_deadlock_handler(),
244249
start_handler: builder.take_start_handler(),
245250
exit_handler: builder.take_exit_handler(),
246251
});
@@ -272,7 +277,7 @@ impl Registry {
272277
global_registry().clone()
273278
}
274279

275-
pub(super) fn current() -> Arc<Registry> {
280+
pub fn current() -> Arc<Registry> {
276281
unsafe {
277282
let worker_thread = WorkerThread::current();
278283
if worker_thread.is_null() {
@@ -566,6 +571,24 @@ impl Registry {
566571
}
567572
}
568573

574+
/// Mark a Rayon worker thread as blocked. This triggers the deadlock handler
575+
/// if no other worker thread is active
576+
#[inline]
577+
pub fn mark_blocked() {
578+
let worker_thread = WorkerThread::current();
579+
assert!(!worker_thread.is_null());
580+
unsafe {
581+
let registry = &(*worker_thread).registry;
582+
registry.sleep.mark_blocked(&registry.deadlock_handler)
583+
}
584+
}
585+
586+
/// Mark a previously blocked Rayon worker thread as unblocked
587+
#[inline]
588+
pub fn mark_unblocked(registry: &Registry) {
589+
registry.sleep.mark_unblocked()
590+
}
591+
569592
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
570593
pub(super) struct RegistryId {
571594
addr: usize,
@@ -720,7 +743,11 @@ impl WorkerThread {
720743
yields = self.registry.sleep.work_found(self.index, yields);
721744
self.execute(job);
722745
} else {
723-
yields = self.registry.sleep.no_work_found(self.index, yields);
746+
yields = self.registry.sleep.no_work_found(
747+
self.index,
748+
yields,
749+
&self.registry.deadlock_handler,
750+
);
724751
}
725752
}
726753

rayon-core/src/sleep/README.md

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -386,3 +386,36 @@ some of them were hit hard:
386386
- 8-10% overhead on nbody-parreduce
387387
- 35% overhead on increment-all
388388
- 245% overhead on join-recursively
389+
390+
# Deadlock detection
391+
392+
This module tracks a number of variables in order to detect deadlocks due to user code blocking.
393+
These variables are stored in the `SleepData` struct which itself is kept behind a mutex.
394+
It contains the following fields:
395+
- `worker_count` - The number of threads in the thread pool.
396+
- `active_threads` - The number of threads in the thread pool which are running
397+
and aren't blocked in user code or sleeping.
398+
- `blocked_threads` - The number of threads which are blocked in user code.
399+
This doesn't include threads blocked by Rayon.
400+
401+
User code can indicate blocking by calling `mark_blocked` before blocking and
402+
calling `mark_unblocked` before unblocking a thread.
403+
This will adjust `active_threads` and `blocked_threads` accordingly.
404+
405+
When we tickle the thread pool in `Sleep::tickle_cold`, we set `active_threads` to
406+
`worker_count` - `blocked_threads` since we wake up all Rayon threads, but not thread blocked
407+
by user code.
408+
409+
A deadlock is detected by checking if `active_threads` is 0 and `blocked_threads` is above 0.
410+
If we ignored `blocked_threads` we would have a deadlock
411+
immediately when creating the thread pool.
412+
We would also deadlock once the thread pool ran out of work.
413+
It is not possible for Rayon itself to deadlock.
414+
Deadlocks can only be caused by user code blocking, so this condition doesn't miss any deadlocks.
415+
416+
We check for the deadlock condition when
417+
threads fall asleep in `mark_unblocked` and in `Sleep::sleep`.
418+
If there's a deadlock detected we call the user provided deadlock handler while we hold the
419+
lock to `SleepData`. This means the deadlock handler cannot call `mark_blocked` and
420+
`mark_unblocked`. The user is expected to handle the deadlock in some non-Rayon thread.
421+
Once the deadlock handler returns, the thread which called the deadlock handler will go to sleep.

rayon-core/src/sleep/mod.rs

Lines changed: 73 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,34 @@ use std::sync::atomic::{AtomicUsize, Ordering};
66
use std::sync::{Condvar, Mutex};
77
use std::thread;
88
use std::usize;
9+
use DeadlockHandler;
10+
11+
struct SleepData {
12+
/// The number of threads in the thread pool.
13+
worker_count: usize,
14+
15+
/// The number of threads in the thread pool which are running and
16+
/// aren't blocked in user code or sleeping.
17+
active_threads: usize,
18+
19+
/// The number of threads which are blocked in user code.
20+
/// This doesn't include threads blocked by this module.
21+
blocked_threads: usize,
22+
}
23+
24+
impl SleepData {
25+
/// Checks if the conditions for a deadlock holds and if so calls the deadlock handler
26+
#[inline]
27+
pub fn deadlock_check(&self, deadlock_handler: &Option<Box<DeadlockHandler>>) {
28+
if self.active_threads == 0 && self.blocked_threads > 0 {
29+
(deadlock_handler.as_ref().unwrap())();
30+
}
31+
}
32+
}
933

1034
pub(super) struct Sleep {
1135
state: AtomicUsize,
12-
data: Mutex<()>,
36+
data: Mutex<SleepData>,
1337
tickle: Condvar,
1438
}
1539

@@ -20,14 +44,42 @@ const ROUNDS_UNTIL_SLEEPY: usize = 32;
2044
const ROUNDS_UNTIL_ASLEEP: usize = 64;
2145

2246
impl Sleep {
23-
pub(super) fn new() -> Sleep {
47+
pub(super) fn new(worker_count: usize) -> Sleep {
2448
Sleep {
2549
state: AtomicUsize::new(AWAKE),
26-
data: Mutex::new(()),
50+
data: Mutex::new(SleepData {
51+
worker_count,
52+
active_threads: worker_count,
53+
blocked_threads: 0,
54+
}),
2755
tickle: Condvar::new(),
2856
}
2957
}
3058

59+
/// Mark a Rayon worker thread as blocked. This triggers the deadlock handler
60+
/// if no other worker thread is active
61+
#[inline]
62+
pub fn mark_blocked(&self, deadlock_handler: &Option<Box<DeadlockHandler>>) {
63+
let mut data = self.data.lock().unwrap();
64+
debug_assert!(data.active_threads > 0);
65+
debug_assert!(data.blocked_threads < data.worker_count);
66+
debug_assert!(data.active_threads > 0);
67+
data.active_threads -= 1;
68+
data.blocked_threads += 1;
69+
70+
data.deadlock_check(deadlock_handler);
71+
}
72+
73+
/// Mark a previously blocked Rayon worker thread as unblocked
74+
#[inline]
75+
pub fn mark_unblocked(&self) {
76+
let mut data = self.data.lock().unwrap();
77+
debug_assert!(data.active_threads < data.worker_count);
78+
debug_assert!(data.blocked_threads > 0);
79+
data.active_threads += 1;
80+
data.blocked_threads -= 1;
81+
}
82+
3183
fn anyone_sleeping(&self, state: usize) -> bool {
3284
state & SLEEPING != 0
3385
}
@@ -61,7 +113,12 @@ impl Sleep {
61113
}
62114

63115
#[inline]
64-
pub(super) fn no_work_found(&self, worker_index: usize, yields: usize) -> usize {
116+
pub(super) fn no_work_found(
117+
&self,
118+
worker_index: usize,
119+
yields: usize,
120+
deadlock_handler: &Option<Box<DeadlockHandler>>,
121+
) -> usize {
65122
log!(DidNotFindWork {
66123
worker: worker_index,
67124
yields: yields,
@@ -88,7 +145,7 @@ impl Sleep {
88145
}
89146
} else {
90147
debug_assert_eq!(yields, ROUNDS_UNTIL_ASLEEP);
91-
self.sleep(worker_index);
148+
self.sleep(worker_index, deadlock_handler);
92149
0
93150
}
94151
}
@@ -122,7 +179,10 @@ impl Sleep {
122179
old_state: old_state,
123180
});
124181
if self.anyone_sleeping(old_state) {
125-
let _data = self.data.lock().unwrap();
182+
let mut data = self.data.lock().unwrap();
183+
// Set the active threads to the number of workers,
184+
// excluding threads blocked by the user since we won't wake those up
185+
data.active_threads = data.worker_count - data.blocked_threads;
126186
self.tickle.notify_all();
127187
}
128188
}
@@ -188,7 +248,7 @@ impl Sleep {
188248
self.worker_is_sleepy(state, worker_index)
189249
}
190250

191-
fn sleep(&self, worker_index: usize) {
251+
fn sleep(&self, worker_index: usize, deadlock_handler: &Option<Box<DeadlockHandler>>) {
192252
loop {
193253
// Acquire here suffices. If we observe that the current worker is still
194254
// sleepy, then in fact we know that no writes have occurred, and anyhow
@@ -235,7 +295,7 @@ impl Sleep {
235295
// reason for the `compare_exchange` to fail is if an
236296
// awaken comes, in which case the next cycle around
237297
// the loop will just return.
238-
let data = self.data.lock().unwrap();
298+
let mut data = self.data.lock().unwrap();
239299

240300
// This must be SeqCst on success because we want to
241301
// ensure:
@@ -264,6 +324,11 @@ impl Sleep {
264324
log!(FellAsleep {
265325
worker: worker_index
266326
});
327+
328+
// Decrement the number of active threads and check for a deadlock
329+
data.active_threads -= 1;
330+
data.deadlock_check(deadlock_handler);
331+
267332
let _ = self.tickle.wait(data).unwrap();
268333
log!(GotAwoken {
269334
worker: worker_index

0 commit comments

Comments
 (0)