Skip to content

Commit c0bceb9

Browse files
Jon Maloydavem330
authored andcommitted
tipc: add smart nagle feature
We introduce a feature that works like a combination of TCP_NAGLE and TCP_CORK, but without some of the weaknesses of those. In particular, we will not observe long delivery delays because of delayed acks, since the algorithm itself decides if and when acks are to be sent from the receiving peer. - The nagle property as such is determined by manipulating a new 'maxnagle' field in struct tipc_sock. If certain conditions are met, 'maxnagle' will define max size of the messages which can be bundled. If it is set to zero no messages are ever bundled, implying that the nagle property is disabled. - A socket with the nagle property enabled enters nagle mode when more than 4 messages have been sent out without receiving any data message from the peer. - A socket leaves nagle mode whenever it receives a data message from the peer. In nagle mode, messages smaller than 'maxnagle' are accumulated in the socket write queue. The last buffer in the queue is marked with a new 'ack_required' bit, which forces the receiving peer to send a CONN_ACK message back to the sender upon reception. The accumulated contents of the write queue is transmitted when one of the following events or conditions occur. - A CONN_ACK message is received from the peer. - A data message is received from the peer. - A SOCK_WAKEUP pseudo message is received from the link level. - The write queue contains more than 64 1k blocks of data. - The connection is being shut down. - There is no CONN_ACK message to expect. I.e., there is currently no outstanding message where the 'ack_required' bit was set. As a consequence, the first message added after we enter nagle mode is always sent directly with this bit set. This new feature gives a 50-100% improvement of throughput for small (i.e., less than MTU size) messages, while it might add up to one RTT to latency time when the socket is in nagle mode. Acked-by: Ying Xue <[email protected]> Signed-off-by: Jon Maloy <[email protected]> Signed-off-by: David S. Miller <[email protected]>
1 parent 6c814e8 commit c0bceb9

File tree

5 files changed

+170
-20
lines changed

5 files changed

+170
-20
lines changed

include/uapi/linux/tipc.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,7 @@ struct sockaddr_tipc {
191191
#define TIPC_GROUP_JOIN 135 /* Takes struct tipc_group_req* */
192192
#define TIPC_GROUP_LEAVE 136 /* No argument */
193193
#define TIPC_SOCK_RECVQ_USED 137 /* Default: none (read only) */
194+
#define TIPC_NODELAY 138 /* Default: false */
194195

195196
/*
196197
* Flag values

net/tipc/msg.c

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,59 @@ int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf)
190190
return 0;
191191
}
192192

193+
/**
194+
* tipc_msg_append(): Append data to tail of an existing buffer queue
195+
* @hdr: header to be used
196+
* @m: the data to be appended
197+
* @mss: max allowable size of buffer
198+
* @dlen: size of data to be appended
199+
* @txq: queue to appand to
200+
* Returns the number og 1k blocks appended or errno value
201+
*/
202+
int tipc_msg_append(struct tipc_msg *_hdr, struct msghdr *m, int dlen,
203+
int mss, struct sk_buff_head *txq)
204+
{
205+
struct sk_buff *skb, *prev;
206+
int accounted, total, curr;
207+
int mlen, cpy, rem = dlen;
208+
struct tipc_msg *hdr;
209+
210+
skb = skb_peek_tail(txq);
211+
accounted = skb ? msg_blocks(buf_msg(skb)) : 0;
212+
total = accounted;
213+
214+
while (rem) {
215+
if (!skb || skb->len >= mss) {
216+
prev = skb;
217+
skb = tipc_buf_acquire(mss, GFP_KERNEL);
218+
if (unlikely(!skb))
219+
return -ENOMEM;
220+
skb_orphan(skb);
221+
skb_trim(skb, MIN_H_SIZE);
222+
hdr = buf_msg(skb);
223+
skb_copy_to_linear_data(skb, _hdr, MIN_H_SIZE);
224+
msg_set_hdr_sz(hdr, MIN_H_SIZE);
225+
msg_set_size(hdr, MIN_H_SIZE);
226+
__skb_queue_tail(txq, skb);
227+
total += 1;
228+
if (prev)
229+
msg_set_ack_required(buf_msg(prev), 0);
230+
msg_set_ack_required(hdr, 1);
231+
}
232+
hdr = buf_msg(skb);
233+
curr = msg_blocks(hdr);
234+
mlen = msg_size(hdr);
235+
cpy = min_t(int, rem, mss - mlen);
236+
if (cpy != copy_from_iter(skb->data + mlen, cpy, &m->msg_iter))
237+
return -EFAULT;
238+
msg_set_size(hdr, mlen + cpy);
239+
skb_put(skb, cpy);
240+
rem -= cpy;
241+
total += msg_blocks(hdr) - curr;
242+
}
243+
return total - accounted;
244+
}
245+
193246
/* tipc_msg_validate - validate basic format of received message
194247
*
195248
* This routine ensures a TIPC message has an acceptable header, and at least

net/tipc/msg.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,16 @@ static inline void msg_set_src_droppable(struct tipc_msg *m, u32 d)
290290
msg_set_bits(m, 0, 18, 1, d);
291291
}
292292

293+
static inline int msg_ack_required(struct tipc_msg *m)
294+
{
295+
return msg_bits(m, 0, 18, 1);
296+
}
297+
298+
static inline void msg_set_ack_required(struct tipc_msg *m, u32 d)
299+
{
300+
msg_set_bits(m, 0, 18, 1, d);
301+
}
302+
293303
static inline bool msg_is_rcast(struct tipc_msg *m)
294304
{
295305
return msg_bits(m, 0, 18, 0x1);
@@ -1079,6 +1089,8 @@ int tipc_msg_fragment(struct sk_buff *skb, const struct tipc_msg *hdr,
10791089
int pktmax, struct sk_buff_head *frags);
10801090
int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,
10811091
int offset, int dsz, int mtu, struct sk_buff_head *list);
1092+
int tipc_msg_append(struct tipc_msg *hdr, struct msghdr *m, int dlen,
1093+
int mss, struct sk_buff_head *txq);
10821094
bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err);
10831095
bool tipc_msg_assemble(struct sk_buff_head *list);
10841096
bool tipc_msg_reassemble(struct sk_buff_head *list, struct sk_buff_head *rcvq);

net/tipc/node.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ enum {
5454
TIPC_LINK_PROTO_SEQNO = (1 << 6),
5555
TIPC_MCAST_RBCTL = (1 << 7),
5656
TIPC_GAP_ACK_BLOCK = (1 << 8),
57-
TIPC_TUNNEL_ENHANCED = (1 << 9)
57+
TIPC_TUNNEL_ENHANCED = (1 << 9),
58+
TIPC_NAGLE = (1 << 10)
5859
};
5960

6061
#define TIPC_NODE_CAPABILITIES (TIPC_SYN_BIT | \
@@ -66,7 +67,9 @@ enum {
6667
TIPC_LINK_PROTO_SEQNO | \
6768
TIPC_MCAST_RBCTL | \
6869
TIPC_GAP_ACK_BLOCK | \
69-
TIPC_TUNNEL_ENHANCED)
70+
TIPC_TUNNEL_ENHANCED | \
71+
TIPC_NAGLE)
72+
7073
#define INVALID_BEARER_ID -1
7174

7275
void tipc_node_stop(struct net *net);

net/tipc/socket.c

Lines changed: 99 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ struct sockaddr_pair {
7575
* @conn_instance: TIPC instance used when connection was established
7676
* @published: non-zero if port has one or more associated names
7777
* @max_pkt: maximum packet size "hint" used when building messages sent by port
78+
* @maxnagle: maximum size of msg which can be subject to nagle
7879
* @portid: unique port identity in TIPC socket hash table
7980
* @phdr: preformatted message header used when sending messages
8081
* #cong_links: list of congested links
@@ -97,6 +98,7 @@ struct tipc_sock {
9798
u32 conn_instance;
9899
int published;
99100
u32 max_pkt;
101+
u32 maxnagle;
100102
u32 portid;
101103
struct tipc_msg phdr;
102104
struct list_head cong_links;
@@ -116,6 +118,10 @@ struct tipc_sock {
116118
struct tipc_mc_method mc_method;
117119
struct rcu_head rcu;
118120
struct tipc_group *group;
121+
u32 oneway;
122+
u16 snd_backlog;
123+
bool expect_ack;
124+
bool nodelay;
119125
bool group_is_open;
120126
};
121127

@@ -137,6 +143,7 @@ static int tipc_sk_insert(struct tipc_sock *tsk);
137143
static void tipc_sk_remove(struct tipc_sock *tsk);
138144
static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz);
139145
static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz);
146+
static void tipc_sk_push_backlog(struct tipc_sock *tsk);
140147

141148
static const struct proto_ops packet_ops;
142149
static const struct proto_ops stream_ops;
@@ -227,6 +234,26 @@ static u16 tsk_inc(struct tipc_sock *tsk, int msglen)
227234
return 1;
228235
}
229236

237+
/* tsk_set_nagle - enable/disable nagle property by manipulating maxnagle
238+
*/
239+
static void tsk_set_nagle(struct tipc_sock *tsk)
240+
{
241+
struct sock *sk = &tsk->sk;
242+
243+
tsk->maxnagle = 0;
244+
if (sk->sk_type != SOCK_STREAM)
245+
return;
246+
if (tsk->nodelay)
247+
return;
248+
if (!(tsk->peer_caps & TIPC_NAGLE))
249+
return;
250+
/* Limit node local buffer size to avoid receive queue overflow */
251+
if (tsk->max_pkt == MAX_MSG_SIZE)
252+
tsk->maxnagle = 1500;
253+
else
254+
tsk->maxnagle = tsk->max_pkt;
255+
}
256+
230257
/**
231258
* tsk_advance_rx_queue - discard first buffer in socket receive queue
232259
*
@@ -446,6 +473,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
446473

447474
tsk = tipc_sk(sk);
448475
tsk->max_pkt = MAX_PKT_DEFAULT;
476+
tsk->maxnagle = 0;
449477
INIT_LIST_HEAD(&tsk->publications);
450478
INIT_LIST_HEAD(&tsk->cong_links);
451479
msg = &tsk->phdr;
@@ -512,8 +540,12 @@ static void __tipc_shutdown(struct socket *sock, int error)
512540
tipc_wait_for_cond(sock, &timeout, (!tsk->cong_link_cnt &&
513541
!tsk_conn_cong(tsk)));
514542

515-
/* Remove any pending SYN message */
516-
__skb_queue_purge(&sk->sk_write_queue);
543+
/* Push out unsent messages or remove if pending SYN */
544+
skb = skb_peek(&sk->sk_write_queue);
545+
if (skb && !msg_is_syn(buf_msg(skb)))
546+
tipc_sk_push_backlog(tsk);
547+
else
548+
__skb_queue_purge(&sk->sk_write_queue);
517549

518550
/* Reject all unreceived messages, except on an active connection
519551
* (which disconnects locally & sends a 'FIN+' to peer).
@@ -1208,6 +1240,27 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
12081240
tipc_sk_rcv(net, inputq);
12091241
}
12101242

1243+
/* tipc_sk_push_backlog(): send accumulated buffers in socket write queue
1244+
* when socket is in Nagle mode
1245+
*/
1246+
static void tipc_sk_push_backlog(struct tipc_sock *tsk)
1247+
{
1248+
struct sk_buff_head *txq = &tsk->sk.sk_write_queue;
1249+
struct net *net = sock_net(&tsk->sk);
1250+
u32 dnode = tsk_peer_node(tsk);
1251+
int rc;
1252+
1253+
if (skb_queue_empty(txq) || tsk->cong_link_cnt)
1254+
return;
1255+
1256+
tsk->snt_unacked += tsk->snd_backlog;
1257+
tsk->snd_backlog = 0;
1258+
tsk->expect_ack = true;
1259+
rc = tipc_node_xmit(net, txq, dnode, tsk->portid);
1260+
if (rc == -ELINKCONG)
1261+
tsk->cong_link_cnt = 1;
1262+
}
1263+
12111264
/**
12121265
* tipc_sk_conn_proto_rcv - receive a connection mng protocol message
12131266
* @tsk: receiving socket
@@ -1221,7 +1274,7 @@ static void tipc_sk_conn_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb,
12211274
u32 onode = tsk_own_node(tsk);
12221275
struct sock *sk = &tsk->sk;
12231276
int mtyp = msg_type(hdr);
1224-
bool conn_cong;
1277+
bool was_cong;
12251278

12261279
/* Ignore if connection cannot be validated: */
12271280
if (!tsk_peer_msg(tsk, hdr)) {
@@ -1254,11 +1307,13 @@ static void tipc_sk_conn_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb,
12541307
__skb_queue_tail(xmitq, skb);
12551308
return;
12561309
} else if (mtyp == CONN_ACK) {
1257-
conn_cong = tsk_conn_cong(tsk);
1310+
was_cong = tsk_conn_cong(tsk);
1311+
tsk->expect_ack = false;
1312+
tipc_sk_push_backlog(tsk);
12581313
tsk->snt_unacked -= msg_conn_ack(hdr);
12591314
if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
12601315
tsk->snd_win = msg_adv_win(hdr);
1261-
if (conn_cong)
1316+
if (was_cong && !tsk_conn_cong(tsk))
12621317
sk->sk_write_space(sk);
12631318
} else if (mtyp != CONN_PROBE_REPLY) {
12641319
pr_warn("Received unknown CONN_PROTO msg\n");
@@ -1437,15 +1492,15 @@ static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen)
14371492
struct sock *sk = sock->sk;
14381493
DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
14391494
long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
1495+
struct sk_buff_head *txq = &sk->sk_write_queue;
14401496
struct tipc_sock *tsk = tipc_sk(sk);
14411497
struct tipc_msg *hdr = &tsk->phdr;
14421498
struct net *net = sock_net(sk);
1443-
struct sk_buff_head pkts;
14441499
u32 dnode = tsk_peer_node(tsk);
1500+
int maxnagle = tsk->maxnagle;
1501+
int maxpkt = tsk->max_pkt;
14451502
int send, sent = 0;
1446-
int rc = 0;
1447-
1448-
__skb_queue_head_init(&pkts);
1503+
int blocks, rc = 0;
14491504

14501505
if (unlikely(dlen > INT_MAX))
14511506
return -EMSGSIZE;
@@ -1467,21 +1522,35 @@ static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen)
14671522
tipc_sk_connected(sk)));
14681523
if (unlikely(rc))
14691524
break;
1470-
14711525
send = min_t(size_t, dlen - sent, TIPC_MAX_USER_MSG_SIZE);
1472-
rc = tipc_msg_build(hdr, m, sent, send, tsk->max_pkt, &pkts);
1473-
if (unlikely(rc != send))
1474-
break;
1475-
1476-
trace_tipc_sk_sendstream(sk, skb_peek(&pkts),
1526+
blocks = tsk->snd_backlog;
1527+
if (tsk->oneway++ >= 4 && send <= maxnagle) {
1528+
rc = tipc_msg_append(hdr, m, send, maxnagle, txq);
1529+
if (unlikely(rc < 0))
1530+
break;
1531+
blocks += rc;
1532+
if (blocks <= 64 && tsk->expect_ack) {
1533+
tsk->snd_backlog = blocks;
1534+
sent += send;
1535+
break;
1536+
}
1537+
tsk->expect_ack = true;
1538+
} else {
1539+
rc = tipc_msg_build(hdr, m, sent, send, maxpkt, txq);
1540+
if (unlikely(rc != send))
1541+
break;
1542+
blocks += tsk_inc(tsk, send + MIN_H_SIZE);
1543+
}
1544+
trace_tipc_sk_sendstream(sk, skb_peek(txq),
14771545
TIPC_DUMP_SK_SNDQ, " ");
1478-
rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
1546+
rc = tipc_node_xmit(net, txq, dnode, tsk->portid);
14791547
if (unlikely(rc == -ELINKCONG)) {
14801548
tsk->cong_link_cnt = 1;
14811549
rc = 0;
14821550
}
14831551
if (likely(!rc)) {
1484-
tsk->snt_unacked += tsk_inc(tsk, send + MIN_H_SIZE);
1552+
tsk->snt_unacked += blocks;
1553+
tsk->snd_backlog = 0;
14851554
sent += send;
14861555
}
14871556
} while (sent < dlen && !rc);
@@ -1528,6 +1597,7 @@ static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port,
15281597
tipc_node_add_conn(net, peer_node, tsk->portid, peer_port);
15291598
tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid, true);
15301599
tsk->peer_caps = tipc_node_get_capabilities(net, peer_node);
1600+
tsk_set_nagle(tsk);
15311601
__skb_queue_purge(&sk->sk_write_queue);
15321602
if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
15331603
return;
@@ -1848,6 +1918,7 @@ static int tipc_recvstream(struct socket *sock, struct msghdr *m,
18481918
bool peek = flags & MSG_PEEK;
18491919
int offset, required, copy, copied = 0;
18501920
int hlen, dlen, err, rc;
1921+
bool ack = false;
18511922
long timeout;
18521923

18531924
/* Catch invalid receive attempts */
@@ -1892,6 +1963,7 @@ static int tipc_recvstream(struct socket *sock, struct msghdr *m,
18921963

18931964
/* Copy data if msg ok, otherwise return error/partial data */
18941965
if (likely(!err)) {
1966+
ack = msg_ack_required(hdr);
18951967
offset = skb_cb->bytes_read;
18961968
copy = min_t(int, dlen - offset, buflen - copied);
18971969
rc = skb_copy_datagram_msg(skb, hlen + offset, m, copy);
@@ -1919,7 +1991,7 @@ static int tipc_recvstream(struct socket *sock, struct msghdr *m,
19191991

19201992
/* Send connection flow control advertisement when applicable */
19211993
tsk->rcv_unacked += tsk_inc(tsk, hlen + dlen);
1922-
if (unlikely(tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE))
1994+
if (ack || tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE)
19231995
tipc_sk_send_ack(tsk);
19241996

19251997
/* Exit if all requested data or FIN/error received */
@@ -1990,6 +2062,7 @@ static void tipc_sk_proto_rcv(struct sock *sk,
19902062
smp_wmb();
19912063
tsk->cong_link_cnt--;
19922064
wakeup = true;
2065+
tipc_sk_push_backlog(tsk);
19932066
break;
19942067
case GROUP_PROTOCOL:
19952068
tipc_group_proto_rcv(grp, &wakeup, hdr, inputq, xmitq);
@@ -2029,6 +2102,7 @@ static bool tipc_sk_filter_connect(struct tipc_sock *tsk, struct sk_buff *skb)
20292102

20302103
if (unlikely(msg_mcast(hdr)))
20312104
return false;
2105+
tsk->oneway = 0;
20322106

20332107
switch (sk->sk_state) {
20342108
case TIPC_CONNECTING:
@@ -2074,6 +2148,8 @@ static bool tipc_sk_filter_connect(struct tipc_sock *tsk, struct sk_buff *skb)
20742148
return true;
20752149
return false;
20762150
case TIPC_ESTABLISHED:
2151+
if (!skb_queue_empty(&sk->sk_write_queue))
2152+
tipc_sk_push_backlog(tsk);
20772153
/* Accept only connection-based messages sent by peer */
20782154
if (likely(con_msg && !err && pport == oport && pnode == onode))
20792155
return true;
@@ -2959,6 +3035,7 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
29593035
case TIPC_SRC_DROPPABLE:
29603036
case TIPC_DEST_DROPPABLE:
29613037
case TIPC_CONN_TIMEOUT:
3038+
case TIPC_NODELAY:
29623039
if (ol < sizeof(value))
29633040
return -EINVAL;
29643041
if (get_user(value, (u32 __user *)ov))
@@ -3007,6 +3084,10 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
30073084
case TIPC_GROUP_LEAVE:
30083085
res = tipc_sk_leave(tsk);
30093086
break;
3087+
case TIPC_NODELAY:
3088+
tsk->nodelay = !!value;
3089+
tsk_set_nagle(tsk);
3090+
break;
30103091
default:
30113092
res = -EINVAL;
30123093
}

0 commit comments

Comments
 (0)