@@ -307,11 +307,15 @@ static inline struct ip_vs_sync_buff *sb_dequeue(struct netns_ipvs *ipvs)
307
307
spin_lock_bh (& ipvs -> sync_lock );
308
308
if (list_empty (& ipvs -> sync_queue )) {
309
309
sb = NULL ;
310
+ __set_current_state (TASK_INTERRUPTIBLE );
310
311
} else {
311
312
sb = list_entry (ipvs -> sync_queue .next ,
312
313
struct ip_vs_sync_buff ,
313
314
list );
314
315
list_del (& sb -> list );
316
+ ipvs -> sync_queue_len -- ;
317
+ if (!ipvs -> sync_queue_len )
318
+ ipvs -> sync_queue_delay = 0 ;
315
319
}
316
320
spin_unlock_bh (& ipvs -> sync_lock );
317
321
@@ -358,9 +362,16 @@ static inline void sb_queue_tail(struct netns_ipvs *ipvs)
358
362
struct ip_vs_sync_buff * sb = ipvs -> sync_buff ;
359
363
360
364
spin_lock (& ipvs -> sync_lock );
361
- if (ipvs -> sync_state & IP_VS_STATE_MASTER )
365
+ if (ipvs -> sync_state & IP_VS_STATE_MASTER &&
366
+ ipvs -> sync_queue_len < sysctl_sync_qlen_max (ipvs )) {
367
+ if (!ipvs -> sync_queue_len )
368
+ schedule_delayed_work (& ipvs -> master_wakeup_work ,
369
+ max (IPVS_SYNC_SEND_DELAY , 1 ));
370
+ ipvs -> sync_queue_len ++ ;
362
371
list_add_tail (& sb -> list , & ipvs -> sync_queue );
363
- else
372
+ if ((++ ipvs -> sync_queue_delay ) == IPVS_SYNC_WAKEUP_RATE )
373
+ wake_up_process (ipvs -> master_thread );
374
+ } else
364
375
ip_vs_sync_buff_release (sb );
365
376
spin_unlock (& ipvs -> sync_lock );
366
377
}
@@ -379,6 +390,7 @@ get_curr_sync_buff(struct netns_ipvs *ipvs, unsigned long time)
379
390
time_after_eq (jiffies - ipvs -> sync_buff -> firstuse , time )) {
380
391
sb = ipvs -> sync_buff ;
381
392
ipvs -> sync_buff = NULL ;
393
+ __set_current_state (TASK_RUNNING );
382
394
} else
383
395
sb = NULL ;
384
396
spin_unlock_bh (& ipvs -> sync_buff_lock );
@@ -392,26 +404,23 @@ get_curr_sync_buff(struct netns_ipvs *ipvs, unsigned long time)
392
404
void ip_vs_sync_switch_mode (struct net * net , int mode )
393
405
{
394
406
struct netns_ipvs * ipvs = net_ipvs (net );
407
+ struct ip_vs_sync_buff * sb ;
395
408
409
+ spin_lock_bh (& ipvs -> sync_buff_lock );
396
410
if (!(ipvs -> sync_state & IP_VS_STATE_MASTER ))
397
- return ;
398
- if (mode == sysctl_sync_ver (ipvs ) || !ipvs -> sync_buff )
399
- return ;
411
+ goto unlock ;
412
+ sb = ipvs -> sync_buff ;
413
+ if (mode == sysctl_sync_ver (ipvs ) || !sb )
414
+ goto unlock ;
400
415
401
- spin_lock_bh (& ipvs -> sync_buff_lock );
402
416
/* Buffer empty ? then let buf_create do the job */
403
- if (ipvs -> sync_buff -> mesg -> size <= sizeof (struct ip_vs_sync_mesg )) {
404
- kfree ( ipvs -> sync_buff );
417
+ if (sb -> mesg -> size <= sizeof (struct ip_vs_sync_mesg )) {
418
+ ip_vs_sync_buff_release ( sb );
405
419
ipvs -> sync_buff = NULL ;
406
- } else {
407
- spin_lock_bh (& ipvs -> sync_lock );
408
- if (ipvs -> sync_state & IP_VS_STATE_MASTER )
409
- list_add_tail (& ipvs -> sync_buff -> list ,
410
- & ipvs -> sync_queue );
411
- else
412
- ip_vs_sync_buff_release (ipvs -> sync_buff );
413
- spin_unlock_bh (& ipvs -> sync_lock );
414
- }
420
+ } else
421
+ sb_queue_tail (ipvs );
422
+
423
+ unlock :
415
424
spin_unlock_bh (& ipvs -> sync_buff_lock );
416
425
}
417
426
@@ -1129,6 +1138,28 @@ static void ip_vs_process_message(struct net *net, __u8 *buffer,
1129
1138
}
1130
1139
1131
1140
1141
+ /*
1142
+ * Setup sndbuf (mode=1) or rcvbuf (mode=0)
1143
+ */
1144
+ static void set_sock_size (struct sock * sk , int mode , int val )
1145
+ {
1146
+ /* setsockopt(sock, SOL_SOCKET, SO_SNDBUF, &val, sizeof(val)); */
1147
+ /* setsockopt(sock, SOL_SOCKET, SO_RCVBUF, &val, sizeof(val)); */
1148
+ lock_sock (sk );
1149
+ if (mode ) {
1150
+ val = clamp_t (int , val , (SOCK_MIN_SNDBUF + 1 ) / 2 ,
1151
+ sysctl_wmem_max );
1152
+ sk -> sk_sndbuf = val * 2 ;
1153
+ sk -> sk_userlocks |= SOCK_SNDBUF_LOCK ;
1154
+ } else {
1155
+ val = clamp_t (int , val , (SOCK_MIN_RCVBUF + 1 ) / 2 ,
1156
+ sysctl_rmem_max );
1157
+ sk -> sk_rcvbuf = val * 2 ;
1158
+ sk -> sk_userlocks |= SOCK_RCVBUF_LOCK ;
1159
+ }
1160
+ release_sock (sk );
1161
+ }
1162
+
1132
1163
/*
1133
1164
* Setup loopback of outgoing multicasts on a sending socket
1134
1165
*/
@@ -1305,6 +1336,9 @@ static struct socket *make_send_sock(struct net *net)
1305
1336
1306
1337
set_mcast_loop (sock -> sk , 0 );
1307
1338
set_mcast_ttl (sock -> sk , 1 );
1339
+ result = sysctl_sync_sock_size (ipvs );
1340
+ if (result > 0 )
1341
+ set_sock_size (sock -> sk , 1 , result );
1308
1342
1309
1343
result = bind_mcastif_addr (sock , ipvs -> master_mcast_ifn );
1310
1344
if (result < 0 ) {
@@ -1350,6 +1384,9 @@ static struct socket *make_receive_sock(struct net *net)
1350
1384
sk_change_net (sock -> sk , net );
1351
1385
/* it is equivalent to the REUSEADDR option in user-space */
1352
1386
sock -> sk -> sk_reuse = SK_CAN_REUSE ;
1387
+ result = sysctl_sync_sock_size (ipvs );
1388
+ if (result > 0 )
1389
+ set_sock_size (sock -> sk , 0 , result );
1353
1390
1354
1391
result = sock -> ops -> bind (sock , (struct sockaddr * ) & mcast_addr ,
1355
1392
sizeof (struct sockaddr ));
@@ -1392,18 +1429,22 @@ ip_vs_send_async(struct socket *sock, const char *buffer, const size_t length)
1392
1429
return len ;
1393
1430
}
1394
1431
1395
- static void
1432
+ static int
1396
1433
ip_vs_send_sync_msg (struct socket * sock , struct ip_vs_sync_mesg * msg )
1397
1434
{
1398
1435
int msize ;
1436
+ int ret ;
1399
1437
1400
1438
msize = msg -> size ;
1401
1439
1402
1440
/* Put size in network byte order */
1403
1441
msg -> size = htons (msg -> size );
1404
1442
1405
- if (ip_vs_send_async (sock , (char * )msg , msize ) != msize )
1406
- pr_err ("ip_vs_send_async error\n" );
1443
+ ret = ip_vs_send_async (sock , (char * )msg , msize );
1444
+ if (ret >= 0 || ret == - EAGAIN )
1445
+ return ret ;
1446
+ pr_err ("ip_vs_send_async error %d\n" , ret );
1447
+ return 0 ;
1407
1448
}
1408
1449
1409
1450
static int
@@ -1428,36 +1469,75 @@ ip_vs_receive(struct socket *sock, char *buffer, const size_t buflen)
1428
1469
return len ;
1429
1470
}
1430
1471
1472
+ /* Wakeup the master thread for sending */
1473
+ static void master_wakeup_work_handler (struct work_struct * work )
1474
+ {
1475
+ struct netns_ipvs * ipvs = container_of (work , struct netns_ipvs ,
1476
+ master_wakeup_work .work );
1477
+
1478
+ spin_lock_bh (& ipvs -> sync_lock );
1479
+ if (ipvs -> sync_queue_len &&
1480
+ ipvs -> sync_queue_delay < IPVS_SYNC_WAKEUP_RATE ) {
1481
+ ipvs -> sync_queue_delay = IPVS_SYNC_WAKEUP_RATE ;
1482
+ wake_up_process (ipvs -> master_thread );
1483
+ }
1484
+ spin_unlock_bh (& ipvs -> sync_lock );
1485
+ }
1486
+
1487
+ /* Get next buffer to send */
1488
+ static inline struct ip_vs_sync_buff *
1489
+ next_sync_buff (struct netns_ipvs * ipvs )
1490
+ {
1491
+ struct ip_vs_sync_buff * sb ;
1492
+
1493
+ sb = sb_dequeue (ipvs );
1494
+ if (sb )
1495
+ return sb ;
1496
+ /* Do not delay entries in buffer for more than 2 seconds */
1497
+ return get_curr_sync_buff (ipvs , 2 * HZ );
1498
+ }
1431
1499
1432
1500
static int sync_thread_master (void * data )
1433
1501
{
1434
1502
struct ip_vs_sync_thread_data * tinfo = data ;
1435
1503
struct netns_ipvs * ipvs = net_ipvs (tinfo -> net );
1504
+ struct sock * sk = tinfo -> sock -> sk ;
1436
1505
struct ip_vs_sync_buff * sb ;
1437
1506
1438
1507
pr_info ("sync thread started: state = MASTER, mcast_ifn = %s, "
1439
1508
"syncid = %d\n" ,
1440
1509
ipvs -> master_mcast_ifn , ipvs -> master_syncid );
1441
1510
1442
- while (!kthread_should_stop ()) {
1443
- while ((sb = sb_dequeue (ipvs ))) {
1444
- ip_vs_send_sync_msg (tinfo -> sock , sb -> mesg );
1445
- ip_vs_sync_buff_release (sb );
1511
+ for (;;) {
1512
+ sb = next_sync_buff (ipvs );
1513
+ if (unlikely (kthread_should_stop ()))
1514
+ break ;
1515
+ if (!sb ) {
1516
+ schedule_timeout (IPVS_SYNC_CHECK_PERIOD );
1517
+ continue ;
1446
1518
}
1447
-
1448
- /* check if entries stay in ipvs->sync_buff for 2 seconds */
1449
- sb = get_curr_sync_buff (ipvs , 2 * HZ );
1450
- if (sb ) {
1451
- ip_vs_send_sync_msg (tinfo -> sock , sb -> mesg );
1452
- ip_vs_sync_buff_release (sb );
1519
+ while (ip_vs_send_sync_msg (tinfo -> sock , sb -> mesg ) < 0 ) {
1520
+ int ret = 0 ;
1521
+
1522
+ __wait_event_interruptible (* sk_sleep (sk ),
1523
+ sock_writeable (sk ) ||
1524
+ kthread_should_stop (),
1525
+ ret );
1526
+ if (unlikely (kthread_should_stop ()))
1527
+ goto done ;
1453
1528
}
1454
-
1455
- schedule_timeout_interruptible (HZ );
1529
+ ip_vs_sync_buff_release (sb );
1456
1530
}
1457
1531
1532
+ done :
1533
+ __set_current_state (TASK_RUNNING );
1534
+ if (sb )
1535
+ ip_vs_sync_buff_release (sb );
1536
+
1458
1537
/* clean up the sync_buff queue */
1459
1538
while ((sb = sb_dequeue (ipvs )))
1460
1539
ip_vs_sync_buff_release (sb );
1540
+ __set_current_state (TASK_RUNNING );
1461
1541
1462
1542
/* clean up the current sync_buff */
1463
1543
sb = get_curr_sync_buff (ipvs , 0 );
@@ -1538,6 +1618,10 @@ int start_sync_thread(struct net *net, int state, char *mcast_ifn, __u8 syncid)
1538
1618
realtask = & ipvs -> master_thread ;
1539
1619
name = "ipvs_master:%d" ;
1540
1620
threadfn = sync_thread_master ;
1621
+ ipvs -> sync_queue_len = 0 ;
1622
+ ipvs -> sync_queue_delay = 0 ;
1623
+ INIT_DELAYED_WORK (& ipvs -> master_wakeup_work ,
1624
+ master_wakeup_work_handler );
1541
1625
sock = make_send_sock (net );
1542
1626
} else if (state == IP_VS_STATE_BACKUP ) {
1543
1627
if (ipvs -> backup_thread )
@@ -1623,6 +1707,7 @@ int stop_sync_thread(struct net *net, int state)
1623
1707
spin_lock_bh (& ipvs -> sync_lock );
1624
1708
ipvs -> sync_state &= ~IP_VS_STATE_MASTER ;
1625
1709
spin_unlock_bh (& ipvs -> sync_lock );
1710
+ cancel_delayed_work_sync (& ipvs -> master_wakeup_work );
1626
1711
retc = kthread_stop (ipvs -> master_thread );
1627
1712
ipvs -> master_thread = NULL ;
1628
1713
} else if (state == IP_VS_STATE_BACKUP ) {
0 commit comments