@@ -23,7 +23,33 @@ type signal_end = pipes::chan<()>;
23
23
type waitqueue = { head : pipes:: port < signal_end > ,
24
24
tail : pipes:: chan < signal_end > } ;
25
25
26
- // The building-block used to make semaphores, lock-and-signals, and rwlocks.
26
+ // Signals one live task from the queue.
27
+ fn signal_waitqueue ( q : & waitqueue ) -> bool {
28
+ // The peek is mandatory to make sure recv doesn't block.
29
+ if q. head . peek ( ) {
30
+ // Pop and send a wakeup signal. If the waiter was killed, its port
31
+ // will have closed. Keep trying until we get a live task.
32
+ if q. head . recv ( ) . try_send ( ( ) ) {
33
+ true
34
+ } else {
35
+ signal_waitqueue ( q)
36
+ }
37
+ } else {
38
+ false
39
+ }
40
+ }
41
+
42
+ fn broadcast_waitqueue ( q : & waitqueue ) -> uint {
43
+ let mut count = 0 ;
44
+ while q. head . peek ( ) {
45
+ if q. head . recv ( ) . try_send ( ( ) ) {
46
+ count += 1 ;
47
+ }
48
+ }
49
+ count
50
+ }
51
+
52
+ // The building-block used to make semaphores, mutexes, and rwlocks.
27
53
enum sem < Q : send > = exclusive<{
28
54
mut count: int,
29
55
waiters: waitqueue,
@@ -39,6 +65,7 @@ impl<Q: send> &sem<Q> {
39
65
do ( * * self ) . with |state| {
40
66
state. count -= 1 ;
41
67
if state. count < 0 {
68
+ // Create waiter nobe.
42
69
let ( signal_end, wait_end) = pipes:: stream ( ) ;
43
70
// Tell outer scope we need to block.
44
71
waiter_nobe = some ( wait_end) ;
@@ -58,14 +85,8 @@ impl<Q: send> &sem<Q> {
58
85
unsafe {
59
86
do ( * * self ) . with |state| {
60
87
state. count += 1 ;
61
- // The peek is mandatory to make sure recv doesn't block.
62
- if state. count <= 0 && state. waiters . head . peek ( ) {
63
- // Pop off the waitqueue and send a wakeup signal. If the
64
- // waiter was killed, its port will have closed, and send
65
- // will fail. Keep trying until we get a live task.
66
- state. waiters . head . recv ( ) . send ( ( ) ) ;
67
- // FIXME(#3145) use kill-friendly version when ready
68
- // while !state.waiters.head.recv().try_send(()) { }
88
+ if state. count <= 0 {
89
+ signal_waitqueue ( & state. waiters ) ;
69
90
}
70
91
}
71
92
}
@@ -74,15 +95,25 @@ impl<Q: send> &sem<Q> {
74
95
// FIXME(#3154) move both copies of this into sem<Q>, and unify the 2 structs
75
96
impl & sem < ( ) > {
76
97
fn access < U > ( blk : fn ( ) -> U ) -> U {
77
- self . acquire ( ) ;
78
- let _x = sem_release ( self ) ;
98
+ let mut release = none;
99
+ unsafe {
100
+ do task:: unkillable {
101
+ self. acquire ( ) ;
102
+ release = some ( sem_release ( self ) ) ;
103
+ }
104
+ }
79
105
blk( )
80
106
}
81
107
}
82
108
impl & sem < waitqueue > {
83
109
fn access < U > ( blk : fn ( ) -> U ) -> U {
84
- self . acquire ( ) ;
85
- let _x = sem_and_signal_release ( self ) ;
110
+ let mut release = none;
111
+ unsafe {
112
+ do task:: unkillable {
113
+ self. acquire ( ) ;
114
+ release = some ( sem_and_signal_release ( self ) ) ;
115
+ }
116
+ }
86
117
blk( )
87
118
}
88
119
}
@@ -105,39 +136,58 @@ enum condvar = &sem<waitqueue>;
105
136
impl condvar {
106
137
/// Atomically drop the associated lock, and block until a signal is sent.
107
138
fn wait ( ) {
139
+ // This is needed for a failing condition variable to reacquire the
140
+ // mutex during unwinding. As long as the wrapper (mutex, etc) is
141
+ // bounded in when it gets released, this shouldn't hang forever.
142
+ struct sem_and_signal_reacquire {
143
+ sem : & sem < waitqueue > ;
144
+ new ( sem : & sem < waitqueue > ) { self . sem = sem; }
145
+ drop unsafe {
146
+ do task:: unkillable {
147
+ self. sem . acquire ( ) ;
148
+ }
149
+ }
150
+ }
151
+
152
+ // Create waiter nobe.
108
153
let ( signal_end, wait_end) = pipes:: stream ( ) ;
109
154
let mut signal_end = some ( signal_end) ;
155
+ let mut reacquire = none;
110
156
unsafe {
111
- do ( * * * self ) . with |state| {
112
- // Drop the lock.
113
- // FIXME(#3145) investigate why factoring doesn't compile.
114
- state. count += 1 ;
115
- if state. count <= 0 && state. waiters . head . peek ( ) {
116
- state. waiters . head . recv ( ) . send ( ( ) ) ;
117
- // FIXME(#3145) use kill-friendly version when ready
157
+ do task:: unkillable {
158
+ // If yield checks start getting inserted anywhere, we can be
159
+ // killed before or after enqueueing. Deciding whether to
160
+ // unkillably reacquire the lock needs to happen atomically
161
+ // wrt enqueuing.
162
+ reacquire = some ( sem_and_signal_reacquire ( * self ) ) ;
163
+
164
+ // Release lock, 'atomically' enqueuing ourselves in so doing.
165
+ do ( * * * self ) . with |state| {
166
+ // Drop the lock.
167
+ // FIXME(#3145) investigate why factoring doesn't compile.
168
+ state. count += 1 ;
169
+ if state. count <= 0 {
170
+ signal_waitqueue ( & state. waiters ) ;
171
+ }
172
+ // Enqueue ourself to be woken up by a signaller.
173
+ let signal_end = option:: swap_unwrap ( & mut signal_end) ;
174
+ state. blocked . tail . send ( signal_end) ;
118
175
}
119
- // Enqueue ourself to be woken up by a signaller.
120
- state. blocked . tail . send ( option:: swap_unwrap ( & mut signal_end) ) ;
121
176
}
122
177
}
123
178
// Unconditionally "block". (Might not actually block if a signaller
124
179
// did send -- I mean 'unconditionally' in contrast with acquire().)
125
180
let _ = wait_end. recv ( ) ;
126
- // Pick up the lock again. FIXME(#3145): unkillable? destructor?
127
- ( * self ) . acquire ( ) ;
181
+ // 'reacquire' will pick up the lock again in its destructor - it must
182
+ // happen whether or not we are killed, and it needs to succeed at
183
+ // reacquiring instead of itself dying.
128
184
}
129
185
130
186
/// Wake up a blocked task. Returns false if there was no blocked task.
131
187
fn signal ( ) -> bool {
132
188
unsafe {
133
189
do ( * * * self ) . with |state| {
134
- if state. blocked . head . peek ( ) {
135
- state. blocked . head . recv ( ) . send ( ( ) ) ;
136
- // FIXME(#3145) use kill-friendly version when ready
137
- true
138
- } else {
139
- false
140
- }
190
+ signal_waitqueue ( & state. blocked )
141
191
}
142
192
}
143
193
}
@@ -146,13 +196,8 @@ impl condvar {
146
196
fn broadcast ( ) -> uint {
147
197
unsafe {
148
198
do ( * * * self ) . with |state| {
149
- let mut count = 0 ;
150
- while state. blocked . head . peek ( ) {
151
- // This is already kill-friendly.
152
- state. blocked . head . recv ( ) . send ( ( ) ) ;
153
- count += 1 ;
154
- }
155
- count
199
+ // FIXME(#3145) fix :broadcast_heavy
200
+ broadcast_waitqueue ( & state. blocked )
156
201
}
157
202
}
158
203
}
@@ -191,11 +236,12 @@ impl &semaphore {
191
236
192
237
/**
193
238
* Release a held resource represented by the semaphore. Wakes a blocked
194
- * contending task, if any exist.
239
+ * contending task, if any exist. Won't block the caller.
195
240
*/
196
241
fn release ( ) { ( & * * self ) . release ( ) }
197
242
198
243
/// Run a function with ownership of one of the semaphore's resources.
244
+ // FIXME(#3145): figure out whether or not this should get exported.
199
245
fn access < U > ( blk : fn ( ) -> U ) -> U { ( & * * self ) . access ( blk) }
200
246
}
201
247
@@ -206,6 +252,7 @@ impl &semaphore {
206
252
/**
207
253
* A blocking, bounded-waiting, mutual exclusion lock with an associated
208
254
* FIFO condition variable.
255
+ * FIXME(#3145): document killability
209
256
*/
210
257
enum mutex = sem< waitqueue > ;
211
258
@@ -243,17 +290,29 @@ impl &mutex {
243
290
244
291
#[ cfg( test) ]
245
292
mod tests {
293
+ #[ test]
294
+ fn test_sem_acquire_release ( ) {
295
+ let s = ~new_semaphore ( 1 ) ;
296
+ s. acquire ( ) ;
297
+ s. release ( ) ;
298
+ s. acquire ( ) ;
299
+ }
300
+ #[ test]
301
+ fn test_sem_basic ( ) {
302
+ let s = ~new_semaphore ( 1 ) ;
303
+ do s. access { }
304
+ }
246
305
#[ test]
247
306
fn test_sem_as_mutex ( ) {
248
307
let s = ~new_semaphore ( 1 ) ;
249
308
let s2 = ~s. clone ( ) ;
250
309
do task:: spawn {
251
310
do s2. access {
252
- for 10 . times { task : : yield ( ) ; }
311
+ for 5 . times { task : : yield ( ) ; }
253
312
}
254
313
}
255
314
do s. access {
256
- for 10 . times { task : : yield( ) ; }
315
+ for 5 . times { task: : yield ( ) ; }
257
316
}
258
317
}
259
318
#[ test]
@@ -266,7 +325,7 @@ mod tests {
266
325
s2. acquire ( ) ;
267
326
c. send ( ( ) ) ;
268
327
}
269
- for 10 . times { task : : yield( ) ; }
328
+ for 5 . times { task : : yield( ) ; }
270
329
s. release( ) ;
271
330
let _ = p. recv( ) ;
272
331
@@ -275,7 +334,7 @@ mod tests {
275
334
let s = ~new_semaphore ( 0 ) ;
276
335
let s2 = ~s. clone ( ) ;
277
336
do task:: spawn {
278
- for 10 . times { task : : yield ( ) ; }
337
+ for 5 . times { task : : yield ( ) ; }
279
338
s2. release ( ) ;
280
339
let _ = p. recv ( ) ;
281
340
}
@@ -324,7 +383,7 @@ mod tests {
324
383
}
325
384
}
326
385
#[ test]
327
- fn test_mutex ( ) {
386
+ fn test_mutex_lock ( ) {
328
387
// Unsafely achieve shared state, and do the textbook
329
388
// "load tmp <- ptr; inc tmp; store ptr <- tmp" dance.
330
389
let ( c, p) = pipes:: stream ( ) ;
@@ -342,9 +401,9 @@ mod tests {
342
401
343
402
assert * sharedstate == 20 ;
344
403
345
- fn access_shared ( sharedstate : & mut int , sem : & mutex , n : uint ) {
404
+ fn access_shared ( sharedstate : & mut int , m : & mutex , n : uint ) {
346
405
for n. times {
347
- do sem . lock {
406
+ do m . lock {
348
407
let oldval = * sharedstate;
349
408
task : : yield( ) ;
350
409
* sharedstate = oldval + 1 ;
@@ -355,13 +414,15 @@ mod tests {
355
414
#[ test]
356
415
fn test_mutex_cond_wait ( ) {
357
416
let m = ~new_mutex ( ) ;
358
- let mut m2 = some ( ~m. clone ( ) ) ;
359
417
360
418
// Child wakes up parent
361
419
do m. lock_cond |cond| {
362
- let m2 = option :: swap_unwrap ( & mut m2 ) ;
420
+ let m2 = ~m . clone ( ) ;
363
421
do task:: spawn {
364
- do m2. lock_cond |cond| { cond. signal ( ) ; }
422
+ do m2. lock_cond |cond| {
423
+ let woken = cond. signal ( ) ;
424
+ assert woken;
425
+ }
365
426
}
366
427
cond. wait ( ) ;
367
428
}
@@ -377,7 +438,8 @@ mod tests {
377
438
}
378
439
let _ = port. recv ( ) ; // Wait until child gets in the mutex
379
440
do m. lock_cond |cond| {
380
- cond. signal ( ) ;
441
+ let woken = cond. signal ( ) ;
442
+ assert woken;
381
443
}
382
444
let _ = port. recv ( ) ; // Wait until child wakes up
383
445
}
@@ -409,4 +471,48 @@ mod tests {
409
471
// wait until all children wake up
410
472
for ports. each |port| { let _ = port. recv ( ) ; }
411
473
}
474
+ #[ test] #[ ignore ( cfg ( windows) ) ]
475
+ fn test_mutex_killed_simple( ) {
476
+ // Mutex must get automatically unlocked if failed/killed within.
477
+ let m = ~new_mutex ( ) ;
478
+ let m2 = ~m. clone ( ) ;
479
+
480
+ let result: result:: result < ( ) , ( ) > = do task:: try {
481
+ do m2. lock {
482
+ fail;
483
+ }
484
+ } ;
485
+ assert result. is_err ( ) ;
486
+ // child task must have finished by the time try returns
487
+ do m. lock { }
488
+ }
489
+ #[ test] #[ ignore( cfg( windows) ) ]
490
+ fn test_mutex_killed_cond ( ) {
491
+ // Getting killed during cond wait must not corrupt the mutex while
492
+ // unwinding (e.g. double unlock).
493
+ let m = ~new_mutex ( ) ;
494
+ let m2 = ~m. clone ( ) ;
495
+
496
+ let result: result:: result < ( ) , ( ) > = do task:: try {
497
+ let ( c, p) = pipes:: stream ( ) ;
498
+ do task:: spawn { // linked
499
+ let _ = p. recv( ) ; // wait for sibling to get in the mutex
500
+ task:: yield ( ) ;
501
+ fail;
502
+ }
503
+ do m2. lock_cond |cond| {
504
+ c. send ( ( ) ) ; // tell sibling go ahead
505
+ cond. wait ( ) ; // block forever
506
+ }
507
+ } ;
508
+ assert result. is_err ( ) ;
509
+ // child task must have finished by the time try returns
510
+ do m. lock_cond |cond| {
511
+ let woken = cond. signal ( ) ;
512
+ // FIXME(#3145) - The semantics of pipes are not quite what I want
513
+ // here - the pipe doesn't get 'terminated' if the child was
514
+ // punted awake during failure.
515
+ // assert !woken;
516
+ }
517
+ }
412
518
}
0 commit comments