@@ -194,19 +194,25 @@ impl<'tcx> QueryLatch<'tcx> {
194
194
}
195
195
}
196
196
197
+ /// Awaits the caller on this latch by blocking the current thread.
197
198
fn await ( & self , waiter : & mut QueryWaiter < ' tcx > ) {
198
199
let mut info = self . info . lock ( ) ;
199
200
if !info. complete {
201
+ // We push the waiter on to the `waiters` list. It can be accessed inside
202
+ // the `wait` call below, by 1) the `set` method or 2) by deadlock detection.
203
+ // Both of these will remove it from the `waiters` list before resuming
204
+ // this thread.
200
205
info. waiters . push ( waiter) ;
201
- let condvar = & waiter . condvar ;
206
+
202
207
// If this detects a deadlock and the deadlock handler want to resume this thread
203
208
// we have to be in the `wait` call. This is ensured by the deadlock handler
204
209
// getting the self.info lock.
205
210
rayon_core:: mark_blocked ( ) ;
206
- condvar. wait ( & mut info) ;
211
+ waiter . condvar . wait ( & mut info) ;
207
212
}
208
213
}
209
214
215
+ /// Sets the latch and resumes all waiters on it
210
216
fn set ( & self ) {
211
217
let mut info = self . info . lock ( ) ;
212
218
debug_assert ! ( !info. complete) ;
@@ -219,46 +225,56 @@ impl<'tcx> QueryLatch<'tcx> {
219
225
}
220
226
}
221
227
222
- fn resume_waiter (
228
+ /// Remove a single waiter from the list of waiters.
229
+ /// This is used to break query cycles.
230
+ fn extract_waiter (
223
231
& self ,
224
232
waiter : usize ,
225
- error : CycleError < ' tcx >
226
233
) -> * mut QueryWaiter < ' tcx > {
227
234
let mut info = self . info . lock ( ) ;
228
235
debug_assert ! ( !info. complete) ;
229
236
// Remove the waiter from the list of waiters
230
- let waiter = info. waiters . remove ( waiter) ;
231
-
232
- // Set the cycle error it will be picked it up when resumed
233
- unsafe {
234
- ( * waiter) . cycle = Some ( error) ;
235
- }
236
-
237
- waiter
237
+ info. waiters . remove ( waiter)
238
238
}
239
239
}
240
240
241
+ /// A pointer to an active query job. This is used to give query jobs an identity.
241
242
#[ cfg( parallel_queries) ]
242
243
type Ref < ' tcx > = * const QueryJob < ' tcx > ;
243
244
245
+ /// A resumable waiter of a query. The usize is the index into waiters in the query's latch
244
246
#[ cfg( parallel_queries) ]
245
247
type Waiter < ' tcx > = ( Ref < ' tcx > , usize ) ;
246
248
249
+ /// Visits all the non-resumable and resumable waiters of a query.
250
+ /// Only waiters in a query are visited.
251
+ /// `visit` is called for every waiter and is passed a query waiting on `query_ref`
252
+ /// and a span indicating the reason the query waited on `query_ref`.
253
+ /// If `visit` returns Some, this function returns.
254
+ /// For visits of non-resumable waiters it returns the return value of `visit`.
255
+ /// For visits of resumable waiters it returns Some(Some(Waiter)) which has the
256
+ /// required information to resume the waiter.
257
+ /// If all `visit` calls returns None, this function also returns None.
247
258
#[ cfg( parallel_queries) ]
248
259
fn visit_waiters < ' tcx , F > ( query_ref : Ref < ' tcx > , mut visit : F ) -> Option < Option < Waiter < ' tcx > > >
249
260
where
250
261
F : FnMut ( Span , Ref < ' tcx > ) -> Option < Option < Waiter < ' tcx > > >
251
262
{
252
263
let query = unsafe { & * query_ref } ;
264
+
265
+ // Visit the parent query which is a non-resumable waiter since it's on the same stack
253
266
if let Some ( ref parent) = query. parent {
254
267
if let Some ( cycle) = visit ( query. info . span , & * * parent as Ref ) {
255
268
return Some ( cycle) ;
256
269
}
257
270
}
271
+
272
+ // Visit the explict waiters which use condvars and are resumable
258
273
for ( i, & waiter) in query. latch . info . lock ( ) . waiters . iter ( ) . enumerate ( ) {
259
274
unsafe {
260
275
if let Some ( ref waiter_query) = * ( * waiter) . query {
261
276
if visit ( ( * waiter) . span , & * * waiter_query as Ref ) . is_some ( ) {
277
+ // Return a value which indicates that this waiter can be resumed
262
278
return Some ( Some ( ( query_ref, i) ) ) ;
263
279
}
264
280
}
@@ -267,13 +283,19 @@ where
267
283
None
268
284
}
269
285
286
+ /// Look for query cycles by doing a depth first search starting at `query`.
287
+ /// `span` is the reason for the `query` to execute. This is initially DUMMY_SP.
288
+ /// If a cycle is detected, this initial value is replaced with the span causing
289
+ /// the cycle.
270
290
#[ cfg( parallel_queries) ]
271
291
fn cycle_check < ' tcx > ( query : Ref < ' tcx > ,
272
292
span : Span ,
273
293
stack : & mut Vec < ( Span , Ref < ' tcx > ) > ,
274
294
visited : & mut HashSet < Ref < ' tcx > > ) -> Option < Option < Waiter < ' tcx > > > {
275
295
if visited. contains ( & query) {
276
296
return if let Some ( p) = stack. iter ( ) . position ( |q| q. 1 == query) {
297
+ // We detected a query cycle, fix up the initial span and return Some
298
+
277
299
// Remove previous stack entries
278
300
stack. splice ( 0 ..p, iter:: empty ( ) ) ;
279
301
// Replace the span for the first query with the cycle cause
@@ -284,26 +306,34 @@ fn cycle_check<'tcx>(query: Ref<'tcx>,
284
306
}
285
307
}
286
308
309
+ // Mark this query is visited and add it to the stack
287
310
visited. insert ( query) ;
288
311
stack. push ( ( span, query) ) ;
289
312
313
+ // Visit all the waiters
290
314
let r = visit_waiters ( query, |span, successor| {
291
315
cycle_check ( successor, span, stack, visited)
292
316
} ) ;
293
317
318
+ // Remove the entry in our stack if we didn't find a cycle
294
319
if r. is_none ( ) {
295
320
stack. pop ( ) ;
296
321
}
297
322
298
323
r
299
324
}
300
325
326
+ /// Finds out if there's a path to the compiler root (aka. code which isn't in a query)
327
+ /// from `query` without going through any of the queries in `visited`.
328
+ /// This is achieved with a depth first search.
301
329
#[ cfg( parallel_queries) ]
302
330
fn connected_to_root < ' tcx > ( query : Ref < ' tcx > , visited : & mut HashSet < Ref < ' tcx > > ) -> bool {
331
+ // We already visited this or we're deliberately ignoring it
303
332
if visited. contains ( & query) {
304
333
return false ;
305
334
}
306
335
336
+ // This query is connected to the root (it has no query parent), return true
307
337
if unsafe { ( * query) . parent . is_none ( ) } {
308
338
return true ;
309
339
}
@@ -321,43 +351,43 @@ fn connected_to_root<'tcx>(query: Ref<'tcx>, visited: &mut HashSet<Ref<'tcx>>) -
321
351
} ) . is_some ( )
322
352
}
323
353
324
- # [ cfg ( parallel_queries ) ]
325
- fn query_entry < ' tcx > ( r : Ref < ' tcx > ) -> QueryInfo < ' tcx > {
326
- unsafe { ( * r ) . info . clone ( ) }
327
- }
328
-
354
+ /// Looks for query cycles starting from the last query in `jobs`.
355
+ /// If a cycle is found, all queries in the cycle is removed from `jobs` and
356
+ /// the function return true.
357
+ /// If a cycle was not found, the starting query is removed from `jobs` and
358
+ /// the function returns false.
329
359
#[ cfg( parallel_queries) ]
330
360
fn remove_cycle < ' tcx > (
331
361
jobs : & mut Vec < Ref < ' tcx > > ,
332
362
wakelist : & mut Vec < * mut QueryWaiter < ' tcx > > ,
333
363
tcx : TyCtxt < ' _ , ' tcx , ' _ >
334
- ) {
364
+ ) -> bool {
335
365
let mut visited = HashSet :: new ( ) ;
336
366
let mut stack = Vec :: new ( ) ;
367
+ // Look for a cycle starting with the last query in `jobs`
337
368
if let Some ( waiter) = cycle_check ( jobs. pop ( ) . unwrap ( ) ,
338
369
DUMMY_SP ,
339
370
& mut stack,
340
371
& mut visited) {
341
372
// Reverse the stack so earlier entries require later entries
342
373
stack. reverse ( ) ;
343
374
375
+ // Extract the spans and queries into separate arrays
344
376
let mut spans: Vec < _ > = stack. iter ( ) . map ( |e| e. 0 ) . collect ( ) ;
345
377
let queries = stack. iter ( ) . map ( |e| e. 1 ) ;
346
378
347
- // Shift the spans so that a query is matched the span for its waitee
379
+ // Shift the spans so that queries are matched with the span for their waitee
348
380
let last = spans. pop ( ) . unwrap ( ) ;
349
381
spans. insert ( 0 , last) ;
350
382
383
+ // Zip them back together
351
384
let mut stack: Vec < _ > = spans. into_iter ( ) . zip ( queries) . collect ( ) ;
352
385
353
386
// Remove the queries in our cycle from the list of jobs to look at
354
387
for r in & stack {
355
388
jobs. remove_item ( & r. 1 ) ;
356
389
}
357
390
358
- let ( waitee_query, waiter_idx) = waiter. unwrap ( ) ;
359
- let waitee_query = unsafe { & * waitee_query } ;
360
-
361
391
// Find the queries in the cycle which are
362
392
// connected to queries outside the cycle
363
393
let entry_points: Vec < Ref < ' _ > > = stack. iter ( ) . filter_map ( |query| {
@@ -392,6 +422,7 @@ fn remove_cycle<'tcx>(
392
422
stack. insert ( 0 , last) ;
393
423
}
394
424
425
+ // Create the cycle error
395
426
let mut error = CycleError {
396
427
usage : None ,
397
428
cycle : stack. iter ( ) . map ( |& ( s, q) | QueryInfo {
@@ -400,10 +431,30 @@ fn remove_cycle<'tcx>(
400
431
} ) . collect ( ) ,
401
432
} ;
402
433
403
- wakelist. push ( waitee_query. latch . resume_waiter ( waiter_idx, error) ) ;
434
+ // We unwrap `waiter` here since there must always be one
435
+ // edge which is resumeable / waited using a query latch
436
+ let ( waitee_query, waiter_idx) = waiter. unwrap ( ) ;
437
+ let waitee_query = unsafe { & * waitee_query } ;
438
+
439
+ // Extract the waiter we want to resume
440
+ let waiter = waitee_query. latch . extract_waiter ( waiter_idx) ;
441
+
442
+ // Set the cycle error it will be picked it up when resumed
443
+ unsafe {
444
+ ( * waiter) . cycle = Some ( error) ;
445
+ }
446
+
447
+ // Put the waiter on the list of things to resume
448
+ wakelist. push ( waiter) ;
449
+
450
+ true
451
+ } else {
452
+ false
404
453
}
405
454
}
406
455
456
+ /// Creates a new thread and forwards information in thread locals to it.
457
+ /// The new thread runs the deadlock handler.
407
458
#[ cfg( parallel_queries) ]
408
459
pub fn handle_deadlock ( ) {
409
460
use syntax;
@@ -440,6 +491,11 @@ pub fn handle_deadlock() {
440
491
} ) ;
441
492
}
442
493
494
+ /// Detects query cycles by using depth first search over all active query jobs.
495
+ /// If a query cycle is found it will break the cycle by finding an edge which
496
+ /// uses a query latch and then resuming that waiter.
497
+ /// There may be multiple cycles involved in a deadlock, so this searches
498
+ /// all active queries for cycles before finally resuming all the waiters at once.
443
499
#[ cfg( parallel_queries) ]
444
500
fn deadlock ( tcx : TyCtxt < ' _ , ' _ , ' _ > , registry : & rayon_core:: Registry ) {
445
501
let on_panic = OnDrop ( || {
@@ -450,13 +506,22 @@ fn deadlock(tcx: TyCtxt<'_, '_, '_>, registry: &rayon_core::Registry) {
450
506
let mut wakelist = Vec :: new ( ) ;
451
507
let mut jobs: Vec < _ > = tcx. maps . collect_active_jobs ( ) . iter ( ) . map ( |j| & * * j as Ref ) . collect ( ) ;
452
508
509
+ let mut found_cycle = false ;
510
+
453
511
while jobs. len ( ) > 0 {
454
- remove_cycle ( & mut jobs, & mut wakelist, tcx) ;
512
+ if remove_cycle ( & mut jobs, & mut wakelist, tcx) {
513
+ found_cycle = true ;
514
+ }
455
515
}
456
516
457
- // FIXME: Panic if no cycle is detected
458
-
459
- // FIXME: Write down the conditions when a deadlock happens without a cycle
517
+ // Check that a cycle was found. It is possible for a deadlock to occur without
518
+ // a query cycle if a query which can be waited on uses Rayon to do multithreading
519
+ // internally. Such a query (X) may be executing on 2 threads (A and B) and A may
520
+ // wait using Rayon on B. Rayon may then switch to executing another query (Y)
521
+ // which in turn will wait on X causing a deadlock. We have a false dependency from
522
+ // X to Y due to Rayon waiting and a true dependency from Y to X. The algorithm here
523
+ // only considers the true dependency and won't detect a cycle.
524
+ assert ! ( found_cycle) ;
460
525
461
526
// FIXME: Ensure this won't cause a deadlock before we return
462
527
for waiter in wakelist. into_iter ( ) {
0 commit comments