Skip to content

Commit a275da6

Browse files
committed
rxrpc: Create a per-local endpoint receive queue and I/O thread
Create a per-local receive queue to which, in a future patch, all incoming packets will be directed and an I/O thread that will process those packets and perform all transmission of packets. Destruction of the local endpoint is also moved from the local processor work item (which will be absorbed) to the thread. Signed-off-by: David Howells <[email protected]> cc: Marc Dionne <[email protected]> cc: [email protected]
1 parent 96b2d69 commit a275da6

File tree

4 files changed

+91
-21
lines changed

4 files changed

+91
-21
lines changed

net/rxrpc/ar-internal.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,8 @@ struct rxrpc_net {
110110
atomic_t stat_rx_acks[256];
111111

112112
atomic_t stat_why_req_ack[8];
113+
114+
atomic_t stat_io_loop;
113115
};
114116

115117
/*
@@ -280,12 +282,14 @@ struct rxrpc_local {
280282
struct hlist_node link;
281283
struct socket *socket; /* my UDP socket */
282284
struct work_struct processor;
285+
struct task_struct *io_thread;
283286
struct list_head ack_tx_queue; /* List of ACKs that need sending */
284287
spinlock_t ack_tx_lock; /* ACK list lock */
285288
struct rxrpc_sock __rcu *service; /* Service(s) listening on this endpoint */
286289
struct rw_semaphore defrag_sem; /* control re-enablement of IP DF bit */
287290
struct sk_buff_head reject_queue; /* packets awaiting rejection */
288291
struct sk_buff_head event_queue; /* endpoint event packets awaiting processing */
292+
struct sk_buff_head rx_queue; /* Received packets */
289293
struct rb_root client_bundles; /* Client connection bundles by socket params */
290294
spinlock_t client_bundles_lock; /* Lock for client_bundles */
291295
spinlock_t lock; /* access lock */
@@ -954,6 +958,11 @@ void rxrpc_input_implicit_end_call(struct rxrpc_sock *, struct rxrpc_connection
954958
* io_thread.c
955959
*/
956960
int rxrpc_input_packet(struct sock *, struct sk_buff *);
961+
int rxrpc_io_thread(void *data);
962+
static inline void rxrpc_wake_up_io_thread(struct rxrpc_local *local)
963+
{
964+
wake_up_process(local->io_thread);
965+
}
957966

958967
/*
959968
* insecure.c
@@ -984,6 +993,7 @@ void rxrpc_put_local(struct rxrpc_local *, enum rxrpc_local_trace);
984993
struct rxrpc_local *rxrpc_use_local(struct rxrpc_local *, enum rxrpc_local_trace);
985994
void rxrpc_unuse_local(struct rxrpc_local *, enum rxrpc_local_trace);
986995
void rxrpc_queue_local(struct rxrpc_local *);
996+
void rxrpc_destroy_local(struct rxrpc_local *local);
987997
void rxrpc_destroy_all_locals(struct rxrpc_net *);
988998

989999
static inline bool __rxrpc_unuse_local(struct rxrpc_local *local,

net/rxrpc/io_thread.c

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// SPDX-License-Identifier: GPL-2.0-or-later
22
/* RxRPC packet reception
33
*
4-
* Copyright (C) 2007, 2016 Red Hat, Inc. All Rights Reserved.
4+
* Copyright (C) 2007, 2016, 2022 Red Hat, Inc. All Rights Reserved.
55
* Written by David Howells ([email protected])
66
*/
77

@@ -368,3 +368,52 @@ int rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
368368
_leave(" [badmsg]");
369369
return 0;
370370
}
371+
372+
/*
373+
* I/O and event handling thread.
374+
*/
375+
int rxrpc_io_thread(void *data)
376+
{
377+
struct sk_buff_head rx_queue;
378+
struct rxrpc_local *local = data;
379+
struct sk_buff *skb;
380+
381+
skb_queue_head_init(&rx_queue);
382+
383+
set_user_nice(current, MIN_NICE);
384+
385+
for (;;) {
386+
rxrpc_inc_stat(local->rxnet, stat_io_loop);
387+
388+
/* Process received packets and errors. */
389+
if ((skb = __skb_dequeue(&rx_queue))) {
390+
// TODO: Input packet
391+
rxrpc_free_skb(skb, rxrpc_skb_put_input);
392+
continue;
393+
}
394+
395+
if (!skb_queue_empty(&local->rx_queue)) {
396+
spin_lock_irq(&local->rx_queue.lock);
397+
skb_queue_splice_tail_init(&local->rx_queue, &rx_queue);
398+
spin_unlock_irq(&local->rx_queue.lock);
399+
continue;
400+
}
401+
402+
set_current_state(TASK_INTERRUPTIBLE);
403+
if (!skb_queue_empty(&local->rx_queue)) {
404+
__set_current_state(TASK_RUNNING);
405+
continue;
406+
}
407+
408+
if (kthread_should_stop())
409+
break;
410+
schedule();
411+
}
412+
413+
__set_current_state(TASK_RUNNING);
414+
rxrpc_see_local(local, rxrpc_local_stop);
415+
rxrpc_destroy_local(local);
416+
local->io_thread = NULL;
417+
rxrpc_see_local(local, rxrpc_local_stopped);
418+
return 0;
419+
}

net/rxrpc/local_object.c

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ static struct rxrpc_local *rxrpc_alloc_local(struct rxrpc_net *rxnet,
103103
init_rwsem(&local->defrag_sem);
104104
skb_queue_head_init(&local->reject_queue);
105105
skb_queue_head_init(&local->event_queue);
106+
skb_queue_head_init(&local->rx_queue);
106107
local->client_bundles = RB_ROOT;
107108
spin_lock_init(&local->client_bundles_lock);
108109
spin_lock_init(&local->lock);
@@ -126,6 +127,7 @@ static int rxrpc_open_socket(struct rxrpc_local *local, struct net *net)
126127
struct udp_tunnel_sock_cfg tuncfg = {NULL};
127128
struct sockaddr_rxrpc *srx = &local->srx;
128129
struct udp_port_cfg udp_conf = {0};
130+
struct task_struct *io_thread;
129131
struct sock *usk;
130132
int ret;
131133

@@ -185,8 +187,23 @@ static int rxrpc_open_socket(struct rxrpc_local *local, struct net *net)
185187
BUG();
186188
}
187189

190+
io_thread = kthread_run(rxrpc_io_thread, local,
191+
"krxrpcio/%u", ntohs(udp_conf.local_udp_port));
192+
if (IS_ERR(io_thread)) {
193+
ret = PTR_ERR(io_thread);
194+
goto error_sock;
195+
}
196+
197+
local->io_thread = io_thread;
188198
_leave(" = 0");
189199
return 0;
200+
201+
error_sock:
202+
kernel_sock_shutdown(local->socket, SHUT_RDWR);
203+
local->socket->sk->sk_user_data = NULL;
204+
sock_release(local->socket);
205+
local->socket = NULL;
206+
return ret;
190207
}
191208

192209
/*
@@ -360,19 +377,8 @@ struct rxrpc_local *rxrpc_use_local(struct rxrpc_local *local,
360377
*/
361378
void rxrpc_unuse_local(struct rxrpc_local *local, enum rxrpc_local_trace why)
362379
{
363-
unsigned int debug_id;
364-
int r, u;
365-
366-
if (local) {
367-
debug_id = local->debug_id;
368-
r = refcount_read(&local->ref);
369-
u = atomic_dec_return(&local->active_users);
370-
trace_rxrpc_local(debug_id, why, r, u);
371-
if (u == 0) {
372-
rxrpc_get_local(local, rxrpc_local_get_queue);
373-
rxrpc_queue_local(local);
374-
}
375-
}
380+
if (local && __rxrpc_unuse_local(local, why))
381+
kthread_stop(local->io_thread);
376382
}
377383

378384
/*
@@ -382,7 +388,7 @@ void rxrpc_unuse_local(struct rxrpc_local *local, enum rxrpc_local_trace why)
382388
* Closing the socket cannot be done from bottom half context or RCU callback
383389
* context because it might sleep.
384390
*/
385-
static void rxrpc_local_destroyer(struct rxrpc_local *local)
391+
void rxrpc_destroy_local(struct rxrpc_local *local)
386392
{
387393
struct socket *socket = local->socket;
388394
struct rxrpc_net *rxnet = local->rxnet;
@@ -411,6 +417,7 @@ static void rxrpc_local_destroyer(struct rxrpc_local *local)
411417
*/
412418
rxrpc_purge_queue(&local->reject_queue);
413419
rxrpc_purge_queue(&local->event_queue);
420+
rxrpc_purge_queue(&local->rx_queue);
414421
}
415422

416423
/*
@@ -430,10 +437,8 @@ static void rxrpc_local_processor(struct work_struct *work)
430437

431438
do {
432439
again = false;
433-
if (!__rxrpc_use_local(local, rxrpc_local_use_work)) {
434-
rxrpc_local_destroyer(local);
440+
if (!__rxrpc_use_local(local, rxrpc_local_use_work))
435441
break;
436-
}
437442

438443
if (!list_empty(&local->ack_tx_queue)) {
439444
rxrpc_transmit_ack_packets(local);

net/rxrpc/proc.c

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -342,7 +342,7 @@ static int rxrpc_local_seq_show(struct seq_file *seq, void *v)
342342
if (v == SEQ_START_TOKEN) {
343343
seq_puts(seq,
344344
"Proto Local "
345-
" Use Act\n");
345+
" Use Act RxQ\n");
346346
return 0;
347347
}
348348

@@ -351,10 +351,11 @@ static int rxrpc_local_seq_show(struct seq_file *seq, void *v)
351351
sprintf(lbuff, "%pISpc", &local->srx.transport);
352352

353353
seq_printf(seq,
354-
"UDP %-47.47s %3u %3u\n",
354+
"UDP %-47.47s %3u %3u %3u\n",
355355
lbuff,
356356
refcount_read(&local->ref),
357-
atomic_read(&local->active_users));
357+
atomic_read(&local->active_users),
358+
local->rx_queue.qlen);
358359

359360
return 0;
360361
}
@@ -463,6 +464,9 @@ int rxrpc_stats_show(struct seq_file *seq, void *v)
463464
"Buffers : txb=%u rxb=%u\n",
464465
atomic_read(&rxrpc_nr_txbuf),
465466
atomic_read(&rxrpc_n_rx_skbs));
467+
seq_printf(seq,
468+
"IO-thread: loops=%u\n",
469+
atomic_read(&rxnet->stat_io_loop));
466470
return 0;
467471
}
468472

@@ -492,5 +496,7 @@ int rxrpc_stats_clear(struct file *file, char *buf, size_t size)
492496
memset(&rxnet->stat_rx_acks, 0, sizeof(rxnet->stat_rx_acks));
493497

494498
memset(&rxnet->stat_why_req_ack, 0, sizeof(rxnet->stat_why_req_ack));
499+
500+
atomic_set(&rxnet->stat_io_loop, 0);
495501
return size;
496502
}

0 commit comments

Comments
 (0)