Skip to content

Commit 7ad32bc

Browse files
Jon Maloydavem330
authored andcommitted
tipc: create group member event messages when they are needed
In the current implementation, a group socket receiving topology events about other members just converts the topology event message into a group event message and stores it until it reaches the right state to issue it to the user. This complicates the code unnecessarily, and becomes impractical when we in the coming commits will need to create and issue membership events independently. In this commit, we change this so that we just notice the type and origin of the incoming topology event, and then drop the buffer. Only when it is time to actually send a group event to the user do we explicitly create a new message and send it upwards. Acked-by: Ying Xue <[email protected]> Signed-off-by: Jon Maloy <[email protected]> Signed-off-by: David S. Miller <[email protected]>
1 parent 0233493 commit 7ad32bc

File tree

3 files changed

+56
-44
lines changed

3 files changed

+56
-44
lines changed

net/tipc/group.c

Lines changed: 54 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ struct tipc_member {
6464
struct rb_node tree_node;
6565
struct list_head list;
6666
struct list_head small_win;
67-
struct sk_buff *event_msg;
6867
struct sk_buff_head deferredq;
6968
struct tipc_group *group;
7069
u32 node;
@@ -632,6 +631,40 @@ void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,
632631
}
633632
}
634633

634+
static void tipc_group_create_event(struct tipc_group *grp,
635+
struct tipc_member *m,
636+
u32 event, u16 seqno,
637+
struct sk_buff_head *inputq)
638+
{ u32 dnode = tipc_own_addr(grp->net);
639+
struct tipc_event evt;
640+
struct sk_buff *skb;
641+
struct tipc_msg *hdr;
642+
643+
evt.event = event;
644+
evt.found_lower = m->instance;
645+
evt.found_upper = m->instance;
646+
evt.port.ref = m->port;
647+
evt.port.node = m->node;
648+
evt.s.seq.type = grp->type;
649+
evt.s.seq.lower = m->instance;
650+
evt.s.seq.upper = m->instance;
651+
652+
skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_GRP_MEMBER_EVT,
653+
GROUP_H_SIZE, sizeof(evt), dnode, m->node,
654+
grp->portid, m->port, 0);
655+
if (!skb)
656+
return;
657+
658+
hdr = buf_msg(skb);
659+
msg_set_nametype(hdr, grp->type);
660+
msg_set_grp_evt(hdr, event);
661+
msg_set_dest_droppable(hdr, true);
662+
msg_set_grp_bc_seqno(hdr, seqno);
663+
memcpy(msg_data(hdr), &evt, sizeof(evt));
664+
TIPC_SKB_CB(skb)->orig_member = m->instance;
665+
__skb_queue_tail(inputq, skb);
666+
}
667+
635668
static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
636669
int mtyp, struct sk_buff_head *xmitq)
637670
{
@@ -677,7 +710,6 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
677710
u32 node = msg_orignode(hdr);
678711
u32 port = msg_origport(hdr);
679712
struct tipc_member *m, *pm;
680-
struct tipc_msg *ehdr;
681713
u16 remitted, in_flight;
682714

683715
if (!grp)
@@ -704,9 +736,8 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
704736
*usr_wakeup = true;
705737
m->usr_pending = false;
706738
tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
707-
ehdr = buf_msg(m->event_msg);
708-
msg_set_grp_bc_seqno(ehdr, m->bc_syncpt);
709-
__skb_queue_tail(inputq, m->event_msg);
739+
tipc_group_create_event(grp, m, TIPC_PUBLISHED,
740+
m->bc_syncpt, inputq);
710741
}
711742
list_del_init(&m->small_win);
712743
tipc_group_update_member(m, 0);
@@ -725,10 +756,9 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
725756
m->state = MBR_LEAVING;
726757
return;
727758
}
728-
/* Otherwise deliver already received WITHDRAW event */
729-
ehdr = buf_msg(m->event_msg);
730-
msg_set_grp_bc_seqno(ehdr, m->bc_syncpt);
731-
__skb_queue_tail(inputq, m->event_msg);
759+
/* Otherwise deliver member WITHDRAW event */
760+
tipc_group_create_event(grp, m, TIPC_WITHDRAWN,
761+
m->bc_syncpt, inputq);
732762
return;
733763
case GRP_ADV_MSG:
734764
if (!m)
@@ -797,11 +827,10 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
797827
void tipc_group_member_evt(struct tipc_group *grp,
798828
bool *usr_wakeup,
799829
int *sk_rcvbuf,
800-
struct sk_buff *skb,
830+
struct tipc_msg *hdr,
801831
struct sk_buff_head *inputq,
802832
struct sk_buff_head *xmitq)
803833
{
804-
struct tipc_msg *hdr = buf_msg(skb);
805834
struct tipc_event *evt = (void *)msg_data(hdr);
806835
u32 instance = evt->found_lower;
807836
u32 node = evt->port.node;
@@ -813,21 +842,12 @@ void tipc_group_member_evt(struct tipc_group *grp,
813842
u32 self;
814843

815844
if (!grp)
816-
goto drop;
845+
return;
817846

818847
net = grp->net;
819848
self = tipc_own_addr(net);
820849
if (!grp->loopback && node == self && port == grp->portid)
821-
goto drop;
822-
823-
/* Convert message before delivery to user */
824-
msg_set_hdr_sz(hdr, GROUP_H_SIZE);
825-
msg_set_user(hdr, TIPC_CRITICAL_IMPORTANCE);
826-
msg_set_type(hdr, TIPC_GRP_MEMBER_EVT);
827-
msg_set_origport(hdr, port);
828-
msg_set_orignode(hdr, node);
829-
msg_set_nametype(hdr, grp->type);
830-
msg_set_grp_evt(hdr, event);
850+
return;
831851

832852
m = tipc_group_find_member(grp, node, port);
833853

@@ -836,59 +856,52 @@ void tipc_group_member_evt(struct tipc_group *grp,
836856
m = tipc_group_create_member(grp, node, port,
837857
MBR_DISCOVERED);
838858
if (!m)
839-
goto drop;
859+
return;
860+
861+
m->instance = instance;
840862

841863
/* Hold back event if JOIN message not yet received */
842864
if (m->state == MBR_DISCOVERED) {
843-
m->event_msg = skb;
844865
m->state = MBR_PUBLISHED;
845866
} else {
846-
msg_set_grp_bc_seqno(hdr, m->bc_syncpt);
847-
__skb_queue_tail(inputq, skb);
867+
tipc_group_create_event(grp, m, TIPC_PUBLISHED,
868+
m->bc_syncpt, inputq);
848869
m->state = MBR_JOINED;
849870
*usr_wakeup = true;
850871
m->usr_pending = false;
851872
}
852-
m->instance = instance;
853-
TIPC_SKB_CB(skb)->orig_member = m->instance;
854873
tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq);
855874
tipc_group_update_member(m, 0);
856875
} else if (event == TIPC_WITHDRAWN) {
857876
if (!m)
858-
goto drop;
859-
860-
TIPC_SKB_CB(skb)->orig_member = m->instance;
877+
return;
861878

862879
*usr_wakeup = true;
863880
m->usr_pending = false;
864881
node_up = tipc_node_is_up(net, node);
865-
m->event_msg = NULL;
866882

867883
if (node_up) {
868884
/* Hold back event if a LEAVE msg should be expected */
869885
if (m->state != MBR_LEAVING) {
870-
m->event_msg = skb;
871886
tipc_group_decr_active(grp, m);
872887
m->state = MBR_LEAVING;
873888
} else {
874-
msg_set_grp_bc_seqno(hdr, m->bc_syncpt);
875-
__skb_queue_tail(inputq, skb);
889+
tipc_group_create_event(grp, m, TIPC_WITHDRAWN,
890+
m->bc_syncpt, inputq);
876891
}
877892
} else {
878893
if (m->state != MBR_LEAVING) {
879894
tipc_group_decr_active(grp, m);
880895
m->state = MBR_LEAVING;
881-
msg_set_grp_bc_seqno(hdr, m->bc_rcv_nxt);
896+
tipc_group_create_event(grp, m, TIPC_WITHDRAWN,
897+
m->bc_rcv_nxt, inputq);
882898
} else {
883-
msg_set_grp_bc_seqno(hdr, m->bc_syncpt);
899+
tipc_group_create_event(grp, m, TIPC_WITHDRAWN,
900+
m->bc_syncpt, inputq);
884901
}
885-
__skb_queue_tail(inputq, skb);
886902
}
887903
list_del_init(&m->list);
888904
list_del_init(&m->small_win);
889905
}
890906
*sk_rcvbuf = tipc_group_rcvbuf_limit(grp);
891-
return;
892-
drop:
893-
kfree_skb(skb);
894907
}

net/tipc/group.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ void tipc_group_filter_msg(struct tipc_group *grp,
5454
struct sk_buff_head *inputq,
5555
struct sk_buff_head *xmitq);
5656
void tipc_group_member_evt(struct tipc_group *grp, bool *wakeup,
57-
int *sk_rcvbuf, struct sk_buff *skb,
57+
int *sk_rcvbuf, struct tipc_msg *hdr,
5858
struct sk_buff_head *inputq,
5959
struct sk_buff_head *xmitq);
6060
void tipc_group_proto_rcv(struct tipc_group *grp, bool *wakeup,

net/tipc/socket.c

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1933,8 +1933,7 @@ static void tipc_sk_proto_rcv(struct sock *sk,
19331933
break;
19341934
case TOP_SRV:
19351935
tipc_group_member_evt(tsk->group, &wakeup, &sk->sk_rcvbuf,
1936-
skb, inputq, xmitq);
1937-
skb = NULL;
1936+
hdr, inputq, xmitq);
19381937
break;
19391938
default:
19401939
break;

0 commit comments

Comments
 (0)