Skip to content

Commit 5d49de5

Browse files
mstsirkindavem330
authored andcommitted
ptr_ring: resize support
This adds ring resize support. Seems to be necessary as users such as tun allow userspace control over queue size. If resize is used, this costs us ability to peek at queue without consumer lock - should not be a big deal as peek and consumer are usually run on the same CPU. If ring is made bigger, ring contents is preserved. If ring is made smaller, extra pointers are passed to an optional destructor callback. Cleanup function also gains destructor callback such that all pointers in queue can be cleaned up. This changes some APIs but we don't have any users yet, so it won't break bisect. Signed-off-by: Michael S. Tsirkin <[email protected]> Acked-by: Jesper Dangaard Brouer <[email protected]> Signed-off-by: David S. Miller <[email protected]>
1 parent ad69f35 commit 5d49de5

File tree

1 file changed

+143
-14
lines changed

1 file changed

+143
-14
lines changed

include/linux/ptr_ring.h

Lines changed: 143 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,9 @@ struct ptr_ring {
4343
};
4444

4545
/* Note: callers invoking this in a loop must use a compiler barrier,
46-
* for example cpu_relax().
47-
* Callers don't need to take producer lock - if they don't
48-
* the next call to __ptr_ring_produce may fail.
46+
* for example cpu_relax(). If ring is ever resized, callers must hold
47+
* producer_lock - see e.g. ptr_ring_full. Otherwise, if callers don't hold
48+
* producer_lock, the next call to __ptr_ring_produce may fail.
4949
*/
5050
static inline bool __ptr_ring_full(struct ptr_ring *r)
5151
{
@@ -54,16 +54,55 @@ static inline bool __ptr_ring_full(struct ptr_ring *r)
5454

5555
static inline bool ptr_ring_full(struct ptr_ring *r)
5656
{
57-
barrier();
58-
return __ptr_ring_full(r);
57+
bool ret;
58+
59+
spin_lock(&r->producer_lock);
60+
ret = __ptr_ring_full(r);
61+
spin_unlock(&r->producer_lock);
62+
63+
return ret;
64+
}
65+
66+
static inline bool ptr_ring_full_irq(struct ptr_ring *r)
67+
{
68+
bool ret;
69+
70+
spin_lock_irq(&r->producer_lock);
71+
ret = __ptr_ring_full(r);
72+
spin_unlock_irq(&r->producer_lock);
73+
74+
return ret;
75+
}
76+
77+
static inline bool ptr_ring_full_any(struct ptr_ring *r)
78+
{
79+
unsigned long flags;
80+
bool ret;
81+
82+
spin_lock_irqsave(&r->producer_lock, flags);
83+
ret = __ptr_ring_full(r);
84+
spin_unlock_irqrestore(&r->producer_lock, flags);
85+
86+
return ret;
87+
}
88+
89+
static inline bool ptr_ring_full_bh(struct ptr_ring *r)
90+
{
91+
bool ret;
92+
93+
spin_lock_bh(&r->producer_lock);
94+
ret = __ptr_ring_full(r);
95+
spin_unlock_bh(&r->producer_lock);
96+
97+
return ret;
5998
}
6099

61100
/* Note: callers invoking this in a loop must use a compiler barrier,
62-
* for example cpu_relax().
101+
* for example cpu_relax(). Callers must hold producer_lock.
63102
*/
64103
static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr)
65104
{
66-
if (__ptr_ring_full(r))
105+
if (r->queue[r->producer])
67106
return -ENOSPC;
68107

69108
r->queue[r->producer++] = ptr;
@@ -120,20 +159,68 @@ static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr)
120159
/* Note: callers invoking this in a loop must use a compiler barrier,
121160
* for example cpu_relax(). Callers must take consumer_lock
122161
* if they dereference the pointer - see e.g. PTR_RING_PEEK_CALL.
123-
* There's no need for a lock if pointer is merely tested - see e.g.
124-
* ptr_ring_empty.
162+
* If ring is never resized, and if the pointer is merely
163+
* tested, there's no need to take the lock - see e.g. __ptr_ring_empty.
125164
*/
126165
static inline void *__ptr_ring_peek(struct ptr_ring *r)
127166
{
128167
return r->queue[r->consumer];
129168
}
130169

131-
static inline bool ptr_ring_empty(struct ptr_ring *r)
170+
/* Note: callers invoking this in a loop must use a compiler barrier,
171+
* for example cpu_relax(). Callers must take consumer_lock
172+
* if the ring is ever resized - see e.g. ptr_ring_empty.
173+
*/
174+
static inline bool __ptr_ring_empty(struct ptr_ring *r)
132175
{
133-
barrier();
134176
return !__ptr_ring_peek(r);
135177
}
136178

179+
static inline bool ptr_ring_empty(struct ptr_ring *r)
180+
{
181+
bool ret;
182+
183+
spin_lock(&r->consumer_lock);
184+
ret = __ptr_ring_empty(r);
185+
spin_unlock(&r->consumer_lock);
186+
187+
return ret;
188+
}
189+
190+
static inline bool ptr_ring_empty_irq(struct ptr_ring *r)
191+
{
192+
bool ret;
193+
194+
spin_lock_irq(&r->consumer_lock);
195+
ret = __ptr_ring_empty(r);
196+
spin_unlock_irq(&r->consumer_lock);
197+
198+
return ret;
199+
}
200+
201+
static inline bool ptr_ring_empty_any(struct ptr_ring *r)
202+
{
203+
unsigned long flags;
204+
bool ret;
205+
206+
spin_lock_irqsave(&r->consumer_lock, flags);
207+
ret = __ptr_ring_empty(r);
208+
spin_unlock_irqrestore(&r->consumer_lock, flags);
209+
210+
return ret;
211+
}
212+
213+
static inline bool ptr_ring_empty_bh(struct ptr_ring *r)
214+
{
215+
bool ret;
216+
217+
spin_lock_bh(&r->consumer_lock);
218+
ret = __ptr_ring_empty(r);
219+
spin_unlock_bh(&r->consumer_lock);
220+
221+
return ret;
222+
}
223+
137224
/* Must only be called after __ptr_ring_peek returned !NULL */
138225
static inline void __ptr_ring_discard_one(struct ptr_ring *r)
139226
{
@@ -241,10 +328,14 @@ static inline void *ptr_ring_consume_bh(struct ptr_ring *r)
241328
__PTR_RING_PEEK_CALL_v; \
242329
})
243330

331+
static inline void **__ptr_ring_init_queue_alloc(int size, gfp_t gfp)
332+
{
333+
return kzalloc(ALIGN(size * sizeof(void *), SMP_CACHE_BYTES), gfp);
334+
}
335+
244336
static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp)
245337
{
246-
r->queue = kzalloc(ALIGN(size * sizeof *(r->queue), SMP_CACHE_BYTES),
247-
gfp);
338+
r->queue = __ptr_ring_init_queue_alloc(size, gfp);
248339
if (!r->queue)
249340
return -ENOMEM;
250341

@@ -256,8 +347,46 @@ static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp)
256347
return 0;
257348
}
258349

259-
static inline void ptr_ring_cleanup(struct ptr_ring *r)
350+
static inline int ptr_ring_resize(struct ptr_ring *r, int size, gfp_t gfp,
351+
void (*destroy)(void *))
352+
{
353+
unsigned long flags;
354+
int producer = 0;
355+
void **queue = __ptr_ring_init_queue_alloc(size, gfp);
356+
void **old;
357+
void *ptr;
358+
359+
if (!queue)
360+
return -ENOMEM;
361+
362+
spin_lock_irqsave(&(r)->producer_lock, flags);
363+
364+
while ((ptr = ptr_ring_consume(r)))
365+
if (producer < size)
366+
queue[producer++] = ptr;
367+
else if (destroy)
368+
destroy(ptr);
369+
370+
r->size = size;
371+
r->producer = producer;
372+
r->consumer = 0;
373+
old = r->queue;
374+
r->queue = queue;
375+
376+
spin_unlock_irqrestore(&(r)->producer_lock, flags);
377+
378+
kfree(old);
379+
380+
return 0;
381+
}
382+
383+
static inline void ptr_ring_cleanup(struct ptr_ring *r, void (*destroy)(void *))
260384
{
385+
void *ptr;
386+
387+
if (destroy)
388+
while ((ptr = ptr_ring_consume(r)))
389+
destroy(ptr);
261390
kfree(r->queue);
262391
}
263392

0 commit comments

Comments
 (0)