Skip to content

Commit 26e6eb3

Browse files
committed
Handle failure conditions correctly in pipes.
1 parent d07e537 commit 26e6eb3

File tree

6 files changed

+48
-19
lines changed

6 files changed

+48
-19
lines changed

src/libcore/pipes.rs

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ extern mod rustrt {
4747
#[rust_stack]
4848
fn task_clear_event_reject(task: *rust_task);
4949

50-
fn task_wait_event(this: *rust_task) -> *libc::c_void;
50+
fn task_wait_event(this: *rust_task, killed: &mut bool) -> *libc::c_void;
5151
fn task_signal_event(target: *rust_task, event: *libc::c_void);
5252
}
5353

@@ -57,6 +57,16 @@ unsafe fn uniquify<T>(x: *T) -> ~T {
5757
unsafe { unsafe::reinterpret_cast(x) }
5858
}
5959

60+
fn wait_event(this: *rust_task) -> *libc::c_void {
61+
let mut killed = false;
62+
63+
let res = rustrt::task_wait_event(this, &mut killed);
64+
if killed && !task::failing() {
65+
fail "killed"
66+
}
67+
res
68+
}
69+
6070
fn swap_state_acq(&dst: state, src: state) -> state {
6171
unsafe {
6272
reinterpret_cast(rusti::atomic_xchng_acq(
@@ -113,23 +123,23 @@ fn recv<T: send>(-p: recv_packet<T>) -> option<T> {
113123
let this = rustrt::rust_get_task();
114124
rustrt::task_clear_event_reject(this);
115125
p.header.blocked_task = some(this);
126+
let mut first = true;
116127
loop {
128+
rustrt::task_clear_event_reject(this);
117129
let old_state = swap_state_acq(p.header.state,
118130
blocked);
119131
#debug("%?", old_state);
120132
alt old_state {
121133
empty {
122134
#debug("no data available on %?, going to sleep.", p_);
123-
rustrt::task_wait_event(this);
135+
wait_event(this);
124136
#debug("woke up, p.state = %?", p.header.state);
125-
if p.header.state == full {
126-
let mut payload = none;
127-
payload <-> (*p).payload;
128-
p.header.state = terminated;
129-
ret some(option::unwrap(payload))
137+
}
138+
blocked {
139+
if first {
140+
fail "blocking on already blocked packet"
130141
}
131142
}
132-
blocked { fail "blocking on already blocked packet" }
133143
full {
134144
let mut payload = none;
135145
payload <-> (*p).payload;
@@ -141,11 +151,12 @@ fn recv<T: send>(-p: recv_packet<T>) -> option<T> {
141151
ret none;
142152
}
143153
}
154+
first = false;
144155
}
145156
}
146157

147158
/// Returns true if messages are available.
148-
fn peek<T: send>(p: recv_packet<T>) -> bool {
159+
pure fn peek<T: send>(p: recv_packet<T>) -> bool {
149160
alt p.header().state {
150161
empty { false }
151162
blocked { fail "peeking on blocked packet" }
@@ -236,7 +247,7 @@ fn wait_many(pkts: ~[&a.packet_header]) -> uint {
236247

237248
while !data_avail {
238249
#debug("sleeping on %? packets", pkts.len());
239-
let event = rustrt::task_wait_event(this) as *packet_header;
250+
let event = wait_event(this) as *packet_header;
240251
let pos = vec::position(pkts, |p| ptr::addr_of(*p) == event);
241252

242253
alt pos {
@@ -356,7 +367,7 @@ class recv_packet<T: send> {
356367
option::unwrap(p)
357368
}
358369

359-
fn header() -> &self.packet_header {
370+
pure fn header() -> &self.packet_header {
360371
alt self.p {
361372
some(packet) {
362373
unsafe {

src/libcore/task.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ export future_result;
4646
export future_task;
4747
export unsupervise;
4848
export run_listener;
49+
export run_with;
4950

5051
export spawn;
5152
export spawn_with;

src/rt/rust_builtin.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -930,11 +930,11 @@ task_clear_event_reject(rust_task *task) {
930930
// Waits on an event, returning the pointer to the event that unblocked this
931931
// task.
932932
extern "C" void *
933-
task_wait_event(rust_task *task) {
933+
task_wait_event(rust_task *task, bool *killed) {
934934
// TODO: we should assert that the passed in task is the currently running
935935
// task. We wouldn't want to wait some other task.
936936

937-
return task->wait_event();
937+
return task->wait_event(killed);
938938
}
939939

940940
extern "C" void

src/rt/rust_task.cpp

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -713,16 +713,14 @@ rust_task::allow_kill() {
713713
}
714714

715715
void *
716-
rust_task::wait_event() {
716+
rust_task::wait_event(bool *killed) {
717717
scoped_lock with(state_lock);
718718

719719
if(!event_reject) {
720720
block_locked(&event_cond, "waiting on event");
721-
bool killed = false;
722721
state_lock.unlock();
723-
yield(&killed);
722+
yield(killed);
724723
state_lock.lock();
725-
// TODO: what is the right thing to do if we are killed?
726724
}
727725

728726
event_reject = false;

src/rt/rust_task.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,7 @@ rust_task : public kernel_owned<rust_task>, rust_cond
316316
this->event_reject = false;
317317
}
318318

319-
void *wait_event();
319+
void *wait_event(bool *killed);
320320
void signal_event(void *event);
321321

322322
void cleanup_after_turn();

src/test/run-pass/pipe-detect-term.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,5 +26,24 @@ fn main() {
2626
}
2727
});
2828

29-
sleep(iotask, 1000);
29+
sleep(iotask, 100);
30+
31+
let b = task::builder();
32+
task::unsupervise(b);
33+
task::run(b, failtest);
34+
}
35+
36+
// Make sure the right thing happens during failure.
37+
fn failtest() {
38+
let iotask = uv::global_loop::get();
39+
40+
let (c, p) = oneshot::init();
41+
42+
do task::spawn_with(c) |_c| {
43+
fail;
44+
}
45+
46+
#error("%?", recv(p));
47+
// make sure we get killed if we missed it in the receive.
48+
loop { task::yield() }
3049
}

0 commit comments

Comments
 (0)