Skip to content

Commit 190ecc2

Browse files
committed
Make sync primitives fail-unwind-friendly
1 parent 6a10e3a commit 190ecc2

File tree

1 file changed

+156
-50
lines changed

1 file changed

+156
-50
lines changed

src/libcore/sync.rs

Lines changed: 156 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,33 @@ type signal_end = pipes::chan<()>;
2323
type waitqueue = { head: pipes::port<signal_end>,
2424
tail: pipes::chan<signal_end> };
2525

26-
// The building-block used to make semaphores, lock-and-signals, and rwlocks.
26+
// Signals one live task from the queue.
27+
fn signal_waitqueue(q: &waitqueue) -> bool {
28+
// The peek is mandatory to make sure recv doesn't block.
29+
if q.head.peek() {
30+
// Pop and send a wakeup signal. If the waiter was killed, its port
31+
// will have closed. Keep trying until we get a live task.
32+
if q.head.recv().try_send(()) {
33+
true
34+
} else {
35+
signal_waitqueue(q)
36+
}
37+
} else {
38+
false
39+
}
40+
}
41+
42+
fn broadcast_waitqueue(q: &waitqueue) -> uint {
43+
let mut count = 0;
44+
while q.head.peek() {
45+
if q.head.recv().try_send(()) {
46+
count += 1;
47+
}
48+
}
49+
count
50+
}
51+
52+
// The building-block used to make semaphores, mutexes, and rwlocks.
2753
enum sem<Q: send> = exclusive<{
2854
mut count: int,
2955
waiters: waitqueue,
@@ -39,6 +65,7 @@ impl<Q: send> &sem<Q> {
3965
do (**self).with |state| {
4066
state.count -= 1;
4167
if state.count < 0 {
68+
// Create waiter nobe.
4269
let (signal_end, wait_end) = pipes::stream();
4370
// Tell outer scope we need to block.
4471
waiter_nobe = some(wait_end);
@@ -58,14 +85,8 @@ impl<Q: send> &sem<Q> {
5885
unsafe {
5986
do (**self).with |state| {
6087
state.count += 1;
61-
// The peek is mandatory to make sure recv doesn't block.
62-
if state.count <= 0 && state.waiters.head.peek() {
63-
// Pop off the waitqueue and send a wakeup signal. If the
64-
// waiter was killed, its port will have closed, and send
65-
// will fail. Keep trying until we get a live task.
66-
state.waiters.head.recv().send(());
67-
// FIXME(#3145) use kill-friendly version when ready
68-
// while !state.waiters.head.recv().try_send(()) { }
88+
if state.count <= 0 {
89+
signal_waitqueue(&state.waiters);
6990
}
7091
}
7192
}
@@ -74,15 +95,25 @@ impl<Q: send> &sem<Q> {
7495
// FIXME(#3154) move both copies of this into sem<Q>, and unify the 2 structs
7596
impl &sem<()> {
7697
fn access<U>(blk: fn() -> U) -> U {
77-
self.acquire();
78-
let _x = sem_release(self);
98+
let mut release = none;
99+
unsafe {
100+
do task::unkillable {
101+
self.acquire();
102+
release = some(sem_release(self));
103+
}
104+
}
79105
blk()
80106
}
81107
}
82108
impl &sem<waitqueue> {
83109
fn access<U>(blk: fn() -> U) -> U {
84-
self.acquire();
85-
let _x = sem_and_signal_release(self);
110+
let mut release = none;
111+
unsafe {
112+
do task::unkillable {
113+
self.acquire();
114+
release = some(sem_and_signal_release(self));
115+
}
116+
}
86117
blk()
87118
}
88119
}
@@ -105,39 +136,58 @@ enum condvar = &sem<waitqueue>;
105136
impl condvar {
106137
/// Atomically drop the associated lock, and block until a signal is sent.
107138
fn wait() {
139+
// This is needed for a failing condition variable to reacquire the
140+
// mutex during unwinding. As long as the wrapper (mutex, etc) is
141+
// bounded in when it gets released, this shouldn't hang forever.
142+
struct sem_and_signal_reacquire {
143+
sem: &sem<waitqueue>;
144+
new(sem: &sem<waitqueue>) { self.sem = sem; }
145+
drop unsafe {
146+
do task::unkillable {
147+
self.sem.acquire();
148+
}
149+
}
150+
}
151+
152+
// Create waiter nobe.
108153
let (signal_end, wait_end) = pipes::stream();
109154
let mut signal_end = some(signal_end);
155+
let mut reacquire = none;
110156
unsafe {
111-
do (***self).with |state| {
112-
// Drop the lock.
113-
// FIXME(#3145) investigate why factoring doesn't compile.
114-
state.count += 1;
115-
if state.count <= 0 && state.waiters.head.peek() {
116-
state.waiters.head.recv().send(());
117-
// FIXME(#3145) use kill-friendly version when ready
157+
do task::unkillable {
158+
// If yield checks start getting inserted anywhere, we can be
159+
// killed before or after enqueueing. Deciding whether to
160+
// unkillably reacquire the lock needs to happen atomically
161+
// wrt enqueuing.
162+
reacquire = some(sem_and_signal_reacquire(*self));
163+
164+
// Release lock, 'atomically' enqueuing ourselves in so doing.
165+
do (***self).with |state| {
166+
// Drop the lock.
167+
// FIXME(#3145) investigate why factoring doesn't compile.
168+
state.count += 1;
169+
if state.count <= 0 {
170+
signal_waitqueue(&state.waiters);
171+
}
172+
// Enqueue ourself to be woken up by a signaller.
173+
let signal_end = option::swap_unwrap(&mut signal_end);
174+
state.blocked.tail.send(signal_end);
118175
}
119-
// Enqueue ourself to be woken up by a signaller.
120-
state.blocked.tail.send(option::swap_unwrap(&mut signal_end));
121176
}
122177
}
123178
// Unconditionally "block". (Might not actually block if a signaller
124179
// did send -- I mean 'unconditionally' in contrast with acquire().)
125180
let _ = wait_end.recv();
126-
// Pick up the lock again. FIXME(#3145): unkillable? destructor?
127-
(*self).acquire();
181+
// 'reacquire' will pick up the lock again in its destructor - it must
182+
// happen whether or not we are killed, and it needs to succeed at
183+
// reacquiring instead of itself dying.
128184
}
129185

130186
/// Wake up a blocked task. Returns false if there was no blocked task.
131187
fn signal() -> bool {
132188
unsafe {
133189
do (***self).with |state| {
134-
if state.blocked.head.peek() {
135-
state.blocked.head.recv().send(());
136-
// FIXME(#3145) use kill-friendly version when ready
137-
true
138-
} else {
139-
false
140-
}
190+
signal_waitqueue(&state.blocked)
141191
}
142192
}
143193
}
@@ -146,13 +196,8 @@ impl condvar {
146196
fn broadcast() -> uint {
147197
unsafe {
148198
do (***self).with |state| {
149-
let mut count = 0;
150-
while state.blocked.head.peek() {
151-
// This is already kill-friendly.
152-
state.blocked.head.recv().send(());
153-
count += 1;
154-
}
155-
count
199+
// FIXME(#3145) fix :broadcast_heavy
200+
broadcast_waitqueue(&state.blocked)
156201
}
157202
}
158203
}
@@ -191,11 +236,12 @@ impl &semaphore {
191236

192237
/**
193238
* Release a held resource represented by the semaphore. Wakes a blocked
194-
* contending task, if any exist.
239+
* contending task, if any exist. Won't block the caller.
195240
*/
196241
fn release() { (&**self).release() }
197242

198243
/// Run a function with ownership of one of the semaphore's resources.
244+
// FIXME(#3145): figure out whether or not this should get exported.
199245
fn access<U>(blk: fn() -> U) -> U { (&**self).access(blk) }
200246
}
201247

@@ -206,6 +252,7 @@ impl &semaphore {
206252
/**
207253
* A blocking, bounded-waiting, mutual exclusion lock with an associated
208254
* FIFO condition variable.
255+
* FIXME(#3145): document killability
209256
*/
210257
enum mutex = sem<waitqueue>;
211258

@@ -243,17 +290,29 @@ impl &mutex {
243290

244291
#[cfg(test)]
245292
mod tests {
293+
#[test]
294+
fn test_sem_acquire_release() {
295+
let s = ~new_semaphore(1);
296+
s.acquire();
297+
s.release();
298+
s.acquire();
299+
}
300+
#[test]
301+
fn test_sem_basic() {
302+
let s = ~new_semaphore(1);
303+
do s.access { }
304+
}
246305
#[test]
247306
fn test_sem_as_mutex() {
248307
let s = ~new_semaphore(1);
249308
let s2 = ~s.clone();
250309
do task::spawn {
251310
do s2.access {
252-
for 10.times { task::yield(); }
311+
for 5.times { task::yield(); }
253312
}
254313
}
255314
do s.access {
256-
for 10.times { task::yield(); }
315+
for 5.times { task::yield(); }
257316
}
258317
}
259318
#[test]
@@ -266,7 +325,7 @@ mod tests {
266325
s2.acquire();
267326
c.send(());
268327
}
269-
for 10.times { task::yield(); }
328+
for 5.times { task::yield(); }
270329
s.release();
271330
let _ = p.recv();
272331

@@ -275,7 +334,7 @@ mod tests {
275334
let s = ~new_semaphore(0);
276335
let s2 = ~s.clone();
277336
do task::spawn {
278-
for 10.times { task::yield(); }
337+
for 5.times { task::yield(); }
279338
s2.release();
280339
let _ = p.recv();
281340
}
@@ -324,7 +383,7 @@ mod tests {
324383
}
325384
}
326385
#[test]
327-
fn test_mutex() {
386+
fn test_mutex_lock() {
328387
// Unsafely achieve shared state, and do the textbook
329388
// "load tmp <- ptr; inc tmp; store ptr <- tmp" dance.
330389
let (c,p) = pipes::stream();
@@ -342,9 +401,9 @@ mod tests {
342401

343402
assert *sharedstate == 20;
344403

345-
fn access_shared(sharedstate: &mut int, sem: &mutex, n: uint) {
404+
fn access_shared(sharedstate: &mut int, m: &mutex, n: uint) {
346405
for n.times {
347-
do sem.lock {
406+
do m.lock {
348407
let oldval = *sharedstate;
349408
task::yield();
350409
*sharedstate = oldval + 1;
@@ -355,13 +414,15 @@ mod tests {
355414
#[test]
356415
fn test_mutex_cond_wait() {
357416
let m = ~new_mutex();
358-
let mut m2 = some(~m.clone());
359417

360418
// Child wakes up parent
361419
do m.lock_cond |cond| {
362-
let m2 = option::swap_unwrap(&mut m2);
420+
let m2 = ~m.clone();
363421
do task::spawn {
364-
do m2.lock_cond |cond| { cond.signal(); }
422+
do m2.lock_cond |cond| {
423+
let woken = cond.signal();
424+
assert woken;
425+
}
365426
}
366427
cond.wait();
367428
}
@@ -377,7 +438,8 @@ mod tests {
377438
}
378439
let _ = port.recv(); // Wait until child gets in the mutex
379440
do m.lock_cond |cond| {
380-
cond.signal();
441+
let woken = cond.signal();
442+
assert woken;
381443
}
382444
let _ = port.recv(); // Wait until child wakes up
383445
}
@@ -409,4 +471,48 @@ mod tests {
409471
// wait until all children wake up
410472
for ports.each |port| { let _ = port.recv(); }
411473
}
474+
#[test] #[ignore(cfg(windows))]
475+
fn test_mutex_killed_simple() {
476+
// Mutex must get automatically unlocked if failed/killed within.
477+
let m = ~new_mutex();
478+
let m2 = ~m.clone();
479+
480+
let result: result::result<(),()> = do task::try {
481+
do m2.lock {
482+
fail;
483+
}
484+
};
485+
assert result.is_err();
486+
// child task must have finished by the time try returns
487+
do m.lock { }
488+
}
489+
#[test] #[ignore(cfg(windows))]
490+
fn test_mutex_killed_cond() {
491+
// Getting killed during cond wait must not corrupt the mutex while
492+
// unwinding (e.g. double unlock).
493+
let m = ~new_mutex();
494+
let m2 = ~m.clone();
495+
496+
let result: result::result<(),()> = do task::try {
497+
let (c,p) = pipes::stream();
498+
do task::spawn { // linked
499+
let _ = p.recv(); // wait for sibling to get in the mutex
500+
task::yield();
501+
fail;
502+
}
503+
do m2.lock_cond |cond| {
504+
c.send(()); // tell sibling go ahead
505+
cond.wait(); // block forever
506+
}
507+
};
508+
assert result.is_err();
509+
// child task must have finished by the time try returns
510+
do m.lock_cond |cond| {
511+
let woken = cond.signal();
512+
// FIXME(#3145) - The semantics of pipes are not quite what I want
513+
// here - the pipe doesn't get 'terminated' if the child was
514+
// punted awake during failure.
515+
// assert !woken;
516+
}
517+
}
412518
}

0 commit comments

Comments
 (0)