Skip to content

Commit 540b1c4

Browse files
dhowellsdavem330
authored andcommitted
rxrpc: Fix deadlock between call creation and sendmsg/recvmsg
All the routines by which rxrpc is accessed from the outside are serialised by means of the socket lock (sendmsg, recvmsg, bind, rxrpc_kernel_begin_call(), ...) and this presents a problem: (1) If a number of calls on the same socket are in the process of connection to the same peer, a maximum of four concurrent live calls are permitted before further calls need to wait for a slot. (2) If a call is waiting for a slot, it is deep inside sendmsg() or rxrpc_kernel_begin_call() and the entry function is holding the socket lock. (3) sendmsg() and recvmsg() or the in-kernel equivalents are prevented from servicing the other calls as they need to take the socket lock to do so. (4) The socket is stuck until a call is aborted and makes its slot available to the waiter. Fix this by: (1) Provide each call with a mutex ('user_mutex') that arbitrates access by the users of rxrpc separately for each specific call. (2) Make rxrpc_sendmsg() and rxrpc_recvmsg() unlock the socket as soon as they've got a call and taken its mutex. Note that I'm returning EWOULDBLOCK from recvmsg() if MSG_DONTWAIT is set but someone else has the lock. Should I instead only return EWOULDBLOCK if there's nothing currently to be done on a socket, and sleep in this particular instance because there is something to be done, but we appear to be blocked by the interrupt handler doing its ping? (3) Make rxrpc_new_client_call() unlock the socket after allocating a new call, locking its user mutex and adding it to the socket's call tree. The call is returned locked so that sendmsg() can add data to it immediately. From the moment the call is in the socket tree, it is subject to access by sendmsg() and recvmsg() - even if it isn't connected yet. (4) Lock new service calls in the UDP data_ready handler (in rxrpc_new_incoming_call()) because they may already be in the socket's tree and the data_ready handler makes them live immediately if a user ID has already been preassigned. Note that the new call is locked before any notifications are sent that it is live, so doing mutex_trylock() *ought* to always succeed. Userspace is prevented from doing sendmsg() on calls that are in a too-early state in rxrpc_do_sendmsg(). (5) Make rxrpc_new_incoming_call() return the call with the user mutex held so that a ping can be scheduled immediately under it. Note that it might be worth moving the ping call into rxrpc_new_incoming_call() and then we can drop the mutex there. (6) Make rxrpc_accept_call() take the lock on the call it is accepting and release the socket after adding the call to the socket's tree. This is slightly tricky as we've dequeued the call by that point and have to requeue it. Note that requeuing emits a trace event. (7) Make rxrpc_kernel_send_data() and rxrpc_kernel_recv_data() take the new mutex immediately and don't bother with the socket mutex at all. This patch has the nice bonus that calls on the same socket are now to some extent parallelisable. Note that we might want to move rxrpc_service_prealloc() calls out from the socket lock and give it its own lock, so that we don't hang progress in other calls because we're waiting for the allocator. We probably also want to avoid calling rxrpc_notify_socket() from within the socket lock (rxrpc_accept_call()). Signed-off-by: David Howells <[email protected]> Tested-by: Marc Dionne <[email protected]> Signed-off-by: David S. Miller <[email protected]>
1 parent 2d6be4a commit 540b1c4

File tree

8 files changed

+156
-22
lines changed

8 files changed

+156
-22
lines changed

include/trace/events/rxrpc.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ enum rxrpc_recvmsg_trace {
119119
rxrpc_recvmsg_full,
120120
rxrpc_recvmsg_hole,
121121
rxrpc_recvmsg_next,
122+
rxrpc_recvmsg_requeue,
122123
rxrpc_recvmsg_return,
123124
rxrpc_recvmsg_terminal,
124125
rxrpc_recvmsg_to_be_accepted,
@@ -277,6 +278,7 @@ enum rxrpc_congest_change {
277278
EM(rxrpc_recvmsg_full, "FULL") \
278279
EM(rxrpc_recvmsg_hole, "HOLE") \
279280
EM(rxrpc_recvmsg_next, "NEXT") \
281+
EM(rxrpc_recvmsg_requeue, "REQU") \
280282
EM(rxrpc_recvmsg_return, "RETN") \
281283
EM(rxrpc_recvmsg_terminal, "TERM") \
282284
EM(rxrpc_recvmsg_to_be_accepted, "TBAC") \

net/rxrpc/af_rxrpc.c

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -290,10 +290,11 @@ struct rxrpc_call *rxrpc_kernel_begin_call(struct socket *sock,
290290
cp.exclusive = false;
291291
cp.service_id = srx->srx_service;
292292
call = rxrpc_new_client_call(rx, &cp, srx, user_call_ID, gfp);
293+
/* The socket has been unlocked. */
293294
if (!IS_ERR(call))
294295
call->notify_rx = notify_rx;
295296

296-
release_sock(&rx->sk);
297+
mutex_unlock(&call->user_mutex);
297298
_leave(" = %p", call);
298299
return call;
299300
}
@@ -310,7 +311,10 @@ EXPORT_SYMBOL(rxrpc_kernel_begin_call);
310311
void rxrpc_kernel_end_call(struct socket *sock, struct rxrpc_call *call)
311312
{
312313
_enter("%d{%d}", call->debug_id, atomic_read(&call->usage));
314+
315+
mutex_lock(&call->user_mutex);
313316
rxrpc_release_call(rxrpc_sk(sock->sk), call);
317+
mutex_unlock(&call->user_mutex);
314318
rxrpc_put_call(call, rxrpc_call_put_kernel);
315319
}
316320
EXPORT_SYMBOL(rxrpc_kernel_end_call);
@@ -450,14 +454,16 @@ static int rxrpc_sendmsg(struct socket *sock, struct msghdr *m, size_t len)
450454
case RXRPC_SERVER_BOUND:
451455
case RXRPC_SERVER_LISTENING:
452456
ret = rxrpc_do_sendmsg(rx, m, len);
453-
break;
457+
/* The socket has been unlocked */
458+
goto out;
454459
default:
455460
ret = -EINVAL;
456-
break;
461+
goto error_unlock;
457462
}
458463

459464
error_unlock:
460465
release_sock(&rx->sk);
466+
out:
461467
_leave(" = %d", ret);
462468
return ret;
463469
}

net/rxrpc/ar-internal.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,7 @@ struct rxrpc_call {
467467
struct rxrpc_connection *conn; /* connection carrying call */
468468
struct rxrpc_peer *peer; /* Peer record for remote address */
469469
struct rxrpc_sock __rcu *socket; /* socket responsible */
470+
struct mutex user_mutex; /* User access mutex */
470471
ktime_t ack_at; /* When deferred ACK needs to happen */
471472
ktime_t resend_at; /* When next resend needs to happen */
472473
ktime_t ping_at; /* When next to send a ping */

net/rxrpc/call_accept.c

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,8 @@ static struct rxrpc_call *rxrpc_alloc_incoming_call(struct rxrpc_sock *rx,
323323
*
324324
* If we want to report an error, we mark the skb with the packet type and
325325
* abort code and return NULL.
326+
*
327+
* The call is returned with the user access mutex held.
326328
*/
327329
struct rxrpc_call *rxrpc_new_incoming_call(struct rxrpc_local *local,
328330
struct rxrpc_connection *conn,
@@ -371,6 +373,18 @@ struct rxrpc_call *rxrpc_new_incoming_call(struct rxrpc_local *local,
371373
trace_rxrpc_receive(call, rxrpc_receive_incoming,
372374
sp->hdr.serial, sp->hdr.seq);
373375

376+
/* Lock the call to prevent rxrpc_kernel_send/recv_data() and
377+
* sendmsg()/recvmsg() inconveniently stealing the mutex once the
378+
* notification is generated.
379+
*
380+
* The BUG should never happen because the kernel should be well
381+
* behaved enough not to access the call before the first notification
382+
* event and userspace is prevented from doing so until the state is
383+
* appropriate.
384+
*/
385+
if (!mutex_trylock(&call->user_mutex))
386+
BUG();
387+
374388
/* Make the call live. */
375389
rxrpc_incoming_call(rx, call, skb);
376390
conn = call->conn;
@@ -429,10 +443,12 @@ struct rxrpc_call *rxrpc_new_incoming_call(struct rxrpc_local *local,
429443
/*
430444
* handle acceptance of a call by userspace
431445
* - assign the user call ID to the call at the front of the queue
446+
* - called with the socket locked.
432447
*/
433448
struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *rx,
434449
unsigned long user_call_ID,
435450
rxrpc_notify_rx_t notify_rx)
451+
__releases(&rx->sk.sk_lock.slock)
436452
{
437453
struct rxrpc_call *call;
438454
struct rb_node *parent, **pp;
@@ -446,6 +462,7 @@ struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *rx,
446462

447463
if (list_empty(&rx->to_be_accepted)) {
448464
write_unlock(&rx->call_lock);
465+
release_sock(&rx->sk);
449466
kleave(" = -ENODATA [empty]");
450467
return ERR_PTR(-ENODATA);
451468
}
@@ -470,10 +487,39 @@ struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *rx,
470487
*/
471488
call = list_entry(rx->to_be_accepted.next,
472489
struct rxrpc_call, accept_link);
490+
write_unlock(&rx->call_lock);
491+
492+
/* We need to gain the mutex from the interrupt handler without
493+
* upsetting lockdep, so we have to release it there and take it here.
494+
* We are, however, still holding the socket lock, so other accepts
495+
* must wait for us and no one can add the user ID behind our backs.
496+
*/
497+
if (mutex_lock_interruptible(&call->user_mutex) < 0) {
498+
release_sock(&rx->sk);
499+
kleave(" = -ERESTARTSYS");
500+
return ERR_PTR(-ERESTARTSYS);
501+
}
502+
503+
write_lock(&rx->call_lock);
473504
list_del_init(&call->accept_link);
474505
sk_acceptq_removed(&rx->sk);
475506
rxrpc_see_call(call);
476507

508+
/* Find the user ID insertion point. */
509+
pp = &rx->calls.rb_node;
510+
parent = NULL;
511+
while (*pp) {
512+
parent = *pp;
513+
call = rb_entry(parent, struct rxrpc_call, sock_node);
514+
515+
if (user_call_ID < call->user_call_ID)
516+
pp = &(*pp)->rb_left;
517+
else if (user_call_ID > call->user_call_ID)
518+
pp = &(*pp)->rb_right;
519+
else
520+
BUG();
521+
}
522+
477523
write_lock_bh(&call->state_lock);
478524
switch (call->state) {
479525
case RXRPC_CALL_SERVER_ACCEPTING:
@@ -499,6 +545,7 @@ struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *rx,
499545
write_unlock(&rx->call_lock);
500546
rxrpc_notify_socket(call);
501547
rxrpc_service_prealloc(rx, GFP_KERNEL);
548+
release_sock(&rx->sk);
502549
_leave(" = %p{%d}", call, call->debug_id);
503550
return call;
504551

@@ -515,6 +562,7 @@ struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *rx,
515562
write_unlock(&rx->call_lock);
516563
out:
517564
rxrpc_service_prealloc(rx, GFP_KERNEL);
565+
release_sock(&rx->sk);
518566
_leave(" = %d", ret);
519567
return ERR_PTR(ret);
520568
}

net/rxrpc/call_object.c

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ struct rxrpc_call *rxrpc_alloc_call(gfp_t gfp)
115115
if (!call->rxtx_annotations)
116116
goto nomem_2;
117117

118+
mutex_init(&call->user_mutex);
118119
setup_timer(&call->timer, rxrpc_call_timer_expired,
119120
(unsigned long)call);
120121
INIT_WORK(&call->processor, &rxrpc_process_call);
@@ -194,14 +195,16 @@ static void rxrpc_start_call_timer(struct rxrpc_call *call)
194195
}
195196

196197
/*
197-
* set up a call for the given data
198-
* - called in process context with IRQs enabled
198+
* Set up a call for the given parameters.
199+
* - Called with the socket lock held, which it must release.
200+
* - If it returns a call, the call's lock will need releasing by the caller.
199201
*/
200202
struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
201203
struct rxrpc_conn_parameters *cp,
202204
struct sockaddr_rxrpc *srx,
203205
unsigned long user_call_ID,
204206
gfp_t gfp)
207+
__releases(&rx->sk.sk_lock.slock)
205208
{
206209
struct rxrpc_call *call, *xcall;
207210
struct rb_node *parent, **pp;
@@ -212,13 +215,19 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
212215

213216
call = rxrpc_alloc_client_call(srx, gfp);
214217
if (IS_ERR(call)) {
218+
release_sock(&rx->sk);
215219
_leave(" = %ld", PTR_ERR(call));
216220
return call;
217221
}
218222

219223
trace_rxrpc_call(call, rxrpc_call_new_client, atomic_read(&call->usage),
220224
here, (const void *)user_call_ID);
221225

226+
/* We need to protect a partially set up call against the user as we
227+
* will be acting outside the socket lock.
228+
*/
229+
mutex_lock(&call->user_mutex);
230+
222231
/* Publish the call, even though it is incompletely set up as yet */
223232
write_lock(&rx->call_lock);
224233

@@ -250,6 +259,9 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
250259
list_add_tail(&call->link, &rxrpc_calls);
251260
write_unlock(&rxrpc_call_lock);
252261

262+
/* From this point on, the call is protected by its own lock. */
263+
release_sock(&rx->sk);
264+
253265
/* Set up or get a connection record and set the protocol parameters,
254266
* including channel number and call ID.
255267
*/
@@ -279,6 +291,7 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
279291
*/
280292
error_dup_user_ID:
281293
write_unlock(&rx->call_lock);
294+
release_sock(&rx->sk);
282295
ret = -EEXIST;
283296

284297
error:
@@ -287,6 +300,7 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
287300
trace_rxrpc_call(call, rxrpc_call_error, atomic_read(&call->usage),
288301
here, ERR_PTR(ret));
289302
rxrpc_release_call(rx, call);
303+
mutex_unlock(&call->user_mutex);
290304
rxrpc_put_call(call, rxrpc_call_put);
291305
_leave(" = %d", ret);
292306
return ERR_PTR(ret);

net/rxrpc/input.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1194,6 +1194,7 @@ void rxrpc_data_ready(struct sock *udp_sk)
11941194
goto reject_packet;
11951195
}
11961196
rxrpc_send_ping(call, skb, skew);
1197+
mutex_unlock(&call->user_mutex);
11971198
}
11981199

11991200
rxrpc_input_call_packet(call, skb, skew);

net/rxrpc/recvmsg.c

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -487,6 +487,20 @@ int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
487487

488488
trace_rxrpc_recvmsg(call, rxrpc_recvmsg_dequeue, 0, 0, 0, 0);
489489

490+
/* We're going to drop the socket lock, so we need to lock the call
491+
* against interference by sendmsg.
492+
*/
493+
if (!mutex_trylock(&call->user_mutex)) {
494+
ret = -EWOULDBLOCK;
495+
if (flags & MSG_DONTWAIT)
496+
goto error_requeue_call;
497+
ret = -ERESTARTSYS;
498+
if (mutex_lock_interruptible(&call->user_mutex) < 0)
499+
goto error_requeue_call;
500+
}
501+
502+
release_sock(&rx->sk);
503+
490504
if (test_bit(RXRPC_CALL_RELEASED, &call->flags))
491505
BUG();
492506

@@ -502,7 +516,7 @@ int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
502516
&call->user_call_ID);
503517
}
504518
if (ret < 0)
505-
goto error;
519+
goto error_unlock_call;
506520
}
507521

508522
if (msg->msg_name) {
@@ -533,12 +547,12 @@ int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
533547
}
534548

535549
if (ret < 0)
536-
goto error;
550+
goto error_unlock_call;
537551

538552
if (call->state == RXRPC_CALL_COMPLETE) {
539553
ret = rxrpc_recvmsg_term(call, msg);
540554
if (ret < 0)
541-
goto error;
555+
goto error_unlock_call;
542556
if (!(flags & MSG_PEEK))
543557
rxrpc_release_call(rx, call);
544558
msg->msg_flags |= MSG_EOR;
@@ -551,8 +565,21 @@ int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
551565
msg->msg_flags &= ~MSG_MORE;
552566
ret = copied;
553567

554-
error:
568+
error_unlock_call:
569+
mutex_unlock(&call->user_mutex);
555570
rxrpc_put_call(call, rxrpc_call_put);
571+
trace_rxrpc_recvmsg(call, rxrpc_recvmsg_return, 0, 0, 0, ret);
572+
return ret;
573+
574+
error_requeue_call:
575+
if (!(flags & MSG_PEEK)) {
576+
write_lock_bh(&rx->recvmsg_lock);
577+
list_add(&call->recvmsg_link, &rx->recvmsg_q);
578+
write_unlock_bh(&rx->recvmsg_lock);
579+
trace_rxrpc_recvmsg(call, rxrpc_recvmsg_requeue, 0, 0, 0, 0);
580+
} else {
581+
rxrpc_put_call(call, rxrpc_call_put);
582+
}
556583
error_no_call:
557584
release_sock(&rx->sk);
558585
trace_rxrpc_recvmsg(call, rxrpc_recvmsg_return, 0, 0, 0, ret);
@@ -609,7 +636,7 @@ int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call,
609636
iov.iov_len = size - *_offset;
610637
iov_iter_kvec(&iter, ITER_KVEC | READ, &iov, 1, size - *_offset);
611638

612-
lock_sock(sock->sk);
639+
mutex_lock(&call->user_mutex);
613640

614641
switch (call->state) {
615642
case RXRPC_CALL_CLIENT_RECV_REPLY:
@@ -648,7 +675,7 @@ int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call,
648675
read_phase_complete:
649676
ret = 1;
650677
out:
651-
release_sock(sock->sk);
678+
mutex_unlock(&call->user_mutex);
652679
_leave(" = %d [%zu,%d]", ret, *_offset, *_abort);
653680
return ret;
654681

0 commit comments

Comments
 (0)