@@ -344,7 +344,53 @@ static int do_setup_tx(int domain, int type, int protocol)
344
344
return fd ;
345
345
}
346
346
347
- static bool do_recv_completion (int fd )
347
+ static uint32_t do_process_zerocopy_cookies (struct rds_zcopy_cookies * ck )
348
+ {
349
+ int i ;
350
+
351
+ if (ck -> num > RDS_MAX_ZCOOKIES )
352
+ error (1 , 0 , "Returned %d cookies, max expected %d\n" ,
353
+ ck -> num , RDS_MAX_ZCOOKIES );
354
+ for (i = 0 ; i < ck -> num ; i ++ )
355
+ if (cfg_verbose >= 2 )
356
+ fprintf (stderr , "%d\n" , ck -> cookies [i ]);
357
+ return ck -> num ;
358
+ }
359
+
360
+ static bool do_recvmsg_completion (int fd )
361
+ {
362
+ char cmsgbuf [CMSG_SPACE (sizeof (struct rds_zcopy_cookies ))];
363
+ struct rds_zcopy_cookies * ck ;
364
+ struct cmsghdr * cmsg ;
365
+ struct msghdr msg ;
366
+ bool ret = false;
367
+
368
+ memset (& msg , 0 , sizeof (msg ));
369
+ msg .msg_control = cmsgbuf ;
370
+ msg .msg_controllen = sizeof (cmsgbuf );
371
+
372
+ if (recvmsg (fd , & msg , MSG_DONTWAIT ))
373
+ return ret ;
374
+
375
+ if (msg .msg_flags & MSG_CTRUNC )
376
+ error (1 , errno , "recvmsg notification: truncated" );
377
+
378
+ for (cmsg = CMSG_FIRSTHDR (& msg ); cmsg ; cmsg = CMSG_NXTHDR (& msg , cmsg )) {
379
+ if (cmsg -> cmsg_level == SOL_RDS &&
380
+ cmsg -> cmsg_type == RDS_CMSG_ZCOPY_COMPLETION ) {
381
+
382
+ ck = (struct rds_zcopy_cookies * )CMSG_DATA (cmsg );
383
+ completions += do_process_zerocopy_cookies (ck );
384
+ ret = true;
385
+ break ;
386
+ }
387
+ error (0 , 0 , "ignoring cmsg at level %d type %d\n" ,
388
+ cmsg -> cmsg_level , cmsg -> cmsg_type );
389
+ }
390
+ return ret ;
391
+ }
392
+
393
+ static bool do_recv_completion (int fd , int domain )
348
394
{
349
395
struct sock_extended_err * serr ;
350
396
struct msghdr msg = {};
@@ -353,6 +399,9 @@ static bool do_recv_completion(int fd)
353
399
int ret , zerocopy ;
354
400
char control [100 ];
355
401
402
+ if (domain == PF_RDS )
403
+ return do_recvmsg_completion (fd );
404
+
356
405
msg .msg_control = control ;
357
406
msg .msg_controllen = sizeof (control );
358
407
@@ -409,20 +458,20 @@ static bool do_recv_completion(int fd)
409
458
}
410
459
411
460
/* Read all outstanding messages on the errqueue */
412
- static void do_recv_completions (int fd )
461
+ static void do_recv_completions (int fd , int domain )
413
462
{
414
- while (do_recv_completion (fd )) {}
463
+ while (do_recv_completion (fd , domain )) {}
415
464
}
416
465
417
466
/* Wait for all remaining completions on the errqueue */
418
- static void do_recv_remaining_completions (int fd )
467
+ static void do_recv_remaining_completions (int fd , int domain )
419
468
{
420
469
int64_t tstop = gettimeofday_ms () + cfg_waittime_ms ;
421
470
422
471
while (completions < expected_completions &&
423
472
gettimeofday_ms () < tstop ) {
424
- if (do_poll (fd , POLLERR ))
425
- do_recv_completions (fd );
473
+ if (do_poll (fd , domain == PF_RDS ? POLLIN : POLLERR ))
474
+ do_recv_completions (fd , domain );
426
475
}
427
476
428
477
if (completions < expected_completions )
@@ -503,13 +552,13 @@ static void do_tx(int domain, int type, int protocol)
503
552
504
553
while (!do_poll (fd , POLLOUT )) {
505
554
if (cfg_zerocopy )
506
- do_recv_completions (fd );
555
+ do_recv_completions (fd , domain );
507
556
}
508
557
509
558
} while (gettimeofday_ms () < tstop );
510
559
511
560
if (cfg_zerocopy )
512
- do_recv_remaining_completions (fd );
561
+ do_recv_remaining_completions (fd , domain );
513
562
514
563
if (close (fd ))
515
564
error (1 , errno , "close" );
0 commit comments