Skip to content

Commit b7f72a3

Browse files
tirthendu-intelAlexei Starovoitov
authored andcommitted
xsk: introduce wrappers and helpers for supporting multi-buffer in Tx path
In Tx path, xsk core reserves space for each desc to be transmitted in the completion queue and it's address contained in it is stored in the skb destructor arg. After successful transmission the skb destructor submits the addr marking completion. To handle multiple descriptors per packet, now along with reserving space for each descriptor, the corresponding address is also stored in completion queue. The number of pending descriptors are stored in skb destructor arg and is used by the skb destructor to update completions. Introduce 'skb' in xdp_sock to store a partially built packet when __xsk_generic_xmit() must return before it sees the EOP descriptor for the current packet so that packet building can resume in next call of __xsk_generic_xmit(). Helper functions are introduced to set and get the pending descriptors in the skb destructor arg. Also, wrappers are introduced for storing descriptor addresses, submitting and cancelling (for unsuccessful transmissions) the number of completions. Signed-off-by: Tirthendu Sarkar <[email protected]> Link: https://lore.kernel.org/r/[email protected] Signed-off-by: Alexei Starovoitov <[email protected]>
1 parent 8046277 commit b7f72a3

File tree

3 files changed

+67
-32
lines changed

3 files changed

+67
-32
lines changed

include/net/xdp_sock.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,12 @@ struct xdp_sock {
6868
u64 rx_dropped;
6969
u64 rx_queue_full;
7070

71+
/* When __xsk_generic_xmit() must return before it sees the EOP descriptor for the current
72+
* packet, the partially built skb is saved here so that packet building can resume in next
73+
* call of __xsk_generic_xmit().
74+
*/
75+
struct sk_buff *skb;
76+
7177
struct list_head map_list;
7278
/* Protects map_list */
7379
spinlock_t map_list_lock;

net/xdp/xsk.c

Lines changed: 54 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -480,19 +480,65 @@ static int xsk_wakeup(struct xdp_sock *xs, u8 flags)
480480
return dev->netdev_ops->ndo_xsk_wakeup(dev, xs->queue_id, flags);
481481
}
482482

483-
static void xsk_destruct_skb(struct sk_buff *skb)
483+
static int xsk_cq_reserve_addr_locked(struct xdp_sock *xs, u64 addr)
484+
{
485+
unsigned long flags;
486+
int ret;
487+
488+
spin_lock_irqsave(&xs->pool->cq_lock, flags);
489+
ret = xskq_prod_reserve_addr(xs->pool->cq, addr);
490+
spin_unlock_irqrestore(&xs->pool->cq_lock, flags);
491+
492+
return ret;
493+
}
494+
495+
static void xsk_cq_submit_locked(struct xdp_sock *xs, u32 n)
496+
{
497+
unsigned long flags;
498+
499+
spin_lock_irqsave(&xs->pool->cq_lock, flags);
500+
xskq_prod_submit_n(xs->pool->cq, n);
501+
spin_unlock_irqrestore(&xs->pool->cq_lock, flags);
502+
}
503+
504+
static void xsk_cq_cancel_locked(struct xdp_sock *xs, u32 n)
484505
{
485-
u64 addr = (u64)(long)skb_shinfo(skb)->destructor_arg;
486-
struct xdp_sock *xs = xdp_sk(skb->sk);
487506
unsigned long flags;
488507

489508
spin_lock_irqsave(&xs->pool->cq_lock, flags);
490-
xskq_prod_submit_addr(xs->pool->cq, addr);
509+
xskq_prod_cancel_n(xs->pool->cq, n);
491510
spin_unlock_irqrestore(&xs->pool->cq_lock, flags);
511+
}
512+
513+
static u32 xsk_get_num_desc(struct sk_buff *skb)
514+
{
515+
return skb ? (long)skb_shinfo(skb)->destructor_arg : 0;
516+
}
492517

518+
static void xsk_destruct_skb(struct sk_buff *skb)
519+
{
520+
xsk_cq_submit_locked(xdp_sk(skb->sk), xsk_get_num_desc(skb));
493521
sock_wfree(skb);
494522
}
495523

524+
static void xsk_set_destructor_arg(struct sk_buff *skb)
525+
{
526+
long num = xsk_get_num_desc(xdp_sk(skb->sk)->skb) + 1;
527+
528+
skb_shinfo(skb)->destructor_arg = (void *)num;
529+
}
530+
531+
static void xsk_consume_skb(struct sk_buff *skb)
532+
{
533+
struct xdp_sock *xs = xdp_sk(skb->sk);
534+
535+
skb->destructor = sock_wfree;
536+
xsk_cq_cancel_locked(xs, xsk_get_num_desc(skb));
537+
/* Free skb without triggering the perf drop trace */
538+
consume_skb(skb);
539+
xs->skb = NULL;
540+
}
541+
496542
static struct sk_buff *xsk_build_skb_zerocopy(struct xdp_sock *xs,
497543
struct xdp_desc *desc)
498544
{
@@ -578,8 +624,8 @@ static struct sk_buff *xsk_build_skb(struct xdp_sock *xs,
578624
skb->dev = dev;
579625
skb->priority = xs->sk.sk_priority;
580626
skb->mark = xs->sk.sk_mark;
581-
skb_shinfo(skb)->destructor_arg = (void *)(long)desc->addr;
582627
skb->destructor = xsk_destruct_skb;
628+
xsk_set_destructor_arg(skb);
583629

584630
return skb;
585631
}
@@ -591,7 +637,6 @@ static int __xsk_generic_xmit(struct sock *sk)
591637
bool sent_frame = false;
592638
struct xdp_desc desc;
593639
struct sk_buff *skb;
594-
unsigned long flags;
595640
int err = 0;
596641

597642
mutex_lock(&xs->mutex);
@@ -616,31 +661,20 @@ static int __xsk_generic_xmit(struct sock *sk)
616661
* if there is space in it. This avoids having to implement
617662
* any buffering in the Tx path.
618663
*/
619-
spin_lock_irqsave(&xs->pool->cq_lock, flags);
620-
if (xskq_prod_reserve(xs->pool->cq)) {
621-
spin_unlock_irqrestore(&xs->pool->cq_lock, flags);
664+
if (xsk_cq_reserve_addr_locked(xs, desc.addr))
622665
goto out;
623-
}
624-
spin_unlock_irqrestore(&xs->pool->cq_lock, flags);
625666

626667
skb = xsk_build_skb(xs, &desc);
627668
if (IS_ERR(skb)) {
628669
err = PTR_ERR(skb);
629-
spin_lock_irqsave(&xs->pool->cq_lock, flags);
630-
xskq_prod_cancel(xs->pool->cq);
631-
spin_unlock_irqrestore(&xs->pool->cq_lock, flags);
670+
xsk_cq_cancel_locked(xs, 1);
632671
goto out;
633672
}
634673

635674
err = __dev_direct_xmit(skb, xs->queue_id);
636675
if (err == NETDEV_TX_BUSY) {
637676
/* Tell user-space to retry the send */
638-
skb->destructor = sock_wfree;
639-
spin_lock_irqsave(&xs->pool->cq_lock, flags);
640-
xskq_prod_cancel(xs->pool->cq);
641-
spin_unlock_irqrestore(&xs->pool->cq_lock, flags);
642-
/* Free skb without triggering the perf drop trace */
643-
consume_skb(skb);
677+
xsk_consume_skb(skb);
644678
err = -EAGAIN;
645679
goto out;
646680
}

net/xdp/xsk_queue.h

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,11 @@ static inline void xskq_cons_release(struct xsk_queue *q)
297297
q->cached_cons++;
298298
}
299299

300+
static inline void xskq_cons_cancel_n(struct xsk_queue *q, u32 cnt)
301+
{
302+
q->cached_cons -= cnt;
303+
}
304+
300305
static inline u32 xskq_cons_present_entries(struct xsk_queue *q)
301306
{
302307
/* No barriers needed since data is not accessed */
@@ -324,9 +329,9 @@ static inline bool xskq_prod_is_full(struct xsk_queue *q)
324329
return xskq_prod_nb_free(q, 1) ? false : true;
325330
}
326331

327-
static inline void xskq_prod_cancel(struct xsk_queue *q)
332+
static inline void xskq_prod_cancel_n(struct xsk_queue *q, u32 cnt)
328333
{
329-
q->cached_prod--;
334+
q->cached_prod -= cnt;
330335
}
331336

332337
static inline int xskq_prod_reserve(struct xsk_queue *q)
@@ -392,16 +397,6 @@ static inline void xskq_prod_submit(struct xsk_queue *q)
392397
__xskq_prod_submit(q, q->cached_prod);
393398
}
394399

395-
static inline void xskq_prod_submit_addr(struct xsk_queue *q, u64 addr)
396-
{
397-
struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
398-
u32 idx = q->ring->producer;
399-
400-
ring->desc[idx++ & q->ring_mask] = addr;
401-
402-
__xskq_prod_submit(q, idx);
403-
}
404-
405400
static inline void xskq_prod_submit_n(struct xsk_queue *q, u32 nb_entries)
406401
{
407402
__xskq_prod_submit(q, q->ring->producer + nb_entries);

0 commit comments

Comments
 (0)