Skip to content

Commit c42b03d

Browse files
committed
core::rt: Fix scheduling logic for enqueued tasks
1 parent 4724966 commit c42b03d

File tree

7 files changed

+188
-126
lines changed

7 files changed

+188
-126
lines changed

src/libcore/macros.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
#[macro_escape];
1212

1313
// Some basic logging
14-
macro_rules! rtdebug (
14+
macro_rules! rtdebug_ (
1515
($( $arg:expr),+) => ( {
1616
dumb_println(fmt!( $($arg),+ ));
1717

@@ -26,7 +26,7 @@ macro_rules! rtdebug (
2626
)
2727

2828
// An alternate version with no output, for turning off logging
29-
macro_rules! rtdebug_ (
29+
macro_rules! rtdebug (
3030
($( $arg:expr),+) => ( $(let _ = $arg)*; )
3131
)
3232

src/libcore/rt/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ pub fn start(_argc: int, _argv: **u8, crate_map: *u8, main: ~fn()) -> int {
147147
let mut sched = ~Scheduler::new(loop_);
148148
let main_task = ~Task::new(&mut sched.stack_pool, main);
149149

150-
sched.task_queue.push_back(main_task);
150+
sched.enqueue_task(main_task);
151151
sched.run();
152152

153153
return 0;
@@ -225,11 +225,11 @@ fn test_context() {
225225
assert!(context() == SchedulerContext);
226226
let task = Cell(task);
227227
do local_sched::borrow |sched| {
228-
sched.task_queue.push_back(task.take());
228+
sched.enqueue_task(task.take());
229229
}
230230
}
231231
};
232-
sched.task_queue.push_back(task);
232+
sched.enqueue_task(task);
233233
sched.run();
234234
}
235235
}

src/libcore/rt/rtio.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ pub type RtioTcpListenerObject = uvio::UvTcpListener;
2525
pub trait EventLoop {
2626
fn run(&mut self);
2727
fn callback(&mut self, ~fn());
28+
fn callback_ms(&mut self, ms: u64, ~fn());
2829
/// The asynchronous I/O services. Not all event loops may provide one
2930
fn io<'a>(&'a mut self) -> Option<&'a mut IoFactoryObject>;
3031
}

src/libcore/rt/sched.rs

Lines changed: 161 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ pub mod local_sched;
3131
/// thread local storage and the running task is owned by the
3232
/// scheduler.
3333
pub struct Scheduler {
34-
task_queue: WorkQueue<~Task>,
34+
priv task_queue: WorkQueue<~Task>,
3535
stack_pool: StackPool,
3636
/// The event loop used to drive the scheduler and perform I/O
3737
event_loop: ~EventLoopObject,
@@ -91,44 +91,56 @@ pub impl Scheduler {
9191
fn run(~self) -> ~Scheduler {
9292
assert!(!self.in_task_context());
9393

94-
// Give ownership of the scheduler (self) to the thread
95-
local_sched::put(self);
94+
let mut self_sched = self;
9695

9796
unsafe {
98-
let scheduler = local_sched::unsafe_borrow();
99-
fn run_scheduler_once() {
100-
let scheduler = local_sched::take();
101-
if scheduler.resume_task_from_queue() {
102-
// Ok, a task ran. Nice! We'll do it again later
103-
do local_sched::borrow |scheduler| {
104-
scheduler.event_loop.callback(run_scheduler_once);
105-
}
106-
}
107-
}
97+
let event_loop: *mut ~EventLoopObject = {
98+
let event_loop: *mut ~EventLoopObject = &mut self_sched.event_loop;
99+
event_loop
100+
};
108101

109-
let scheduler = &mut *scheduler;
110-
scheduler.event_loop.callback(run_scheduler_once);
111-
scheduler.event_loop.run();
102+
// Give ownership of the scheduler (self) to the thread
103+
local_sched::put(self_sched);
104+
105+
(*event_loop).run();
112106
}
113107

114-
return local_sched::take();
108+
let sched = local_sched::take();
109+
assert!(sched.task_queue.is_empty());
110+
return sched;
111+
}
112+
113+
/// Schedule a task to be executed later.
114+
///
115+
/// Pushes the task onto the work stealing queue and tells the event loop
116+
/// to run it later. Always use this instead of pushing to the work queue
117+
/// directly.
118+
fn enqueue_task(&mut self, task: ~Task) {
119+
self.task_queue.push_front(task);
120+
self.event_loop.callback(resume_task_from_queue);
121+
122+
fn resume_task_from_queue() {
123+
let scheduler = local_sched::take();
124+
scheduler.resume_task_from_queue();
125+
}
115126
}
116127

117128
// * Scheduler-context operations
118129

119-
fn resume_task_from_queue(~self) -> bool {
130+
fn resume_task_from_queue(~self) {
120131
assert!(!self.in_task_context());
121132

133+
rtdebug!("looking in work queue for task to schedule");
134+
122135
let mut this = self;
123136
match this.task_queue.pop_front() {
124137
Some(task) => {
138+
rtdebug!("resuming task from work queue");
125139
this.resume_task_immediately(task);
126-
return true;
127140
}
128141
None => {
129142
rtdebug!("no tasks in queue");
130143
local_sched::put(this);
131-
return false;
132144
}
133145
}
134146
}
@@ -158,7 +170,7 @@ pub impl Scheduler {
158170
do self.switch_running_tasks_and_then(task) |last_task| {
159171
let last_task = Cell(last_task);
160172
do local_sched::borrow |sched| {
161-
sched.task_queue.push_front(last_task.take());
173+
sched.enqueue_task(last_task.take());
162174
}
163175
}
164176
}
@@ -385,118 +397,153 @@ pub impl Task {
385397
}
386398
}
387399

388-
#[test]
389-
fn test_simple_scheduling() {
390-
do run_in_bare_thread {
391-
let mut task_ran = false;
392-
let task_ran_ptr: *mut bool = &mut task_ran;
393-
394-
let mut sched = ~UvEventLoop::new_scheduler();
395-
let task = ~do Task::new(&mut sched.stack_pool) {
396-
unsafe { *task_ran_ptr = true; }
397-
};
398-
sched.task_queue.push_back(task);
399-
sched.run();
400-
assert!(task_ran);
400+
#[cfg(test)]
401+
mod test {
402+
use int;
403+
use cell::Cell;
404+
use rt::uv::uvio::UvEventLoop;
405+
use unstable::run_in_bare_thread;
406+
use task::spawn;
407+
use rt::test::*;
408+
use super::*;
409+
410+
#[test]
411+
fn test_simple_scheduling() {
412+
do run_in_bare_thread {
413+
let mut task_ran = false;
414+
let task_ran_ptr: *mut bool = &mut task_ran;
415+
416+
let mut sched = ~UvEventLoop::new_scheduler();
417+
let task = ~do Task::new(&mut sched.stack_pool) {
418+
unsafe { *task_ran_ptr = true; }
419+
};
420+
sched.enqueue_task(task);
421+
sched.run();
422+
assert!(task_ran);
423+
}
401424
}
402-
}
403425

404-
#[test]
405-
fn test_several_tasks() {
406-
do run_in_bare_thread {
407-
let total = 10;
408-
let mut task_count = 0;
409-
let task_count_ptr: *mut int = &mut task_count;
426+
#[test]
427+
fn test_several_tasks() {
428+
do run_in_bare_thread {
429+
let total = 10;
430+
let mut task_count = 0;
431+
let task_count_ptr: *mut int = &mut task_count;
410432

411-
let mut sched = ~UvEventLoop::new_scheduler();
412-
for int::range(0, total) |_| {
413-
let task = ~do Task::new(&mut sched.stack_pool) {
414-
unsafe { *task_count_ptr = *task_count_ptr + 1; }
415-
};
416-
sched.task_queue.push_back(task);
433+
let mut sched = ~UvEventLoop::new_scheduler();
434+
for int::range(0, total) |_| {
435+
let task = ~do Task::new(&mut sched.stack_pool) {
436+
unsafe { *task_count_ptr = *task_count_ptr + 1; }
437+
};
438+
sched.enqueue_task(task);
439+
}
440+
sched.run();
441+
assert!(task_count == total);
417442
}
418-
sched.run();
419-
assert!(task_count == total);
420443
}
421-
}
422444

423-
#[test]
424-
fn test_swap_tasks_then() {
425-
do run_in_bare_thread {
426-
let mut count = 0;
427-
let count_ptr: *mut int = &mut count;
428-
429-
let mut sched = ~UvEventLoop::new_scheduler();
430-
let task1 = ~do Task::new(&mut sched.stack_pool) {
431-
unsafe { *count_ptr = *count_ptr + 1; }
432-
let mut sched = local_sched::take();
433-
let task2 = ~do Task::new(&mut sched.stack_pool) {
445+
#[test]
446+
fn test_swap_tasks_then() {
447+
do run_in_bare_thread {
448+
let mut count = 0;
449+
let count_ptr: *mut int = &mut count;
450+
451+
let mut sched = ~UvEventLoop::new_scheduler();
452+
let task1 = ~do Task::new(&mut sched.stack_pool) {
434453
unsafe { *count_ptr = *count_ptr + 1; }
435-
};
436-
// Context switch directly to the new task
437-
do sched.switch_running_tasks_and_then(task2) |task1| {
438-
let task1 = Cell(task1);
439-
do local_sched::borrow |sched| {
440-
sched.task_queue.push_front(task1.take());
454+
let mut sched = local_sched::take();
455+
let task2 = ~do Task::new(&mut sched.stack_pool) {
456+
unsafe { *count_ptr = *count_ptr + 1; }
457+
};
458+
// Context switch directly to the new task
459+
do sched.switch_running_tasks_and_then(task2) |task1| {
460+
let task1 = Cell(task1);
461+
do local_sched::borrow |sched| {
462+
sched.enqueue_task(task1.take());
463+
}
441464
}
442-
}
443-
unsafe { *count_ptr = *count_ptr + 1; }
444-
};
445-
sched.task_queue.push_back(task1);
446-
sched.run();
447-
assert!(count == 3);
465+
unsafe { *count_ptr = *count_ptr + 1; }
466+
};
467+
sched.enqueue_task(task1);
468+
sched.run();
469+
assert!(count == 3);
470+
}
448471
}
449-
}
450472

451-
#[bench] #[test] #[ignore(reason = "long test")]
452-
fn test_run_a_lot_of_tasks_queued() {
453-
do run_in_bare_thread {
454-
static MAX: int = 1000000;
455-
let mut count = 0;
456-
let count_ptr: *mut int = &mut count;
473+
#[bench] #[test] #[ignore(reason = "long test")]
474+
fn test_run_a_lot_of_tasks_queued() {
475+
do run_in_bare_thread {
476+
static MAX: int = 1000000;
477+
let mut count = 0;
478+
let count_ptr: *mut int = &mut count;
457479

458-
let mut sched = ~UvEventLoop::new_scheduler();
480+
let mut sched = ~UvEventLoop::new_scheduler();
459481

460-
let start_task = ~do Task::new(&mut sched.stack_pool) {
461-
run_task(count_ptr);
462-
};
463-
sched.task_queue.push_back(start_task);
464-
sched.run();
482+
let start_task = ~do Task::new(&mut sched.stack_pool) {
483+
run_task(count_ptr);
484+
};
485+
sched.enqueue_task(start_task);
486+
sched.run();
465487

466-
assert!(count == MAX);
488+
assert!(count == MAX);
467489

468-
fn run_task(count_ptr: *mut int) {
469-
do local_sched::borrow |sched| {
470-
let task = ~do Task::new(&mut sched.stack_pool) {
471-
unsafe {
472-
*count_ptr = *count_ptr + 1;
473-
if *count_ptr != MAX {
474-
run_task(count_ptr);
490+
fn run_task(count_ptr: *mut int) {
491+
do local_sched::borrow |sched| {
492+
let task = ~do Task::new(&mut sched.stack_pool) {
493+
unsafe {
494+
*count_ptr = *count_ptr + 1;
495+
if *count_ptr != MAX {
496+
run_task(count_ptr);
497+
}
475498
}
499+
};
500+
sched.enqueue_task(task);
501+
}
502+
};
503+
}
504+
}
505+
506+
#[test]
507+
fn test_block_task() {
508+
do run_in_bare_thread {
509+
let mut sched = ~UvEventLoop::new_scheduler();
510+
let task = ~do Task::new(&mut sched.stack_pool) {
511+
let sched = local_sched::take();
512+
assert!(sched.in_task_context());
513+
do sched.deschedule_running_task_and_then() |task| {
514+
let task = Cell(task);
515+
do local_sched::borrow |sched| {
516+
assert!(!sched.in_task_context());
517+
sched.enqueue_task(task.take());
476518
}
477-
};
478-
sched.task_queue.push_back(task);
479-
}
480-
};
519+
}
520+
};
521+
sched.enqueue_task(task);
522+
sched.run();
523+
}
481524
}
482-
}
483525

484-
#[test]
485-
fn test_block_task() {
486-
do run_in_bare_thread {
487-
let mut sched = ~UvEventLoop::new_scheduler();
488-
let task = ~do Task::new(&mut sched.stack_pool) {
489-
let sched = local_sched::take();
490-
assert!(sched.in_task_context());
491-
do sched.deschedule_running_task_and_then() |task| {
492-
let task = Cell(task);
493-
do local_sched::borrow |sched| {
494-
assert!(!sched.in_task_context());
495-
sched.task_queue.push_back(task.take());
526+
#[test]
527+
fn test_io_callback() {
528+
// This is a regression test that when there are no schedulable tasks
529+
// in the work queue, but we are performing I/O, that once we do put
530+
// something in the work queue again the scheduler picks it up and doesn't
531+
// exit before emptying the work queue
532+
do run_in_newsched_task {
533+
do spawn {
534+
let sched = local_sched::take();
535+
do sched.deschedule_running_task_and_then |task| {
536+
let mut sched = local_sched::take();
537+
let task = Cell(task);
538+
do sched.event_loop.callback_ms(10) {
539+
rtdebug!("in callback");
540+
let mut sched = local_sched::take();
541+
sched.enqueue_task(task.take());
542+
local_sched::put(sched);
543+
}
544+
local_sched::put(sched);
496545
}
497546
}
498-
};
499-
sched.task_queue.push_back(task);
500-
sched.run();
547+
}
501548
}
502549
}

0 commit comments

Comments
 (0)