Skip to content

Commit 94f58c5

Browse files
chuckleveramschuma-ntap
authored andcommitted
xprtrdma: Allow Read list and Reply chunk simultaneously
rpcrdma_marshal_req() makes a simplifying assumption: that NFS operations with large Call messages have small Reply messages, and vice versa. Therefore with RPC-over-RDMA, only one chunk type is ever needed for each Call/Reply pair, because one direction needs chunks, the other direction will always fit inline. In fact, this assumption is asserted in the code: if (rtype != rpcrdma_noch && wtype != rpcrdma_noch) { dprintk("RPC: %s: cannot marshal multiple chunk lists\n", __func__); return -EIO; } But RPCGSS_SEC breaks this assumption. Because krb5i and krb5p perform data transformation on RPC messages before they are transmitted, direct data placement techniques cannot be used, thus RPC messages must be sent via a Long call in both directions. All such calls are sent with a Position Zero Read chunk, and all such replies are handled with a Reply chunk. Thus the client must provide every Call/Reply pair with both a Read list and a Reply chunk. Without any special security in effect, NFSv4 WRITEs may now also use the Read list and provide a Reply chunk. The marshal_req logic was preventing that, meaning an NFSv4 WRITE with a large payload that included a GETATTR result larger than the inline threshold would fail. The code that encodes each chunk list is now completely contained in its own function. There is some code duplication, but the trade-off is that the overall logic should be more clear. Note that all three chunk lists now share the rl_segments array. Some additional per-req accounting is necessary to track this usage. For the same reasons that the above simplifying assumption has held true for so long, I don't expect more array elements are needed at this time. Signed-off-by: Chuck Lever <[email protected]> Tested-by: Steve Wise <[email protected]> Reviewed-by: Sagi Grimberg <[email protected]> Signed-off-by: Anna Schumaker <[email protected]>
1 parent 88b18a1 commit 94f58c5

File tree

2 files changed

+272
-60
lines changed

2 files changed

+272
-60
lines changed

net/sunrpc/xprtrdma/rpc_rdma.c

Lines changed: 268 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -62,17 +62,17 @@ enum rpcrdma_chunktype {
6262
};
6363

6464
static const char transfertypes[][12] = {
65-
"pure inline", /* no chunks */
66-
" read chunk", /* some argument via rdma read */
67-
"*read chunk", /* entire request via rdma read */
68-
"write chunk", /* some result via rdma write */
65+
"inline", /* no chunks */
66+
"read list", /* some argument via rdma read */
67+
"*read list", /* entire request via rdma read */
68+
"write list", /* some result via rdma write */
6969
"reply chunk" /* entire reply via rdma write */
7070
};
7171

7272
/* Returns size of largest RPC-over-RDMA header in a Call message
7373
*
74-
* The client marshals only one chunk list per Call message.
75-
* The largest list is the Read list.
74+
* The largest Call header contains a full-size Read list and a
75+
* minimal Reply chunk.
7676
*/
7777
static unsigned int rpcrdma_max_call_header_size(unsigned int maxsegs)
7878
{
@@ -85,6 +85,11 @@ static unsigned int rpcrdma_max_call_header_size(unsigned int maxsegs)
8585
maxsegs += 2; /* segment for head and tail buffers */
8686
size = maxsegs * sizeof(struct rpcrdma_read_chunk);
8787

88+
/* Minimal Read chunk size */
89+
size += sizeof(__be32); /* segment count */
90+
size += sizeof(struct rpcrdma_segment);
91+
size += sizeof(__be32); /* list discriminator */
92+
8893
dprintk("RPC: %s: max call header size = %u\n",
8994
__func__, size);
9095
return size;
@@ -431,6 +436,209 @@ rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,
431436
return n;
432437
}
433438

439+
static inline __be32 *
440+
xdr_encode_rdma_segment(__be32 *iptr, struct rpcrdma_mr_seg *seg)
441+
{
442+
*iptr++ = cpu_to_be32(seg->mr_rkey);
443+
*iptr++ = cpu_to_be32(seg->mr_len);
444+
return xdr_encode_hyper(iptr, seg->mr_base);
445+
}
446+
447+
/* XDR-encode the Read list. Supports encoding a list of read
448+
* segments that belong to a single read chunk.
449+
*
450+
* Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
451+
*
452+
* Read chunklist (a linked list):
453+
* N elements, position P (same P for all chunks of same arg!):
454+
* 1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0
455+
*
456+
* Returns a pointer to the XDR word in the RDMA header following
457+
* the end of the Read list, or an error pointer.
458+
*/
459+
static __be32 *
460+
rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
461+
struct rpcrdma_req *req, struct rpc_rqst *rqst,
462+
__be32 *iptr, enum rpcrdma_chunktype rtype)
463+
{
464+
struct rpcrdma_mr_seg *seg = req->rl_nextseg;
465+
unsigned int pos;
466+
int n, nsegs;
467+
468+
if (rtype == rpcrdma_noch) {
469+
*iptr++ = xdr_zero; /* item not present */
470+
return iptr;
471+
}
472+
473+
pos = rqst->rq_snd_buf.head[0].iov_len;
474+
if (rtype == rpcrdma_areadch)
475+
pos = 0;
476+
nsegs = rpcrdma_convert_iovs(&rqst->rq_snd_buf, pos, rtype, seg,
477+
RPCRDMA_MAX_SEGS - req->rl_nchunks);
478+
if (nsegs < 0)
479+
return ERR_PTR(nsegs);
480+
481+
do {
482+
n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, false);
483+
if (n <= 0)
484+
return ERR_PTR(n);
485+
486+
*iptr++ = xdr_one; /* item present */
487+
488+
/* All read segments in this chunk
489+
* have the same "position".
490+
*/
491+
*iptr++ = cpu_to_be32(pos);
492+
iptr = xdr_encode_rdma_segment(iptr, seg);
493+
494+
dprintk("RPC: %5u %s: read segment pos %u "
495+
"%d@0x%016llx:0x%08x (%s)\n",
496+
rqst->rq_task->tk_pid, __func__, pos,
497+
seg->mr_len, (unsigned long long)seg->mr_base,
498+
seg->mr_rkey, n < nsegs ? "more" : "last");
499+
500+
r_xprt->rx_stats.read_chunk_count++;
501+
req->rl_nchunks++;
502+
seg += n;
503+
nsegs -= n;
504+
} while (nsegs);
505+
req->rl_nextseg = seg;
506+
507+
/* Finish Read list */
508+
*iptr++ = xdr_zero; /* Next item not present */
509+
return iptr;
510+
}
511+
512+
/* XDR-encode the Write list. Supports encoding a list containing
513+
* one array of plain segments that belong to a single write chunk.
514+
*
515+
* Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
516+
*
517+
* Write chunklist (a list of (one) counted array):
518+
* N elements:
519+
* 1 - N - HLOO - HLOO - ... - HLOO - 0
520+
*
521+
* Returns a pointer to the XDR word in the RDMA header following
522+
* the end of the Write list, or an error pointer.
523+
*/
524+
static __be32 *
525+
rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
526+
struct rpc_rqst *rqst, __be32 *iptr,
527+
enum rpcrdma_chunktype wtype)
528+
{
529+
struct rpcrdma_mr_seg *seg = req->rl_nextseg;
530+
int n, nsegs, nchunks;
531+
__be32 *segcount;
532+
533+
if (wtype != rpcrdma_writech) {
534+
*iptr++ = xdr_zero; /* no Write list present */
535+
return iptr;
536+
}
537+
538+
nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf,
539+
rqst->rq_rcv_buf.head[0].iov_len,
540+
wtype, seg,
541+
RPCRDMA_MAX_SEGS - req->rl_nchunks);
542+
if (nsegs < 0)
543+
return ERR_PTR(nsegs);
544+
545+
*iptr++ = xdr_one; /* Write list present */
546+
segcount = iptr++; /* save location of segment count */
547+
548+
nchunks = 0;
549+
do {
550+
n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, true);
551+
if (n <= 0)
552+
return ERR_PTR(n);
553+
554+
iptr = xdr_encode_rdma_segment(iptr, seg);
555+
556+
dprintk("RPC: %5u %s: write segment "
557+
"%d@0x016%llx:0x%08x (%s)\n",
558+
rqst->rq_task->tk_pid, __func__,
559+
seg->mr_len, (unsigned long long)seg->mr_base,
560+
seg->mr_rkey, n < nsegs ? "more" : "last");
561+
562+
r_xprt->rx_stats.write_chunk_count++;
563+
r_xprt->rx_stats.total_rdma_request += seg->mr_len;
564+
req->rl_nchunks++;
565+
nchunks++;
566+
seg += n;
567+
nsegs -= n;
568+
} while (nsegs);
569+
req->rl_nextseg = seg;
570+
571+
/* Update count of segments in this Write chunk */
572+
*segcount = cpu_to_be32(nchunks);
573+
574+
/* Finish Write list */
575+
*iptr++ = xdr_zero; /* Next item not present */
576+
return iptr;
577+
}
578+
579+
/* XDR-encode the Reply chunk. Supports encoding an array of plain
580+
* segments that belong to a single write (reply) chunk.
581+
*
582+
* Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
583+
*
584+
* Reply chunk (a counted array):
585+
* N elements:
586+
* 1 - N - HLOO - HLOO - ... - HLOO
587+
*
588+
* Returns a pointer to the XDR word in the RDMA header following
589+
* the end of the Reply chunk, or an error pointer.
590+
*/
591+
static __be32 *
592+
rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt,
593+
struct rpcrdma_req *req, struct rpc_rqst *rqst,
594+
__be32 *iptr, enum rpcrdma_chunktype wtype)
595+
{
596+
struct rpcrdma_mr_seg *seg = req->rl_nextseg;
597+
int n, nsegs, nchunks;
598+
__be32 *segcount;
599+
600+
if (wtype != rpcrdma_replych) {
601+
*iptr++ = xdr_zero; /* no Reply chunk present */
602+
return iptr;
603+
}
604+
605+
nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf, 0, wtype, seg,
606+
RPCRDMA_MAX_SEGS - req->rl_nchunks);
607+
if (nsegs < 0)
608+
return ERR_PTR(nsegs);
609+
610+
*iptr++ = xdr_one; /* Reply chunk present */
611+
segcount = iptr++; /* save location of segment count */
612+
613+
nchunks = 0;
614+
do {
615+
n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, true);
616+
if (n <= 0)
617+
return ERR_PTR(n);
618+
619+
iptr = xdr_encode_rdma_segment(iptr, seg);
620+
621+
dprintk("RPC: %5u %s: reply segment "
622+
"%d@0x%016llx:0x%08x (%s)\n",
623+
rqst->rq_task->tk_pid, __func__,
624+
seg->mr_len, (unsigned long long)seg->mr_base,
625+
seg->mr_rkey, n < nsegs ? "more" : "last");
626+
627+
r_xprt->rx_stats.reply_chunk_count++;
628+
r_xprt->rx_stats.total_rdma_request += seg->mr_len;
629+
req->rl_nchunks++;
630+
nchunks++;
631+
seg += n;
632+
nsegs -= n;
633+
} while (nsegs);
634+
req->rl_nextseg = seg;
635+
636+
/* Update count of segments in the Reply chunk */
637+
*segcount = cpu_to_be32(nchunks);
638+
639+
return iptr;
640+
}
641+
434642
/*
435643
* Copy write data inline.
436644
* This function is used for "small" requests. Data which is passed
@@ -508,24 +716,18 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
508716
struct rpc_xprt *xprt = rqst->rq_xprt;
509717
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
510718
struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
511-
char *base;
512-
size_t rpclen;
513-
ssize_t hdrlen;
514719
enum rpcrdma_chunktype rtype, wtype;
515720
struct rpcrdma_msg *headerp;
721+
unsigned int pos;
722+
ssize_t hdrlen;
723+
size_t rpclen;
724+
__be32 *iptr;
516725

517726
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
518727
if (test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state))
519728
return rpcrdma_bc_marshal_reply(rqst);
520729
#endif
521730

522-
/*
523-
* rpclen gets amount of data in first buffer, which is the
524-
* pre-registered buffer.
525-
*/
526-
base = rqst->rq_svec[0].iov_base;
527-
rpclen = rqst->rq_svec[0].iov_len;
528-
529731
headerp = rdmab_to_msg(req->rl_rdmabuf);
530732
/* don't byte-swap XID, it's already done in request */
531733
headerp->rm_xid = rqst->rq_xid;
@@ -565,61 +767,62 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
565767
*/
566768
if (rpcrdma_args_inline(r_xprt, rqst)) {
567769
rtype = rpcrdma_noch;
770+
rpcrdma_inline_pullup(rqst);
771+
rpclen = rqst->rq_svec[0].iov_len;
568772
} else if (rqst->rq_snd_buf.flags & XDRBUF_WRITE) {
569773
rtype = rpcrdma_readch;
774+
rpclen = rqst->rq_svec[0].iov_len;
775+
rpclen += rpcrdma_tail_pullup(&rqst->rq_snd_buf);
570776
} else {
571777
r_xprt->rx_stats.nomsg_call_count++;
572778
headerp->rm_type = htonl(RDMA_NOMSG);
573779
rtype = rpcrdma_areadch;
574780
rpclen = 0;
575781
}
576782

577-
/* The following simplification is not true forever */
578-
if (rtype != rpcrdma_noch && wtype == rpcrdma_replych)
579-
wtype = rpcrdma_noch;
580-
if (rtype != rpcrdma_noch && wtype != rpcrdma_noch) {
581-
dprintk("RPC: %s: cannot marshal multiple chunk lists\n",
582-
__func__);
583-
return -EIO;
584-
}
585-
586-
hdrlen = RPCRDMA_HDRLEN_MIN;
587-
588-
/*
589-
* Pull up any extra send data into the preregistered buffer.
590-
* When padding is in use and applies to the transfer, insert
591-
* it and change the message type.
783+
/* This implementation supports the following combinations
784+
* of chunk lists in one RPC-over-RDMA Call message:
785+
*
786+
* - Read list
787+
* - Write list
788+
* - Reply chunk
789+
* - Read list + Reply chunk
790+
*
791+
* It might not yet support the following combinations:
792+
*
793+
* - Read list + Write list
794+
*
795+
* It does not support the following combinations:
796+
*
797+
* - Write list + Reply chunk
798+
* - Read list + Write list + Reply chunk
799+
*
800+
* This implementation supports only a single chunk in each
801+
* Read or Write list. Thus for example the client cannot
802+
* send a Call message with a Position Zero Read chunk and a
803+
* regular Read chunk at the same time.
592804
*/
593-
if (rtype == rpcrdma_noch) {
594-
595-
rpcrdma_inline_pullup(rqst);
596-
597-
headerp->rm_body.rm_nochunks.rm_empty[0] = xdr_zero;
598-
headerp->rm_body.rm_nochunks.rm_empty[1] = xdr_zero;
599-
headerp->rm_body.rm_nochunks.rm_empty[2] = xdr_zero;
600-
/* new length after pullup */
601-
rpclen = rqst->rq_svec[0].iov_len;
602-
} else if (rtype == rpcrdma_readch)
603-
rpclen += rpcrdma_tail_pullup(&rqst->rq_snd_buf);
604-
if (rtype != rpcrdma_noch) {
605-
hdrlen = rpcrdma_create_chunks(rqst, &rqst->rq_snd_buf,
606-
headerp, rtype);
607-
wtype = rtype; /* simplify dprintk */
608-
609-
} else if (wtype != rpcrdma_noch) {
610-
hdrlen = rpcrdma_create_chunks(rqst, &rqst->rq_rcv_buf,
611-
headerp, wtype);
612-
}
613-
if (hdrlen < 0)
614-
return hdrlen;
805+
req->rl_nchunks = 0;
806+
req->rl_nextseg = req->rl_segments;
807+
iptr = headerp->rm_body.rm_chunks;
808+
iptr = rpcrdma_encode_read_list(r_xprt, req, rqst, iptr, rtype);
809+
if (IS_ERR(iptr))
810+
goto out_unmap;
811+
iptr = rpcrdma_encode_write_list(r_xprt, req, rqst, iptr, wtype);
812+
if (IS_ERR(iptr))
813+
goto out_unmap;
814+
iptr = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, iptr, wtype);
815+
if (IS_ERR(iptr))
816+
goto out_unmap;
817+
hdrlen = (unsigned char *)iptr - (unsigned char *)headerp;
615818

616819
if (hdrlen + rpclen > RPCRDMA_INLINE_WRITE_THRESHOLD(rqst))
617820
goto out_overflow;
618821

619-
dprintk("RPC: %s: %s: hdrlen %zd rpclen %zd"
620-
" headerp 0x%p base 0x%p lkey 0x%x\n",
621-
__func__, transfertypes[wtype], hdrlen, rpclen,
622-
headerp, base, rdmab_lkey(req->rl_rdmabuf));
822+
dprintk("RPC: %5u %s: %s/%s: hdrlen %zd rpclen %zd\n",
823+
rqst->rq_task->tk_pid, __func__,
824+
transfertypes[rtype], transfertypes[wtype],
825+
hdrlen, rpclen);
623826

624827
req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf);
625828
req->rl_send_iov[0].length = hdrlen;
@@ -637,12 +840,18 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
637840
return 0;
638841

639842
out_overflow:
640-
pr_err("rpcrdma: send overflow: hdrlen %zd rpclen %zu %s\n",
641-
hdrlen, rpclen, transfertypes[wtype]);
843+
pr_err("rpcrdma: send overflow: hdrlen %zd rpclen %zu %s/%s\n",
844+
hdrlen, rpclen, transfertypes[rtype], transfertypes[wtype]);
642845
/* Terminate this RPC. Chunks registered above will be
643846
* released by xprt_release -> xprt_rmda_free .
644847
*/
645848
return -EIO;
849+
850+
out_unmap:
851+
for (pos = 0; req->rl_nchunks--;)
852+
pos += r_xprt->rx_ia.ri_ops->ro_unmap(r_xprt,
853+
&req->rl_segments[pos]);
854+
return PTR_ERR(iptr);
646855
}
647856

648857
/*

0 commit comments

Comments
 (0)