@@ -148,19 +148,6 @@ enum condvar = &sem<waitqueue>;
148
148
impl condvar {
149
149
/// Atomically drop the associated lock, and block until a signal is sent.
150
150
fn wait ( ) {
151
- // This is needed for a failing condition variable to reacquire the
152
- // mutex during unwinding. As long as the wrapper (mutex, etc) is
153
- // bounded in when it gets released, this shouldn't hang forever.
154
- struct sem_and_signal_reacquire {
155
- sem : & sem < waitqueue > ;
156
- new ( sem : & sem < waitqueue > ) { self . sem = sem; }
157
- drop unsafe {
158
- do task:: unkillable {
159
- self. sem . acquire ( ) ;
160
- }
161
- }
162
- }
163
-
164
151
// Create waiter nobe.
165
152
let ( signal_end, wait_end) = pipes:: stream ( ) ;
166
153
let mut signal_end = some ( signal_end) ;
@@ -190,9 +177,20 @@ impl condvar {
190
177
// Unconditionally "block". (Might not actually block if a signaller
191
178
// did send -- I mean 'unconditionally' in contrast with acquire().)
192
179
let _ = wait_end. recv ( ) ;
193
- // 'reacquire' will pick up the lock again in its destructor - it must
194
- // happen whether or not we are killed, and it needs to succeed at
195
- // reacquiring instead of itself dying.
180
+
181
+ // This is needed for a failing condition variable to reacquire the
182
+ // mutex during unwinding. As long as the wrapper (mutex, etc) is
183
+ // bounded in when it gets released, this shouldn't hang forever.
184
+ struct sem_and_signal_reacquire {
185
+ sem : & sem < waitqueue > ;
186
+ new ( sem : & sem < waitqueue > ) { self . sem = sem ; }
187
+ drop unsafe {
188
+ // Needs to succeed, instead of itself dying.
189
+ do task:: unkillable {
190
+ self . sem. acquire( ) ;
191
+ }
192
+ }
193
+ }
196
194
}
197
195
198
196
/// Wake up a blocked task. Returns false if there was no blocked task.
@@ -297,14 +295,15 @@ struct rwlock {
297
295
state: arc:: exclusive<rwlock_inner>;
298
296
}
299
297
298
+ /// Create a new rwlock.
300
299
fn rwlock ( ) -> rwlock {
301
300
rwlock { order_lock : new_semaphore ( 1 ) , access_lock : new_sem_and_signal ( 1 ) ,
302
301
state : arc:: exclusive ( rwlock_inner { read_mode : false ,
303
302
read_count : 0 } ) }
304
303
}
305
304
306
305
impl & rwlock {
307
- // Create a new handle to the rwlock.
306
+ /// Create a new handle to the rwlock.
308
307
fn clone ( ) -> rwlock {
309
308
rwlock { order_lock : ( & ( self . order_lock ) ) . clone ( ) ,
310
309
access_lock : sem ( ( * self . access_lock ) . clone ( ) ) ,
@@ -316,40 +315,39 @@ impl &rwlock {
316
315
* tasks may run concurrently with this one.
317
316
*/
318
317
fn read < U > ( blk : fn ( ) -> U ) -> U {
319
- do ( & self . order_lock ) . access {
320
- let mut first_reader = false ;
321
- do self. state . with |state| {
322
- state. read_mode = true ;
323
- first_reader = ( state. read_count == 0 ) ;
324
- state. read_count += 1 ;
325
- }
326
- if first_reader {
327
- ( & self . access_lock ) . acquire ( ) ;
318
+ unsafe {
319
+ do task:: unkillable {
320
+ do ( & self . order_lock ) . access {
321
+ let mut first_reader = false ;
322
+ do self. state . with |state| {
323
+ state. read_mode = true ;
324
+ first_reader = ( state. read_count == 0 ) ;
325
+ state. read_count += 1 ;
326
+ }
327
+ if first_reader {
328
+ ( & self . access_lock ) . acquire ( ) ;
329
+ }
330
+ }
328
331
}
329
332
}
330
- let result = blk ( ) ;
331
- let mut last_reader = false ;
332
- do self. state . with |state| {
333
- assert state. read_mode ;
334
- state. read_count -= 1 ;
335
- last_reader = ( state. read_count == 0 ) ;
336
- }
337
- if last_reader {
338
- ( & self . access_lock ) . release ( ) ;
339
- }
340
- result
333
+ let _z = rwlock_release_read ( self ) ;
334
+ blk ( )
341
335
}
342
336
343
337
/**
344
338
* Run a function with the rwlock in write mode. No calls to 'read' or
345
339
* 'write' from other tasks will run concurrently with this one.
346
340
*/
347
341
fn write < U > ( blk : fn ( ) -> U ) -> U {
348
- ( & self . order_lock ) . acquire ( ) ;
349
- do ( & self . access_lock ) . access {
350
- ( & self . order_lock ) . release ( ) ;
351
- blk ( )
342
+ unsafe {
343
+ do task:: unkillable {
344
+ ( & self . order_lock ) . acquire ( ) ;
345
+ ( & self . access_lock ) . acquire ( ) ;
346
+ ( & self . order_lock ) . release ( ) ;
347
+ }
352
348
}
349
+ let _z = rwlock_release_write ( self ) ;
350
+ blk( )
353
351
}
354
352
355
353
/**
@@ -364,12 +362,41 @@ impl &rwlock {
364
362
// to-do implement downgrade
365
363
}
366
364
365
+ // FIXME(#3136) should go inside of write() and read() respectively
366
+ struct rwlock_release_write {
367
+ lock : & rwlock ;
368
+ new ( lock : & rwlock ) { self . lock = lock; }
369
+ drop unsafe {
370
+ do task:: unkillable { ( & self . lock . access_lock ) . release ( ) ; }
371
+ }
372
+ }
373
+ struct rwlock_release_read {
374
+ lock : & rwlock ;
375
+ new ( lock : & rwlock ) { self . lock = lock; }
376
+ drop unsafe {
377
+ do task:: unkillable {
378
+ let mut last_reader = false ;
379
+ do self. lock . state . with |state| {
380
+ assert state. read_mode ;
381
+ state. read_count -= 1 ;
382
+ last_reader = ( state. read_count == 0 ) ;
383
+ }
384
+ if last_reader {
385
+ ( & self . lock . access_lock ) . release ( ) ;
386
+ }
387
+ }
388
+ }
389
+ }
390
+
367
391
/****************************************************************************
368
392
* Tests
369
393
****************************************************************************/
370
394
371
395
#[ cfg( test) ]
372
396
mod tests {
397
+ /************************************************************************
398
+ * Semaphore tests
399
+ ************************************************************************/
373
400
#[ test]
374
401
fn test_sem_acquire_release ( ) {
375
402
let s = ~new_semaphore ( 1 ) ;
@@ -462,6 +489,9 @@ mod tests {
462
489
let _ = p. recv ( ) ; // wait for child to be done
463
490
}
464
491
}
492
+ /************************************************************************
493
+ * Mutex tests
494
+ ************************************************************************/
465
495
#[ test]
466
496
fn test_mutex_lock ( ) {
467
497
// Unsafely achieve shared state, and do the textbook
@@ -613,32 +643,36 @@ mod tests {
613
643
// assert !woken;
614
644
}
615
645
}
646
+ /************************************************************************
647
+ * Reader/writer lock tests
648
+ ************************************************************************/
649
+ #[ cfg( test) ]
650
+ fn lock_rwlock_in_mode ( x : & rwlock , reader : bool , blk : fn ( ) ) {
651
+ if reader { x. read ( blk) ; } else { x. write ( blk) ; }
652
+ }
616
653
#[ cfg( test) ]
617
654
fn test_rwlock_exclusion ( reader1 : bool , reader2 : bool ) {
618
655
// Test mutual exclusion between readers and writers. Just like the
619
656
// mutex mutual exclusion test, a ways above.
620
657
let ( c, p) = pipes:: stream ( ) ;
621
- let m = ~rwlock ( ) ;
622
- let m2 = ~m . clone ( ) ;
658
+ let x = ~rwlock ( ) ;
659
+ let x2 = ~x . clone ( ) ;
623
660
let sharedstate = ~0 ;
624
661
let ptr = ptr:: addr_of ( * sharedstate) ;
625
662
do task:: spawn {
626
663
let sharedstate = unsafe { unsafe : : reinterpret_cast ( ptr) } ;
627
- access_shared ( sharedstate, m2 , reader1, 10 ) ;
664
+ access_shared ( sharedstate, x2 , reader1, 10 ) ;
628
665
c. send ( ( ) ) ;
629
666
}
630
- access_shared ( sharedstate, m , reader2, 10 ) ;
667
+ access_shared ( sharedstate, x , reader2, 10 ) ;
631
668
let _ = p. recv ( ) ;
632
669
633
670
assert * sharedstate == 20 ;
634
671
635
- fn access_shared ( sharedstate : & mut int , m : & rwlock , reader : bool ,
672
+ fn access_shared ( sharedstate : & mut int , x : & rwlock , reader : bool ,
636
673
n : uint ) {
637
- let lock_fn = fn @( m: & rwlock, blk : fn ( ) ) {
638
- if reader { m. read ( blk) ; } else { m. write ( blk) ; }
639
- } ;
640
674
for n. times {
641
- do lock_fn ( m ) {
675
+ do lock_rwlock_in_mode ( x , reader ) {
642
676
let oldval = * sharedstate;
643
677
task:: yield ( ) ;
644
678
* sharedstate = oldval + 1 ;
@@ -672,6 +706,28 @@ mod tests {
672
706
c2. send ( ( ) ) ;
673
707
let _ = p1. recv ( ) ;
674
708
}
709
+ }
710
+ #[ cfg( test) ] #[ ignore( cfg( windows) ) ]
711
+ fn rwlock_kill_helper ( reader1 : bool , reader2 : bool ) {
712
+ // Mutex must get automatically unlocked if failed/killed within.
713
+ let x = ~rwlock ( ) ;
714
+ let x2 = ~x. clone ( ) ;
675
715
716
+ let result: result:: result < ( ) , ( ) > = do task:: try {
717
+ do lock_rwlock_in_mode( x2, reader1) {
718
+ fail;
719
+ }
720
+ } ;
721
+ assert result. is_err ( ) ;
722
+ // child task must have finished by the time try returns
723
+ do lock_rwlock_in_mode ( x, reader2) { }
676
724
}
725
+ #[ test] #[ ignore( cfg( windows) ) ]
726
+ fn test_rwlock_reader_killed_writer ( ) { rwlock_kill_helper ( true , false ) ; }
727
+ #[ test] #[ ignore( cfg( windows) ) ]
728
+ fn test_rwlock_writer_killed_reader ( ) { rwlock_kill_helper ( false , true ) ; }
729
+ #[ test] #[ ignore( cfg( windows) ) ]
730
+ fn test_rwlock_reader_killed_reader ( ) { rwlock_kill_helper ( true , true ) ; }
731
+ #[ test] #[ ignore( cfg( windows) ) ]
732
+ fn test_rwlock_writer_killed_writer ( ) { rwlock_kill_helper ( false , false ) ; }
677
733
}
0 commit comments