Skip to content

Commit 1d82fe5

Browse files
committed
fixed incorrect handling of returned scheduler option and restructed scheduler functions slightly
1 parent a5f55b3 commit 1d82fe5

File tree

6 files changed

+64
-75
lines changed

6 files changed

+64
-75
lines changed

src/libstd/rt/comm.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -131,10 +131,7 @@ impl<T> ChanOne<T> {
131131
// Port is blocked. Wake it up.
132132
let recvr = BlockedTask::cast_from_uint(task_as_state);
133133
do recvr.wake().map_consume |woken_task| {
134-
let mut sched = Local::take::<Scheduler>();
135-
rtdebug!("rendezvous send");
136-
sched.metrics.rendezvous_sends += 1;
137-
sched.schedule_task(woken_task);
134+
Scheduler::run_task(woken_task);
138135
};
139136
}
140137
}
@@ -350,8 +347,7 @@ impl<T> Drop for ChanOne<T> {
350347
assert!((*this.packet()).payload.is_none());
351348
let recvr = BlockedTask::cast_from_uint(task_as_state);
352349
do recvr.wake().map_consume |woken_task| {
353-
let sched = Local::take::<Scheduler>();
354-
sched.schedule_task(woken_task);
350+
Scheduler::run_task(woken_task);
355351
};
356352
}
357353
}

src/libstd/rt/mod.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,8 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int {
259259
let mut handles = ~[];
260260

261261
do nscheds.times {
262+
rtdebug!("inserting a regular scheduler");
263+
262264
// Every scheduler is driven by an I/O event loop.
263265
let loop_ = ~UvEventLoop::new();
264266
let mut sched = ~Scheduler::new(loop_, work_queue.clone(), sleepers.clone());
@@ -344,6 +346,7 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int {
344346

345347
// Run each remaining scheduler in a thread.
346348
while !scheds.is_empty() {
349+
rtdebug!("creating regular schedulers");
347350
let sched = scheds.pop();
348351
let sched_cell = Cell::new(sched);
349352
let thread = do Thread::start {
@@ -360,15 +363,21 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int {
360363

361364
if use_main_sched {
362365

366+
rtdebug!("about to create the main scheduler task");
367+
363368
let mut main_sched = main_sched.get();
364369

365370
let home = Sched(main_sched.make_handle());
366-
let mut main_task = ~Task::new_root_homed(&mut scheds[0].stack_pool,
371+
let mut main_task = ~Task::new_root_homed(&mut main_sched.stack_pool,
367372
home, main.take());
368373
main_task.death.on_exit = Some(on_exit.take());
374+
rtdebug!("boostrapping main_task");
375+
369376
main_sched.bootstrap(main_task);
370377
}
371378

379+
rtdebug!("waiting for threads");
380+
372381
// Wait for schedulers
373382
foreach thread in threads.consume_iter() {
374383
thread.join();
@@ -404,7 +413,6 @@ pub fn context() -> RuntimeContext {
404413
if unsafe { rust_try_get_task().is_not_null() } {
405414
return OldTaskContext;
406415
} else if Local::exists::<Task>() {
407-
rtdebug!("either task or scheduler context in newrt");
408416
// In this case we know it is a new runtime context, but we
409417
// need to check which one. Going to try borrowing task to
410418
// check. Task should always be in TLS, so hopefully this

src/libstd/rt/sched.rs

Lines changed: 46 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -142,11 +142,11 @@ impl Scheduler {
142142
local_ptr::init_tls_key();
143143

144144
// Create a task for the scheduler with an empty context.
145-
let sched_task = Task::new_sched_task();
145+
let sched_task = ~Task::new_sched_task();
146146

147147
// Now that we have an empty task struct for the scheduler
148148
// task, put it in TLS.
149-
Local::put::(~sched_task);
149+
Local::put::(sched_task);
150150

151151
// Now, as far as all the scheduler state is concerned, we are
152152
// inside the "scheduler" context. So we can act like the
@@ -165,8 +165,6 @@ impl Scheduler {
165165
// cleaning up the memory it uses. As we didn't actually call
166166
// task.run() on the scheduler task we never get through all
167167
// the cleanup code it runs.
168-
169-
rtdebug!("post sched.run(), cleaning up scheduler task");
170168
let mut stask = Local::take::<Task>();
171169
stask.destroyed = true;
172170
}
@@ -224,6 +222,8 @@ impl Scheduler {
224222
// 2) A shutdown is also easy, shutdown.
225223
// 3) A pinned task - we resume immediately and do not return
226224
// here.
225+
// 4) A message from another scheduler with a non-homed task
226+
// to run here.
227227

228228
let result = sched.interpret_message_queue();
229229
let sched = match result {
@@ -236,6 +236,8 @@ impl Scheduler {
236236
}
237237
};
238238

239+
// Second activity is to try resuming a task from the queue.
240+
239241
let result = sched.resume_task_from_queue();
240242
let mut sched = match result {
241243
Some(sched) => {
@@ -333,8 +335,7 @@ impl Scheduler {
333335
return None;
334336
}
335337
Some(TaskFromFriend(task)) => {
336-
this.schedule_task_sched_context(task);
337-
return None;
338+
return this.sched_schedule_task(task);
338339
}
339340
Some(Wake) => {
340341
this.sleepy = false;
@@ -442,8 +443,6 @@ impl Scheduler {
442443
}
443444
}
444445

445-
// * Task-context operations
446-
447446
/// Called by a running task to end execution, after which it will
448447
/// be recycled by the scheduler for reuse in a new task.
449448
pub fn terminate_current_task(~self) {
@@ -457,10 +456,17 @@ impl Scheduler {
457456
}
458457
}
459458

460-
// If a scheduling action is performed, return None. If not,
461-
// return Some(sched).
459+
// Scheduling a task requires a few checks to make sure the task
460+
// ends up in the appropriate location. The run_anything flag on
461+
// the scheduler and the home on the task need to be checked. This
462+
// helper performs that check. It takes a function that specifies
463+
// how to queue the the provided task if that is the correct
464+
// action. This is a "core" function that requires handling the
465+
// returned Option correctly.
462466

463-
pub fn schedule_task(~self, task: ~Task) -> Option<~Scheduler> {
467+
pub fn schedule_task(~self, task: ~Task,
468+
schedule_fn: ~fn(sched: ~Scheduler, task: ~Task))
469+
-> Option<~Scheduler> {
464470

465471
// is the task home?
466472
let is_home = task.is_home_no_tls(&self);
@@ -474,9 +480,7 @@ impl Scheduler {
474480
// here we know we are home, execute now OR we know we
475481
// aren't homed, and that this sched doesn't care
476482
rtdebug!("task: %u is on ok sched, executing", to_uint(task));
477-
do this.switch_running_tasks_and_then(task) |sched, last_task| {
478-
sched.enqueue_blocked_task(last_task);
479-
}
483+
schedule_fn(this, task);
480484
return None;
481485
} else if !homed && !this.run_anything {
482486
// the task isn't homed, but it can't be run here
@@ -489,35 +493,30 @@ impl Scheduler {
489493
}
490494
}
491495

492-
// BAD BAD BAD BAD BAD
493-
// Do something instead of just copy-pasting this.
494-
pub fn schedule_task_sched_context(~self, task: ~Task) -> Option<~Scheduler> {
495-
496-
// is the task home?
497-
let is_home = task.is_home_no_tls(&self);
498-
499-
// does the task have a home?
500-
let homed = task.homed();
501-
502-
let mut this = self;
503-
504-
if is_home || (!homed && this.run_anything) {
505-
// here we know we are home, execute now OR we know we
506-
// aren't homed, and that this sched doesn't care
507-
rtdebug!("task: %u is on ok sched, executing", to_uint(task));
508-
this.resume_task_immediately(task);
509-
return None;
510-
} else if !homed && !this.run_anything {
511-
// the task isn't homed, but it can't be run here
512-
this.enqueue_task(task);
513-
return Some(this);
514-
} else {
515-
// task isn't home, so don't run it here, send it home
516-
Scheduler::send_task_home(task);
517-
return Some(this);
496+
// There are two contexts in which schedule_task can be called:
497+
// inside the scheduler, and inside a task. These contexts handle
498+
// executing the task slightly differently. In the scheduler
499+
// context case we want to receive the scheduler as an input, and
500+
// manually deal with the option. In the task context case we want
501+
// to use TLS to find the scheduler, and deal with the option
502+
// inside the helper.
503+
504+
pub fn sched_schedule_task(~self, task: ~Task) -> Option<~Scheduler> {
505+
do self.schedule_task(task) |sched, next_task| {
506+
sched.resume_task_immediately(next_task);
518507
}
519508
}
520509

510+
// Task context case - use TLS.
511+
pub fn run_task(task: ~Task) {
512+
let sched = Local::take::<Scheduler>();
513+
let opt = do sched.schedule_task(task) |sched, next_task| {
514+
do sched.switch_running_tasks_and_then(next_task) |sched, last_task| {
515+
sched.enqueue_blocked_task(last_task);
516+
}
517+
};
518+
opt.map_consume(Local::put);
519+
}
521520

522521
// The primary function for changing contexts. In the current
523522
// design the scheduler is just a slightly modified GreenTask, so
@@ -586,7 +585,7 @@ impl Scheduler {
586585
Context::swap(current_task_context, next_task_context);
587586
}
588587

589-
// When the context swaps back to the scheduler we immediately
588+
// When the context swaps back to this task we immediately
590589
// run the cleanup job, as expected by the previously called
591590
// swap_contexts function.
592591
unsafe {
@@ -599,15 +598,8 @@ impl Scheduler {
599598
}
600599
}
601600

602-
// There are a variety of "obvious" functions to be passed to
603-
// change_task_context, so we can make a few "named cases".
604-
605-
// Enqueue the old task on the current scheduler.
606-
pub fn enqueue_old(sched: &mut Scheduler, task: ~Task) {
607-
sched.enqueue_task(task);
608-
}
609-
610-
// Sometimes we just want the old API though.
601+
// Old API for task manipulation implemented over the new core
602+
// function.
611603

612604
pub fn resume_task_immediately(~self, task: ~Task) {
613605
do self.change_task_context(task) |sched, stask| {
@@ -668,13 +660,6 @@ impl Scheduler {
668660
};
669661
}
670662

671-
// A helper that looks up the scheduler and runs a task. If it can
672-
// be run now it is run now.
673-
pub fn run_task(new_task: ~Task) {
674-
let sched = Local::take::<Scheduler>();
675-
sched.schedule_task(new_task).map_consume(Local::put);
676-
}
677-
678663
// Returns a mutable reference to both contexts involved in this
679664
// swap. This is unsafe - we are getting mutable internal
680665
// references to keep even when we don't own the tasks. It looks
@@ -692,8 +677,6 @@ impl Scheduler {
692677
}
693678
}
694679

695-
// * Other stuff
696-
697680
pub fn enqueue_cleanup_job(&mut self, job: CleanupJob) {
698681
self.cleanup_job = Some(job);
699682
}
@@ -1004,22 +987,22 @@ mod test {
1004987
let port = Cell::new(port);
1005988
let chan = Cell::new(chan);
1006989

1007-
let _thread_one = do Thread::start {
990+
let thread_one = do Thread::start {
1008991
let chan = Cell::new(chan.take());
1009992
do run_in_newsched_task_core {
1010993
chan.take().send(());
1011994
}
1012995
};
1013996

1014-
let _thread_two = do Thread::start {
997+
let thread_two = do Thread::start {
1015998
let port = Cell::new(port.take());
1016999
do run_in_newsched_task_core {
10171000
port.take().recv();
10181001
}
10191002
};
10201003

1021-
thread1.join();
1022-
thread2.join();
1004+
thread_two.join();
1005+
thread_one.join();
10231006
}
10241007
}
10251008

src/libstd/rt/task.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ impl Task {
209209
}
210210

211211
pub fn run(&mut self, f: &fn()) {
212-
212+
rtdebug!("run called on task: %u", borrow::to_uint(self));
213213
self.unwinder.try(f);
214214
{ let _ = self.taskgroup.take(); }
215215
self.death.collect_failure(!self.unwinder.unwinding);
@@ -301,7 +301,7 @@ impl Task {
301301

302302
impl Drop for Task {
303303
fn drop(&self) {
304-
rtdebug!("called drop for a task");
304+
rtdebug!("called drop for a task: %u", borrow::to_uint(self));
305305
assert!(self.destroyed)
306306
}
307307
}

src/libstd/rt/test.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,9 @@ pub fn run_in_mt_newsched_task(f: ~fn()) {
130130

131131
while !scheds.is_empty() {
132132
let mut sched = scheds.pop();
133-
let bootstrap_task = ~do Task::new_root(&mut sched.stack_pool) || {};
133+
let bootstrap_task = ~do Task::new_root(&mut sched.stack_pool) || {
134+
rtdebug!("bootstrapping non-primary scheduler");
135+
};
134136
let bootstrap_task_cell = Cell::new(bootstrap_task);
135137
let sched_cell = Cell::new(sched);
136138
let thread = do Thread::start {

src/libstd/rt/uv/uvio.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ impl IoFactory for UvIoFactory {
253253
let scheduler = Local::take::<Scheduler>();
254254

255255
// Block this task and take ownership, switch to scheduler context
256-
do scheduler.deschedule_running_task_and_then |_sched, task| {
256+
do scheduler.deschedule_running_task_and_then |_, task| {
257257

258258
rtdebug!("connect: entered scheduler context");
259259
let mut tcp_watcher = TcpWatcher::new(self.uv_loop());

0 commit comments

Comments
 (0)