Skip to content

Commit fc66448

Browse files
chuckleveramschuma-ntap
authored andcommitted
xprtrdma: Split the completion queue
The current CQ handler uses the ib_wc.opcode field to distinguish between event types. However, the contents of that field are not reliable if the completion status is not IB_WC_SUCCESS. When an error completion occurs on a send event, the CQ handler schedules a tasklet with something that is not a struct rpcrdma_rep. This is never correct behavior, and sometimes it results in a panic. To resolve this issue, split the completion queue into a send CQ and a receive CQ. The send CQ handler now handles only struct rpcrdma_mw wr_id's, and the receive CQ handler now handles only struct rpcrdma_rep wr_id's. Fix suggested by Shirley Ma <[email protected]> Reported-by: Rafael Reiter <[email protected]> Fixes: 5c635e0 BugLink: https://bugzilla.kernel.org/show_bug.cgi?id=73211 Signed-off-by: Chuck Lever <[email protected]> Tested-by: Klemens Senn <[email protected]> Tested-by: Steve Wise <[email protected]> Signed-off-by: Anna Schumaker <[email protected]>
1 parent 7f1d541 commit fc66448

File tree

2 files changed

+137
-92
lines changed

2 files changed

+137
-92
lines changed

net/sunrpc/xprtrdma/verbs.c

Lines changed: 137 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -142,96 +142,115 @@ rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
142142
}
143143
}
144144

145-
static inline
146-
void rpcrdma_event_process(struct ib_wc *wc)
145+
static void
146+
rpcrdma_sendcq_process_wc(struct ib_wc *wc)
147147
{
148-
struct rpcrdma_mw *frmr;
149-
struct rpcrdma_rep *rep =
150-
(struct rpcrdma_rep *)(unsigned long) wc->wr_id;
148+
struct rpcrdma_mw *frmr = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
151149

152-
dprintk("RPC: %s: event rep %p status %X opcode %X length %u\n",
153-
__func__, rep, wc->status, wc->opcode, wc->byte_len);
150+
dprintk("RPC: %s: frmr %p status %X opcode %d\n",
151+
__func__, frmr, wc->status, wc->opcode);
154152

155-
if (!rep) /* send completion that we don't care about */
153+
if (wc->wr_id == 0ULL)
156154
return;
157-
158-
if (IB_WC_SUCCESS != wc->status) {
159-
dprintk("RPC: %s: WC opcode %d status %X, connection lost\n",
160-
__func__, wc->opcode, wc->status);
161-
rep->rr_len = ~0U;
162-
if (wc->opcode != IB_WC_FAST_REG_MR && wc->opcode != IB_WC_LOCAL_INV)
163-
rpcrdma_schedule_tasklet(rep);
155+
if (wc->status != IB_WC_SUCCESS)
164156
return;
165-
}
166157

167-
switch (wc->opcode) {
168-
case IB_WC_FAST_REG_MR:
169-
frmr = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
158+
if (wc->opcode == IB_WC_FAST_REG_MR)
170159
frmr->r.frmr.state = FRMR_IS_VALID;
171-
break;
172-
case IB_WC_LOCAL_INV:
173-
frmr = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
160+
else if (wc->opcode == IB_WC_LOCAL_INV)
174161
frmr->r.frmr.state = FRMR_IS_INVALID;
175-
break;
176-
case IB_WC_RECV:
177-
rep->rr_len = wc->byte_len;
178-
ib_dma_sync_single_for_cpu(
179-
rdmab_to_ia(rep->rr_buffer)->ri_id->device,
180-
rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE);
181-
/* Keep (only) the most recent credits, after check validity */
182-
if (rep->rr_len >= 16) {
183-
struct rpcrdma_msg *p =
184-
(struct rpcrdma_msg *) rep->rr_base;
185-
unsigned int credits = ntohl(p->rm_credit);
186-
if (credits == 0) {
187-
dprintk("RPC: %s: server"
188-
" dropped credits to 0!\n", __func__);
189-
/* don't deadlock */
190-
credits = 1;
191-
} else if (credits > rep->rr_buffer->rb_max_requests) {
192-
dprintk("RPC: %s: server"
193-
" over-crediting: %d (%d)\n",
194-
__func__, credits,
195-
rep->rr_buffer->rb_max_requests);
196-
credits = rep->rr_buffer->rb_max_requests;
197-
}
198-
atomic_set(&rep->rr_buffer->rb_credits, credits);
199-
}
200-
rpcrdma_schedule_tasklet(rep);
201-
break;
202-
default:
203-
dprintk("RPC: %s: unexpected WC event %X\n",
204-
__func__, wc->opcode);
205-
break;
206-
}
207162
}
208163

209-
static inline int
210-
rpcrdma_cq_poll(struct ib_cq *cq)
164+
static int
165+
rpcrdma_sendcq_poll(struct ib_cq *cq)
211166
{
212167
struct ib_wc wc;
213168
int rc;
214169

215-
for (;;) {
216-
rc = ib_poll_cq(cq, 1, &wc);
217-
if (rc < 0) {
218-
dprintk("RPC: %s: ib_poll_cq failed %i\n",
219-
__func__, rc);
220-
return rc;
221-
}
222-
if (rc == 0)
223-
break;
170+
while ((rc = ib_poll_cq(cq, 1, &wc)) == 1)
171+
rpcrdma_sendcq_process_wc(&wc);
172+
return rc;
173+
}
224174

225-
rpcrdma_event_process(&wc);
175+
/*
176+
* Handle send, fast_reg_mr, and local_inv completions.
177+
*
178+
* Send events are typically suppressed and thus do not result
179+
* in an upcall. Occasionally one is signaled, however. This
180+
* prevents the provider's completion queue from wrapping and
181+
* losing a completion.
182+
*/
183+
static void
184+
rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
185+
{
186+
int rc;
187+
188+
rc = rpcrdma_sendcq_poll(cq);
189+
if (rc) {
190+
dprintk("RPC: %s: ib_poll_cq failed: %i\n",
191+
__func__, rc);
192+
return;
226193
}
227194

228-
return 0;
195+
rc = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
196+
if (rc) {
197+
dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
198+
__func__, rc);
199+
return;
200+
}
201+
202+
rpcrdma_sendcq_poll(cq);
203+
}
204+
205+
static void
206+
rpcrdma_recvcq_process_wc(struct ib_wc *wc)
207+
{
208+
struct rpcrdma_rep *rep =
209+
(struct rpcrdma_rep *)(unsigned long)wc->wr_id;
210+
211+
dprintk("RPC: %s: rep %p status %X opcode %X length %u\n",
212+
__func__, rep, wc->status, wc->opcode, wc->byte_len);
213+
214+
if (wc->status != IB_WC_SUCCESS) {
215+
rep->rr_len = ~0U;
216+
goto out_schedule;
217+
}
218+
if (wc->opcode != IB_WC_RECV)
219+
return;
220+
221+
rep->rr_len = wc->byte_len;
222+
ib_dma_sync_single_for_cpu(rdmab_to_ia(rep->rr_buffer)->ri_id->device,
223+
rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE);
224+
225+
if (rep->rr_len >= 16) {
226+
struct rpcrdma_msg *p = (struct rpcrdma_msg *)rep->rr_base;
227+
unsigned int credits = ntohl(p->rm_credit);
228+
229+
if (credits == 0)
230+
credits = 1; /* don't deadlock */
231+
else if (credits > rep->rr_buffer->rb_max_requests)
232+
credits = rep->rr_buffer->rb_max_requests;
233+
atomic_set(&rep->rr_buffer->rb_credits, credits);
234+
}
235+
236+
out_schedule:
237+
rpcrdma_schedule_tasklet(rep);
238+
}
239+
240+
static int
241+
rpcrdma_recvcq_poll(struct ib_cq *cq)
242+
{
243+
struct ib_wc wc;
244+
int rc;
245+
246+
while ((rc = ib_poll_cq(cq, 1, &wc)) == 1)
247+
rpcrdma_recvcq_process_wc(&wc);
248+
return rc;
229249
}
230250

231251
/*
232-
* rpcrdma_cq_event_upcall
252+
* Handle receive completions.
233253
*
234-
* This upcall handles recv and send events.
235254
* It is reentrant but processes single events in order to maintain
236255
* ordering of receives to keep server credits.
237256
*
@@ -240,26 +259,27 @@ rpcrdma_cq_poll(struct ib_cq *cq)
240259
* connection shutdown. That is, the structures required for
241260
* the completion of the reply handler must remain intact until
242261
* all memory has been reclaimed.
243-
*
244-
* Note that send events are suppressed and do not result in an upcall.
245262
*/
246263
static void
247-
rpcrdma_cq_event_upcall(struct ib_cq *cq, void *context)
264+
rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
248265
{
249266
int rc;
250267

251-
rc = rpcrdma_cq_poll(cq);
252-
if (rc)
268+
rc = rpcrdma_recvcq_poll(cq);
269+
if (rc) {
270+
dprintk("RPC: %s: ib_poll_cq failed: %i\n",
271+
__func__, rc);
253272
return;
273+
}
254274

255275
rc = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
256276
if (rc) {
257-
dprintk("RPC: %s: ib_req_notify_cq failed %i\n",
277+
dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
258278
__func__, rc);
259279
return;
260280
}
261281

262-
rpcrdma_cq_poll(cq);
282+
rpcrdma_recvcq_poll(cq);
263283
}
264284

265285
#ifdef RPC_DEBUG
@@ -610,6 +630,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
610630
struct rpcrdma_create_data_internal *cdata)
611631
{
612632
struct ib_device_attr devattr;
633+
struct ib_cq *sendcq, *recvcq;
613634
int rc, err;
614635

615636
rc = ib_query_device(ia->ri_id->device, &devattr);
@@ -685,34 +706,51 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
685706
ep->rep_attr.cap.max_recv_sge);
686707

687708
/* set trigger for requesting send completion */
688-
ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 /* - 1*/;
709+
ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1;
689710
if (ep->rep_cqinit <= 2)
690711
ep->rep_cqinit = 0;
691712
INIT_CQCOUNT(ep);
692713
ep->rep_ia = ia;
693714
init_waitqueue_head(&ep->rep_connect_wait);
694715
INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
695716

696-
ep->rep_cq = ib_create_cq(ia->ri_id->device, rpcrdma_cq_event_upcall,
717+
sendcq = ib_create_cq(ia->ri_id->device, rpcrdma_sendcq_upcall,
697718
rpcrdma_cq_async_error_upcall, NULL,
698-
ep->rep_attr.cap.max_recv_wr +
699719
ep->rep_attr.cap.max_send_wr + 1, 0);
700-
if (IS_ERR(ep->rep_cq)) {
701-
rc = PTR_ERR(ep->rep_cq);
702-
dprintk("RPC: %s: ib_create_cq failed: %i\n",
720+
if (IS_ERR(sendcq)) {
721+
rc = PTR_ERR(sendcq);
722+
dprintk("RPC: %s: failed to create send CQ: %i\n",
703723
__func__, rc);
704724
goto out1;
705725
}
706726

707-
rc = ib_req_notify_cq(ep->rep_cq, IB_CQ_NEXT_COMP);
727+
rc = ib_req_notify_cq(sendcq, IB_CQ_NEXT_COMP);
708728
if (rc) {
709729
dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
710730
__func__, rc);
711731
goto out2;
712732
}
713733

714-
ep->rep_attr.send_cq = ep->rep_cq;
715-
ep->rep_attr.recv_cq = ep->rep_cq;
734+
recvcq = ib_create_cq(ia->ri_id->device, rpcrdma_recvcq_upcall,
735+
rpcrdma_cq_async_error_upcall, NULL,
736+
ep->rep_attr.cap.max_recv_wr + 1, 0);
737+
if (IS_ERR(recvcq)) {
738+
rc = PTR_ERR(recvcq);
739+
dprintk("RPC: %s: failed to create recv CQ: %i\n",
740+
__func__, rc);
741+
goto out2;
742+
}
743+
744+
rc = ib_req_notify_cq(recvcq, IB_CQ_NEXT_COMP);
745+
if (rc) {
746+
dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
747+
__func__, rc);
748+
ib_destroy_cq(recvcq);
749+
goto out2;
750+
}
751+
752+
ep->rep_attr.send_cq = sendcq;
753+
ep->rep_attr.recv_cq = recvcq;
716754

717755
/* Initialize cma parameters */
718756

@@ -734,7 +772,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
734772
return 0;
735773

736774
out2:
737-
err = ib_destroy_cq(ep->rep_cq);
775+
err = ib_destroy_cq(sendcq);
738776
if (err)
739777
dprintk("RPC: %s: ib_destroy_cq returned %i\n",
740778
__func__, err);
@@ -774,8 +812,14 @@ rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
774812
ep->rep_pad_mr = NULL;
775813
}
776814

777-
rpcrdma_clean_cq(ep->rep_cq);
778-
rc = ib_destroy_cq(ep->rep_cq);
815+
rpcrdma_clean_cq(ep->rep_attr.recv_cq);
816+
rc = ib_destroy_cq(ep->rep_attr.recv_cq);
817+
if (rc)
818+
dprintk("RPC: %s: ib_destroy_cq returned %i\n",
819+
__func__, rc);
820+
821+
rpcrdma_clean_cq(ep->rep_attr.send_cq);
822+
rc = ib_destroy_cq(ep->rep_attr.send_cq);
779823
if (rc)
780824
dprintk("RPC: %s: ib_destroy_cq returned %i\n",
781825
__func__, rc);
@@ -798,7 +842,9 @@ rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
798842
if (rc && rc != -ENOTCONN)
799843
dprintk("RPC: %s: rpcrdma_ep_disconnect"
800844
" status %i\n", __func__, rc);
801-
rpcrdma_clean_cq(ep->rep_cq);
845+
846+
rpcrdma_clean_cq(ep->rep_attr.recv_cq);
847+
rpcrdma_clean_cq(ep->rep_attr.send_cq);
802848

803849
xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
804850
id = rpcrdma_create_id(xprt, ia,
@@ -907,7 +953,8 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
907953
{
908954
int rc;
909955

910-
rpcrdma_clean_cq(ep->rep_cq);
956+
rpcrdma_clean_cq(ep->rep_attr.recv_cq);
957+
rpcrdma_clean_cq(ep->rep_attr.send_cq);
911958
rc = rdma_disconnect(ia->ri_id);
912959
if (!rc) {
913960
/* returns without wait if not connected */
@@ -1727,7 +1774,6 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
17271774
ib_dma_sync_single_for_cpu(ia->ri_id->device,
17281775
rep->rr_iov.addr, rep->rr_iov.length, DMA_BIDIRECTIONAL);
17291776

1730-
DECR_CQCOUNT(ep);
17311777
rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
17321778

17331779
if (rc)

net/sunrpc/xprtrdma/xprt_rdma.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,6 @@ struct rpcrdma_ep {
7979
int rep_cqinit;
8080
int rep_connected;
8181
struct rpcrdma_ia *rep_ia;
82-
struct ib_cq *rep_cq;
8382
struct ib_qp_init_attr rep_attr;
8483
wait_queue_head_t rep_connect_wait;
8584
struct ib_sge rep_pad; /* holds zeroed pad */

0 commit comments

Comments
 (0)