41
41
* The risk of writer starvation is there, but the pathological use cases
42
42
* which trigger it are not necessarily the typical RT workloads.
43
43
*
44
+ * Fast-path orderings:
45
+ * The lock/unlock of readers can run in fast paths: lock and unlock are only
46
+ * atomic ops, and there is no inner lock to provide ACQUIRE and RELEASE
47
+ * semantics of rwbase_rt. Atomic ops should thus provide _acquire()
48
+ * and _release() (or stronger).
49
+ *
44
50
* Common code shared between RT rw_semaphore and rwlock
45
51
*/
46
52
@@ -53,6 +59,7 @@ static __always_inline int rwbase_read_trylock(struct rwbase_rt *rwb)
53
59
* set.
54
60
*/
55
61
for (r = atomic_read (& rwb -> readers ); r < 0 ;) {
62
+ /* Fully-ordered if cmpxchg() succeeds, provides ACQUIRE */
56
63
if (likely (atomic_try_cmpxchg (& rwb -> readers , & r , r + 1 )))
57
64
return 1 ;
58
65
}
@@ -162,6 +169,8 @@ static __always_inline void rwbase_read_unlock(struct rwbase_rt *rwb,
162
169
/*
163
170
* rwb->readers can only hit 0 when a writer is waiting for the
164
171
* active readers to leave the critical section.
172
+ *
173
+ * dec_and_test() is fully ordered, provides RELEASE.
165
174
*/
166
175
if (unlikely (atomic_dec_and_test (& rwb -> readers )))
167
176
__rwbase_read_unlock (rwb , state );
@@ -172,7 +181,11 @@ static inline void __rwbase_write_unlock(struct rwbase_rt *rwb, int bias,
172
181
{
173
182
struct rt_mutex_base * rtm = & rwb -> rtmutex ;
174
183
175
- atomic_add (READER_BIAS - bias , & rwb -> readers );
184
+ /*
185
+ * _release() is needed in case that reader is in fast path, pairing
186
+ * with atomic_try_cmpxchg() in rwbase_read_trylock(), provides RELEASE
187
+ */
188
+ (void )atomic_add_return_release (READER_BIAS - bias , & rwb -> readers );
176
189
raw_spin_unlock_irqrestore (& rtm -> wait_lock , flags );
177
190
rwbase_rtmutex_unlock (rtm );
178
191
}
@@ -196,6 +209,23 @@ static inline void rwbase_write_downgrade(struct rwbase_rt *rwb)
196
209
__rwbase_write_unlock (rwb , WRITER_BIAS - 1 , flags );
197
210
}
198
211
212
+ static inline bool __rwbase_write_trylock (struct rwbase_rt * rwb )
213
+ {
214
+ /* Can do without CAS because we're serialized by wait_lock. */
215
+ lockdep_assert_held (& rwb -> rtmutex .wait_lock );
216
+
217
+ /*
218
+ * _acquire is needed in case the reader is in the fast path, pairing
219
+ * with rwbase_read_unlock(), provides ACQUIRE.
220
+ */
221
+ if (!atomic_read_acquire (& rwb -> readers )) {
222
+ atomic_set (& rwb -> readers , WRITER_BIAS );
223
+ return 1 ;
224
+ }
225
+
226
+ return 0 ;
227
+ }
228
+
199
229
static int __sched rwbase_write_lock (struct rwbase_rt * rwb ,
200
230
unsigned int state )
201
231
{
@@ -210,34 +240,30 @@ static int __sched rwbase_write_lock(struct rwbase_rt *rwb,
210
240
atomic_sub (READER_BIAS , & rwb -> readers );
211
241
212
242
raw_spin_lock_irqsave (& rtm -> wait_lock , flags );
213
- /*
214
- * set_current_state() for rw_semaphore
215
- * current_save_and_set_rtlock_wait_state() for rwlock
216
- */
217
- rwbase_set_and_save_current_state (state );
243
+ if (__rwbase_write_trylock (rwb ))
244
+ goto out_unlock ;
218
245
219
- /* Block until all readers have left the critical section. */
220
- for (; atomic_read ( & rwb -> readers ) ;) {
246
+ rwbase_set_and_save_current_state ( state );
247
+ for (;;) {
221
248
/* Optimized out for rwlocks */
222
249
if (rwbase_signal_pending_state (state , current )) {
223
- __set_current_state ( TASK_RUNNING );
250
+ rwbase_restore_current_state ( );
224
251
__rwbase_write_unlock (rwb , 0 , flags );
225
252
return - EINTR ;
226
253
}
254
+
255
+ if (__rwbase_write_trylock (rwb ))
256
+ break ;
257
+
227
258
raw_spin_unlock_irqrestore (& rtm -> wait_lock , flags );
259
+ rwbase_schedule ();
260
+ raw_spin_lock_irqsave (& rtm -> wait_lock , flags );
228
261
229
- /*
230
- * Schedule and wait for the readers to leave the critical
231
- * section. The last reader leaving it wakes the waiter.
232
- */
233
- if (atomic_read (& rwb -> readers ) != 0 )
234
- rwbase_schedule ();
235
262
set_current_state (state );
236
- raw_spin_lock_irqsave (& rtm -> wait_lock , flags );
237
263
}
238
-
239
- atomic_set (& rwb -> readers , WRITER_BIAS );
240
264
rwbase_restore_current_state ();
265
+
266
+ out_unlock :
241
267
raw_spin_unlock_irqrestore (& rtm -> wait_lock , flags );
242
268
return 0 ;
243
269
}
@@ -253,8 +279,7 @@ static inline int rwbase_write_trylock(struct rwbase_rt *rwb)
253
279
atomic_sub (READER_BIAS , & rwb -> readers );
254
280
255
281
raw_spin_lock_irqsave (& rtm -> wait_lock , flags );
256
- if (!atomic_read (& rwb -> readers )) {
257
- atomic_set (& rwb -> readers , WRITER_BIAS );
282
+ if (__rwbase_write_trylock (rwb )) {
258
283
raw_spin_unlock_irqrestore (& rtm -> wait_lock , flags );
259
284
return 1 ;
260
285
}
0 commit comments