Skip to content

Commit 41dde35

Browse files
committed
---
yaml --- r: 88849 b: refs/heads/snap-stage3 c: 14caf00 h: refs/heads/master i: 88847: 2f0ee9f v: v3
1 parent 1167575 commit 41dde35

File tree

5 files changed

+85
-147
lines changed

5 files changed

+85
-147
lines changed

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
---
22
refs/heads/master: deeca5d586bfaa4aa60246f671a8d611d38f6248
33
refs/heads/snap-stage1: e33de59e47c5076a89eadeb38f4934f58a3618a6
4-
refs/heads/snap-stage3: 3893716390f2c4857b7e8b1705a6344f96b85bb6
4+
refs/heads/snap-stage3: 14caf00c92b40e3f62094db54f325196c8a05d5a
55
refs/heads/try: b160761e35efcd1207112b3b782c06633cf441a8
66
refs/tags/release-0.1: 1f5c5126e96c79d22cb7862f75304136e204f105
77
refs/heads/ndm: f3868061cd7988080c30d6d5bf352a5a5fe2460b

branches/snap-stage3/src/libgreen/lib.rs

Lines changed: 27 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ use std::os;
3535
use std::rt::thread::Thread;
3636
use std::rt;
3737
use std::rt::crate_map;
38-
use std::rt::task::Task;
3938
use std::rt::rtio;
4039
use std::sync::deque;
4140
use std::sync::atomics::{SeqCst, AtomicUint, INIT_ATOMIC_UINT};
@@ -105,32 +104,32 @@ pub fn start(argc: int, argv: **u8, main: proc()) -> int {
105104
/// This function will not return until all schedulers in the associated pool
106105
/// have returned.
107106
pub fn run(main: proc()) -> int {
108-
let mut pool = Pool::new(Config::new());
107+
let mut pool = SchedPool::new(PoolConfig::new());
109108
pool.spawn(TaskOpts::new(), main);
110109
unsafe { stdtask::wait_for_completion(); }
111110
pool.shutdown();
112111
os::get_exit_status()
113112
}
114113

115114
/// Configuration of how an M:N pool of schedulers is spawned.
116-
pub struct Config {
115+
pub struct PoolConfig {
117116
/// The number of schedulers (OS threads) to spawn into this M:N pool.
118117
threads: uint,
119118
}
120119

121-
impl Config {
120+
impl PoolConfig {
122121
/// Returns the default configuration, as determined the the environment
123122
/// variables of this process.
124-
pub fn new() -> Config {
125-
Config {
123+
pub fn new() -> PoolConfig {
124+
PoolConfig {
126125
threads: rt::default_sched_threads(),
127126
}
128127
}
129128
}
130129

131130
/// A structure representing a handle to a pool of schedulers. This handle is
132131
/// used to keep the pool alive and also reap the status from the pool.
133-
pub struct Pool {
132+
pub struct SchedPool {
134133
priv id: uint,
135134
priv threads: ~[Thread<()>],
136135
priv handles: ~[SchedHandle],
@@ -141,19 +140,19 @@ pub struct Pool {
141140
priv sleepers: SleeperList,
142141
}
143142

144-
impl Pool {
143+
impl SchedPool {
145144
/// Execute the main function in a pool of M:N schedulers.
146145
///
147146
/// This will configure the pool according to the `config` parameter, and
148147
/// initially run `main` inside the pool of schedulers.
149-
pub fn new(config: Config) -> Pool {
148+
pub fn new(config: PoolConfig) -> SchedPool {
150149
static mut POOL_ID: AtomicUint = INIT_ATOMIC_UINT;
151150

152-
let Config { threads: nscheds } = config;
151+
let PoolConfig { threads: nscheds } = config;
153152
assert!(nscheds > 0);
154153

155154
// The pool of schedulers that will be returned from this function
156-
let mut pool = Pool {
155+
let mut pool = SchedPool {
157156
threads: ~[],
158157
handles: ~[],
159158
stealers: ~[],
@@ -185,30 +184,22 @@ impl Pool {
185184
let sched = sched;
186185
pool.threads.push(do Thread::start {
187186
let mut sched = sched;
188-
let mut task = do GreenTask::new(&mut sched.stack_pool, None) {
187+
let task = do GreenTask::new(&mut sched.stack_pool, None) {
189188
rtdebug!("boostraping a non-primary scheduler");
190189
};
191-
task.put_task(~Task::new());
192190
sched.bootstrap(task);
193191
});
194192
}
195193

196194
return pool;
197195
}
198196

199-
pub fn shutdown(mut self) {
200-
self.stealers = ~[];
201-
202-
for mut handle in util::replace(&mut self.handles, ~[]).move_iter() {
203-
handle.send(Shutdown);
204-
}
205-
for thread in util::replace(&mut self.threads, ~[]).move_iter() {
206-
thread.join();
207-
}
197+
pub fn task(&mut self, opts: TaskOpts, f: proc()) -> ~GreenTask {
198+
GreenTask::configure(&mut self.stack_pool, opts, f)
208199
}
209200

210201
pub fn spawn(&mut self, opts: TaskOpts, f: proc()) {
211-
let task = GreenTask::configure(&mut self.stack_pool, opts, f);
202+
let task = self.task(opts, f);
212203

213204
// Figure out someone to send this task to
214205
let idx = self.next_friend;
@@ -250,18 +241,28 @@ impl Pool {
250241
let sched = sched;
251242
self.threads.push(do Thread::start {
252243
let mut sched = sched;
253-
let mut task = do GreenTask::new(&mut sched.stack_pool, None) {
244+
let task = do GreenTask::new(&mut sched.stack_pool, None) {
254245
rtdebug!("boostraping a non-primary scheduler");
255246
};
256-
task.put_task(~Task::new());
257247
sched.bootstrap(task);
258248
});
259249

260250
return ret;
261251
}
252+
253+
pub fn shutdown(mut self) {
254+
self.stealers = ~[];
255+
256+
for mut handle in util::replace(&mut self.handles, ~[]).move_iter() {
257+
handle.send(Shutdown);
258+
}
259+
for thread in util::replace(&mut self.threads, ~[]).move_iter() {
260+
thread.join();
261+
}
262+
}
262263
}
263264

264-
impl Drop for Pool {
265+
impl Drop for SchedPool {
265266
fn drop(&mut self) {
266267
if self.threads.len() > 0 {
267268
fail!("dropping a M:N scheduler pool that wasn't shut down");

branches/snap-stage3/src/libgreen/sched.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -178,9 +178,8 @@ impl Scheduler {
178178
self.idle_callback = Some(self.event_loop.pausable_idle_callback(cb));
179179

180180
// Create a task for the scheduler with an empty context.
181-
let mut sched_task = GreenTask::new_typed(Some(Coroutine::empty()),
182-
TypeSched);
183-
sched_task.put_task(~Task::new());
181+
let sched_task = GreenTask::new_typed(Some(Coroutine::empty()),
182+
TypeSched);
184183

185184
// Before starting our first task, make sure the idle callback
186185
// is active. As we do not start in the sleep state this is

branches/snap-stage3/src/libgreen/task.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ impl GreenTask {
8585
sched: None,
8686
handle: None,
8787
nasty_deschedule_lock: unsafe { Mutex::new() },
88-
task: None,
88+
task: Some(~Task::new()),
8989
}
9090
}
9191

@@ -101,16 +101,17 @@ impl GreenTask {
101101
} = opts;
102102

103103
let mut green = GreenTask::new(pool, stack_size, f);
104-
let mut task = ~Task::new();
105-
task.name = name;
106-
match notify_chan {
107-
Some(chan) => {
108-
let on_exit = proc(task_result) { chan.send(task_result) };
109-
task.death.on_exit = Some(on_exit);
104+
{
105+
let task = green.task.get_mut_ref();
106+
task.name = name;
107+
match notify_chan {
108+
Some(chan) => {
109+
let on_exit = proc(task_result) { chan.send(task_result) };
110+
task.death.on_exit = Some(on_exit);
111+
}
112+
None => {}
110113
}
111-
None => {}
112114
}
113-
green.put_task(task);
114115
return green;
115116
}
116117

branches/snap-stage3/src/librustuv/homing.rs

Lines changed: 45 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -145,121 +145,58 @@ impl Drop for HomingMissile {
145145

146146
#[cfg(test)]
147147
mod test {
148+
use green::sched;
149+
use green::{SchedPool, PoolConfig};
150+
use std::rt::rtio::RtioUdpSocket;
151+
use std::io::test::next_test_ip4;
152+
use std::task::TaskOpts;
153+
154+
use net::UdpWatcher;
155+
use super::super::local_loop;
156+
148157
// On one thread, create a udp socket. Then send that socket to another
149158
// thread and destroy the socket on the remote thread. This should make sure
150159
// that homing kicks in for the socket to go back home to the original
151160
// thread, close itself, and then come back to the last thread.
152-
//#[test]
153-
//fn test_homing_closes_correctly() {
154-
// let (port, chan) = Chan::new();
155-
156-
// do task::spawn_sched(task::SingleThreaded) {
157-
// let listener = UdpWatcher::bind(local_loop(), next_test_ip4()).unwrap();
158-
// chan.send(listener);
159-
// }
160-
161-
// do task::spawn_sched(task::SingleThreaded) {
162-
// port.recv();
163-
// }
164-
//}
165-
166-
// This is a bit of a crufty old test, but it has its uses.
167-
//#[test]
168-
//fn test_simple_homed_udp_io_bind_then_move_task_then_home_and_close() {
169-
// use std::cast;
170-
// use std::rt::local::Local;
171-
// use std::rt::rtio::{EventLoop, IoFactory};
172-
// use std::rt::sched::Scheduler;
173-
// use std::rt::sched::{Shutdown, TaskFromFriend};
174-
// use std::rt::sleeper_list::SleeperList;
175-
// use std::rt::task::Task;
176-
// use std::rt::task::UnwindResult;
177-
// use std::rt::thread::Thread;
178-
// use std::rt::deque::BufferPool;
179-
// use std::unstable::run_in_bare_thread;
180-
// use uvio::UvEventLoop;
181-
182-
// do run_in_bare_thread {
183-
// let sleepers = SleeperList::new();
184-
// let mut pool = BufferPool::new();
185-
// let (worker1, stealer1) = pool.deque();
186-
// let (worker2, stealer2) = pool.deque();
187-
// let queues = ~[stealer1, stealer2];
188-
189-
// let loop1 = ~UvEventLoop::new() as ~EventLoop;
190-
// let mut sched1 = ~Scheduler::new(loop1, worker1, queues.clone(),
191-
// sleepers.clone());
192-
// let loop2 = ~UvEventLoop::new() as ~EventLoop;
193-
// let mut sched2 = ~Scheduler::new(loop2, worker2, queues.clone(),
194-
// sleepers.clone());
195-
196-
// let handle1 = sched1.make_handle();
197-
// let handle2 = sched2.make_handle();
198-
// let tasksFriendHandle = sched2.make_handle();
199-
200-
// let on_exit: proc(UnwindResult) = proc(exit_status) {
201-
// let mut handle1 = handle1;
202-
// let mut handle2 = handle2;
203-
// handle1.send(Shutdown);
204-
// handle2.send(Shutdown);
205-
// assert!(exit_status.is_success());
206-
// };
207-
208-
// unsafe fn local_io() -> &'static mut IoFactory {
209-
// let mut sched = Local::borrow(None::<Scheduler>);
210-
// let io = sched.get().event_loop.io();
211-
// cast::transmute(io.unwrap())
212-
// }
213-
214-
// let test_function: proc() = proc() {
215-
// let io = unsafe { local_io() };
216-
// let addr = next_test_ip4();
217-
// let maybe_socket = io.udp_bind(addr);
218-
// // this socket is bound to this event loop
219-
// assert!(maybe_socket.is_ok());
220-
221-
// // block self on sched1
222-
// let scheduler: ~Scheduler = Local::take();
223-
// let mut tasksFriendHandle = Some(tasksFriendHandle);
224-
// scheduler.deschedule_running_task_and_then(|_, task| {
225-
// // unblock task
226-
// task.wake().map(|task| {
227-
// // send self to sched2
228-
// tasksFriendHandle.take_unwrap()
229-
// .send(TaskFromFriend(task));
230-
// });
231-
// // sched1 should now sleep since it has nothing else to do
232-
// })
233-
// // sched2 will wake up and get the task as we do nothing else,
234-
// // the function ends and the socket goes out of scope sched2
235-
// // will start to run the destructor the destructor will first
236-
// // block the task, set it's home as sched1, then enqueue it
237-
// // sched2 will dequeue the task, see that it has a home, and
238-
// // send it to sched1 sched1 will wake up, exec the close
239-
// // function on the correct loop, and then we're done
240-
// };
161+
#[test]
162+
fn test_homing_closes_correctly() {
163+
let (port, chan) = Chan::new();
164+
let mut pool = SchedPool::new(PoolConfig { threads: 1 });
165+
166+
do pool.spawn(TaskOpts::new()) {
167+
let listener = UdpWatcher::bind(local_loop(), next_test_ip4());
168+
chan.send(listener.unwrap());
169+
}
241170

242-
// let mut main_task = ~Task::new_root(&mut sched1.stack_pool, None,
243-
// test_function);
244-
// main_task.death.on_exit = Some(on_exit);
171+
let task = do pool.task(TaskOpts::new()) {
172+
port.recv();
173+
};
174+
pool.spawn_sched().send(sched::TaskFromFriend(task));
245175

246-
// let null_task = ~do Task::new_root(&mut sched2.stack_pool, None) {
247-
// // nothing
248-
// };
176+
pool.shutdown();
177+
}
249178

250-
// let main_task = main_task;
251-
// let sched1 = sched1;
252-
// let thread1 = do Thread::start {
253-
// sched1.bootstrap(main_task);
254-
// };
179+
#[test]
180+
fn test_homing_read() {
181+
let (port, chan) = Chan::new();
182+
let mut pool = SchedPool::new(PoolConfig { threads: 1 });
183+
184+
do pool.spawn(TaskOpts::new()) {
185+
let addr1 = next_test_ip4();
186+
let addr2 = next_test_ip4();
187+
let listener = UdpWatcher::bind(local_loop(), addr2);
188+
chan.send((listener.unwrap(), addr1));
189+
let mut listener = UdpWatcher::bind(local_loop(), addr1).unwrap();
190+
listener.sendto([1, 2, 3, 4], addr2);
191+
}
255192

256-
// let sched2 = sched2;
257-
// let thread2 = do Thread::start {
258-
// sched2.bootstrap(null_task);
259-
// };
193+
let task = do pool.task(TaskOpts::new()) {
194+
let (mut watcher, addr) = port.recv();
195+
let mut buf = [0, ..10];
196+
assert_eq!(watcher.recvfrom(buf).unwrap(), (4, addr));
197+
};
198+
pool.spawn_sched().send(sched::TaskFromFriend(task));
260199

261-
// thread1.join();
262-
// thread2.join();
263-
// }
264-
//}
200+
pool.shutdown();
201+
}
265202
}

0 commit comments

Comments
 (0)