Skip to content

Commit 6b2344b

Browse files
committed
Add futex-based RwLock on Linux.
1 parent f1a4041 commit 6b2344b

File tree

2 files changed

+230
-2
lines changed

2 files changed

+230
-2
lines changed
Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
use crate::sync::atomic::{
2+
AtomicI32,
3+
Ordering::{Acquire, Relaxed, Release},
4+
};
5+
use crate::sys::futex::{futex_wait, futex_wake, futex_wake_all};
6+
7+
pub type MovableRwLock = RwLock;
8+
9+
pub struct RwLock {
10+
// The state consists of a 30-bit reader counter, a 'readers waiting' flag, and a 'writers waiting' flag.
11+
// All bits of the reader counter set means write locked.
12+
// A reader count of zero means the lock is unlocked.
13+
// See the constants below.
14+
// Readers wait on this futex.
15+
state: AtomicI32,
16+
// The 'condition variable' to notify writers through.
17+
// Incremented on every signal.
18+
// Writers wait on this futex.
19+
writer_notify: AtomicI32,
20+
}
21+
22+
const READ_LOCKED: i32 = 1;
23+
const MAX_READERS: i32 = (1 << 30) - 2;
24+
const WRITE_LOCKED: i32 = (1 << 30) - 1;
25+
const READERS_WAITING: i32 = 1 << 30;
26+
const WRITERS_WAITING: i32 = 1 << 31;
27+
28+
fn readers(state: i32) -> i32 {
29+
state & !(READERS_WAITING + WRITERS_WAITING)
30+
}
31+
32+
fn readers_waiting(state: i32) -> bool {
33+
state & READERS_WAITING != 0
34+
}
35+
36+
fn writers_waiting(state: i32) -> bool {
37+
state & WRITERS_WAITING != 0
38+
}
39+
40+
fn read_lockable(state: i32) -> bool {
41+
readers(state) < MAX_READERS && !readers_waiting(state) && !writers_waiting(state)
42+
}
43+
44+
impl RwLock {
45+
#[inline]
46+
pub const fn new() -> Self {
47+
Self { state: AtomicI32::new(0), writer_notify: AtomicI32::new(0) }
48+
}
49+
50+
#[inline]
51+
pub unsafe fn destroy(&self) {}
52+
53+
#[inline]
54+
pub unsafe fn try_read(&self) -> bool {
55+
self.state
56+
.fetch_update(Acquire, Relaxed, |s| read_lockable(s).then(|| s + READ_LOCKED))
57+
.is_ok()
58+
}
59+
60+
#[inline]
61+
pub unsafe fn read(&self) {
62+
if let Err(s) =
63+
self.state.fetch_update(Acquire, Relaxed, |s| read_lockable(s).then(|| s + READ_LOCKED))
64+
{
65+
self.read_contended(s);
66+
}
67+
}
68+
69+
#[inline]
70+
pub unsafe fn try_write(&self) -> bool {
71+
self.state
72+
.fetch_update(Acquire, Relaxed, |s| (readers(s) == 0).then(|| s + WRITE_LOCKED))
73+
.is_ok()
74+
}
75+
76+
#[inline]
77+
pub unsafe fn write(&self) {
78+
if let Err(s) = self
79+
.state
80+
.fetch_update(Acquire, Relaxed, |s| (readers(s) == 0).then(|| s + WRITE_LOCKED))
81+
{
82+
self.write_contended(s);
83+
}
84+
}
85+
86+
#[inline]
87+
pub unsafe fn read_unlock(&self) {
88+
if self.state.fetch_sub(READ_LOCKED, Release) == READ_LOCKED + WRITERS_WAITING {
89+
self.wake_after_read_unlock();
90+
}
91+
}
92+
93+
#[inline]
94+
pub unsafe fn write_unlock(&self) {
95+
if let Err(e) = self.state.compare_exchange(WRITE_LOCKED, 0, Release, Relaxed) {
96+
self.write_unlock_contended(e);
97+
}
98+
}
99+
100+
#[cold]
101+
fn read_contended(&self, mut state: i32) {
102+
loop {
103+
if read_lockable(state) {
104+
match self.state.compare_exchange(state, state + READ_LOCKED, Acquire, Relaxed) {
105+
Ok(_) => return, // Locked!
106+
Err(s) => {
107+
state = s;
108+
continue;
109+
}
110+
}
111+
}
112+
113+
if readers(state) == MAX_READERS {
114+
panic!("too many active read locks on RwLock");
115+
}
116+
117+
// Make sure the readers waiting bit is set before we go to sleep.
118+
if !readers_waiting(state) {
119+
if let Err(s) =
120+
self.state.compare_exchange(state, state | READERS_WAITING, Relaxed, Relaxed)
121+
{
122+
state = s;
123+
continue;
124+
}
125+
}
126+
127+
// Wait for the state to change.
128+
futex_wait(&self.state, state | READERS_WAITING, None);
129+
130+
state = self.state.load(Relaxed);
131+
}
132+
}
133+
134+
#[cold]
135+
fn write_contended(&self, mut state: i32) {
136+
loop {
137+
// If it's unlocked, we try to lock it.
138+
if readers(state) == 0 {
139+
match self.state.compare_exchange(
140+
state,
141+
state | WRITE_LOCKED | WRITERS_WAITING, // Other threads might be waiting.
142+
Acquire,
143+
Relaxed,
144+
) {
145+
Ok(_) => return, // Locked!
146+
Err(s) => {
147+
state = s;
148+
continue;
149+
}
150+
}
151+
}
152+
153+
// Set the waiting bit indicating that we're waiting on it.
154+
if !writers_waiting(state) {
155+
match self.state.compare_exchange(state, state | WRITERS_WAITING, Relaxed, Relaxed)
156+
{
157+
Ok(_) => state |= WRITERS_WAITING,
158+
Err(s) => {
159+
state = s;
160+
continue;
161+
}
162+
}
163+
}
164+
165+
// Examine the notification counter before we check if `state` has changed,
166+
// to make sure we don't miss any notifications.
167+
let seq = self.writer_notify.load(Acquire);
168+
169+
// Don't go to sleep if the state has already changed.
170+
let s = self.state.load(Relaxed);
171+
if state != s {
172+
state = s;
173+
continue;
174+
}
175+
176+
// Wait for the state to change.
177+
futex_wait(&self.writer_notify, seq, None);
178+
179+
// Check out the new state.
180+
state = self.state.load(Relaxed);
181+
}
182+
}
183+
184+
#[cold]
185+
fn wake_after_read_unlock(&self) {
186+
// If this compare_exchange fails, another writer already locked, which
187+
// will take care of waking up the next waiting writer.
188+
if self.state.compare_exchange(WRITERS_WAITING, 0, Relaxed, Relaxed).is_ok() {
189+
self.writer_notify.fetch_add(1, Release);
190+
futex_wake(&self.writer_notify);
191+
}
192+
}
193+
194+
#[cold]
195+
fn write_unlock_contended(&self, mut state: i32) {
196+
// If there are any waiting writers _or_ waiting readers, but not both (!),
197+
// we turn off that bit while unlocking.
198+
if readers_waiting(state) != writers_waiting(state) {
199+
if self.state.compare_exchange(state, 0, Release, Relaxed).is_err() {
200+
// The only way this can fail is if the other waiting bit was set too.
201+
state |= READERS_WAITING | WRITERS_WAITING;
202+
}
203+
}
204+
205+
// If both readers and writers are waiting, unlock but leave the readers waiting.
206+
if readers_waiting(state) && writers_waiting(state) {
207+
self.state.store(READERS_WAITING, Release);
208+
}
209+
210+
if writers_waiting(state) {
211+
// Notify one writer, if any writer was waiting.
212+
self.writer_notify.fetch_add(1, Release);
213+
if !futex_wake(&self.writer_notify) {
214+
// If there was no writer to wake up, maybe there's readers to wake up instead.
215+
if readers_waiting(state) {
216+
// If this compare_exchange fails, another writer already locked, which
217+
// will take care of waking up the next waiting writer.
218+
if self.state.compare_exchange(READERS_WAITING, 0, Relaxed, Relaxed).is_ok() {
219+
futex_wake_all(&self.state);
220+
}
221+
}
222+
}
223+
} else if readers_waiting(state) {
224+
// Notify all readers, if any reader was waiting.
225+
futex_wake_all(&self.state);
226+
}
227+
}
228+
}

library/std/src/sys/unix/locks/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,13 @@ cfg_if::cfg_if! {
44
target_os = "android",
55
))] {
66
mod futex;
7+
mod futex_rwlock;
78
#[allow(dead_code)]
89
mod pthread_mutex; // Only used for PthreadMutexAttr, needed by pthread_remutex.
910
mod pthread_remutex; // FIXME: Implement this using a futex
10-
mod pthread_rwlock; // FIXME: Implement this using a futex
1111
pub use futex::{Mutex, MovableMutex, Condvar, MovableCondvar};
1212
pub use pthread_remutex::ReentrantMutex;
13-
pub use pthread_rwlock::{RwLock, MovableRwLock};
13+
pub use futex_rwlock::{RwLock, MovableRwLock};
1414
} else {
1515
mod pthread_mutex;
1616
mod pthread_remutex;

0 commit comments

Comments
 (0)