@@ -181,6 +181,7 @@ struct Node {
181
181
write : bool ,
182
182
thread : OnceCell < Thread > ,
183
183
completed : AtomicBool ,
184
+ is_reading : AtomicBool ,
184
185
}
185
186
186
187
impl Node {
@@ -193,6 +194,7 @@ impl Node {
193
194
write,
194
195
thread : OnceCell :: new ( ) ,
195
196
completed : AtomicBool :: new ( false ) ,
197
+ is_reading : AtomicBool :: new ( false ) ,
196
198
}
197
199
}
198
200
@@ -317,24 +319,20 @@ impl RwLock {
317
319
let mut count = 0 ;
318
320
let mut has_slept = false ;
319
321
loop {
320
- if let Some ( next) = update ( state) {
322
+ if node. is_reading . load ( Acquire ) {
323
+ // This node was awaken by a call to `downgrade`, which means we are already in read
324
+ // mode and we can return.
325
+ debug_assert ! (
326
+ has_slept,
327
+ "Somehow `is_reading` is `true` before the node has gone to sleep"
328
+ ) ;
329
+ return ;
330
+ } else if let Some ( next) = update ( state) {
321
331
// The lock is available, try locking it.
322
332
match self . state . compare_exchange_weak ( state, next, Acquire , Relaxed ) {
323
333
Ok ( _) => return ,
324
334
Err ( new) => state = new,
325
335
}
326
- } else if !write && has_slept {
327
- // If we are trying to read and we have already gone to sleep, first check if the
328
- // lock is in read mode before going to sleep again.
329
- let tail = unsafe { add_backlinks_and_find_tail ( to_node ( state) ) . as_ref ( ) } ;
330
- let _ = tail. next . 0 . fetch_update ( Release , Acquire , |count : * mut Node | {
331
- if count. mask ( MASK ) . addr ( ) > 0 {
332
- Some ( without_provenance_mut ( state. addr ( ) . checked_add ( SINGLE ) ? | LOCKED ) )
333
- } else {
334
- None
335
- }
336
- } ) ;
337
- todo ! ( "This is very wrong" ) ;
338
336
} else if state. addr ( ) & QUEUED == 0 && count < SPIN_COUNT {
339
337
// If the lock is not available and no threads are queued, spin
340
338
// for a while, using exponential backoff to decrease cache
@@ -468,56 +466,82 @@ impl RwLock {
468
466
pub unsafe fn downgrade ( & self ) {
469
467
// Atomically attempt to go from a single writer without any waiting threads to a single
470
468
// reader without any waiting threads.
471
- if let Err ( mut state ) = self . state . compare_exchange (
469
+ match self . state . compare_exchange (
472
470
without_provenance_mut ( LOCKED ) ,
473
471
without_provenance_mut ( LOCKED | SINGLE ) ,
474
472
Release ,
475
473
Relaxed ,
476
474
) {
477
- debug_assert ! (
475
+ Ok ( _) => return ,
476
+ Err ( state) => debug_assert ! (
478
477
state. mask( LOCKED ) . addr( ) != 0 && state. mask( QUEUED ) . addr( ) != 0 ,
479
478
"RwLock should be LOCKED and QUEUED"
480
- ) ;
481
- // 1. Attempt to grab the queue lock
482
- // 2. Find the tail of the queue
483
- // 3. While the current tail is not a writer:
484
- // 4. The current tail must be a reader
485
- // 5. Remove the current tail from the queue and update the tail to be the previous
486
- // node if possible
487
- // 6. Set the flag (somewhere on the node) to notify the reader thread that it has
488
- // been woken up by a `downgrade` call
489
- // 7. `complete` the node
490
- // 8. Go back to step 3 with the updated tail if it exists, otherwise break
491
- // 9. Once we find a writer or there are no more `prev` links, we write the correct
492
- // number of readers into the current node state or the head state.
493
-
494
- // Attempt to grab the queue lock.
495
- state = loop {
496
- match self . state . fetch_update ( Release , Acquire , |ptr : State | {
497
- // Go from not queue locked to being queue locked.
498
- if ptr. mask ( QUEUE_LOCKED ) . addr ( ) != 0 {
499
- Some ( without_provenance_mut ( ptr. addr ( ) | QUEUE_LOCKED ) )
500
- } else {
501
- None
502
- }
503
- } ) {
504
- Ok ( state) => break state,
505
- Err ( _) => { }
479
+ ) ,
480
+ }
481
+
482
+ // Attempt to grab the queue lock.
483
+ let state = loop {
484
+ match self . state . fetch_update ( Release , Acquire , |ptr : State | {
485
+ // Go from not queue locked to being queue locked.
486
+ if ptr. mask ( QUEUE_LOCKED ) . addr ( ) == 0 {
487
+ Some ( without_provenance_mut ( ptr. addr ( ) | QUEUE_LOCKED ) )
488
+ } else {
489
+ None
506
490
}
507
- } ;
491
+ } ) {
492
+ Ok ( state) => break state,
493
+ Err ( _) => { }
494
+ }
495
+ } ;
496
+
497
+ // SAFETY: By Invariant 2 we know that this tail is valid.
498
+ let mut tail_ptr = unsafe { add_backlinks_and_find_tail ( to_node ( state) ) } ;
499
+ let mut prev_ptr;
500
+
501
+ // We start with 1 reader, which is the current thread.
502
+ let mut readers = 1 ;
508
503
509
- // SAFETY: FIXME
510
- let _head = unsafe { to_node ( state) . as_ref ( ) } ;
504
+ // Wake up all readers at the top of the queue.
505
+ loop {
506
+ // SAFETY: We have the queue lock so nobody else can be modifying the queue or the tail.
507
+ let tail = unsafe { tail_ptr. as_mut ( ) } ;
508
+
509
+ // If the current `tail` is a writer thread, then we have no readers left to wake.
510
+ // If there is no valid `prev` backlink, then we don't want to remove this node either
511
+ // because we will have no `tail` that we can replace this node with.
512
+ if tail. write || tail. prev . get ( ) . is_none ( ) {
513
+ // Store the reader count in the `next` field.
514
+ tail. next . 0 = AtomicPtr :: new ( without_provenance_mut ( readers) ) ;
515
+ break ;
516
+ }
517
+
518
+ prev_ptr = tail. prev . get ( ) . expect ( "Cannot be a `None` variant by the above" ) ;
519
+
520
+ // SAFETY: By Invariant 2, we know that all `prev` backlinks are valid, and since we
521
+ // have the queue lock, nobody can be modifying the queue or this node.
522
+ let prev = unsafe { prev_ptr. as_mut ( ) } ;
511
523
512
- // FIXME Is this correct?
513
- // SAFETY: Since we have the write lock, nobody else can be modifying state, and since
514
- // we got `state` from the `compare_exchange`, we know it is a valid head of the queue.
515
- let tail = unsafe { add_backlinks_and_find_tail ( to_node ( state ) ) . as_ref ( ) } ;
524
+ // Notify the reader thread that they are now in read mode.
525
+ tail . is_reading . store ( true , Release ) ;
526
+ readers += 1 ;
527
+ prev . next . 0 = AtomicPtr :: new ( without_provenance_mut ( readers ) ) ;
516
528
517
- while !tail. write {
518
- todo ! ( )
529
+ // SAFETY: Because we observed that there were waiting threads from the first CAS, we
530
+ // know that the state must have been a valid head to the wait queue as nobody can be
531
+ // modifying the node itself.
532
+ let head = unsafe { to_node ( self . state . load ( Acquire ) ) . as_ref ( ) } ;
533
+
534
+ // Split off the last node and update the `tail` field of `state`.
535
+ head. tail . set ( Some ( prev_ptr) ) ;
536
+ tail_ptr = prev_ptr;
537
+
538
+ unsafe {
539
+ Node :: complete ( tail_ptr) ;
519
540
}
520
541
}
542
+
543
+ // Once we are done updating the queue, release the queue lock.
544
+ self . state . fetch_byte_sub ( QUEUE_LOCKED , Release ) ;
521
545
}
522
546
523
547
/// # Safety
0 commit comments