Skip to content

Commit 983e702

Browse files
committed
---
yaml --- r: 64509 b: refs/heads/snap-stage3 c: 9ad1997 h: refs/heads/master i: 64507: f288685 v: v3
1 parent 3bef764 commit 983e702

File tree

8 files changed

+103
-61
lines changed

8 files changed

+103
-61
lines changed

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
---
22
refs/heads/master: 2d28d645422c1617be58c8ca7ad9a457264ca850
33
refs/heads/snap-stage1: e33de59e47c5076a89eadeb38f4934f58a3618a6
4-
refs/heads/snap-stage3: 0101f35f276d0ef1ab841a179d01d0c66a18b38a
4+
refs/heads/snap-stage3: 9ad199754923e6d0ce8a004087036bf5bd347fbf
55
refs/heads/try: 7b78b52e602bb3ea8174f9b2006bff3315f03ef9
66
refs/tags/release-0.1: 1f5c5126e96c79d22cb7862f75304136e204f105
77
refs/heads/ndm: f3868061cd7988080c30d6d5bf352a5a5fe2460b

branches/snap-stage3/src/libstd/rt/comm.rs

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use option::*;
1919
use cast;
2020
use util;
2121
use ops::Drop;
22-
use rt::task::Task;
22+
use rt::kill::BlockedTask;
2323
use kinds::Send;
2424
use rt::sched::Scheduler;
2525
use rt::local::Local;
@@ -30,13 +30,13 @@ use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable};
3030
use cell::Cell;
3131
use clone::Clone;
3232

33-
/// A combined refcount / ~Task pointer.
33+
/// A combined refcount / BlockedTask-as-uint pointer.
3434
///
3535
/// Can be equal to the following values:
3636
///
3737
/// * 2 - both endpoints are alive
3838
/// * 1 - either the sender or the receiver is dead, determined by context
39-
/// * <ptr> - A pointer to a blocked Task that can be transmuted to ~Task
39+
/// * <ptr> - A pointer to a blocked Task (see BlockedTask::cast_{to,from}_uint)
4040
type State = uint;
4141

4242
static STATE_BOTH: State = 2;
@@ -137,11 +137,13 @@ impl<T> ChanOne<T> {
137137
}
138138
task_as_state => {
139139
// Port is blocked. Wake it up.
140-
let recvr: ~Task = cast::transmute(task_as_state);
141-
let mut sched = Local::take::<Scheduler>();
142-
rtdebug!("rendezvous send");
143-
sched.metrics.rendezvous_sends += 1;
144-
sched.schedule_task(recvr);
140+
let recvr = BlockedTask::cast_from_uint(task_as_state);
141+
do recvr.wake().map_consume |woken_task| {
142+
let mut sched = Local::take::<Scheduler>();
143+
rtdebug!("rendezvous send");
144+
sched.metrics.rendezvous_sends += 1;
145+
sched.schedule_task(woken_task);
146+
};
145147
}
146148
}
147149
}
@@ -177,7 +179,7 @@ impl<T> PortOne<T> {
177179
// an acquire barrier to prevent reordering of the subsequent read
178180
// of the payload. Also issues a release barrier to prevent reordering
179181
// of any previous writes to the task structure.
180-
let task_as_state: State = cast::transmute(task);
182+
let task_as_state = task.cast_to_uint();
181183
let oldstate = (*packet).state.swap(task_as_state, SeqCst);
182184
match oldstate {
183185
STATE_BOTH => {
@@ -193,8 +195,8 @@ impl<T> PortOne<T> {
193195
// NB: We have to drop back into the scheduler event loop here
194196
// instead of switching immediately back or we could end up
195197
// triggering infinite recursion on the scheduler's stack.
196-
let task: ~Task = cast::transmute(task_as_state);
197-
sched.enqueue_task(task);
198+
let recvr = BlockedTask::cast_from_uint(task_as_state);
199+
sched.enqueue_blocked_task(recvr);
198200
}
199201
_ => util::unreachable()
200202
}
@@ -258,9 +260,11 @@ impl<T> Drop for ChanOneHack<T> {
258260
task_as_state => {
259261
// The port is blocked waiting for a message we will never send. Wake it.
260262
assert!((*this.packet()).payload.is_none());
261-
let recvr: ~Task = cast::transmute(task_as_state);
262-
let sched = Local::take::<Scheduler>();
263-
sched.schedule_task(recvr);
263+
let recvr = BlockedTask::cast_from_uint(task_as_state);
264+
do recvr.wake().map_consume |woken_task| {
265+
let sched = Local::take::<Scheduler>();
266+
sched.schedule_task(woken_task);
267+
};
264268
}
265269
}
266270
}

branches/snap-stage3/src/libstd/rt/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,7 @@ fn test_context() {
367367
let sched = Local::take::<Scheduler>();
368368
do sched.deschedule_running_task_and_then() |sched, task| {
369369
assert_eq!(context(), SchedulerContext);
370-
sched.enqueue_task(task);
370+
sched.enqueue_blocked_task(task);
371371
}
372372
};
373373
sched.enqueue_task(task);

branches/snap-stage3/src/libstd/rt/sched.rs

Lines changed: 60 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@
88
// option. This file may not be copied, modified, or distributed
99
// except according to those terms.
1010

11-
use option::*;
11+
use either::{Left, Right};
12+
use option::{Option, Some, None};
1213
use sys;
1314
use cast::transmute;
1415
use clone::Clone;
@@ -20,6 +21,7 @@ use super::rtio::{EventLoop, EventLoopObject, RemoteCallbackObject};
2021
use super::context::Context;
2122
use super::task::{Task, AnySched, Sched};
2223
use super::message_queue::MessageQueue;
24+
use rt::kill::BlockedTask;
2325
use rt::local_ptr;
2426
use rt::local::Local;
2527
use rt::rtio::RemoteCallback;
@@ -271,6 +273,14 @@ impl Scheduler {
271273
};
272274
}
273275

276+
/// As enqueue_task, but with the possibility for the blocked task to
277+
/// already have been killed.
278+
pub fn enqueue_blocked_task(&mut self, blocked_task: BlockedTask) {
279+
do blocked_task.wake().map_consume |task| {
280+
self.enqueue_task(task);
281+
};
282+
}
283+
274284
// * Scheduler-context operations
275285

276286
fn interpret_message_queue(~self) -> bool {
@@ -412,14 +422,26 @@ impl Scheduler {
412422
/// Called by a running task to end execution, after which it will
413423
/// be recycled by the scheduler for reuse in a new task.
414424
pub fn terminate_current_task(~self) {
415-
assert!(self.in_task_context());
425+
let mut this = self;
426+
assert!(this.in_task_context());
416427

417428
rtdebug!("ending running task");
418429

419-
do self.deschedule_running_task_and_then |sched, dead_task| {
420-
let mut dead_task = dead_task;
421-
let coroutine = dead_task.coroutine.take_unwrap();
422-
coroutine.recycle(&mut sched.stack_pool);
430+
// This task is post-cleanup, so it must be unkillable. This sequence
431+
// of descheduling and recycling must not get interrupted by a kill.
432+
// FIXME(#7544): Make this use an inner descheduler, like yield should.
433+
this.current_task.get_mut_ref().death.unkillable += 1;
434+
435+
do this.deschedule_running_task_and_then |sched, dead_task| {
436+
match dead_task.wake() {
437+
Some(dead_task) => {
438+
let mut dead_task = dead_task;
439+
dead_task.death.unkillable -= 1; // FIXME(#7544) ugh
440+
let coroutine = dead_task.coroutine.take_unwrap();
441+
coroutine.recycle(&mut sched.stack_pool);
442+
}
443+
None => rtabort!("dead task killed before recycle"),
444+
}
423445
}
424446

425447
rtabort!("control reached end of task");
@@ -440,7 +462,7 @@ impl Scheduler {
440462
// here we know we are home, execute now OR we know we
441463
// aren't homed, and that this sched doesn't care
442464
do this.switch_running_tasks_and_then(task) |sched, last_task| {
443-
sched.enqueue_task(last_task);
465+
sched.enqueue_blocked_task(last_task);
444466
}
445467
} else if !homed && !this.run_anything {
446468
// the task isn't homed, but it can't be run here
@@ -491,6 +513,13 @@ impl Scheduler {
491513
}
492514
}
493515

516+
pub fn resume_blocked_task_immediately(~self, blocked_task: BlockedTask) {
517+
match blocked_task.wake() {
518+
Some(task) => self.resume_task_immediately(task),
519+
None => Local::put(self),
520+
};
521+
}
522+
494523
/// Block a running task, context switch to the scheduler, then pass the
495524
/// blocked task to a closure.
496525
///
@@ -503,7 +532,7 @@ impl Scheduler {
503532
/// This passes a Scheduler pointer to the fn after the context switch
504533
/// in order to prevent that fn from performing further scheduling operations.
505534
/// Doing further scheduling could easily result in infinite recursion.
506-
pub fn deschedule_running_task_and_then(~self, f: &fn(&mut Scheduler, ~Task)) {
535+
pub fn deschedule_running_task_and_then(~self, f: &fn(&mut Scheduler, BlockedTask)) {
507536
let mut this = self;
508537
assert!(this.in_task_context());
509538

@@ -512,8 +541,8 @@ impl Scheduler {
512541

513542
unsafe {
514543
let blocked_task = this.current_task.take_unwrap();
515-
let f_fake_region = transmute::<&fn(&mut Scheduler, ~Task),
516-
&fn(&mut Scheduler, ~Task)>(f);
544+
let f_fake_region = transmute::<&fn(&mut Scheduler, BlockedTask),
545+
&fn(&mut Scheduler, BlockedTask)>(f);
517546
let f_opaque = ClosureConverter::from_fn(f_fake_region);
518547
this.enqueue_cleanup_job(GiveTask(blocked_task, f_opaque));
519548
}
@@ -539,7 +568,7 @@ impl Scheduler {
539568
/// You would want to think hard about doing this, e.g. if there are
540569
/// pending I/O events it would be a bad idea.
541570
pub fn switch_running_tasks_and_then(~self, next_task: ~Task,
542-
f: &fn(&mut Scheduler, ~Task)) {
571+
f: &fn(&mut Scheduler, BlockedTask)) {
543572
let mut this = self;
544573
assert!(this.in_task_context());
545574

@@ -548,8 +577,8 @@ impl Scheduler {
548577

549578
let old_running_task = this.current_task.take_unwrap();
550579
let f_fake_region = unsafe {
551-
transmute::<&fn(&mut Scheduler, ~Task),
552-
&fn(&mut Scheduler, ~Task)>(f)
580+
transmute::<&fn(&mut Scheduler, BlockedTask),
581+
&fn(&mut Scheduler, BlockedTask)>(f)
553582
};
554583
let f_opaque = ClosureConverter::from_fn(f_fake_region);
555584
this.enqueue_cleanup_job(GiveTask(old_running_task, f_opaque));
@@ -590,7 +619,15 @@ impl Scheduler {
590619
let cleanup_job = self.cleanup_job.take_unwrap();
591620
match cleanup_job {
592621
DoNothing => { }
593-
GiveTask(task, f) => (f.to_fn())(self, task)
622+
GiveTask(task, f) => {
623+
let f = f.to_fn();
624+
// Task might need to receive a kill signal instead of blocking.
625+
// We can call the "and_then" only if it blocks successfully.
626+
match BlockedTask::try_block(task) {
627+
Left(killed_task) => self.enqueue_task(killed_task),
628+
Right(blocked_task) => f(self, blocked_task),
629+
}
630+
}
594631
}
595632
}
596633

@@ -663,12 +700,14 @@ impl SchedHandle {
663700
// complaining
664701
type UnsafeTaskReceiver = sys::Closure;
665702
trait ClosureConverter {
666-
fn from_fn(&fn(&mut Scheduler, ~Task)) -> Self;
667-
fn to_fn(self) -> &fn(&mut Scheduler, ~Task);
703+
fn from_fn(&fn(&mut Scheduler, BlockedTask)) -> Self;
704+
fn to_fn(self) -> &fn(&mut Scheduler, BlockedTask);
668705
}
669706
impl ClosureConverter for UnsafeTaskReceiver {
670-
fn from_fn(f: &fn(&mut Scheduler, ~Task)) -> UnsafeTaskReceiver { unsafe { transmute(f) } }
671-
fn to_fn(self) -> &fn(&mut Scheduler, ~Task) { unsafe { transmute(self) } }
707+
fn from_fn(f: &fn(&mut Scheduler, BlockedTask)) -> UnsafeTaskReceiver {
708+
unsafe { transmute(f) }
709+
}
710+
fn to_fn(self) -> &fn(&mut Scheduler, BlockedTask) { unsafe { transmute(self) } }
672711
}
673712

674713

@@ -928,8 +967,7 @@ mod test {
928967
};
929968
// Context switch directly to the new task
930969
do sched.switch_running_tasks_and_then(task2) |sched, task1| {
931-
let task1 = Cell::new(task1);
932-
sched.enqueue_task(task1.take());
970+
sched.enqueue_blocked_task(task1);
933971
}
934972
unsafe { *count_ptr = *count_ptr + 1; }
935973
};
@@ -980,9 +1018,8 @@ mod test {
9801018
let sched = Local::take::<Scheduler>();
9811019
assert!(sched.in_task_context());
9821020
do sched.deschedule_running_task_and_then() |sched, task| {
983-
let task = Cell::new(task);
9841021
assert!(!sched.in_task_context());
985-
sched.enqueue_task(task.take());
1022+
sched.enqueue_blocked_task(task);
9861023
}
9871024
};
9881025
sched.enqueue_task(task);
@@ -1004,7 +1041,7 @@ mod test {
10041041
do sched.event_loop.callback_ms(10) {
10051042
rtdebug!("in callback");
10061043
let mut sched = Local::take::<Scheduler>();
1007-
sched.enqueue_task(task.take());
1044+
sched.enqueue_blocked_task(task.take());
10081045
Local::put(sched);
10091046
}
10101047
}

branches/snap-stage3/src/libstd/rt/test.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ pub fn spawntask_immediately(f: ~fn()) {
170170

171171
let sched = Local::take::<Scheduler>();
172172
do sched.switch_running_tasks_and_then(task) |sched, task| {
173-
sched.enqueue_task(task);
173+
sched.enqueue_blocked_task(task);
174174
}
175175
}
176176

@@ -214,7 +214,7 @@ pub fn spawntask_random(f: ~fn()) {
214214

215215
if run_now {
216216
do sched.switch_running_tasks_and_then(task) |sched, task| {
217-
sched.enqueue_task(task);
217+
sched.enqueue_blocked_task(task);
218218
}
219219
} else {
220220
sched.enqueue_task(task);
@@ -284,7 +284,7 @@ pub fn spawntask_try(f: ~fn()) -> Result<(), ()> {
284284

285285
let sched = Local::take::<Scheduler>();
286286
do sched.switch_running_tasks_and_then(new_task) |sched, old_task| {
287-
sched.enqueue_task(old_task);
287+
sched.enqueue_blocked_task(old_task);
288288
}
289289

290290
rtdebug!("enqueued the new task, now waiting on exit_status");

branches/snap-stage3/src/libstd/rt/tube.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,13 @@ use clone::Clone;
1818
use super::rc::RC;
1919
use rt::sched::Scheduler;
2020
use rt::{context, TaskContext, SchedulerContext};
21+
use rt::kill::BlockedTask;
2122
use rt::local::Local;
22-
use rt::task::Task;
2323
use vec::OwnedVector;
2424
use container::Container;
2525

2626
struct TubeState<T> {
27-
blocked_task: Option<~Task>,
27+
blocked_task: Option<BlockedTask>,
2828
buf: ~[T]
2929
}
3030

@@ -55,7 +55,7 @@ impl<T> Tube<T> {
5555
rtdebug!("waking blocked tube");
5656
let task = (*state).blocked_task.take_unwrap();
5757
let sched = Local::take::<Scheduler>();
58-
sched.resume_task_immediately(task);
58+
sched.resume_blocked_task_immediately(task);
5959
}
6060
}
6161
}
@@ -111,7 +111,7 @@ mod test {
111111
do sched.deschedule_running_task_and_then |sched, task| {
112112
let mut tube_clone = tube_clone_cell.take();
113113
tube_clone.send(1);
114-
sched.enqueue_task(task);
114+
sched.enqueue_blocked_task(task);
115115
}
116116

117117
assert!(tube.recv() == 1);
@@ -133,7 +133,7 @@ mod test {
133133
// sending will wake it up.
134134
tube_clone.send(1);
135135
}
136-
sched.enqueue_task(task);
136+
sched.enqueue_blocked_task(task);
137137
}
138138

139139
assert!(tube.recv() == 1);
@@ -168,7 +168,7 @@ mod test {
168168
}
169169
}
170170

171-
sched.enqueue_task(task);
171+
sched.enqueue_blocked_task(task);
172172
}
173173

174174
for int::range(0, MAX) |i| {

0 commit comments

Comments
 (0)