@@ -45,9 +45,10 @@ struct ptr_ring {
45
45
};
46
46
47
47
/* Note: callers invoking this in a loop must use a compiler barrier,
48
- * for example cpu_relax(). If ring is ever resized, callers must hold
49
- * producer_lock - see e.g. ptr_ring_full. Otherwise, if callers don't hold
50
- * producer_lock, the next call to __ptr_ring_produce may fail.
48
+ * for example cpu_relax().
49
+ *
50
+ * NB: this is unlike __ptr_ring_empty in that callers must hold producer_lock:
51
+ * see e.g. ptr_ring_full.
51
52
*/
52
53
static inline bool __ptr_ring_full (struct ptr_ring * r )
53
54
{
@@ -113,7 +114,7 @@ static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr)
113
114
/* Pairs with smp_read_barrier_depends in __ptr_ring_consume. */
114
115
smp_wmb ();
115
116
116
- r -> queue [r -> producer ++ ] = ptr ;
117
+ WRITE_ONCE ( r -> queue [r -> producer ++ ], ptr ) ;
117
118
if (unlikely (r -> producer >= r -> size ))
118
119
r -> producer = 0 ;
119
120
return 0 ;
@@ -169,32 +170,36 @@ static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr)
169
170
return ret ;
170
171
}
171
172
172
- /* Note: callers invoking this in a loop must use a compiler barrier,
173
- * for example cpu_relax(). Callers must take consumer_lock
174
- * if they dereference the pointer - see e.g. PTR_RING_PEEK_CALL.
175
- * If ring is never resized, and if the pointer is merely
176
- * tested, there's no need to take the lock - see e.g. __ptr_ring_empty.
177
- * However, if called outside the lock, and if some other CPU
178
- * consumes ring entries at the same time, the value returned
179
- * is not guaranteed to be correct.
180
- * In this case - to avoid incorrectly detecting the ring
181
- * as empty - the CPU consuming the ring entries is responsible
182
- * for either consuming all ring entries until the ring is empty,
183
- * or synchronizing with some other CPU and causing it to
184
- * execute __ptr_ring_peek and/or consume the ring enteries
185
- * after the synchronization point.
186
- */
187
173
static inline void * __ptr_ring_peek (struct ptr_ring * r )
188
174
{
189
175
if (likely (r -> size ))
190
- return r -> queue [r -> consumer_head ];
176
+ return READ_ONCE ( r -> queue [r -> consumer_head ]) ;
191
177
return NULL ;
192
178
}
193
179
194
- /* See __ptr_ring_peek above for locking rules. */
180
+ /*
181
+ * Test ring empty status without taking any locks.
182
+ *
183
+ * NB: This is only safe to call if ring is never resized.
184
+ *
185
+ * However, if some other CPU consumes ring entries at the same time, the value
186
+ * returned is not guaranteed to be correct.
187
+ *
188
+ * In this case - to avoid incorrectly detecting the ring
189
+ * as empty - the CPU consuming the ring entries is responsible
190
+ * for either consuming all ring entries until the ring is empty,
191
+ * or synchronizing with some other CPU and causing it to
192
+ * re-test __ptr_ring_empty and/or consume the ring enteries
193
+ * after the synchronization point.
194
+ *
195
+ * Note: callers invoking this in a loop must use a compiler barrier,
196
+ * for example cpu_relax().
197
+ */
195
198
static inline bool __ptr_ring_empty (struct ptr_ring * r )
196
199
{
197
- return !__ptr_ring_peek (r );
200
+ if (likely (r -> size ))
201
+ return !r -> queue [READ_ONCE (r -> consumer_head )];
202
+ return true;
198
203
}
199
204
200
205
static inline bool ptr_ring_empty (struct ptr_ring * r )
@@ -248,35 +253,43 @@ static inline void __ptr_ring_discard_one(struct ptr_ring *r)
248
253
/* Fundamentally, what we want to do is update consumer
249
254
* index and zero out the entry so producer can reuse it.
250
255
* Doing it naively at each consume would be as simple as:
251
- * r->queue[r->consumer++] = NULL;
252
- * if (unlikely(r->consumer >= r->size))
253
- * r->consumer = 0;
256
+ * consumer = r->consumer;
257
+ * r->queue[consumer++] = NULL;
258
+ * if (unlikely(consumer >= r->size))
259
+ * consumer = 0;
260
+ * r->consumer = consumer;
254
261
* but that is suboptimal when the ring is full as producer is writing
255
262
* out new entries in the same cache line. Defer these updates until a
256
263
* batch of entries has been consumed.
257
264
*/
258
- int head = r -> consumer_head ++ ;
265
+ /* Note: we must keep consumer_head valid at all times for __ptr_ring_empty
266
+ * to work correctly.
267
+ */
268
+ int consumer_head = r -> consumer_head ;
269
+ int head = consumer_head ++ ;
259
270
260
271
/* Once we have processed enough entries invalidate them in
261
272
* the ring all at once so producer can reuse their space in the ring.
262
273
* We also do this when we reach end of the ring - not mandatory
263
274
* but helps keep the implementation simple.
264
275
*/
265
- if (unlikely (r -> consumer_head - r -> consumer_tail >= r -> batch ||
266
- r -> consumer_head >= r -> size )) {
276
+ if (unlikely (consumer_head - r -> consumer_tail >= r -> batch ||
277
+ consumer_head >= r -> size )) {
267
278
/* Zero out entries in the reverse order: this way we touch the
268
279
* cache line that producer might currently be reading the last;
269
280
* producer won't make progress and touch other cache lines
270
281
* besides the first one until we write out all entries.
271
282
*/
272
283
while (likely (head >= r -> consumer_tail ))
273
284
r -> queue [head -- ] = NULL ;
274
- r -> consumer_tail = r -> consumer_head ;
285
+ r -> consumer_tail = consumer_head ;
275
286
}
276
- if (unlikely (r -> consumer_head >= r -> size )) {
277
- r -> consumer_head = 0 ;
287
+ if (unlikely (consumer_head >= r -> size )) {
288
+ consumer_head = 0 ;
278
289
r -> consumer_tail = 0 ;
279
290
}
291
+ /* matching READ_ONCE in __ptr_ring_empty for lockless tests */
292
+ WRITE_ONCE (r -> consumer_head , consumer_head );
280
293
}
281
294
282
295
static inline void * __ptr_ring_consume (struct ptr_ring * r )
@@ -453,12 +466,7 @@ static inline int ptr_ring_consume_batched_bh(struct ptr_ring *r,
453
466
454
467
static inline void * * __ptr_ring_init_queue_alloc (unsigned int size , gfp_t gfp )
455
468
{
456
- /* Allocate an extra dummy element at end of ring to avoid consumer head
457
- * or produce head access past the end of the array. Possible when
458
- * producer/consumer operations and __ptr_ring_peek operations run in
459
- * parallel.
460
- */
461
- return kcalloc (size + 1 , sizeof (void * ), gfp );
469
+ return kcalloc (size , sizeof (void * ), gfp );
462
470
}
463
471
464
472
static inline void __ptr_ring_set_size (struct ptr_ring * r , int size )
@@ -532,7 +540,9 @@ static inline void ptr_ring_unconsume(struct ptr_ring *r, void **batch, int n,
532
540
goto done ;
533
541
}
534
542
r -> queue [head ] = batch [-- n ];
535
- r -> consumer_tail = r -> consumer_head = head ;
543
+ r -> consumer_tail = head ;
544
+ /* matching READ_ONCE in __ptr_ring_empty for lockless tests */
545
+ WRITE_ONCE (r -> consumer_head , head );
536
546
}
537
547
538
548
done :
0 commit comments