Skip to content

Commit bcb6a68

Browse files
committed
sync: Add rwlocks (half-done) and test cases
1 parent ef32a99 commit bcb6a68

File tree

1 file changed

+177
-18
lines changed

1 file changed

+177
-18
lines changed

src/libcore/sync.rs

Lines changed: 177 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
export condvar;
99
export semaphore, new_semaphore;
1010
export mutex, new_mutex;
11+
export rwlock;
1112

1213
// FIXME (#3119) This shouldn't be a thing exported from core.
1314
import arc::exclusive;
@@ -58,6 +59,17 @@ enum sem<Q: send> = exclusive<{
5859
blocked: Q,
5960
}>;
6061

62+
fn new_sem<Q: send>(count: int, +q: Q) -> sem<Q> {
63+
let (wait_tail, wait_head) = pipes::stream();
64+
sem(exclusive({ mut count: count,
65+
waiters: { head: wait_head, tail: wait_tail },
66+
blocked: q }))
67+
}
68+
fn new_sem_and_signal(count: int) -> sem<waitqueue> {
69+
let (block_tail, block_head) = pipes::stream();
70+
new_sem(count, { head: block_head, tail: block_tail })
71+
}
72+
6173
impl<Q: send> &sem<Q> {
6274
fn acquire() {
6375
let mut waiter_nobe = none;
@@ -217,12 +229,7 @@ impl &sem<waitqueue> {
217229
enum semaphore = sem<()>;
218230

219231
/// Create a new semaphore with the specified count.
220-
fn new_semaphore(count: int) -> semaphore {
221-
let (wait_tail, wait_head) = pipes::stream();
222-
semaphore(sem(exclusive({ mut count: count,
223-
waiters: { head: wait_head, tail: wait_tail },
224-
blocked: () })))
225-
}
232+
fn new_semaphore(count: int) -> semaphore { semaphore(new_sem(count, ())) }
226233

227234
impl &semaphore {
228235
/// Create a new handle to the semaphore.
@@ -257,13 +264,7 @@ impl &semaphore {
257264
enum mutex = sem<waitqueue>;
258265

259266
/// Create a new mutex.
260-
fn new_mutex() -> mutex {
261-
let (wait_tail, wait_head) = pipes::stream();
262-
let (block_tail, block_head) = pipes::stream();
263-
mutex(sem(exclusive({ mut count: 1,
264-
waiters: { head: wait_head, tail: wait_tail },
265-
blocked: { head: block_head, tail: block_tail } })))
266-
}
267+
fn new_mutex() -> mutex { mutex(new_sem_and_signal(1)) }
267268

268269
impl &mutex {
269270
/// Create a new handle to the mutex.
@@ -282,7 +283,86 @@ impl &mutex {
282283
* Reader-writer locks
283284
****************************************************************************/
284285

285-
// FIXME(#3145) implement
286+
// NB: Uncyclopedia - Readers-writers_problem#The_third_readers-writers_problem
287+
288+
struct rwlock_inner {
289+
read_mode: bool;
290+
read_count: uint;
291+
}
292+
293+
/// A blocking, no-starvation, reader-writer lock with an associated condvar.
294+
struct rwlock {
295+
order_lock: semaphore;
296+
access_lock: sem<waitqueue>;
297+
state: arc::exclusive<rwlock_inner>;
298+
}
299+
300+
fn rwlock() -> rwlock {
301+
rwlock { order_lock: new_semaphore(1), access_lock: new_sem_and_signal(1),
302+
state: arc::exclusive(rwlock_inner { read_mode: false,
303+
read_count: 0 }) }
304+
}
305+
306+
impl &rwlock {
307+
// Create a new handle to the rwlock.
308+
fn clone() -> rwlock {
309+
rwlock { order_lock: (&(self.order_lock)).clone(),
310+
access_lock: sem((*self.access_lock).clone()),
311+
state: self.state.clone() }
312+
}
313+
314+
/**
315+
* Run a function with the rwlock in read mode. Calls to 'read' from other
316+
* tasks may run concurrently with this one.
317+
*/
318+
fn read<U>(blk: fn() -> U) -> U {
319+
do (&self.order_lock).access {
320+
let mut first_reader = false;
321+
do self.state.with |state| {
322+
state.read_mode = true;
323+
first_reader = (state.read_count == 0);
324+
state.read_count += 1;
325+
}
326+
if first_reader {
327+
(&self.access_lock).acquire();
328+
}
329+
}
330+
let result = blk();
331+
let mut last_reader = false;
332+
do self.state.with |state| {
333+
assert state.read_mode;
334+
state.read_count -= 1;
335+
last_reader = (state.read_count == 0);
336+
}
337+
if last_reader {
338+
(&self.access_lock).release();
339+
}
340+
result
341+
}
342+
343+
/**
344+
* Run a function with the rwlock in write mode. No calls to 'read' or
345+
* 'write' from other tasks will run concurrently with this one.
346+
*/
347+
fn write<U>(blk: fn() -> U) -> U {
348+
(&self.order_lock).acquire();
349+
do (&self.access_lock).access {
350+
(&self.order_lock).release();
351+
blk()
352+
}
353+
}
354+
355+
/**
356+
* As write(), but also with a handle to a condvar. Waiting on this
357+
* condvar will allow readers and writers alike to take the rwlock before
358+
* the waiting task is signalled.
359+
*/
360+
fn write_cond<U>(_blk: fn(condvar) -> U) -> U {
361+
fail ~"Need implement lock order lock before access lock";
362+
}
363+
364+
// to-do implement downgrade
365+
}
286366

287367
/****************************************************************************
288368
* Tests
@@ -443,9 +523,8 @@ mod tests {
443523
}
444524
let _ = port.recv(); // Wait until child wakes up
445525
}
446-
#[test]
447-
fn test_mutex_cond_broadcast() {
448-
let num_waiters: uint = 12;
526+
#[cfg(test)]
527+
fn test_mutex_cond_broadcast_helper(num_waiters: uint) {
449528
let m = ~new_mutex();
450529
let mut ports = ~[];
451530

@@ -471,6 +550,25 @@ mod tests {
471550
// wait until all children wake up
472551
for ports.each |port| { let _ = port.recv(); }
473552
}
553+
#[test]
554+
fn test_mutex_cond_broadcast() {
555+
test_mutex_cond_broadcast_helper(12);
556+
}
557+
#[test]
558+
fn test_mutex_cond_broadcast_none() {
559+
test_mutex_cond_broadcast_helper(0);
560+
}
561+
#[test]
562+
fn test_mutex_cond_no_waiter() {
563+
let m = ~new_mutex();
564+
let m2 = ~m.clone();
565+
do task::try {
566+
do m.lock_cond |_x| { }
567+
};
568+
do m2.lock_cond |cond| {
569+
assert !cond.signal();
570+
}
571+
}
474572
#[test] #[ignore(cfg(windows))]
475573
fn test_mutex_killed_simple() {
476574
// Mutex must get automatically unlocked if failed/killed within.
@@ -508,11 +606,72 @@ mod tests {
508606
assert result.is_err();
509607
// child task must have finished by the time try returns
510608
do m.lock_cond |cond| {
511-
let woken = cond.signal();
609+
let _woken = cond.signal();
512610
// FIXME(#3145) - The semantics of pipes are not quite what I want
513611
// here - the pipe doesn't get 'terminated' if the child was
514612
// punted awake during failure.
515613
// assert !woken;
516614
}
517615
}
616+
#[cfg(test)]
617+
fn test_rwlock_exclusion(reader1: bool, reader2: bool) {
618+
// Test mutual exclusion between readers and writers. Just like the
619+
// mutex mutual exclusion test, a ways above.
620+
let (c,p) = pipes::stream();
621+
let m = ~rwlock();
622+
let m2 = ~m.clone();
623+
let sharedstate = ~0;
624+
let ptr = ptr::addr_of(*sharedstate);
625+
do task::spawn {
626+
let sharedstate = unsafe { unsafe::reinterpret_cast(ptr) };
627+
access_shared(sharedstate, m2, reader1, 10);
628+
c.send(());
629+
}
630+
access_shared(sharedstate, m, reader2, 10);
631+
let _ = p.recv();
632+
633+
assert *sharedstate == 20;
634+
635+
fn access_shared(sharedstate: &mut int, m: &rwlock, reader: bool,
636+
n: uint) {
637+
let lock_fn = fn@(m: &rwlock, blk: fn()) {
638+
if reader { m.read(blk); } else { m.write(blk); }
639+
};
640+
for n.times {
641+
do lock_fn(m) {
642+
let oldval = *sharedstate;
643+
task::yield();
644+
*sharedstate = oldval + 1;
645+
}
646+
}
647+
}
648+
}
649+
#[test]
650+
fn test_rwlock_readers_wont_modify_the_data() {
651+
test_rwlock_exclusion(true, false);
652+
test_rwlock_exclusion(false, true);
653+
}
654+
#[test]
655+
fn test_rwlock_writers_and_writers() {
656+
test_rwlock_exclusion(false, false);
657+
}
658+
#[test]
659+
fn test_rwlock_readers_and_readers() {
660+
// Much like sem_multi_resource.
661+
let x = ~rwlock();
662+
let x2 = ~x.clone();
663+
let (c1,p1) = pipes::stream();
664+
let (c2,p2) = pipes::stream();
665+
do task::spawn {
666+
do x2.read {
667+
let _ = p2.recv();
668+
c1.send(());
669+
}
670+
}
671+
do x.read {
672+
c2.send(());
673+
let _ = p1.recv();
674+
}
675+
676+
}
518677
}

0 commit comments

Comments
 (0)