Skip to content

Commit c55c8ed

Browse files
Hoang Ledavem330
authored andcommitted
tipc: smooth change between replicast and broadcast
Currently, a multicast stream may start out using replicast, because there are few destinations, and then it should ideally switch to L2/broadcast IGMP/multicast when the number of destinations grows beyond a certain limit. The opposite should happen when the number decreases below the limit. To eliminate the risk of message reordering caused by method change, a sending socket must stick to a previously selected method until it enters an idle period of 5 seconds. Means there is a 5 seconds pause in the traffic from the sender socket. If the sender never makes such a pause, the method will never change, and transmission may become very inefficient as the cluster grows. With this commit, we allow such a switch between replicast and broadcast without any need for a traffic pause. Solution is to send a dummy message with only the header, also with the SYN bit set, via broadcast or replicast. For the data message, the SYN bit is set and sending via replicast or broadcast (inverse method with dummy). Then, at receiving side any messages follow first SYN bit message (data or dummy message), they will be held in deferred queue until another pair (dummy or data message) arrived in other link. v2: reverse christmas tree declaration Acked-by: Jon Maloy <[email protected]> Signed-off-by: Hoang Le <[email protected]> Signed-off-by: David S. Miller <[email protected]>
1 parent ff2ebbf commit c55c8ed

File tree

4 files changed

+184
-1
lines changed

4 files changed

+184
-1
lines changed

net/tipc/bcast.c

Lines changed: 164 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,9 +220,24 @@ static void tipc_bcast_select_xmit_method(struct net *net, int dests,
220220
}
221221
/* Can current method be changed ? */
222222
method->expires = jiffies + TIPC_METHOD_EXPIRE;
223-
if (method->mandatory || time_before(jiffies, exp))
223+
if (method->mandatory)
224224
return;
225225

226+
if (!(tipc_net(net)->capabilities & TIPC_MCAST_RBCTL) &&
227+
time_before(jiffies, exp))
228+
return;
229+
230+
/* Configuration as force 'broadcast' method */
231+
if (bb->force_bcast) {
232+
method->rcast = false;
233+
return;
234+
}
235+
/* Configuration as force 'replicast' method */
236+
if (bb->force_rcast) {
237+
method->rcast = true;
238+
return;
239+
}
240+
/* Configuration as 'autoselect' or default method */
226241
/* Determine method to use now */
227242
method->rcast = dests <= bb->bc_threshold;
228243
}
@@ -285,6 +300,63 @@ static int tipc_rcast_xmit(struct net *net, struct sk_buff_head *pkts,
285300
return 0;
286301
}
287302

303+
/* tipc_mcast_send_sync - deliver a dummy message with SYN bit
304+
* @net: the applicable net namespace
305+
* @skb: socket buffer to copy
306+
* @method: send method to be used
307+
* @dests: destination nodes for message.
308+
* @cong_link_cnt: returns number of encountered congested destination links
309+
* Returns 0 if success, otherwise errno
310+
*/
311+
static int tipc_mcast_send_sync(struct net *net, struct sk_buff *skb,
312+
struct tipc_mc_method *method,
313+
struct tipc_nlist *dests,
314+
u16 *cong_link_cnt)
315+
{
316+
struct tipc_msg *hdr, *_hdr;
317+
struct sk_buff_head tmpq;
318+
struct sk_buff *_skb;
319+
320+
/* Is a cluster supporting with new capabilities ? */
321+
if (!(tipc_net(net)->capabilities & TIPC_MCAST_RBCTL))
322+
return 0;
323+
324+
hdr = buf_msg(skb);
325+
if (msg_user(hdr) == MSG_FRAGMENTER)
326+
hdr = msg_get_wrapped(hdr);
327+
if (msg_type(hdr) != TIPC_MCAST_MSG)
328+
return 0;
329+
330+
/* Allocate dummy message */
331+
_skb = tipc_buf_acquire(MCAST_H_SIZE, GFP_KERNEL);
332+
if (!skb)
333+
return -ENOMEM;
334+
335+
/* Preparing for 'synching' header */
336+
msg_set_syn(hdr, 1);
337+
338+
/* Copy skb's header into a dummy header */
339+
skb_copy_to_linear_data(_skb, hdr, MCAST_H_SIZE);
340+
skb_orphan(_skb);
341+
342+
/* Reverse method for dummy message */
343+
_hdr = buf_msg(_skb);
344+
msg_set_size(_hdr, MCAST_H_SIZE);
345+
msg_set_is_rcast(_hdr, !msg_is_rcast(hdr));
346+
347+
skb_queue_head_init(&tmpq);
348+
__skb_queue_tail(&tmpq, _skb);
349+
if (method->rcast)
350+
tipc_bcast_xmit(net, &tmpq, cong_link_cnt);
351+
else
352+
tipc_rcast_xmit(net, &tmpq, dests, cong_link_cnt);
353+
354+
/* This queue should normally be empty by now */
355+
__skb_queue_purge(&tmpq);
356+
357+
return 0;
358+
}
359+
288360
/* tipc_mcast_xmit - deliver message to indicated destination nodes
289361
* and to identified node local sockets
290362
* @net: the applicable net namespace
@@ -300,6 +372,9 @@ int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts,
300372
u16 *cong_link_cnt)
301373
{
302374
struct sk_buff_head inputq, localq;
375+
bool rcast = method->rcast;
376+
struct tipc_msg *hdr;
377+
struct sk_buff *skb;
303378
int rc = 0;
304379

305380
skb_queue_head_init(&inputq);
@@ -313,6 +388,18 @@ int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts,
313388
/* Send according to determined transmit method */
314389
if (dests->remote) {
315390
tipc_bcast_select_xmit_method(net, dests->remote, method);
391+
392+
skb = skb_peek(pkts);
393+
hdr = buf_msg(skb);
394+
if (msg_user(hdr) == MSG_FRAGMENTER)
395+
hdr = msg_get_wrapped(hdr);
396+
msg_set_is_rcast(hdr, method->rcast);
397+
398+
/* Switch method ? */
399+
if (rcast != method->rcast)
400+
tipc_mcast_send_sync(net, skb, method,
401+
dests, cong_link_cnt);
402+
316403
if (method->rcast)
317404
rc = tipc_rcast_xmit(net, pkts, dests, cong_link_cnt);
318405
else
@@ -672,3 +759,79 @@ u32 tipc_bcast_get_broadcast_ratio(struct net *net)
672759

673760
return bb->rc_ratio;
674761
}
762+
763+
void tipc_mcast_filter_msg(struct sk_buff_head *defq,
764+
struct sk_buff_head *inputq)
765+
{
766+
struct sk_buff *skb, *_skb, *tmp;
767+
struct tipc_msg *hdr, *_hdr;
768+
bool match = false;
769+
u32 node, port;
770+
771+
skb = skb_peek(inputq);
772+
hdr = buf_msg(skb);
773+
774+
if (likely(!msg_is_syn(hdr) && skb_queue_empty(defq)))
775+
return;
776+
777+
node = msg_orignode(hdr);
778+
port = msg_origport(hdr);
779+
780+
/* Has the twin SYN message already arrived ? */
781+
skb_queue_walk(defq, _skb) {
782+
_hdr = buf_msg(_skb);
783+
if (msg_orignode(_hdr) != node)
784+
continue;
785+
if (msg_origport(_hdr) != port)
786+
continue;
787+
match = true;
788+
break;
789+
}
790+
791+
if (!match) {
792+
if (!msg_is_syn(hdr))
793+
return;
794+
__skb_dequeue(inputq);
795+
__skb_queue_tail(defq, skb);
796+
return;
797+
}
798+
799+
/* Deliver non-SYN message from other link, otherwise queue it */
800+
if (!msg_is_syn(hdr)) {
801+
if (msg_is_rcast(hdr) != msg_is_rcast(_hdr))
802+
return;
803+
__skb_dequeue(inputq);
804+
__skb_queue_tail(defq, skb);
805+
return;
806+
}
807+
808+
/* Queue non-SYN/SYN message from same link */
809+
if (msg_is_rcast(hdr) == msg_is_rcast(_hdr)) {
810+
__skb_dequeue(inputq);
811+
__skb_queue_tail(defq, skb);
812+
return;
813+
}
814+
815+
/* Matching SYN messages => return the one with data, if any */
816+
__skb_unlink(_skb, defq);
817+
if (msg_data_sz(hdr)) {
818+
kfree_skb(_skb);
819+
} else {
820+
__skb_dequeue(inputq);
821+
kfree_skb(skb);
822+
__skb_queue_tail(inputq, _skb);
823+
}
824+
825+
/* Deliver subsequent non-SYN messages from same peer */
826+
skb_queue_walk_safe(defq, _skb, tmp) {
827+
_hdr = buf_msg(_skb);
828+
if (msg_orignode(_hdr) != node)
829+
continue;
830+
if (msg_origport(_hdr) != port)
831+
continue;
832+
if (msg_is_syn(_hdr))
833+
break;
834+
__skb_unlink(_skb, defq);
835+
__skb_queue_tail(inputq, _skb);
836+
}
837+
}

net/tipc/bcast.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,13 @@ void tipc_nlist_del(struct tipc_nlist *nl, u32 node);
6767
/* Cookie to be used between socket and broadcast layer
6868
* @rcast: replicast (instead of broadcast) was used at previous xmit
6969
* @mandatory: broadcast/replicast indication was set by user
70+
* @deferredq: defer queue to make message in order
7071
* @expires: re-evaluate non-mandatory transmit method if we are past this
7172
*/
7273
struct tipc_mc_method {
7374
bool rcast;
7475
bool mandatory;
76+
struct sk_buff_head deferredq;
7577
unsigned long expires;
7678
};
7779

@@ -99,6 +101,9 @@ int tipc_bclink_reset_stats(struct net *net);
99101
u32 tipc_bcast_get_broadcast_mode(struct net *net);
100102
u32 tipc_bcast_get_broadcast_ratio(struct net *net);
101103

104+
void tipc_mcast_filter_msg(struct sk_buff_head *defq,
105+
struct sk_buff_head *inputq);
106+
102107
static inline void tipc_bcast_lock(struct net *net)
103108
{
104109
spin_lock_bh(&tipc_net(net)->bclock);

net/tipc/msg.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,16 @@ static inline void msg_set_src_droppable(struct tipc_msg *m, u32 d)
257257
msg_set_bits(m, 0, 18, 1, d);
258258
}
259259

260+
static inline bool msg_is_rcast(struct tipc_msg *m)
261+
{
262+
return msg_bits(m, 0, 18, 0x1);
263+
}
264+
265+
static inline void msg_set_is_rcast(struct tipc_msg *m, bool d)
266+
{
267+
msg_set_bits(m, 0, 18, 0x1, d);
268+
}
269+
260270
static inline void msg_set_size(struct tipc_msg *m, u32 sz)
261271
{
262272
m->hdr[0] = htonl((msg_word(m, 0) & ~0x1ffff) | sz);

net/tipc/socket.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -485,6 +485,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
485485
tsk_set_unreturnable(tsk, true);
486486
if (sock->type == SOCK_DGRAM)
487487
tsk_set_unreliable(tsk, true);
488+
__skb_queue_head_init(&tsk->mc_method.deferredq);
488489
}
489490

490491
trace_tipc_sk_create(sk, NULL, TIPC_DUMP_NONE, " ");
@@ -582,6 +583,7 @@ static int tipc_release(struct socket *sock)
582583
sk->sk_shutdown = SHUTDOWN_MASK;
583584
tipc_sk_leave(tsk);
584585
tipc_sk_withdraw(tsk, 0, NULL);
586+
__skb_queue_purge(&tsk->mc_method.deferredq);
585587
sk_stop_timer(sk, &sk->sk_timer);
586588
tipc_sk_remove(tsk);
587589

@@ -2162,6 +2164,9 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb,
21622164
if (unlikely(grp))
21632165
tipc_group_filter_msg(grp, &inputq, xmitq);
21642166

2167+
if (msg_type(hdr) == TIPC_MCAST_MSG)
2168+
tipc_mcast_filter_msg(&tsk->mc_method.deferredq, &inputq);
2169+
21652170
/* Validate and add to receive buffer if there is space */
21662171
while ((skb = __skb_dequeue(&inputq))) {
21672172
hdr = buf_msg(skb);

0 commit comments

Comments
 (0)