@@ -62,6 +62,7 @@ struct tipc_member {
62
62
struct list_head list ;
63
63
struct list_head congested ;
64
64
struct sk_buff * event_msg ;
65
+ struct sk_buff_head deferredq ;
65
66
struct tipc_group * group ;
66
67
u32 node ;
67
68
u32 port ;
@@ -253,6 +254,7 @@ static struct tipc_member *tipc_group_create_member(struct tipc_group *grp,
253
254
return NULL ;
254
255
INIT_LIST_HEAD (& m -> list );
255
256
INIT_LIST_HEAD (& m -> congested );
257
+ __skb_queue_head_init (& m -> deferredq );
256
258
m -> group = grp ;
257
259
m -> node = node ;
258
260
m -> port = port ;
@@ -380,29 +382,54 @@ bool tipc_group_bc_cong(struct tipc_group *grp, int len)
380
382
return tipc_group_cong (grp , m -> node , m -> port , len , & m );
381
383
}
382
384
385
+ /* tipc_group_sort_msg() - sort msg into queue by bcast sequence number
386
+ */
387
+ static void tipc_group_sort_msg (struct sk_buff * skb , struct sk_buff_head * defq )
388
+ {
389
+ struct tipc_msg * _hdr , * hdr = buf_msg (skb );
390
+ u16 bc_seqno = msg_grp_bc_seqno (hdr );
391
+ struct sk_buff * _skb , * tmp ;
392
+ int mtyp = msg_type (hdr );
393
+
394
+ /* Bcast may be bypassed by unicast, - sort it in */
395
+ if (mtyp == TIPC_GRP_BCAST_MSG || mtyp == TIPC_GRP_MCAST_MSG ) {
396
+ skb_queue_walk_safe (defq , _skb , tmp ) {
397
+ _hdr = buf_msg (_skb );
398
+ if (!less (bc_seqno , msg_grp_bc_seqno (_hdr )))
399
+ continue ;
400
+ __skb_queue_before (defq , _skb , skb );
401
+ return ;
402
+ }
403
+ /* Bcast was not bypassed, - add to tail */
404
+ }
405
+ /* Unicasts are never bypassed, - always add to tail */
406
+ __skb_queue_tail (defq , skb );
407
+ }
408
+
383
409
/* tipc_group_filter_msg() - determine if we should accept arriving message
384
410
*/
385
411
void tipc_group_filter_msg (struct tipc_group * grp , struct sk_buff_head * inputq ,
386
412
struct sk_buff_head * xmitq )
387
413
{
388
414
struct sk_buff * skb = __skb_dequeue (inputq );
415
+ struct sk_buff_head * defq ;
389
416
struct tipc_member * m ;
390
417
struct tipc_msg * hdr ;
418
+ bool deliver , update ;
391
419
u32 node , port ;
392
- int mtyp ;
420
+ int mtyp , blks ;
393
421
394
422
if (!skb )
395
423
return ;
396
424
397
425
hdr = buf_msg (skb );
398
- mtyp = msg_type (hdr );
399
426
node = msg_orignode (hdr );
400
427
port = msg_origport (hdr );
401
428
402
429
if (!msg_in_group (hdr ))
403
430
goto drop ;
404
431
405
- if (mtyp == TIPC_GRP_MEMBER_EVT ) {
432
+ if (msg_is_grp_evt ( hdr ) ) {
406
433
if (!grp -> events )
407
434
goto drop ;
408
435
__skb_queue_tail (inputq , skb );
@@ -413,22 +440,52 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
413
440
if (!tipc_group_is_receiver (m ))
414
441
goto drop ;
415
442
416
- m -> bc_rcv_nxt = msg_grp_bc_seqno (hdr ) + 1 ;
443
+ if (less (msg_grp_bc_seqno (hdr ), m -> bc_rcv_nxt ))
444
+ goto drop ;
417
445
418
- /* Drop multicast here if not for this member */
419
- if (mtyp == TIPC_GRP_MCAST_MSG ) {
420
- if (msg_nameinst (hdr ) != grp -> instance ) {
421
- m -> bc_rcv_nxt = msg_grp_bc_seqno (hdr ) + 1 ;
422
- tipc_group_update_rcv_win (grp , msg_blocks (hdr ),
423
- node , port , xmitq );
424
- kfree_skb (skb );
425
- return ;
446
+ TIPC_SKB_CB (skb )-> orig_member = m -> instance ;
447
+ defq = & m -> deferredq ;
448
+ tipc_group_sort_msg (skb , defq );
449
+
450
+ while ((skb = skb_peek (defq ))) {
451
+ hdr = buf_msg (skb );
452
+ mtyp = msg_type (hdr );
453
+ deliver = true;
454
+ update = false;
455
+
456
+ if (more (msg_grp_bc_seqno (hdr ), m -> bc_rcv_nxt ))
457
+ break ;
458
+
459
+ /* Decide what to do with message */
460
+ switch (mtyp ) {
461
+ case TIPC_GRP_MCAST_MSG :
462
+ if (msg_nameinst (hdr ) != grp -> instance ) {
463
+ update = true;
464
+ deliver = false;
465
+ }
466
+ /* Fall thru */
467
+ case TIPC_GRP_BCAST_MSG :
468
+ m -> bc_rcv_nxt ++ ;
469
+ break ;
470
+ case TIPC_GRP_UCAST_MSG :
471
+ break ;
472
+ default :
473
+ break ;
426
474
}
427
- }
428
475
429
- TIPC_SKB_CB (skb )-> orig_member = m -> instance ;
430
- __skb_queue_tail (inputq , skb );
476
+ /* Execute decisions */
477
+ __skb_dequeue (defq );
478
+ if (deliver )
479
+ __skb_queue_tail (inputq , skb );
480
+ else
481
+ kfree_skb (skb );
482
+
483
+ if (!update )
484
+ continue ;
431
485
486
+ blks = msg_blocks (hdr );
487
+ tipc_group_update_rcv_win (grp , blks , node , port , xmitq );
488
+ }
432
489
return ;
433
490
drop :
434
491
kfree_skb (skb );
0 commit comments