@@ -217,12 +217,13 @@ struct tipc_node *tipc_bclink_retransmit_to(void)
217
217
*/
218
218
static void bclink_retransmit_pkt (u32 after , u32 to )
219
219
{
220
- struct sk_buff * buf ;
220
+ struct sk_buff * skb ;
221
221
222
- buf = bcl -> first_out ;
223
- while (buf && less_eq (buf_seqno (buf ), after ))
224
- buf = buf -> next ;
225
- tipc_link_retransmit (bcl , buf , mod (to - after ));
222
+ skb_queue_walk (& bcl -> outqueue , skb ) {
223
+ if (more (buf_seqno (skb ), after ))
224
+ break ;
225
+ }
226
+ tipc_link_retransmit (bcl , skb , mod (to - after ));
226
227
}
227
228
228
229
/**
@@ -245,14 +246,14 @@ void tipc_bclink_wakeup_users(void)
245
246
*/
246
247
void tipc_bclink_acknowledge (struct tipc_node * n_ptr , u32 acked )
247
248
{
248
- struct sk_buff * crs ;
249
+ struct sk_buff * skb , * tmp ;
249
250
struct sk_buff * next ;
250
251
unsigned int released = 0 ;
251
252
252
253
tipc_bclink_lock ();
253
254
/* Bail out if tx queue is empty (no clean up is required) */
254
- crs = bcl -> first_out ;
255
- if (!crs )
255
+ skb = skb_peek ( & bcl -> outqueue ) ;
256
+ if (!skb )
256
257
goto exit ;
257
258
258
259
/* Determine which messages need to be acknowledged */
@@ -271,43 +272,43 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
271
272
* Bail out if specified sequence number does not correspond
272
273
* to a message that has been sent and not yet acknowledged
273
274
*/
274
- if (less (acked , buf_seqno (crs )) ||
275
+ if (less (acked , buf_seqno (skb )) ||
275
276
less (bcl -> fsm_msg_cnt , acked ) ||
276
277
less_eq (acked , n_ptr -> bclink .acked ))
277
278
goto exit ;
278
279
}
279
280
280
281
/* Skip over packets that node has previously acknowledged */
281
- while (crs && less_eq (buf_seqno (crs ), n_ptr -> bclink .acked ))
282
- crs = crs -> next ;
282
+ skb_queue_walk (& bcl -> outqueue , skb ) {
283
+ if (more (buf_seqno (skb ), n_ptr -> bclink .acked ))
284
+ break ;
285
+ }
283
286
284
287
/* Update packets that node is now acknowledging */
288
+ skb_queue_walk_from_safe (& bcl -> outqueue , skb , tmp ) {
289
+ if (more (buf_seqno (skb ), acked ))
290
+ break ;
285
291
286
- while (crs && less_eq (buf_seqno (crs ), acked )) {
287
- next = crs -> next ;
288
-
289
- if (crs != bcl -> next_out )
290
- bcbuf_decr_acks (crs );
291
- else {
292
- bcbuf_set_acks (crs , 0 );
292
+ next = tipc_skb_queue_next (& bcl -> outqueue , skb );
293
+ if (skb != bcl -> next_out ) {
294
+ bcbuf_decr_acks (skb );
295
+ } else {
296
+ bcbuf_set_acks (skb , 0 );
293
297
bcl -> next_out = next ;
294
298
bclink_set_last_sent ();
295
299
}
296
300
297
- if (bcbuf_acks (crs ) == 0 ) {
298
- bcl -> first_out = next ;
299
- bcl -> out_queue_size -- ;
300
- kfree_skb (crs );
301
+ if (bcbuf_acks (skb ) == 0 ) {
302
+ __skb_unlink (skb , & bcl -> outqueue );
303
+ kfree_skb (skb );
301
304
released = 1 ;
302
305
}
303
- crs = next ;
304
306
}
305
307
n_ptr -> bclink .acked = acked ;
306
308
307
309
/* Try resolving broadcast link congestion, if necessary */
308
-
309
310
if (unlikely (bcl -> next_out )) {
310
- tipc_link_push_queue (bcl );
311
+ tipc_link_push_packets (bcl );
311
312
bclink_set_last_sent ();
312
313
}
313
314
if (unlikely (released && !skb_queue_empty (& bcl -> waiting_sks )))
@@ -327,45 +328,40 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent)
327
328
struct sk_buff * buf ;
328
329
329
330
/* Ignore "stale" link state info */
330
-
331
331
if (less_eq (last_sent , n_ptr -> bclink .last_in ))
332
332
return ;
333
333
334
334
/* Update link synchronization state; quit if in sync */
335
-
336
335
bclink_update_last_sent (n_ptr , last_sent );
337
336
338
337
if (n_ptr -> bclink .last_sent == n_ptr -> bclink .last_in )
339
338
return ;
340
339
341
340
/* Update out-of-sync state; quit if loss is still unconfirmed */
342
-
343
341
if ((++ n_ptr -> bclink .oos_state ) == 1 ) {
344
342
if (n_ptr -> bclink .deferred_size < (TIPC_MIN_LINK_WIN / 2 ))
345
343
return ;
346
344
n_ptr -> bclink .oos_state ++ ;
347
345
}
348
346
349
347
/* Don't NACK if one has been recently sent (or seen) */
350
-
351
348
if (n_ptr -> bclink .oos_state & 0x1 )
352
349
return ;
353
350
354
351
/* Send NACK */
355
-
356
352
buf = tipc_buf_acquire (INT_H_SIZE );
357
353
if (buf ) {
358
354
struct tipc_msg * msg = buf_msg (buf );
355
+ struct sk_buff * skb = skb_peek (& n_ptr -> bclink .deferred_queue );
356
+ u32 to = skb ? buf_seqno (skb ) - 1 : n_ptr -> bclink .last_sent ;
359
357
360
358
tipc_msg_init (msg , BCAST_PROTOCOL , STATE_MSG ,
361
359
INT_H_SIZE , n_ptr -> addr );
362
360
msg_set_non_seq (msg , 1 );
363
361
msg_set_mc_netid (msg , tipc_net_id );
364
362
msg_set_bcast_ack (msg , n_ptr -> bclink .last_in );
365
363
msg_set_bcgap_after (msg , n_ptr -> bclink .last_in );
366
- msg_set_bcgap_to (msg , n_ptr -> bclink .deferred_head
367
- ? buf_seqno (n_ptr -> bclink .deferred_head ) - 1
368
- : n_ptr -> bclink .last_sent );
364
+ msg_set_bcgap_to (msg , to );
369
365
370
366
tipc_bclink_lock ();
371
367
tipc_bearer_send (MAX_BEARERS , buf , NULL );
@@ -402,46 +398,48 @@ static void bclink_peek_nack(struct tipc_msg *msg)
402
398
403
399
/* tipc_bclink_xmit - broadcast buffer chain to all nodes in cluster
404
400
* and to identified node local sockets
405
- * @buf : chain of buffers containing message
401
+ * @list : chain of buffers containing message
406
402
* Consumes the buffer chain, except when returning -ELINKCONG
407
403
* Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
408
404
*/
409
- int tipc_bclink_xmit (struct sk_buff * buf )
405
+ int tipc_bclink_xmit (struct sk_buff_head * list )
410
406
{
411
407
int rc = 0 ;
412
408
int bc = 0 ;
413
- struct sk_buff * clbuf ;
409
+ struct sk_buff * skb ;
414
410
415
411
/* Prepare clone of message for local node */
416
- clbuf = tipc_msg_reassemble (buf );
417
- if (unlikely (!clbuf )) {
418
- kfree_skb_list ( buf );
412
+ skb = tipc_msg_reassemble (list );
413
+ if (unlikely (!skb )) {
414
+ __skb_queue_purge ( list );
419
415
return - EHOSTUNREACH ;
420
416
}
421
417
422
418
/* Broadcast to all other nodes */
423
419
if (likely (bclink )) {
424
420
tipc_bclink_lock ();
425
421
if (likely (bclink -> bcast_nodes .count )) {
426
- rc = __tipc_link_xmit (bcl , buf );
422
+ rc = __tipc_link_xmit (bcl , list );
427
423
if (likely (!rc )) {
424
+ u32 len = skb_queue_len (& bcl -> outqueue );
425
+
428
426
bclink_set_last_sent ();
429
427
bcl -> stats .queue_sz_counts ++ ;
430
- bcl -> stats .accu_queue_sz += bcl -> out_queue_size ;
428
+ bcl -> stats .accu_queue_sz += len ;
431
429
}
432
430
bc = 1 ;
433
431
}
434
432
tipc_bclink_unlock ();
435
433
}
436
434
437
435
if (unlikely (!bc ))
438
- kfree_skb_list ( buf );
436
+ __skb_queue_purge ( list );
439
437
440
438
/* Deliver message clone */
441
439
if (likely (!rc ))
442
- tipc_sk_mcast_rcv (clbuf );
440
+ tipc_sk_mcast_rcv (skb );
443
441
else
444
- kfree_skb (clbuf );
442
+ kfree_skb (skb );
445
443
446
444
return rc ;
447
445
}
@@ -462,7 +460,6 @@ static void bclink_accept_pkt(struct tipc_node *node, u32 seqno)
462
460
* Unicast an ACK periodically, ensuring that
463
461
* all nodes in the cluster don't ACK at the same time
464
462
*/
465
-
466
463
if (((seqno - tipc_own_addr ) % TIPC_MIN_LINK_WIN ) == 0 ) {
467
464
tipc_link_proto_xmit (node -> active_links [node -> addr & 1 ],
468
465
STATE_MSG , 0 , 0 , 0 , 0 , 0 );
@@ -484,7 +481,6 @@ void tipc_bclink_rcv(struct sk_buff *buf)
484
481
int deferred = 0 ;
485
482
486
483
/* Screen out unwanted broadcast messages */
487
-
488
484
if (msg_mc_netid (msg ) != tipc_net_id )
489
485
goto exit ;
490
486
@@ -497,7 +493,6 @@ void tipc_bclink_rcv(struct sk_buff *buf)
497
493
goto unlock ;
498
494
499
495
/* Handle broadcast protocol message */
500
-
501
496
if (unlikely (msg_user (msg ) == BCAST_PROTOCOL )) {
502
497
if (msg_type (msg ) != STATE_MSG )
503
498
goto unlock ;
@@ -518,14 +513,12 @@ void tipc_bclink_rcv(struct sk_buff *buf)
518
513
}
519
514
520
515
/* Handle in-sequence broadcast message */
521
-
522
516
seqno = msg_seqno (msg );
523
517
next_in = mod (node -> bclink .last_in + 1 );
524
518
525
519
if (likely (seqno == next_in )) {
526
520
receive :
527
521
/* Deliver message to destination */
528
-
529
522
if (likely (msg_isdata (msg ))) {
530
523
tipc_bclink_lock ();
531
524
bclink_accept_pkt (node , seqno );
@@ -574,41 +567,33 @@ void tipc_bclink_rcv(struct sk_buff *buf)
574
567
buf = NULL ;
575
568
576
569
/* Determine new synchronization state */
577
-
578
570
tipc_node_lock (node );
579
571
if (unlikely (!tipc_node_is_up (node )))
580
572
goto unlock ;
581
573
582
574
if (node -> bclink .last_in == node -> bclink .last_sent )
583
575
goto unlock ;
584
576
585
- if (! node -> bclink .deferred_head ) {
577
+ if (skb_queue_empty ( & node -> bclink .deferred_queue ) ) {
586
578
node -> bclink .oos_state = 1 ;
587
579
goto unlock ;
588
580
}
589
581
590
- msg = buf_msg (node -> bclink .deferred_head );
582
+ msg = buf_msg (skb_peek ( & node -> bclink .deferred_queue ) );
591
583
seqno = msg_seqno (msg );
592
584
next_in = mod (next_in + 1 );
593
585
if (seqno != next_in )
594
586
goto unlock ;
595
587
596
588
/* Take in-sequence message from deferred queue & deliver it */
597
-
598
- buf = node -> bclink .deferred_head ;
599
- node -> bclink .deferred_head = buf -> next ;
600
- buf -> next = NULL ;
601
- node -> bclink .deferred_size -- ;
589
+ buf = __skb_dequeue (& node -> bclink .deferred_queue );
602
590
goto receive ;
603
591
}
604
592
605
593
/* Handle out-of-sequence broadcast message */
606
-
607
594
if (less (next_in , seqno )) {
608
- deferred = tipc_link_defer_pkt (& node -> bclink .deferred_head ,
609
- & node -> bclink .deferred_tail ,
595
+ deferred = tipc_link_defer_pkt (& node -> bclink .deferred_queue ,
610
596
buf );
611
- node -> bclink .deferred_size += deferred ;
612
597
bclink_update_last_sent (node , seqno );
613
598
buf = NULL ;
614
599
}
@@ -963,6 +948,8 @@ int tipc_bclink_init(void)
963
948
sprintf (bcbearer -> media .name , "tipc-broadcast" );
964
949
965
950
spin_lock_init (& bclink -> lock );
951
+ __skb_queue_head_init (& bcl -> outqueue );
952
+ __skb_queue_head_init (& bcl -> deferred_queue );
966
953
__skb_queue_head_init (& bcl -> waiting_sks );
967
954
bcl -> next_out_no = 1 ;
968
955
spin_lock_init (& bclink -> node .lock );
0 commit comments