Skip to content

Commit 51c03c1

Browse files
committed
green: Properly wait for main before shutdown
There was a race in the code previously where schedulers would *immediately* shut down after spawning the main task (because the global task count would still be 0). This fixes the logic by blocking the sched pool task in receving on a port instead of spawning a task into the pool to receive on a port. The modifications necessary were to have a "simple task" running by the time the code is executing, but this is a simple enough thing to implement and I forsee this being necessary to have implemented in the future anyway.
1 parent 282f3d9 commit 51c03c1

File tree

5 files changed

+143
-45
lines changed

5 files changed

+143
-45
lines changed

src/libgreen/lib.rs

Lines changed: 34 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -33,22 +33,24 @@
3333

3434
use std::os;
3535
use std::rt::crate_map;
36+
use std::rt::local::Local;
3637
use std::rt::rtio;
38+
use std::rt::task::Task;
3739
use std::rt::thread::Thread;
3840
use std::rt;
3941
use std::sync::atomics::{SeqCst, AtomicUint, INIT_ATOMIC_UINT};
4042
use std::sync::deque;
4143
use std::task::TaskOpts;
4244
use std::util;
4345
use std::vec;
44-
use stdtask = std::rt::task;
4546

4647
use sched::{Shutdown, Scheduler, SchedHandle, TaskFromFriend, NewNeighbor};
4748
use sleeper_list::SleeperList;
4849
use stack::StackPool;
4950
use task::GreenTask;
5051

5152
mod macros;
53+
mod simple;
5254

5355
pub mod basic;
5456
pub mod context;
@@ -61,16 +63,20 @@ pub mod task;
6163
#[lang = "start"]
6264
pub fn lang_start(main: *u8, argc: int, argv: **u8) -> int {
6365
use std::cast;
64-
do start(argc, argv) {
65-
let main: extern "Rust" fn() = unsafe { cast::transmute(main) };
66-
main();
67-
}
66+
let mut ret = None;
67+
simple::task().run(|| {
68+
ret = Some(do start(argc, argv) {
69+
let main: extern "Rust" fn() = unsafe { cast::transmute(main) };
70+
main();
71+
})
72+
});
73+
ret.unwrap()
6874
}
6975

7076
/// Set up a default runtime configuration, given compiler-supplied arguments.
7177
///
72-
/// This function will block the current thread of execution until the entire
73-
/// pool of M:N schedulers have exited.
78+
/// This function will block until the entire pool of M:N schedulers have
79+
/// exited. This function also requires a local task to be available.
7480
///
7581
/// # Arguments
7682
///
@@ -95,24 +101,37 @@ pub fn start(argc: int, argv: **u8, main: proc()) -> int {
95101

96102
/// Execute the main function in a pool of M:N schedulers.
97103
///
98-
/// Configures the runtime according to the environment, by default
99-
/// using a task scheduler with the same number of threads as cores.
100-
/// Returns a process exit code.
104+
/// Configures the runtime according to the environment, by default using a task
105+
/// scheduler with the same number of threads as cores. Returns a process exit
106+
/// code.
101107
///
102108
/// This function will not return until all schedulers in the associated pool
103109
/// have returned.
104110
pub fn run(main: proc()) -> int {
111+
// Create a scheduler pool and spawn the main task into this pool. We will
112+
// get notified over a channel when the main task exits.
105113
let mut pool = SchedPool::new(PoolConfig::new());
106114
let (port, chan) = Chan::new();
107115
let mut opts = TaskOpts::new();
108116
opts.notify_chan = Some(chan);
109117
pool.spawn(opts, main);
110-
do pool.spawn(TaskOpts::new()) {
111-
if port.recv().is_err() {
112-
os::set_exit_status(rt::DEFAULT_ERROR_CODE);
113-
}
118+
119+
// Wait for the main task to return, and set the process error code
120+
// appropriately.
121+
if port.recv().is_err() {
122+
os::set_exit_status(rt::DEFAULT_ERROR_CODE);
114123
}
115-
unsafe { stdtask::wait_for_completion(); }
124+
125+
// Once the main task has exited and we've set our exit code, wait for all
126+
// spawned sub-tasks to finish running. This is done to allow all schedulers
127+
// to remain active while there are still tasks possibly running.
128+
unsafe {
129+
let mut task = Local::borrow(None::<Task>);
130+
task.get().wait_for_other_tasks();
131+
}
132+
133+
// Now that we're sure all tasks are dead, shut down the pool of schedulers,
134+
// waiting for them all to return.
116135
pool.shutdown();
117136
os::get_exit_status()
118137
}

src/libgreen/simple.rs

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2+
// file at the top-level directory of this distribution and at
3+
// http://rust-lang.org/COPYRIGHT.
4+
//
5+
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6+
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7+
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8+
// option. This file may not be copied, modified, or distributed
9+
// except according to those terms.
10+
11+
//! A small module implementing a simple "runtime" used for bootstrapping a rust
12+
//! scheduler pool and then interacting with it.
13+
14+
use std::cast;
15+
use std::rt::Runtime;
16+
use std::task::TaskOpts;
17+
use std::rt::rtio;
18+
use std::rt::local::Local;
19+
use std::rt::task::{Task, BlockedTask};
20+
use std::unstable::sync::LittleLock;
21+
22+
struct SimpleTask {
23+
lock: LittleLock,
24+
}
25+
26+
impl Runtime for SimpleTask {
27+
// Implement the simple tasks of descheduling and rescheduling, but only in
28+
// a simple number of cases.
29+
fn deschedule(mut ~self, times: uint, mut cur_task: ~Task,
30+
f: |BlockedTask| -> Result<(), BlockedTask>) {
31+
assert!(times == 1);
32+
33+
let my_lock: *mut LittleLock = &mut self.lock;
34+
cur_task.put_runtime(self as ~Runtime);
35+
36+
unsafe {
37+
let cur_task_dupe = *cast::transmute::<&~Task, &uint>(&cur_task);
38+
let task = BlockedTask::block(cur_task);
39+
40+
let mut guard = (*my_lock).lock();
41+
match f(task) {
42+
Ok(()) => guard.wait(),
43+
Err(task) => { cast::forget(task.wake()); }
44+
}
45+
drop(guard);
46+
cur_task = cast::transmute::<uint, ~Task>(cur_task_dupe);
47+
}
48+
Local::put(cur_task);
49+
}
50+
fn reawaken(mut ~self, mut to_wake: ~Task) {
51+
let lock: *mut LittleLock = &mut self.lock;
52+
to_wake.put_runtime(self as ~Runtime);
53+
unsafe {
54+
cast::forget(to_wake);
55+
let _l = (*lock).lock();
56+
(*lock).signal();
57+
}
58+
}
59+
60+
// These functions are all unimplemented and fail as a result. This is on
61+
// purpose. A "simple task" is just that, a very simple task that can't
62+
// really do a whole lot. The only purpose of the task is to get us off our
63+
// feet and running.
64+
fn yield_now(~self, _cur_task: ~Task) { fail!() }
65+
fn maybe_yield(~self, _cur_task: ~Task) { fail!() }
66+
fn spawn_sibling(~self, _cur_task: ~Task, _opts: TaskOpts, _f: proc()) {
67+
fail!()
68+
}
69+
fn local_io<'a>(&'a mut self) -> Option<rtio::LocalIo<'a>> { None }
70+
fn wrap(~self) -> ~Any { fail!() }
71+
}
72+
73+
pub fn task() -> ~Task {
74+
let mut task = ~Task::new();
75+
task.put_runtime(~SimpleTask { lock: LittleLock::new() } as ~Runtime);
76+
return task;
77+
}

src/libnative/lib.rs

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,16 @@
3333
// answer is that you don't need them)
3434

3535
use std::os;
36+
use std::rt::local::Local;
37+
use std::rt::task::Task;
3638
use std::rt;
37-
use stdtask = std::rt::task;
3839

3940
pub mod io;
4041
pub mod task;
4142

4243

4344
// XXX: this should not exist here
44-
#[cfg(stage0, notready)]
45+
#[cfg(stage0)]
4546
#[lang = "start"]
4647
pub fn lang_start(main: *u8, argc: int, argv: **u8) -> int {
4748
use std::cast;
@@ -72,21 +73,25 @@ pub fn lang_start(main: *u8, argc: int, argv: **u8) -> int {
7273
/// exited.
7374
pub fn start(argc: int, argv: **u8, main: proc()) -> int {
7475
rt::init(argc, argv);
75-
let exit_code = run(main);
76+
let mut exit_code = None;
77+
let mut main = Some(main);
78+
task::new().run(|| {
79+
exit_code = Some(run(main.take_unwrap()));
80+
});
7681
unsafe { rt::cleanup(); }
77-
return exit_code;
82+
return exit_code.unwrap();
7883
}
7984

8085
/// Executes a procedure on the current thread in a Rust task context.
8186
///
8287
/// This function has all of the same details as `start` except for a different
8388
/// number of arguments.
8489
pub fn run(main: proc()) -> int {
85-
// Create a task, run the procedure in it, and then wait for everything.
86-
task::run(task::new(), main);
87-
88-
// Block this OS task waiting for everything to finish.
89-
unsafe { stdtask::wait_for_completion() }
90-
90+
// Run the main procedure and then wait for everything to finish
91+
main();
92+
unsafe {
93+
let mut task = Local::borrow(None::<Task>);
94+
task.get().wait_for_other_tasks();
95+
}
9196
os::get_exit_status()
9297
}

src/libnative/task.rs

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -77,17 +77,11 @@ pub fn spawn_opts(opts: TaskOpts, f: proc()) {
7777
stack::record_stack_bounds(my_stack - stack + 1024, my_stack);
7878
}
7979

80-
run(task, f);
80+
let mut f = Some(f);
81+
task.run(|| { f.take_unwrap()() });
8182
})
8283
}
8384

84-
/// Runs a task once, consuming the task. The given procedure is run inside of
85-
/// the task.
86-
pub fn run(t: ~Task, f: proc()) {
87-
let mut f = Some(f);
88-
t.run(|| { f.take_unwrap()(); });
89-
}
90-
9185
// This structure is the glue between channels and the 1:1 scheduling mode. This
9286
// structure is allocated once per task.
9387
struct Ops {

src/libstd/rt/task.rs

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,21 @@ impl Task {
292292
pub fn local_io<'a>(&'a mut self) -> Option<LocalIo<'a>> {
293293
self.imp.get_mut_ref().local_io()
294294
}
295+
296+
/// The main function of all rust executables will by default use this
297+
/// function. This function will *block* the OS thread (hence the `unsafe`)
298+
/// waiting for all known tasks to complete. Once this function has
299+
/// returned, it is guaranteed that no more user-defined code is still
300+
/// running.
301+
pub unsafe fn wait_for_other_tasks(&mut self) {
302+
TASK_COUNT.fetch_sub(1, SeqCst); // don't count ourselves
303+
TASK_LOCK.lock();
304+
while TASK_COUNT.load(SeqCst) > 0 {
305+
TASK_LOCK.wait();
306+
}
307+
TASK_LOCK.unlock();
308+
TASK_COUNT.fetch_add(1, SeqCst); // add ourselves back in
309+
}
295310
}
296311

297312
impl Drop for Task {
@@ -396,18 +411,6 @@ impl Drop for Death {
396411
}
397412
}
398413

399-
/// The main function of all rust executables will by default use this function.
400-
/// This function will *block* the OS thread (hence the `unsafe`) waiting for
401-
/// all known tasks to complete. Once this function has returned, it is
402-
/// guaranteed that no more user-defined code is still running.
403-
pub unsafe fn wait_for_completion() {
404-
TASK_LOCK.lock();
405-
while TASK_COUNT.load(SeqCst) > 0 {
406-
TASK_LOCK.wait();
407-
}
408-
TASK_LOCK.unlock();
409-
}
410-
411414
#[cfg(test)]
412415
mod test {
413416
use super::*;

0 commit comments

Comments
 (0)