Skip to content

Commit c497176

Browse files
Björn TöpelAlexei Starovoitov
authored andcommitted
xsk: add Rx receive functions and poll support
Here the actual receive functions of AF_XDP are implemented, that in a later commit, will be called from the XDP layers. There's one set of functions for the XDP_DRV side and another for XDP_SKB (generic). A new XDP API, xdp_return_buff, is also introduced. Adding xdp_return_buff, which is analogous to xdp_return_frame, but acts upon an struct xdp_buff. The API will be used by AF_XDP in future commits. Support for the poll syscall is also implemented. v2: xskq_validate_id did not update cons_tail. The entries variable was calculated twice in xskq_nb_avail. Squashed xdp_return_buff commit. Signed-off-by: Björn Töpel <[email protected]> Signed-off-by: Alexei Starovoitov <[email protected]>
1 parent 965a990 commit c497176

File tree

6 files changed

+238
-5
lines changed

6 files changed

+238
-5
lines changed

include/net/xdp.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ struct xdp_frame *convert_to_xdp_frame(struct xdp_buff *xdp)
104104
}
105105

106106
void xdp_return_frame(struct xdp_frame *xdpf);
107+
void xdp_return_buff(struct xdp_buff *xdp);
107108

108109
int xdp_rxq_info_reg(struct xdp_rxq_info *xdp_rxq,
109110
struct net_device *dev, u32 queue_index);

include/net/xdp_sock.h

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,28 @@ struct xdp_sock {
3131
u16 queue_id;
3232
/* Protects multiple processes in the control path */
3333
struct mutex mutex;
34+
u64 rx_dropped;
3435
};
3536

37+
struct xdp_buff;
38+
#ifdef CONFIG_XDP_SOCKETS
39+
int xsk_generic_rcv(struct xdp_sock *xs, struct xdp_buff *xdp);
40+
int xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp);
41+
void xsk_flush(struct xdp_sock *xs);
42+
#else
43+
static inline int xsk_generic_rcv(struct xdp_sock *xs, struct xdp_buff *xdp)
44+
{
45+
return -ENOTSUPP;
46+
}
47+
48+
static inline int xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp)
49+
{
50+
return -ENOTSUPP;
51+
}
52+
53+
static inline void xsk_flush(struct xdp_sock *xs)
54+
{
55+
}
56+
#endif /* CONFIG_XDP_SOCKETS */
57+
3658
#endif /* _LINUX_XDP_SOCK_H */

net/core/xdp.c

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -308,11 +308,9 @@ int xdp_rxq_info_reg_mem_model(struct xdp_rxq_info *xdp_rxq,
308308
}
309309
EXPORT_SYMBOL_GPL(xdp_rxq_info_reg_mem_model);
310310

311-
void xdp_return_frame(struct xdp_frame *xdpf)
311+
static void xdp_return(void *data, struct xdp_mem_info *mem)
312312
{
313-
struct xdp_mem_info *mem = &xdpf->mem;
314313
struct xdp_mem_allocator *xa;
315-
void *data = xdpf->data;
316314
struct page *page;
317315

318316
switch (mem->type) {
@@ -339,4 +337,15 @@ void xdp_return_frame(struct xdp_frame *xdpf)
339337
break;
340338
}
341339
}
340+
341+
void xdp_return_frame(struct xdp_frame *xdpf)
342+
{
343+
xdp_return(xdpf->data, &xdpf->mem);
344+
}
342345
EXPORT_SYMBOL_GPL(xdp_return_frame);
346+
347+
void xdp_return_buff(struct xdp_buff *xdp)
348+
{
349+
xdp_return(xdp->data, &xdp->rxq->mem);
350+
}
351+
EXPORT_SYMBOL_GPL(xdp_return_buff);

net/xdp/xdp_umem.h

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,24 @@ struct xdp_umem {
3939
struct work_struct work;
4040
};
4141

42+
static inline char *xdp_umem_get_data(struct xdp_umem *umem, u32 idx)
43+
{
44+
u64 pg, off;
45+
char *data;
46+
47+
pg = idx >> umem->nfpplog2;
48+
off = (idx & umem->nfpp_mask) << umem->frame_size_log2;
49+
50+
data = page_address(umem->pgs[pg]);
51+
return data + off;
52+
}
53+
54+
static inline char *xdp_umem_get_data_with_headroom(struct xdp_umem *umem,
55+
u32 idx)
56+
{
57+
return xdp_umem_get_data(umem, idx) + umem->frame_headroom;
58+
}
59+
4260
bool xdp_umem_validate_queues(struct xdp_umem *umem);
4361
int xdp_umem_reg(struct xdp_umem *umem, struct xdp_umem_reg *mr);
4462
void xdp_get_umem(struct xdp_umem *umem);

net/xdp/xsk.c

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,74 @@ static struct xdp_sock *xdp_sk(struct sock *sk)
4141
return (struct xdp_sock *)sk;
4242
}
4343

44+
static int __xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp)
45+
{
46+
u32 *id, len = xdp->data_end - xdp->data;
47+
void *buffer;
48+
int err = 0;
49+
50+
if (xs->dev != xdp->rxq->dev || xs->queue_id != xdp->rxq->queue_index)
51+
return -EINVAL;
52+
53+
id = xskq_peek_id(xs->umem->fq);
54+
if (!id)
55+
return -ENOSPC;
56+
57+
buffer = xdp_umem_get_data_with_headroom(xs->umem, *id);
58+
memcpy(buffer, xdp->data, len);
59+
err = xskq_produce_batch_desc(xs->rx, *id, len,
60+
xs->umem->frame_headroom);
61+
if (!err)
62+
xskq_discard_id(xs->umem->fq);
63+
64+
return err;
65+
}
66+
67+
int xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp)
68+
{
69+
int err;
70+
71+
err = __xsk_rcv(xs, xdp);
72+
if (likely(!err))
73+
xdp_return_buff(xdp);
74+
else
75+
xs->rx_dropped++;
76+
77+
return err;
78+
}
79+
80+
void xsk_flush(struct xdp_sock *xs)
81+
{
82+
xskq_produce_flush_desc(xs->rx);
83+
xs->sk.sk_data_ready(&xs->sk);
84+
}
85+
86+
int xsk_generic_rcv(struct xdp_sock *xs, struct xdp_buff *xdp)
87+
{
88+
int err;
89+
90+
err = __xsk_rcv(xs, xdp);
91+
if (!err)
92+
xsk_flush(xs);
93+
else
94+
xs->rx_dropped++;
95+
96+
return err;
97+
}
98+
99+
static unsigned int xsk_poll(struct file *file, struct socket *sock,
100+
struct poll_table_struct *wait)
101+
{
102+
unsigned int mask = datagram_poll(file, sock, wait);
103+
struct sock *sk = sock->sk;
104+
struct xdp_sock *xs = xdp_sk(sk);
105+
106+
if (xs->rx && !xskq_empty_desc(xs->rx))
107+
mask |= POLLIN | POLLRDNORM;
108+
109+
return mask;
110+
}
111+
44112
static int xsk_init_queue(u32 entries, struct xsk_queue **queue,
45113
bool umem_queue)
46114
{
@@ -179,6 +247,9 @@ static int xsk_bind(struct socket *sock, struct sockaddr *addr, int addr_len)
179247
} else if (!xs->umem || !xdp_umem_validate_queues(xs->umem)) {
180248
err = -EINVAL;
181249
goto out_unlock;
250+
} else {
251+
/* This xsk has its own umem. */
252+
xskq_set_umem(xs->umem->fq, &xs->umem->props);
182253
}
183254

184255
/* Rebind? */
@@ -330,7 +401,7 @@ static const struct proto_ops xsk_proto_ops = {
330401
.socketpair = sock_no_socketpair,
331402
.accept = sock_no_accept,
332403
.getname = sock_no_getname,
333-
.poll = sock_no_poll,
404+
.poll = xsk_poll,
334405
.ioctl = sock_no_ioctl,
335406
.listen = sock_no_listen,
336407
.shutdown = sock_no_shutdown,

net/xdp/xsk_queue.h

Lines changed: 113 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
#include "xdp_umem_props.h"
2222

23+
#define RX_BATCH_SIZE 16
24+
2325
struct xsk_queue {
2426
struct xdp_umem_props umem_props;
2527
u32 ring_mask;
@@ -32,8 +34,118 @@ struct xsk_queue {
3234
u64 invalid_descs;
3335
};
3436

37+
/* Common functions operating for both RXTX and umem queues */
38+
39+
static inline u32 xskq_nb_avail(struct xsk_queue *q, u32 dcnt)
40+
{
41+
u32 entries = q->prod_tail - q->cons_tail;
42+
43+
if (entries == 0) {
44+
/* Refresh the local pointer */
45+
q->prod_tail = READ_ONCE(q->ring->producer);
46+
entries = q->prod_tail - q->cons_tail;
47+
}
48+
49+
return (entries > dcnt) ? dcnt : entries;
50+
}
51+
52+
static inline u32 xskq_nb_free(struct xsk_queue *q, u32 producer, u32 dcnt)
53+
{
54+
u32 free_entries = q->nentries - (producer - q->cons_tail);
55+
56+
if (free_entries >= dcnt)
57+
return free_entries;
58+
59+
/* Refresh the local tail pointer */
60+
q->cons_tail = READ_ONCE(q->ring->consumer);
61+
return q->nentries - (producer - q->cons_tail);
62+
}
63+
64+
/* UMEM queue */
65+
66+
static inline bool xskq_is_valid_id(struct xsk_queue *q, u32 idx)
67+
{
68+
if (unlikely(idx >= q->umem_props.nframes)) {
69+
q->invalid_descs++;
70+
return false;
71+
}
72+
return true;
73+
}
74+
75+
static inline u32 *xskq_validate_id(struct xsk_queue *q)
76+
{
77+
while (q->cons_tail != q->cons_head) {
78+
struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
79+
unsigned int idx = q->cons_tail & q->ring_mask;
80+
81+
if (xskq_is_valid_id(q, ring->desc[idx]))
82+
return &ring->desc[idx];
83+
84+
q->cons_tail++;
85+
}
86+
87+
return NULL;
88+
}
89+
90+
static inline u32 *xskq_peek_id(struct xsk_queue *q)
91+
{
92+
struct xdp_umem_ring *ring;
93+
94+
if (q->cons_tail == q->cons_head) {
95+
WRITE_ONCE(q->ring->consumer, q->cons_tail);
96+
q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE);
97+
98+
/* Order consumer and data */
99+
smp_rmb();
100+
101+
return xskq_validate_id(q);
102+
}
103+
104+
ring = (struct xdp_umem_ring *)q->ring;
105+
return &ring->desc[q->cons_tail & q->ring_mask];
106+
}
107+
108+
static inline void xskq_discard_id(struct xsk_queue *q)
109+
{
110+
q->cons_tail++;
111+
(void)xskq_validate_id(q);
112+
}
113+
114+
/* Rx queue */
115+
116+
static inline int xskq_produce_batch_desc(struct xsk_queue *q,
117+
u32 id, u32 len, u16 offset)
118+
{
119+
struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring;
120+
unsigned int idx;
121+
122+
if (xskq_nb_free(q, q->prod_head, 1) == 0)
123+
return -ENOSPC;
124+
125+
idx = (q->prod_head++) & q->ring_mask;
126+
ring->desc[idx].idx = id;
127+
ring->desc[idx].len = len;
128+
ring->desc[idx].offset = offset;
129+
130+
return 0;
131+
}
132+
133+
static inline void xskq_produce_flush_desc(struct xsk_queue *q)
134+
{
135+
/* Order producer and data */
136+
smp_wmb();
137+
138+
q->prod_tail = q->prod_head,
139+
WRITE_ONCE(q->ring->producer, q->prod_tail);
140+
}
141+
142+
static inline bool xskq_empty_desc(struct xsk_queue *q)
143+
{
144+
return (xskq_nb_free(q, q->prod_tail, 1) == q->nentries);
145+
}
146+
35147
void xskq_set_umem(struct xsk_queue *q, struct xdp_umem_props *umem_props);
36148
struct xsk_queue *xskq_create(u32 nentries, bool umem_queue);
37-
void xskq_destroy(struct xsk_queue *q);
149+
void xskq_destroy(struct xsk_queue *q_ops);
38150

39151
#endif /* _LINUX_XSK_QUEUE_H */

0 commit comments

Comments
 (0)