Skip to content

Commit 06ea2c9

Browse files
dhowellskuba-moo
authored andcommitted
rxrpc: Fix alteration of headers whilst zerocopy pending
rxrpc: Fix alteration of headers whilst zerocopy pending AF_RXRPC now uses MSG_SPLICE_PAGES to do zerocopy of the DATA packets when it transmits them, but to reduce the number of descriptors required in the DMA ring, it allocates a space for the protocol header in the memory immediately before the data content so that it can include both in a single descriptor. This is used for either the main RX header or the smaller jumbo subpacket header as appropriate: +----+------+ | RX | | +-+--+DATA | |JH| | +--+------+ Now, when it stitches a large jumbo packet together from a number of individual DATA packets (each of which is 1412 bytes of data), it uses the full RX header from the first and then the jumbo subpacket header for the rest of the components: +---+--+------+--+------+--+------+--+------+--+------+--+------+ |UDP|RX|DATA |JH|DATA |JH|DATA |JH|DATA |JH|DATA |JH|DATA | +---+--+------+--+------+--+------+--+------+--+------+--+------+ As mentioned, the main RX header and the jumbo header overlay one another in memory and the formats don't match, so switching from one to the other means rearranging the fields and adjusting the flags. However, now that TLP has been included, it wants to retransmit the last subpacket as a new data packet on its own, which means switching between the header formats... and if the transmission is still pending, because of the MSG_SPLICE_PAGES, we end up corrupting the jumbo subheader. This has a variety of effects, with the RX service number overwriting the jumbo checksum/key number field and the RX checksum overwriting the jumbo flags - resulting in, at the very least, a confused connection-level abort from the peer. Fix this by leaving the jumbo header in the allocation with the data, but allocating the RX header from the page frag allocator and concocting it on the fly at the point of transmission as it does for ACK packets. Fixes: 7c48266 ("rxrpc: Implement RACK/TLP to deal with transmission stalls [RFC8985]") Signed-off-by: David Howells <[email protected]> cc: Marc Dionne <[email protected]> cc: Chuck Lever <[email protected]> cc: Simon Horman <[email protected]> cc: [email protected] Link: https://patch.msgid.link/[email protected] Signed-off-by: Jakub Kicinski <[email protected]>
1 parent 1942b1c commit 06ea2c9

File tree

5 files changed

+54
-57
lines changed

5 files changed

+54
-57
lines changed

net/rxrpc/ar-internal.h

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -327,8 +327,8 @@ struct rxrpc_local {
327327
* packet with a maximum set of jumbo subpackets or a PING ACK padded
328328
* out to 64K with zeropages for PMTUD.
329329
*/
330-
struct kvec kvec[RXRPC_MAX_NR_JUMBO > 3 + 16 ?
331-
RXRPC_MAX_NR_JUMBO : 3 + 16];
330+
struct kvec kvec[1 + RXRPC_MAX_NR_JUMBO > 3 + 16 ?
331+
1 + RXRPC_MAX_NR_JUMBO : 3 + 16];
332332
};
333333

334334
/*
@@ -874,8 +874,7 @@ struct rxrpc_txbuf {
874874
#define RXRPC_TXBUF_RESENT 0x100 /* Set if has been resent */
875875
__be16 cksum; /* Checksum to go in header */
876876
bool jumboable; /* Can be non-terminal jumbo subpacket */
877-
u8 nr_kvec; /* Amount of kvec[] used */
878-
struct kvec kvec[1];
877+
void *data; /* Data with preceding jumbo header */
879878
};
880879

881880
static inline bool rxrpc_sending_to_server(const struct rxrpc_txbuf *txb)

net/rxrpc/output.c

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -428,13 +428,13 @@ int rxrpc_send_abort_packet(struct rxrpc_call *call)
428428
static size_t rxrpc_prepare_data_subpacket(struct rxrpc_call *call,
429429
struct rxrpc_send_data_req *req,
430430
struct rxrpc_txbuf *txb,
431+
struct rxrpc_wire_header *whdr,
431432
rxrpc_serial_t serial, int subpkt)
432433
{
433-
struct rxrpc_wire_header *whdr = txb->kvec[0].iov_base;
434-
struct rxrpc_jumbo_header *jumbo = (void *)(whdr + 1) - sizeof(*jumbo);
434+
struct rxrpc_jumbo_header *jumbo = txb->data - sizeof(*jumbo);
435435
enum rxrpc_req_ack_trace why;
436436
struct rxrpc_connection *conn = call->conn;
437-
struct kvec *kv = &call->local->kvec[subpkt];
437+
struct kvec *kv = &call->local->kvec[1 + subpkt];
438438
size_t len = txb->pkt_len;
439439
bool last;
440440
u8 flags;
@@ -491,18 +491,15 @@ static size_t rxrpc_prepare_data_subpacket(struct rxrpc_call *call,
491491
}
492492
dont_set_request_ack:
493493

494-
/* The jumbo header overlays the wire header in the txbuf. */
494+
/* There's a jumbo header prepended to the data if we need it. */
495495
if (subpkt < req->n - 1)
496496
flags |= RXRPC_JUMBO_PACKET;
497497
else
498498
flags &= ~RXRPC_JUMBO_PACKET;
499499
if (subpkt == 0) {
500500
whdr->flags = flags;
501-
whdr->serial = htonl(txb->serial);
502501
whdr->cksum = txb->cksum;
503-
whdr->serviceId = htons(conn->service_id);
504-
kv->iov_base = whdr;
505-
len += sizeof(*whdr);
502+
kv->iov_base = txb->data;
506503
} else {
507504
jumbo->flags = flags;
508505
jumbo->pad = 0;
@@ -535,7 +532,9 @@ static unsigned int rxrpc_prepare_txqueue(struct rxrpc_txqueue *tq,
535532
/*
536533
* Prepare a (jumbo) packet for transmission.
537534
*/
538-
static size_t rxrpc_prepare_data_packet(struct rxrpc_call *call, struct rxrpc_send_data_req *req)
535+
static size_t rxrpc_prepare_data_packet(struct rxrpc_call *call,
536+
struct rxrpc_send_data_req *req,
537+
struct rxrpc_wire_header *whdr)
539538
{
540539
struct rxrpc_txqueue *tq = req->tq;
541540
rxrpc_serial_t serial;
@@ -549,6 +548,18 @@ static size_t rxrpc_prepare_data_packet(struct rxrpc_call *call, struct rxrpc_se
549548
/* Each transmission of a Tx packet needs a new serial number */
550549
serial = rxrpc_get_next_serials(call->conn, req->n);
551550

551+
whdr->epoch = htonl(call->conn->proto.epoch);
552+
whdr->cid = htonl(call->cid);
553+
whdr->callNumber = htonl(call->call_id);
554+
whdr->seq = htonl(seq);
555+
whdr->serial = htonl(serial);
556+
whdr->type = RXRPC_PACKET_TYPE_DATA;
557+
whdr->flags = 0;
558+
whdr->userStatus = 0;
559+
whdr->securityIndex = call->security_ix;
560+
whdr->_rsvd = 0;
561+
whdr->serviceId = htons(call->conn->service_id);
562+
552563
call->tx_last_serial = serial + req->n - 1;
553564
call->tx_last_sent = req->now;
554565
xmit_ts = rxrpc_prepare_txqueue(tq, req);
@@ -576,7 +587,7 @@ static size_t rxrpc_prepare_data_packet(struct rxrpc_call *call, struct rxrpc_se
576587
if (i + 1 == req->n)
577588
/* Only sample the last subpacket in a jumbo. */
578589
__set_bit(ix, &tq->rtt_samples);
579-
len += rxrpc_prepare_data_subpacket(call, req, txb, serial, i);
590+
len += rxrpc_prepare_data_subpacket(call, req, txb, whdr, serial, i);
580591
serial++;
581592
seq++;
582593
i++;
@@ -618,6 +629,7 @@ static size_t rxrpc_prepare_data_packet(struct rxrpc_call *call, struct rxrpc_se
618629
}
619630

620631
rxrpc_set_keepalive(call, req->now);
632+
page_frag_free(whdr);
621633
return len;
622634
}
623635

@@ -626,25 +638,33 @@ static size_t rxrpc_prepare_data_packet(struct rxrpc_call *call, struct rxrpc_se
626638
*/
627639
void rxrpc_send_data_packet(struct rxrpc_call *call, struct rxrpc_send_data_req *req)
628640
{
641+
struct rxrpc_wire_header *whdr;
629642
struct rxrpc_connection *conn = call->conn;
630643
enum rxrpc_tx_point frag;
631644
struct rxrpc_txqueue *tq = req->tq;
632645
struct rxrpc_txbuf *txb;
633646
struct msghdr msg;
634647
rxrpc_seq_t seq = req->seq;
635-
size_t len;
648+
size_t len = sizeof(*whdr);
636649
bool new_call = test_bit(RXRPC_CALL_BEGAN_RX_TIMER, &call->flags);
637650
int ret, stat_ix;
638651

639652
_enter("%x,%x-%x", tq->qbase, seq, seq + req->n - 1);
640653

654+
whdr = page_frag_alloc(&call->local->tx_alloc, sizeof(*whdr), GFP_NOFS);
655+
if (!whdr)
656+
return; /* Drop the packet if no memory. */
657+
658+
call->local->kvec[0].iov_base = whdr;
659+
call->local->kvec[0].iov_len = sizeof(*whdr);
660+
641661
stat_ix = umin(req->n, ARRAY_SIZE(call->rxnet->stat_tx_jumbo)) - 1;
642662
atomic_inc(&call->rxnet->stat_tx_jumbo[stat_ix]);
643663

644-
len = rxrpc_prepare_data_packet(call, req);
664+
len += rxrpc_prepare_data_packet(call, req, whdr);
645665
txb = tq->bufs[seq & RXRPC_TXQ_MASK];
646666

647-
iov_iter_kvec(&msg.msg_iter, WRITE, call->local->kvec, req->n, len);
667+
iov_iter_kvec(&msg.msg_iter, WRITE, call->local->kvec, 1 + req->n, len);
648668

649669
msg.msg_name = &call->peer->srx.transport;
650670
msg.msg_namelen = call->peer->srx.transport_len;
@@ -695,13 +715,13 @@ void rxrpc_send_data_packet(struct rxrpc_call *call, struct rxrpc_send_data_req
695715

696716
if (ret == -EMSGSIZE) {
697717
rxrpc_inc_stat(call->rxnet, stat_tx_data_send_msgsize);
698-
trace_rxrpc_tx_packet(call->debug_id, call->local->kvec[0].iov_base, frag);
718+
trace_rxrpc_tx_packet(call->debug_id, whdr, frag);
699719
ret = 0;
700720
} else if (ret < 0) {
701721
rxrpc_inc_stat(call->rxnet, stat_tx_data_send_fail);
702722
trace_rxrpc_tx_fail(call->debug_id, txb->serial, ret, frag);
703723
} else {
704-
trace_rxrpc_tx_packet(call->debug_id, call->local->kvec[0].iov_base, frag);
724+
trace_rxrpc_tx_packet(call->debug_id, whdr, frag);
705725
}
706726

707727
rxrpc_tx_backoff(call, ret);

net/rxrpc/rxkad.c

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -257,8 +257,7 @@ static int rxkad_secure_packet_auth(const struct rxrpc_call *call,
257257
struct rxrpc_txbuf *txb,
258258
struct skcipher_request *req)
259259
{
260-
struct rxrpc_wire_header *whdr = txb->kvec[0].iov_base;
261-
struct rxkad_level1_hdr *hdr = (void *)(whdr + 1);
260+
struct rxkad_level1_hdr *hdr = txb->data;
262261
struct rxrpc_crypt iv;
263262
struct scatterlist sg;
264263
size_t pad;
@@ -274,7 +273,7 @@ static int rxkad_secure_packet_auth(const struct rxrpc_call *call,
274273
pad = RXKAD_ALIGN - pad;
275274
pad &= RXKAD_ALIGN - 1;
276275
if (pad) {
277-
memset(txb->kvec[0].iov_base + txb->offset, 0, pad);
276+
memset(txb->data + txb->offset, 0, pad);
278277
txb->pkt_len += pad;
279278
}
280279

@@ -300,8 +299,7 @@ static int rxkad_secure_packet_encrypt(const struct rxrpc_call *call,
300299
struct skcipher_request *req)
301300
{
302301
const struct rxrpc_key_token *token;
303-
struct rxrpc_wire_header *whdr = txb->kvec[0].iov_base;
304-
struct rxkad_level2_hdr *rxkhdr = (void *)(whdr + 1);
302+
struct rxkad_level2_hdr *rxkhdr = txb->data;
305303
struct rxrpc_crypt iv;
306304
struct scatterlist sg;
307305
size_t content, pad;
@@ -319,7 +317,7 @@ static int rxkad_secure_packet_encrypt(const struct rxrpc_call *call,
319317
txb->pkt_len = round_up(content, RXKAD_ALIGN);
320318
pad = txb->pkt_len - content;
321319
if (pad)
322-
memset(txb->kvec[0].iov_base + txb->offset, 0, pad);
320+
memset(txb->data + txb->offset, 0, pad);
323321

324322
/* encrypt from the session key */
325323
token = call->conn->key->payload.data[0];
@@ -407,9 +405,8 @@ static int rxkad_secure_packet(struct rxrpc_call *call, struct rxrpc_txbuf *txb)
407405

408406
/* Clear excess space in the packet */
409407
if (txb->pkt_len < txb->alloc_size) {
410-
struct rxrpc_wire_header *whdr = txb->kvec[0].iov_base;
411408
size_t gap = txb->alloc_size - txb->pkt_len;
412-
void *p = whdr + 1;
409+
void *p = txb->data;
413410

414411
memset(p + txb->pkt_len, 0, gap);
415412
}

net/rxrpc/sendmsg.c

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -419,7 +419,7 @@ static int rxrpc_send_data(struct rxrpc_sock *rx,
419419
size_t copy = umin(txb->space, msg_data_left(msg));
420420

421421
_debug("add %zu", copy);
422-
if (!copy_from_iter_full(txb->kvec[0].iov_base + txb->offset,
422+
if (!copy_from_iter_full(txb->data + txb->offset,
423423
copy, &msg->msg_iter))
424424
goto efault;
425425
_debug("added");
@@ -445,8 +445,6 @@ static int rxrpc_send_data(struct rxrpc_sock *rx,
445445
ret = call->security->secure_packet(call, txb);
446446
if (ret < 0)
447447
goto out;
448-
449-
txb->kvec[0].iov_len += txb->len;
450448
rxrpc_queue_packet(rx, call, txb, notify_end_tx);
451449
txb = NULL;
452450
}

net/rxrpc/txbuf.c

Lines changed: 10 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,19 @@ atomic_t rxrpc_nr_txbuf;
1919
struct rxrpc_txbuf *rxrpc_alloc_data_txbuf(struct rxrpc_call *call, size_t data_size,
2020
size_t data_align, gfp_t gfp)
2121
{
22-
struct rxrpc_wire_header *whdr;
2322
struct rxrpc_txbuf *txb;
24-
size_t total, hoff;
23+
size_t total, doff, jsize = sizeof(struct rxrpc_jumbo_header);
2524
void *buf;
2625

2726
txb = kzalloc(sizeof(*txb), gfp);
2827
if (!txb)
2928
return NULL;
3029

31-
hoff = round_up(sizeof(*whdr), data_align) - sizeof(*whdr);
32-
total = hoff + sizeof(*whdr) + data_size;
30+
/* We put a jumbo header in the buffer, but not a full wire header to
31+
* avoid delayed-corruption problems with zerocopy.
32+
*/
33+
doff = round_up(jsize, data_align);
34+
total = doff + data_size;
3335

3436
data_align = umax(data_align, L1_CACHE_BYTES);
3537
mutex_lock(&call->conn->tx_data_alloc_lock);
@@ -41,30 +43,15 @@ struct rxrpc_txbuf *rxrpc_alloc_data_txbuf(struct rxrpc_call *call, size_t data_
4143
return NULL;
4244
}
4345

44-
whdr = buf + hoff;
45-
4646
refcount_set(&txb->ref, 1);
4747
txb->call_debug_id = call->debug_id;
4848
txb->debug_id = atomic_inc_return(&rxrpc_txbuf_debug_ids);
4949
txb->alloc_size = data_size;
5050
txb->space = data_size;
51-
txb->offset = sizeof(*whdr);
51+
txb->offset = 0;
5252
txb->flags = call->conn->out_clientflag;
5353
txb->seq = call->send_top + 1;
54-
txb->nr_kvec = 1;
55-
txb->kvec[0].iov_base = whdr;
56-
txb->kvec[0].iov_len = sizeof(*whdr);
57-
58-
whdr->epoch = htonl(call->conn->proto.epoch);
59-
whdr->cid = htonl(call->cid);
60-
whdr->callNumber = htonl(call->call_id);
61-
whdr->seq = htonl(txb->seq);
62-
whdr->type = RXRPC_PACKET_TYPE_DATA;
63-
whdr->flags = 0;
64-
whdr->userStatus = 0;
65-
whdr->securityIndex = call->security_ix;
66-
whdr->_rsvd = 0;
67-
whdr->serviceId = htons(call->dest_srx.srx_service);
54+
txb->data = buf + doff;
6855

6956
trace_rxrpc_txbuf(txb->debug_id, txb->call_debug_id, txb->seq, 1,
7057
rxrpc_txbuf_alloc_data);
@@ -90,14 +77,10 @@ void rxrpc_see_txbuf(struct rxrpc_txbuf *txb, enum rxrpc_txbuf_trace what)
9077

9178
static void rxrpc_free_txbuf(struct rxrpc_txbuf *txb)
9279
{
93-
int i;
94-
9580
trace_rxrpc_txbuf(txb->debug_id, txb->call_debug_id, txb->seq, 0,
9681
rxrpc_txbuf_free);
97-
for (i = 0; i < txb->nr_kvec; i++)
98-
if (txb->kvec[i].iov_base &&
99-
!is_zero_pfn(page_to_pfn(virt_to_page(txb->kvec[i].iov_base))))
100-
page_frag_free(txb->kvec[i].iov_base);
82+
if (txb->data)
83+
page_frag_free(txb->data);
10184
kfree(txb);
10285
atomic_dec(&rxrpc_nr_txbuf);
10386
}

0 commit comments

Comments
 (0)