@@ -333,11 +333,17 @@ static void poll_cq(struct rds_ib_connection *ic, struct ib_cq *cq,
333
333
334
334
while ((nr = ib_poll_cq (cq , RDS_WC_MAX , wcs )) > 0 ) {
335
335
for (i = 0 ; i < nr ; i ++ ) {
336
-
337
- if (rx &&
338
- (++ ic -> i_rx_poll_cq % RDS_IB_RX_LIMIT ) == 0 )
339
- cond_resched ();
340
-
336
+ if (rx ) {
337
+ if ((++ ic -> i_rx_poll_cq % RDS_IB_RX_LIMIT ) == 0 ) {
338
+ rdsdebug ("connection "
339
+ "<%u.%u.%u.%u,%u.%u.%u.%u,%d> "
340
+ "RX poll_cq processed %d\n" ,
341
+ NIPQUAD (ic -> conn -> c_laddr ),
342
+ NIPQUAD (ic -> conn -> c_faddr ),
343
+ ic -> conn -> c_tos ,
344
+ ic -> i_rx_poll_cq );
345
+ }
346
+ }
341
347
wc = wcs + i ;
342
348
rdsdebug ("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n" ,
343
349
(unsigned long long )wc -> wr_id , wc -> status , wc -> byte_len ,
@@ -348,6 +354,10 @@ static void poll_cq(struct rds_ib_connection *ic, struct ib_cq *cq,
348
354
else
349
355
rds_ib_recv_cqe_handler (ic , wc , ack_state );
350
356
}
357
+
358
+ if (rx && ic -> i_rx_poll_cq >= RDS_IB_RX_LIMIT )
359
+ break ;
360
+
351
361
}
352
362
}
353
363
@@ -374,9 +384,14 @@ void rds_ib_tasklet_fn_send(unsigned long data)
374
384
rds_send_xmit (ic -> conn );
375
385
}
376
386
377
- void rds_ib_tasklet_fn_recv (unsigned long data )
387
+ /*
388
+ * Note: rds_ib_rx(): don't call with irqs disabled.
389
+ * It calls rds_send_drop_acked() which calls other
390
+ * routines that reach into rds_rdma_free_op()
391
+ * where irqs_disabled() warning is asserted!
392
+ */
393
+ static void rds_ib_rx (struct rds_ib_connection * ic )
378
394
{
379
- struct rds_ib_connection * ic = (struct rds_ib_connection * ) data ;
380
395
struct rds_connection * conn = ic -> conn ;
381
396
struct rds_ib_ack_state ack_state ;
382
397
struct rds_ib_device * rds_ibdev = ic -> rds_ibdev ;
@@ -394,22 +409,52 @@ void rds_ib_tasklet_fn_recv(unsigned long data)
394
409
395
410
if (ack_state .ack_next_valid )
396
411
rds_ib_set_ack (ic , ack_state .ack_next , ack_state .ack_required );
397
-
398
412
if (ack_state .ack_recv_valid && ack_state .ack_recv > ic -> i_ack_recv ) {
399
413
rds_send_drop_acked (conn , ack_state .ack_recv , NULL );
400
414
ic -> i_ack_recv = ack_state .ack_recv ;
401
415
}
402
-
403
416
if (rds_conn_up (conn ))
404
417
rds_ib_attempt_ack (ic );
405
418
406
419
if (rds_ib_srq_enabled )
407
420
if ((atomic_read (& rds_ibdev -> srq -> s_num_posted ) <
408
- rds_ib_srq_hwm_refill ) &&
409
- !test_and_set_bit (0 , & rds_ibdev -> srq -> s_refill_gate ))
410
- queue_delayed_work (rds_wq , & rds_ibdev -> srq -> s_refill_w , 0 );
421
+ rds_ib_srq_hwm_refill ) &&
422
+ !test_and_set_bit (0 , & rds_ibdev -> srq -> s_refill_gate ))
423
+ queue_delayed_work (rds_wq ,
424
+ & rds_ibdev -> srq -> s_refill_w , 0 );
425
+
426
+ if (ic -> i_rx_poll_cq >= RDS_IB_RX_LIMIT ) {
427
+ ic -> i_rx_w .ic = ic ;
428
+ /* Delay 10 msecs until the RX worker starts reaping again */
429
+ queue_delayed_work (rds_aux_wq , & ic -> i_rx_w ,
430
+ msecs_to_jiffies (10 ));
431
+ ic -> i_rx_wait_for_handler = 1 ;
432
+ }
433
+ }
434
+
435
+ void rds_ib_tasklet_fn_recv (unsigned long data )
436
+ {
437
+ struct rds_ib_connection * ic = (struct rds_ib_connection * ) data ;
438
+
439
+ spin_lock_bh (& ic -> i_rx_lock );
440
+ if (ic -> i_rx_wait_for_handler )
441
+ goto out ;
442
+ rds_ib_rx (ic );
443
+ out :
444
+ spin_unlock_bh (& ic -> i_rx_lock );
411
445
}
412
446
447
+ static void rds_ib_rx_handler (struct work_struct * _work )
448
+ {
449
+ struct rds_ib_rx_work * work =
450
+ container_of (_work , struct rds_ib_rx_work , work .work );
451
+ struct rds_ib_connection * ic = work -> ic ;
452
+
453
+ spin_lock_bh (& ic -> i_rx_lock );
454
+ ic -> i_rx_wait_for_handler = 0 ;
455
+ rds_ib_rx (ic );
456
+ spin_unlock_bh (& ic -> i_rx_lock );
457
+ }
413
458
414
459
static void rds_ib_qp_event_handler (struct ib_event * event , void * data )
415
460
{
@@ -1083,9 +1128,18 @@ void rds_ib_conn_shutdown(struct rds_connection *conn)
1083
1128
}
1084
1129
1085
1130
/* quiesce tx and rx completion before tearing down */
1086
- wait_event (rds_ib_ring_empty_wait ,
1087
- rds_ib_ring_empty (& ic -> i_recv_ring ) &&
1088
- (atomic_read (& ic -> i_signaled_sends ) == 0 ));
1131
+ while (!wait_event_timeout (rds_ib_ring_empty_wait ,
1132
+ rds_ib_ring_empty (& ic -> i_recv_ring ) &&
1133
+ (atomic_read (& ic -> i_signaled_sends ) == 0 ),
1134
+ msecs_to_jiffies (5000 ))) {
1135
+
1136
+ /* Try to reap pending RX completions every 5 secs */
1137
+ if (!rds_ib_ring_empty (& ic -> i_recv_ring )) {
1138
+ spin_lock_bh (& ic -> i_rx_lock );
1139
+ rds_ib_rx (ic );
1140
+ spin_unlock_bh (& ic -> i_rx_lock );
1141
+ }
1142
+ }
1089
1143
1090
1144
tasklet_kill (& ic -> i_stasklet );
1091
1145
tasklet_kill (& ic -> i_rtasklet );
@@ -1222,6 +1276,7 @@ int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp)
1222
1276
spin_lock_init (& ic -> i_ack_lock );
1223
1277
#endif
1224
1278
atomic_set (& ic -> i_signaled_sends , 0 );
1279
+ spin_lock_init (& ic -> i_rx_lock );
1225
1280
1226
1281
/*
1227
1282
* rds_ib_conn_shutdown() waits for these to be emptied so they
@@ -1236,6 +1291,7 @@ int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp)
1236
1291
init_completion (& ic -> i_last_wqe_complete );
1237
1292
1238
1293
INIT_DELAYED_WORK (& ic -> i_migrate_w .work , rds_ib_migrate );
1294
+ INIT_DELAYED_WORK (& ic -> i_rx_w .work , rds_ib_rx_handler );
1239
1295
1240
1296
spin_lock_irqsave (& ib_nodev_conns_lock , flags );
1241
1297
list_add_tail (& ic -> ib_node , & ib_nodev_conns );
0 commit comments