Skip to content

Commit 3fcd74d

Browse files
committed
---
yaml --- r: 139836 b: refs/heads/try2 c: 68583a2 h: refs/heads/master v: v3
1 parent 28dff0a commit 3fcd74d

File tree

3 files changed

+95
-63
lines changed

3 files changed

+95
-63
lines changed

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ refs/heads/snap-stage3: 78a7676898d9f80ab540c6df5d4c9ce35bb50463
55
refs/heads/try: 519addf6277dbafccbb4159db4b710c37eaa2ec5
66
refs/tags/release-0.1: 1f5c5126e96c79d22cb7862f75304136e204f105
77
refs/heads/ndm: f3868061cd7988080c30d6d5bf352a5a5fe2460b
8-
refs/heads/try2: ebefe07792caf17c03c6f90fb1979d4e6c935001
8+
refs/heads/try2: 68583a25a0b31bc113cf1f4ec479339cbf876e4d
99
refs/heads/dist-snap: ba4081a5a8573875fed17545846f6f6902c8ba8d
1010
refs/tags/release-0.2: c870d2dffb391e14efb05aa27898f1f6333a9596
1111
refs/tags/release-0.3: b5f0d0f648d9a6153664837026ba1be43d3e2503

branches/try2/src/libcore/rt/sched/mod.rs

Lines changed: 70 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use super::context::Context;
2020
#[cfg(test)] use super::uvio::UvEventLoop;
2121
#[cfg(test)] use unstable::run_in_bare_thread;
2222
#[cfg(test)] use int;
23+
#[cfg(test)] use cell::Cell;
2324

2425
mod local;
2526

@@ -46,14 +47,14 @@ pub struct Scheduler {
4647
// complaining
4748
type UnsafeTaskReceiver = sys::Closure;
4849
trait HackAroundBorrowCk {
49-
fn from_fn(&fn(&mut Scheduler, ~Task)) -> Self;
50-
fn to_fn(self) -> &fn(&mut Scheduler, ~Task);
50+
fn from_fn(&fn(~Task)) -> Self;
51+
fn to_fn(self) -> &fn(~Task);
5152
}
5253
impl HackAroundBorrowCk for UnsafeTaskReceiver {
53-
fn from_fn(f: &fn(&mut Scheduler, ~Task)) -> UnsafeTaskReceiver {
54+
fn from_fn(f: &fn(~Task)) -> UnsafeTaskReceiver {
5455
unsafe { transmute(f) }
5556
}
56-
fn to_fn(self) -> &fn(&mut Scheduler, ~Task) {
57+
fn to_fn(self) -> &fn(~Task) {
5758
unsafe { transmute(self) }
5859
}
5960
}
@@ -97,10 +98,12 @@ pub impl Scheduler {
9798

9899
let scheduler = Scheduler::unsafe_local_borrow();
99100
fn run_scheduler_once() {
100-
let scheduler = Scheduler::unsafe_local_borrow();
101+
let scheduler = Scheduler::local_take();
101102
if scheduler.resume_task_from_queue() {
102103
// Ok, a task ran. Nice! We'll do it again later
103-
scheduler.event_loop.callback(run_scheduler_once);
104+
do Scheduler::local_borrow |scheduler| {
105+
scheduler.event_loop.callback(run_scheduler_once);
106+
}
104107
}
105108
}
106109

@@ -124,9 +127,13 @@ pub impl Scheduler {
124127
local::put(sched);
125128
}
126129

130+
fn local_take() -> ~Scheduler {
131+
local::take()
132+
}
133+
127134
// * Scheduler-context operations
128135

129-
fn resume_task_from_queue(&mut self) -> bool {
136+
fn resume_task_from_queue(~self) -> bool {
130137
assert!(!self.in_task_context());
131138

132139
let mut self = self;
@@ -137,12 +144,14 @@ pub impl Scheduler {
137144
}
138145
None => {
139146
rtdebug!("no tasks in queue");
147+
local::put(self);
140148
return false;
141149
}
142150
}
143151
}
144152

145-
fn resume_task_immediately(&mut self, task: ~Task) {
153+
fn resume_task_immediately(~self, task: ~Task) {
154+
let mut self = self;
146155
assert!(!self.in_task_context());
147156

148157
rtdebug!("scheduling a task");
@@ -151,39 +160,46 @@ pub impl Scheduler {
151160
self.current_task = Some(task);
152161
self.enqueue_cleanup_job(DoNothing);
153162

163+
local::put(self);
164+
154165
// Take pointers to both the task and scheduler's saved registers.
155-
{
156-
let (sched_context, _, next_task_context) = self.get_contexts();
157-
let next_task_context = next_task_context.unwrap();
158-
// Context switch to the task, restoring it's registers
159-
// and saving the scheduler's
160-
Context::swap(sched_context, next_task_context);
161-
}
166+
let sched = Scheduler::unsafe_local_borrow();
167+
let (sched_context, _, next_task_context) = sched.get_contexts();
168+
let next_task_context = next_task_context.unwrap();
169+
// Context switch to the task, restoring it's registers
170+
// and saving the scheduler's
171+
Context::swap(sched_context, next_task_context);
162172

173+
let sched = Scheduler::unsafe_local_borrow();
163174
// The running task should have passed ownership elsewhere
164-
assert!(self.current_task.is_none());
175+
assert!(sched.current_task.is_none());
165176

166177
// Running tasks may have asked us to do some cleanup
167-
self.run_cleanup_job();
178+
sched.run_cleanup_job();
168179
}
169180

170181

171182
// * Task-context operations
172183

173184
/// Called by a running task to end execution, after which it will
174185
/// be recycled by the scheduler for reuse in a new task.
175-
fn terminate_current_task(&mut self) {
186+
fn terminate_current_task(~self) {
187+
let mut self = self;
176188
assert!(self.in_task_context());
177189

178190
rtdebug!("ending running task");
179191

180192
let dead_task = self.current_task.swap_unwrap();
181193
self.enqueue_cleanup_job(RecycleTask(dead_task));
182-
{
183-
let (sched_context, last_task_context, _) = self.get_contexts();
184-
let last_task_context = last_task_context.unwrap();
185-
Context::swap(last_task_context, sched_context);
186-
}
194+
195+
local::put(self);
196+
197+
let sched = Scheduler::unsafe_local_borrow();
198+
let (sched_context, last_task_context, _) = sched.get_contexts();
199+
let last_task_context = last_task_context.unwrap();
200+
Context::swap(last_task_context, sched_context);
201+
202+
// Control never reaches here
187203
}
188204

189205
/// Block a running task, context switch to the scheduler, then pass the
@@ -194,22 +210,25 @@ pub impl Scheduler {
194210
/// The closure here is a *stack* closure that lives in the
195211
/// running task. It gets transmuted to the scheduler's lifetime
196212
/// and called while the task is blocked.
197-
fn deschedule_running_task_and_then(&mut self, f: &fn(&mut Scheduler, ~Task)) {
213+
fn deschedule_running_task_and_then(~self, f: &fn(~Task)) {
214+
let mut self = self;
198215
assert!(self.in_task_context());
199216

200217
rtdebug!("blocking task");
201218

202219
let blocked_task = self.current_task.swap_unwrap();
203220
let f_fake_region = unsafe {
204-
transmute::<&fn(&mut Scheduler, ~Task), &fn(&mut Scheduler, ~Task)>(f)
221+
transmute::<&fn(~Task), &fn(~Task)>(f)
205222
};
206223
let f_opaque = HackAroundBorrowCk::from_fn(f_fake_region);
207224
self.enqueue_cleanup_job(GiveTask(blocked_task, f_opaque));
208-
{
209-
let (sched_context, last_task_context, _) = self.get_contexts();
210-
let last_task_context = last_task_context.unwrap();
211-
Context::swap(last_task_context, sched_context);
212-
}
225+
226+
local::put(self);
227+
228+
let sched = Scheduler::unsafe_local_borrow();
229+
let (sched_context, last_task_context, _) = sched.get_contexts();
230+
let last_task_context = last_task_context.unwrap();
231+
Context::swap(last_task_context, sched_context);
213232

214233
// We could be executing in a different thread now
215234
let sched = Scheduler::unsafe_local_borrow();
@@ -219,20 +238,23 @@ pub impl Scheduler {
219238
/// Switch directly to another task, without going through the scheduler.
220239
/// You would want to think hard about doing this, e.g. if there are
221240
/// pending I/O events it would be a bad idea.
222-
fn resume_task_from_running_task_direct(&mut self, next_task: ~Task) {
241+
fn resume_task_from_running_task_direct(~self, next_task: ~Task) {
242+
let mut self = self;
223243
assert!(self.in_task_context());
224244

225245
rtdebug!("switching tasks");
226246

227247
let old_running_task = self.current_task.swap_unwrap();
228248
self.enqueue_cleanup_job(RescheduleTask(old_running_task));
229249
self.current_task = Some(next_task);
230-
{
231-
let (_, last_task_context, next_task_context) = self.get_contexts();
232-
let last_task_context = last_task_context.unwrap();
233-
let next_task_context = next_task_context.unwrap();
234-
Context::swap(last_task_context, next_task_context);
235-
}
250+
251+
local::put(self);
252+
253+
let sched = Scheduler::unsafe_local_borrow();
254+
let (_, last_task_context, next_task_context) = sched.get_contexts();
255+
let last_task_context = last_task_context.unwrap();
256+
let next_task_context = next_task_context.unwrap();
257+
Context::swap(last_task_context, next_task_context);
236258

237259
// We could be executing in a different thread now
238260
let sched = Scheduler::unsafe_local_borrow();
@@ -261,7 +283,7 @@ pub impl Scheduler {
261283
self.task_queue.push_front(task);
262284
}
263285
RecycleTask(task) => task.recycle(&mut self.stack_pool),
264-
GiveTask(task, f) => (f.to_fn())(self, task)
286+
GiveTask(task, f) => (f.to_fn())(task)
265287
}
266288
}
267289

@@ -338,7 +360,7 @@ pub impl Task {
338360

339361
start();
340362

341-
let sched = Scheduler::unsafe_local_borrow();
363+
let sched = Scheduler::local_take();
342364
sched.terminate_current_task();
343365
};
344366
return wrapper;
@@ -398,7 +420,7 @@ fn test_swap_tasks() {
398420
let mut sched = ~UvEventLoop::new_scheduler();
399421
let task1 = ~do Task::new(&mut sched.stack_pool) {
400422
unsafe { *count_ptr = *count_ptr + 1; }
401-
let sched = Scheduler::unsafe_local_borrow();
423+
let mut sched = Scheduler::local_take();
402424
let task2 = ~do Task::new(&mut sched.stack_pool) {
403425
unsafe { *count_ptr = *count_ptr + 1; }
404426
};
@@ -463,7 +485,7 @@ fn test_run_a_lot_of_tasks_direct() {
463485
assert!(count == MAX);
464486

465487
fn run_task(count_ptr: *mut int) {
466-
let sched = Scheduler::unsafe_local_borrow();
488+
let mut sched = Scheduler::local_take();
467489
let task = ~do Task::new(&mut sched.stack_pool) {
468490
unsafe {
469491
*count_ptr = *count_ptr + 1;
@@ -483,11 +505,14 @@ fn test_block_task() {
483505
do run_in_bare_thread {
484506
let mut sched = ~UvEventLoop::new_scheduler();
485507
let task = ~do Task::new(&mut sched.stack_pool) {
486-
let sched = Scheduler::unsafe_local_borrow();
508+
let sched = Scheduler::local_take();
487509
assert!(sched.in_task_context());
488-
do sched.deschedule_running_task_and_then() |sched, task| {
489-
assert!(!sched.in_task_context());
490-
sched.task_queue.push_back(task);
510+
do sched.deschedule_running_task_and_then() |task| {
511+
let task = Cell(task);
512+
do Scheduler::local_borrow |sched| {
513+
assert!(!sched.in_task_context());
514+
sched.task_queue.push_back(task.take());
515+
}
491516
}
492517
};
493518
sched.task_queue.push_back(task);

branches/try2/src/libcore/rt/uvio.rs

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -104,14 +104,16 @@ impl IoFactory for UvIoFactory {
104104
let result_cell = empty_cell();
105105
let result_cell_ptr: *Cell<Option<~StreamObject>> = &result_cell;
106106

107-
let scheduler = Scheduler::unsafe_local_borrow();
107+
let scheduler = Scheduler::local_take();
108108
assert!(scheduler.in_task_context());
109109

110110
// Block this task and take ownership, switch to scheduler context
111-
do scheduler.deschedule_running_task_and_then |scheduler, task| {
111+
do scheduler.deschedule_running_task_and_then |task| {
112112

113113
rtdebug!("connect: entered scheduler context");
114-
assert!(!scheduler.in_task_context());
114+
do Scheduler::local_borrow |scheduler| {
115+
assert!(!scheduler.in_task_context());
116+
}
115117
let mut tcp_watcher = TcpWatcher::new(self.uv_loop());
116118
let task_cell = Cell(task);
117119

@@ -131,7 +133,7 @@ impl IoFactory for UvIoFactory {
131133
unsafe { (*result_cell_ptr).put_back(maybe_stream); }
132134

133135
// Context switch
134-
let scheduler = Scheduler::unsafe_local_borrow();
136+
let scheduler = Scheduler::local_take();
135137
scheduler.resume_task_immediately(task_cell.take());
136138
}
137139
}
@@ -176,10 +178,10 @@ impl TcpListener for UvTcpListener {
176178

177179
let server_tcp_watcher = self.watcher();
178180

179-
let scheduler = Scheduler::unsafe_local_borrow();
181+
let scheduler = Scheduler::local_take();
180182
assert!(scheduler.in_task_context());
181183

182-
do scheduler.deschedule_running_task_and_then |_, task| {
184+
do scheduler.deschedule_running_task_and_then |task| {
183185
let task_cell = Cell(task);
184186
let mut server_tcp_watcher = server_tcp_watcher;
185187
do server_tcp_watcher.listen |server_stream_watcher, status| {
@@ -199,7 +201,7 @@ impl TcpListener for UvTcpListener {
199201

200202
rtdebug!("resuming task from listen");
201203
// Context switch
202-
let scheduler = Scheduler::unsafe_local_borrow();
204+
let scheduler = Scheduler::local_take();
203205
scheduler.resume_task_immediately(task_cell.take());
204206
}
205207
}
@@ -239,13 +241,15 @@ impl Stream for UvStream {
239241
let result_cell = empty_cell();
240242
let result_cell_ptr: *Cell<Result<uint, ()>> = &result_cell;
241243

242-
let scheduler = Scheduler::unsafe_local_borrow();
244+
let scheduler = Scheduler::local_take();
243245
assert!(scheduler.in_task_context());
244246
let watcher = self.watcher();
245247
let buf_ptr: *&mut [u8] = &buf;
246-
do scheduler.deschedule_running_task_and_then |scheduler, task| {
248+
do scheduler.deschedule_running_task_and_then |task| {
247249
rtdebug!("read: entered scheduler context");
248-
assert!(!scheduler.in_task_context());
250+
do Scheduler::local_borrow |scheduler| {
251+
assert!(!scheduler.in_task_context());
252+
}
249253
let mut watcher = watcher;
250254
let task_cell = Cell(task);
251255
// XXX: We shouldn't reallocate these callbacks every
@@ -271,7 +275,7 @@ impl Stream for UvStream {
271275

272276
unsafe { (*result_cell_ptr).put_back(result); }
273277

274-
let scheduler = Scheduler::unsafe_local_borrow();
278+
let scheduler = Scheduler::local_take();
275279
scheduler.resume_task_immediately(task_cell.take());
276280
}
277281
}
@@ -283,11 +287,11 @@ impl Stream for UvStream {
283287
fn write(&mut self, buf: &[u8]) -> Result<(), ()> {
284288
let result_cell = empty_cell();
285289
let result_cell_ptr: *Cell<Result<(), ()>> = &result_cell;
286-
let scheduler = Scheduler::unsafe_local_borrow();
290+
let scheduler = Scheduler::local_take();
287291
assert!(scheduler.in_task_context());
288292
let watcher = self.watcher();
289293
let buf_ptr: *&[u8] = &buf;
290-
do scheduler.deschedule_running_task_and_then |_, task| {
294+
do scheduler.deschedule_running_task_and_then |task| {
291295
let mut watcher = watcher;
292296
let task_cell = Cell(task);
293297
let buf = unsafe { &*buf_ptr };
@@ -302,7 +306,7 @@ impl Stream for UvStream {
302306

303307
unsafe { (*result_cell_ptr).put_back(result); }
304308

305-
let scheduler = Scheduler::unsafe_local_borrow();
309+
let scheduler = Scheduler::local_take();
306310
scheduler.resume_task_immediately(task_cell.take());
307311
}
308312
}
@@ -404,12 +408,15 @@ fn test_read_and_block() {
404408
}
405409
reads += 1;
406410

407-
let scheduler = Scheduler::unsafe_local_borrow();
411+
let scheduler = Scheduler::local_take();
408412
// Yield to the other task in hopes that it
409413
// will trigger a read callback while we are
410414
// not ready for it
411-
do scheduler.deschedule_running_task_and_then |scheduler, task| {
412-
scheduler.task_queue.push_back(task);
415+
do scheduler.deschedule_running_task_and_then |task| {
416+
let task = Cell(task);
417+
do Scheduler::local_borrow |scheduler| {
418+
scheduler.task_queue.push_back(task.take());
419+
}
413420
}
414421
}
415422

0 commit comments

Comments
 (0)