@@ -78,7 +78,13 @@ pub struct Scheduler {
78
78
/// A fast XorShift rng for scheduler use
79
79
rng : XorShiftRng ,
80
80
/// A toggleable idle callback
81
- idle_callback : Option < ~PausibleIdleCallback >
81
+ idle_callback : Option < ~PausibleIdleCallback > ,
82
+ /// A count of the number of times `maybe_yield` has been called without
83
+ /// actually yielding.
84
+ yield_check_count : uint ,
85
+ /// A flag to tell the scheduler loop it needs to do some stealing
86
+ /// in order to introduce randomness as part of a yield
87
+ steal_for_yield : bool
82
88
}
83
89
84
90
/// An indication of how hard to work on a given operation, the difference
@@ -127,7 +133,9 @@ impl Scheduler {
127
133
run_anything : run_anything,
128
134
friend_handle : friend,
129
135
rng : XorShiftRng :: new ( ) ,
130
- idle_callback : None
136
+ idle_callback : None ,
137
+ yield_check_count : 0 ,
138
+ steal_for_yield : false
131
139
}
132
140
}
133
141
@@ -373,27 +381,47 @@ impl Scheduler {
373
381
// there, trying to steal from the remote work queues.
374
382
fn find_work ( & mut self ) -> Option < ~Task > {
375
383
rtdebug ! ( "scheduler looking for work" ) ;
376
- match self . work_queue . pop ( ) {
377
- Some ( task) => {
378
- rtdebug ! ( "found a task locally" ) ;
379
- return Some ( task)
384
+ if !self . steal_for_yield {
385
+ match self . work_queue . pop ( ) {
386
+ Some ( task) => {
387
+ rtdebug ! ( "found a task locally" ) ;
388
+ return Some ( task)
389
+ }
390
+ None => {
391
+ rtdebug ! ( "scheduler trying to steal" ) ;
392
+ return self . try_steals ( ) ;
393
+ }
380
394
}
381
- None => {
382
- // Our naive stealing, try kinda hard.
383
- rtdebug ! ( "scheduler trying to steal" ) ;
384
- let len = self . work_queues . len ( ) ;
385
- return self . try_steals ( len/2 ) ;
395
+ } else {
396
+ // During execution of the last task, it performed a 'yield',
397
+ // so we're doing some work stealing in order to introduce some
398
+ // scheduling randomness. Otherwise we would just end up popping
399
+ // that same task again. This is pretty lame and is to work around
400
+ // the problem that work stealing is not designed for 'non-strict'
401
+ // (non-fork-join) task parallelism.
402
+ self . steal_for_yield = false ;
403
+ match self . try_steals ( ) {
404
+ Some ( task) => {
405
+ rtdebug ! ( "stole a task after yielding" ) ;
406
+ return Some ( task) ;
407
+ }
408
+ None => {
409
+ rtdebug ! ( "did not steal a task after yielding" ) ;
410
+ // Back to business
411
+ return self . find_work ( ) ;
412
+ }
386
413
}
387
414
}
388
415
}
389
416
390
- // With no backoff try stealing n times from the queues the
391
- // scheduler knows about. This naive implementation can steal from
392
- // our own queue or from other special schedulers.
393
- fn try_steals ( & mut self , n : uint ) -> Option < ~Task > {
394
- for _ in range ( 0 , n) {
395
- let index = self . rng . gen_uint_range ( 0 , self . work_queues . len ( ) ) ;
396
- let work_queues = & mut self . work_queues ;
417
+ // Try stealing from all queues the scheduler knows about. This
418
+ // naive implementation can steal from our own queue or from other
419
+ // special schedulers.
420
+ fn try_steals ( & mut self ) -> Option < ~Task > {
421
+ let work_queues = & mut self . work_queues ;
422
+ let len = work_queues. len ( ) ;
423
+ let start_index = self . rng . gen_uint_range ( 0 , len) ;
424
+ for index in range ( 0 , len) . map ( |i| ( i + start_index) % len) {
397
425
match work_queues[ index] . steal ( ) {
398
426
Some ( task) => {
399
427
rtdebug ! ( "found task by stealing" ) ;
@@ -697,6 +725,34 @@ impl Scheduler {
697
725
} ;
698
726
}
699
727
728
+ /// Yield control to the scheduler, executing another task. This is guaranteed
729
+ /// to introduce some amount of randomness to the scheduler. Currently the
730
+ /// randomness is a result of performing a round of work stealing (which
731
+ /// may end up stealing from the current scheduler).
732
+ pub fn yield_now ( ~self ) {
733
+ let mut this = self ;
734
+ this. yield_check_count = 0 ;
735
+ // Tell the scheduler to start stealing on the next iteration
736
+ this. steal_for_yield = true ;
737
+ do this. deschedule_running_task_and_then |sched, task| {
738
+ sched. enqueue_blocked_task ( task) ;
739
+ }
740
+ }
741
+
742
+ pub fn maybe_yield ( ~self ) {
743
+ // The number of times to do the yield check before yielding, chosen arbitrarily.
744
+ static YIELD_THRESHOLD : uint = 100 ;
745
+ let mut this = self ;
746
+ rtassert ! ( this. yield_check_count < YIELD_THRESHOLD ) ;
747
+ this. yield_check_count += 1 ;
748
+ if this. yield_check_count == YIELD_THRESHOLD {
749
+ this. yield_now ( ) ;
750
+ } else {
751
+ Local :: put ( this) ;
752
+ }
753
+ }
754
+
755
+
700
756
// * Utility Functions
701
757
702
758
pub fn sched_id ( & self ) -> uint { to_uint ( self ) }
@@ -1213,4 +1269,45 @@ mod test {
1213
1269
}
1214
1270
}
1215
1271
1272
+ #[ test]
1273
+ fn dont_starve_1 ( ) {
1274
+ use rt:: comm:: oneshot;
1275
+
1276
+ do stress_factor ( ) . times {
1277
+ do run_in_mt_newsched_task {
1278
+ let ( port, chan) = oneshot ( ) ;
1279
+
1280
+ // This task should not be able to starve the sender;
1281
+ // The sender should get stolen to another thread.
1282
+ do spawntask {
1283
+ while !port. peek ( ) { }
1284
+ }
1285
+
1286
+ chan. send ( ( ) ) ;
1287
+ }
1288
+ }
1289
+ }
1290
+
1291
+ #[ test]
1292
+ fn dont_starve_2 ( ) {
1293
+ use rt:: comm:: oneshot;
1294
+
1295
+ do stress_factor ( ) . times {
1296
+ do run_in_newsched_task {
1297
+ let ( port, chan) = oneshot ( ) ;
1298
+ let ( _port2, chan2) = stream ( ) ;
1299
+
1300
+ // This task should not be able to starve the other task.
1301
+ // The sends should eventually yield.
1302
+ do spawntask {
1303
+ while !port. peek ( ) {
1304
+ chan2. send ( ( ) ) ;
1305
+ }
1306
+ }
1307
+
1308
+ chan. send ( ( ) ) ;
1309
+ }
1310
+ }
1311
+ }
1312
+
1216
1313
}
0 commit comments