@@ -329,11 +329,17 @@ static void poll_cq(struct rds_ib_connection *ic, struct ib_cq *cq,
329
329
330
330
while ((nr = ib_poll_cq (cq , RDS_WC_MAX , wcs )) > 0 ) {
331
331
for (i = 0 ; i < nr ; i ++ ) {
332
-
333
- if (rx &&
334
- (++ ic -> i_rx_poll_cq % RDS_IB_RX_LIMIT ) == 0 )
335
- cond_resched ();
336
-
332
+ if (rx ) {
333
+ if ((++ ic -> i_rx_poll_cq % RDS_IB_RX_LIMIT ) == 0 ) {
334
+ rdsdebug ("connection "
335
+ "<%u.%u.%u.%u,%u.%u.%u.%u,%d> "
336
+ "RX poll_cq processed %d\n" ,
337
+ NIPQUAD (ic -> conn -> c_laddr ),
338
+ NIPQUAD (ic -> conn -> c_faddr ),
339
+ ic -> conn -> c_tos ,
340
+ ic -> i_rx_poll_cq );
341
+ }
342
+ }
337
343
wc = wcs + i ;
338
344
rdsdebug ("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n" ,
339
345
(unsigned long long )wc -> wr_id , wc -> status , wc -> byte_len ,
@@ -344,6 +350,10 @@ static void poll_cq(struct rds_ib_connection *ic, struct ib_cq *cq,
344
350
else
345
351
rds_ib_recv_cqe_handler (ic , wc , ack_state );
346
352
}
353
+
354
+ if (rx && ic -> i_rx_poll_cq >= RDS_IB_RX_LIMIT )
355
+ break ;
356
+
347
357
}
348
358
}
349
359
@@ -370,9 +380,14 @@ void rds_ib_tasklet_fn_send(unsigned long data)
370
380
rds_send_xmit (ic -> conn );
371
381
}
372
382
373
- void rds_ib_tasklet_fn_recv (unsigned long data )
383
+ /*
384
+ * Note: rds_ib_rx(): don't call with irqs disabled.
385
+ * It calls rds_send_drop_acked() which calls other
386
+ * routines that reach into rds_rdma_free_op()
387
+ * where irqs_disabled() warning is asserted!
388
+ */
389
+ static void rds_ib_rx (struct rds_ib_connection * ic )
374
390
{
375
- struct rds_ib_connection * ic = (struct rds_ib_connection * ) data ;
376
391
struct rds_connection * conn = ic -> conn ;
377
392
struct rds_ib_ack_state ack_state ;
378
393
struct rds_ib_device * rds_ibdev = ic -> rds_ibdev ;
@@ -390,22 +405,52 @@ void rds_ib_tasklet_fn_recv(unsigned long data)
390
405
391
406
if (ack_state .ack_next_valid )
392
407
rds_ib_set_ack (ic , ack_state .ack_next , ack_state .ack_required );
393
-
394
408
if (ack_state .ack_recv_valid && ack_state .ack_recv > ic -> i_ack_recv ) {
395
409
rds_send_drop_acked (conn , ack_state .ack_recv , NULL );
396
410
ic -> i_ack_recv = ack_state .ack_recv ;
397
411
}
398
-
399
412
if (rds_conn_up (conn ))
400
413
rds_ib_attempt_ack (ic );
401
414
402
415
if (rds_ib_srq_enabled )
403
416
if ((atomic_read (& rds_ibdev -> srq -> s_num_posted ) <
404
- rds_ib_srq_hwm_refill ) &&
405
- !test_and_set_bit (0 , & rds_ibdev -> srq -> s_refill_gate ))
406
- queue_delayed_work (rds_wq , & rds_ibdev -> srq -> s_refill_w , 0 );
417
+ rds_ib_srq_hwm_refill ) &&
418
+ !test_and_set_bit (0 , & rds_ibdev -> srq -> s_refill_gate ))
419
+ queue_delayed_work (rds_wq ,
420
+ & rds_ibdev -> srq -> s_refill_w , 0 );
421
+
422
+ if (ic -> i_rx_poll_cq >= RDS_IB_RX_LIMIT ) {
423
+ ic -> i_rx_w .ic = ic ;
424
+ /* Delay 10 msecs until the RX worker starts reaping again */
425
+ queue_delayed_work (rds_aux_wq , & ic -> i_rx_w ,
426
+ msecs_to_jiffies (10 ));
427
+ ic -> i_rx_wait_for_handler = 1 ;
428
+ }
429
+ }
430
+
431
+ void rds_ib_tasklet_fn_recv (unsigned long data )
432
+ {
433
+ struct rds_ib_connection * ic = (struct rds_ib_connection * ) data ;
434
+
435
+ spin_lock_bh (& ic -> i_rx_lock );
436
+ if (ic -> i_rx_wait_for_handler )
437
+ goto out ;
438
+ rds_ib_rx (ic );
439
+ out :
440
+ spin_unlock_bh (& ic -> i_rx_lock );
407
441
}
408
442
443
+ static void rds_ib_rx_handler (struct work_struct * _work )
444
+ {
445
+ struct rds_ib_rx_work * work =
446
+ container_of (_work , struct rds_ib_rx_work , work .work );
447
+ struct rds_ib_connection * ic = work -> ic ;
448
+
449
+ spin_lock_bh (& ic -> i_rx_lock );
450
+ ic -> i_rx_wait_for_handler = 0 ;
451
+ rds_ib_rx (ic );
452
+ spin_unlock_bh (& ic -> i_rx_lock );
453
+ }
409
454
410
455
static void rds_ib_qp_event_handler (struct ib_event * event , void * data )
411
456
{
@@ -1064,9 +1109,18 @@ void rds_ib_conn_shutdown(struct rds_connection *conn)
1064
1109
}
1065
1110
1066
1111
/* quiesce tx and rx completion before tearing down */
1067
- wait_event (rds_ib_ring_empty_wait ,
1068
- rds_ib_ring_empty (& ic -> i_recv_ring ) &&
1069
- (atomic_read (& ic -> i_signaled_sends ) == 0 ));
1112
+ while (!wait_event_timeout (rds_ib_ring_empty_wait ,
1113
+ rds_ib_ring_empty (& ic -> i_recv_ring ) &&
1114
+ (atomic_read (& ic -> i_signaled_sends ) == 0 ),
1115
+ msecs_to_jiffies (5000 ))) {
1116
+
1117
+ /* Try to reap pending RX completions every 5 secs */
1118
+ if (!rds_ib_ring_empty (& ic -> i_recv_ring )) {
1119
+ spin_lock_bh (& ic -> i_rx_lock );
1120
+ rds_ib_rx (ic );
1121
+ spin_unlock_bh (& ic -> i_rx_lock );
1122
+ }
1123
+ }
1070
1124
1071
1125
tasklet_kill (& ic -> i_stasklet );
1072
1126
tasklet_kill (& ic -> i_rtasklet );
@@ -1199,6 +1253,7 @@ int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp)
1199
1253
spin_lock_init (& ic -> i_ack_lock );
1200
1254
#endif
1201
1255
atomic_set (& ic -> i_signaled_sends , 0 );
1256
+ spin_lock_init (& ic -> i_rx_lock );
1202
1257
1203
1258
/*
1204
1259
* rds_ib_conn_shutdown() waits for these to be emptied so they
@@ -1213,6 +1268,7 @@ int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp)
1213
1268
init_completion (& ic -> i_last_wqe_complete );
1214
1269
1215
1270
INIT_DELAYED_WORK (& ic -> i_migrate_w .work , rds_ib_migrate );
1271
+ INIT_DELAYED_WORK (& ic -> i_rx_w .work , rds_ib_rx_handler );
1216
1272
1217
1273
spin_lock_irqsave (& ib_nodev_conns_lock , flags );
1218
1274
list_add_tail (& ic -> ib_node , & ib_nodev_conns );
0 commit comments