Skip to content

Commit ad31046

Browse files
kcp-gitjfvogel
authored andcommitted
rds: Add per peer RDS socket send buffer
Currently, an RDS socket's send buffer is shared by all the peers this socket communicates with. When one or more peers are slow to receive, their unacknowledged data can consume a large portion of the socket's buffer. This will affect the communication with other peers of the socket. To resolve this issue, an RDS socket's send buffer is now per peer. This works like opening multiple TCP sockets to different peers, each peer has its own send buffer (in each socket). In RDS, each peer has its own buffer space in the socket. With this per peer send buffer, when one or more peers are slow, their data will not interfere with data sent to other peers. A sysctl parameter, sock_max_peers, is added to limit the number of peers a socket can communicate with. The default is 8192. Its range of valid value is 128 to 65536. Orabug: 29492596 Tested-by: RDS CI <[email protected]> Tested-by: Rose Wang <[email protected]> Tested-by: Rosa Lopez <[email protected]> Tested-by: Shih-Yu Huang <[email protected]> Signed-off-by: Ka-Cheong Poon <[email protected]> Reviewed-by: Venkat Venkatsubra <[email protected]> Acked-by: Santosh Shilimkar <[email protected]>
1 parent 000d784 commit ad31046

File tree

5 files changed

+255
-45
lines changed

5 files changed

+255
-45
lines changed

net/rds/af_rds.c

Lines changed: 122 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2006, 2018 Oracle and/or its affiliates. All rights reserved.
2+
* Copyright (c) 2006, 2019 Oracle and/or its affiliates. All rights reserved.
33
*
44
* This software is available to you under a choice of one of two
55
* licenses. You may choose to be licensed under the terms of the GNU
@@ -80,6 +80,17 @@ static unsigned long rds_sock_count;
8080
static LIST_HEAD(rds_sock_list);
8181
DECLARE_WAIT_QUEUE_HEAD(rds_poll_waitq);
8282

83+
/* kmem cache slab for struct rds_buf_info */
84+
static struct kmem_cache *rds_rs_buf_info_slab;
85+
86+
/* Helper function to be passed to rhashtable_free_and_destroy() to free a
87+
* struct rs_buf_info.
88+
*/
89+
static void rds_buf_info_free(void *rsbi, void *arg __attribute__((unused)))
90+
{
91+
kmem_cache_free(rds_rs_buf_info_slab, rsbi);
92+
}
93+
8394
/*
8495
* This is called as the final descriptor referencing this socket is closed.
8596
* We have to unbind the socket so that another socket can be bound to the
@@ -112,6 +123,9 @@ static int rds_release(struct socket *sock)
112123
rds_rdma_drop_keys(rs);
113124
rds_notify_queue_get(rs, NULL);
114125

126+
rhashtable_free_and_destroy(&rs->rs_buf_info_tbl, rds_buf_info_free,
127+
NULL);
128+
115129
spin_lock_bh(&rds_sock_lock);
116130
list_del_init(&rs->rs_item);
117131
rds_sock_count--;
@@ -272,10 +286,18 @@ static unsigned int rds_poll(struct file *file, struct socket *sock,
272286
if (!list_empty(&rs->rs_recv_queue)
273287
|| !list_empty(&rs->rs_notify_queue))
274288
mask |= (POLLIN | POLLRDNORM);
275-
if (rs->rs_snd_bytes < rds_sk_sndbuf(rs))
276-
mask |= (POLLOUT | POLLWRNORM);
277289
read_unlock_irqrestore(&rs->rs_recv_lock, flags);
278290

291+
/* Use the number of destination this socket has to estimate the
292+
* send buffer size. When there is no peer yet, return the default
293+
* send buffer size.
294+
*/
295+
spin_lock_irqsave(&rs->rs_snd_lock, flags);
296+
if (rs->rs_snd_bytes < max_t(u32, rs->rs_buf_info_dest_cnt, 1) *
297+
rds_sk_sndbuf(rs))
298+
mask |= (POLLOUT | POLLWRNORM);
299+
spin_unlock_irqrestore(&rs->rs_snd_lock, flags);
300+
279301
/* clear state any time we wake a seen-congested socket */
280302
if (mask)
281303
rs->rs_seen_congestion = 0;
@@ -712,6 +734,77 @@ static int rds_getsockopt(struct socket *sock, int level, int optname,
712734

713735
}
714736

737+
/* Check if there is a rs_buf_info associated with the given address. If not,
738+
* add one to the rds_sock. The found or added rs_buf_info is returned. If
739+
* there is no rs_buf_info found and a new rs_buf_info cannot be allocated,
740+
* NULL is returned and ret is set to the error. Once an address' rs_buf_info
741+
* is added, it will not be removed until the rs_sock is closed.
742+
*/
743+
struct rs_buf_info *rds_add_buf_info(struct rds_sock *rs, struct in6_addr *addr,
744+
int *ret, gfp_t gfp)
745+
{
746+
struct rs_buf_info *info, *tmp_info;
747+
unsigned long flags;
748+
749+
/* Normal path, peer is expected to be found most of the time. */
750+
info = rhashtable_lookup_fast(&rs->rs_buf_info_tbl, addr,
751+
rs_buf_info_params);
752+
if (info) {
753+
*ret = 0;
754+
return info;
755+
}
756+
757+
/* Allocate the buffer outside of lock first. */
758+
tmp_info = kmem_cache_alloc(rds_rs_buf_info_slab, gfp);
759+
if (!tmp_info) {
760+
*ret = -ENOMEM;
761+
return NULL;
762+
}
763+
764+
spin_lock_irqsave(&rs->rs_snd_lock, flags);
765+
766+
/* Cannot add more peer. */
767+
if (rs->rs_buf_info_dest_cnt + 1 > rds_sock_max_peers) {
768+
spin_unlock_irqrestore(&rs->rs_snd_lock, flags);
769+
kmem_cache_free(rds_rs_buf_info_slab, tmp_info);
770+
*ret = -ENFILE;
771+
return NULL;
772+
}
773+
774+
tmp_info->rsbi_key = *addr;
775+
tmp_info->rsbi_snd_bytes = 0;
776+
*ret = rhashtable_insert_fast(&rs->rs_buf_info_tbl,
777+
&tmp_info->rsbi_link, rs_buf_info_params);
778+
if (!*ret) {
779+
rs->rs_buf_info_dest_cnt++;
780+
spin_unlock_irqrestore(&rs->rs_snd_lock, flags);
781+
return tmp_info;
782+
} else if (*ret != -EEXIST) {
783+
spin_unlock_irqrestore(&rs->rs_snd_lock, flags);
784+
kmem_cache_free(rds_rs_buf_info_slab, tmp_info);
785+
/* Very unlikely to happen... */
786+
pr_err("%s: cannot add rs_buf_info for %pI6c: %d\n", __func__,
787+
addr, *ret);
788+
return NULL;
789+
}
790+
791+
/* Another thread beats us in adding the rs_buf_info.... */
792+
info = rhashtable_lookup_fast(&rs->rs_buf_info_tbl, addr,
793+
rs_buf_info_params);
794+
spin_unlock_irqrestore(&rs->rs_snd_lock, flags);
795+
kmem_cache_free(rds_rs_buf_info_slab, tmp_info);
796+
797+
if (info) {
798+
*ret = 0;
799+
return info;
800+
}
801+
802+
/* Should not happen... */
803+
pr_err("%s: cannot find rs_buf_info for %pI6c\n", __func__, addr);
804+
*ret = -EINVAL;
805+
return NULL;
806+
}
807+
715808
static int rds_connect(struct socket *sock, struct sockaddr *uaddr,
716809
int addr_len, int flags)
717810
{
@@ -800,6 +893,12 @@ static int rds_connect(struct socket *sock, struct sockaddr *uaddr,
800893
break;
801894
}
802895

896+
if (!ret &&
897+
!rds_add_buf_info(rs, &rs->rs_conn_addr, &ret, GFP_KERNEL)) {
898+
/* Need to clear the connected info in case of error. */
899+
rs->rs_conn_addr = in6addr_any;
900+
rs->rs_conn_port = 0;
901+
}
803902
release_sock(sk);
804903
return ret;
805904
}
@@ -842,6 +941,7 @@ static void rds_sock_destruct(struct sock *sk)
842941
static int __rds_create(struct socket *sock, struct sock *sk, int protocol)
843942
{
844943
struct rds_sock *rs;
944+
int ret;
845945

846946
sock_init_data(sock, sk);
847947
sock->ops = &rds_proto_ops;
@@ -863,6 +963,11 @@ static int __rds_create(struct socket *sock, struct sock *sk, int protocol)
863963
rs->rs_netfilter_enabled = 0;
864964
rs->rs_rx_traces = 0;
865965

966+
spin_lock_init(&rs->rs_snd_lock);
967+
ret = rhashtable_init(&rs->rs_buf_info_tbl, &rs_buf_info_params);
968+
if (ret)
969+
return ret;
970+
866971
if (!ipv6_addr_any(&rs->rs_bound_addr)) {
867972
printk(KERN_CRIT "bound addr %pI6c at create\n",
868973
&rs->rs_bound_addr);
@@ -879,6 +984,7 @@ static int __rds_create(struct socket *sock, struct sock *sk, int protocol)
879984
static int rds_create(struct net *net, struct socket *sock, int protocol, int kern)
880985
{
881986
struct sock *sk;
987+
int ret;
882988

883989
if (sock->type != SOCK_SEQPACKET ||
884990
(protocol && IPPROTO_OKA != protocol))
@@ -888,7 +994,10 @@ static int rds_create(struct net *net, struct socket *sock, int protocol, int ke
888994
if (!sk)
889995
return -ENOMEM;
890996

891-
return __rds_create(sock, sk, protocol);
997+
ret = __rds_create(sock, sk, protocol);
998+
if (ret)
999+
sk_free(sk);
1000+
return ret;
8921001
}
8931002

8941003
void debug_sock_hold(struct sock *sk)
@@ -1194,6 +1303,7 @@ static void __exit rds_exit(void)
11941303
rds_info_deregister_func(RDS6_INFO_SOCKETS, rds6_sock_info);
11951304
rds_info_deregister_func(RDS6_INFO_RECV_MESSAGES, rds6_sock_inc_info);
11961305
#endif
1306+
kmem_cache_destroy(rds_rs_buf_info_slab);
11971307
}
11981308

11991309
module_exit(rds_exit);
@@ -1204,6 +1314,14 @@ static int __init rds_init(void)
12041314
{
12051315
int ret;
12061316

1317+
rds_rs_buf_info_slab = kmem_cache_create("rds_rs_buf_info",
1318+
sizeof(struct rs_buf_info),
1319+
0, SLAB_HWCACHE_ALIGN, NULL);
1320+
if (!rds_rs_buf_info_slab) {
1321+
ret = -ENOMEM;
1322+
goto out;
1323+
}
1324+
12071325
net_get_random_once(&rds_gen_num, sizeof(rds_gen_num));
12081326

12091327
rds_bind_lock_init();

net/rds/rds.h

Lines changed: 43 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include <uapi/linux/rds.h>
1313
#include <linux/in6.h>
1414
#include <linux/sizes.h>
15+
#include <linux/rhashtable.h>
1516

1617
#include "info.h"
1718

@@ -730,6 +731,13 @@ struct rds_transport {
730731
struct rdma_cm_event *event);
731732
};
732733

734+
/* Used to store per peer socket buffer info. */
735+
struct rs_buf_info {
736+
struct in6_addr rsbi_key;
737+
struct rhash_head rsbi_link;
738+
u32 rsbi_snd_bytes;
739+
};
740+
733741
struct rds_sock {
734742
struct sock rs_sk;
735743

@@ -762,28 +770,36 @@ struct rds_sock {
762770
/* seen congestion (ENOBUFS) when sending? */
763771
int rs_seen_congestion;
764772

765-
/* rs_lock protects all these adjacent members before the newline */
766-
spinlock_t rs_lock;
767-
struct list_head rs_send_queue;
768-
u32 rs_snd_bytes;
769-
int rs_rcv_bytes;
770-
struct list_head rs_notify_queue; /* currently used for failed RDMAs */
771-
772-
/* Congestion wake_up. If rs_cong_monitor is set, we use cong_mask
773+
/* rs_lock protects all these adjacent members before the newline.
774+
*
775+
* Congestion wake_up. If rs_cong_monitor is set, we use cong_mask
773776
* to decide whether the application should be woken up.
774777
* If not set, we use rs_cong_track to find out whether a cong map
775778
* update arrived.
776779
*/
780+
spinlock_t rs_lock;
777781
uint64_t rs_cong_mask;
778782
uint64_t rs_cong_notify;
779783
struct list_head rs_cong_list;
780784
unsigned long rs_cong_track;
785+
/* currently used for failed RDMAs */
786+
struct list_head rs_notify_queue;
787+
788+
/* rs_snd_lock protects all these adjacent members before the
789+
* newline
790+
*/
791+
spinlock_t rs_snd_lock;
792+
struct list_head rs_send_queue;
793+
u32 rs_snd_bytes; /* Total bytes to all peers */
794+
u32 rs_buf_info_dest_cnt;
795+
struct rhashtable rs_buf_info_tbl;
781796

782797
/*
783798
* rs_recv_lock protects the receive queue, and is
784799
* used to serialize with rds_release.
785800
*/
786801
rwlock_t rs_recv_lock;
802+
int rs_rcv_bytes;
787803
struct list_head rs_recv_queue;
788804

789805
/* just for stats reporting */
@@ -872,6 +888,25 @@ struct rds_statistics {
872888
};
873889

874890
/* af_rds.c */
891+
#define RDS_SOCK_BUF_INFO_HTBL_SIZE 512
892+
static const struct rhashtable_params rs_buf_info_params = {
893+
.nelem_hint = RDS_SOCK_BUF_INFO_HTBL_SIZE,
894+
.key_len = sizeof(struct in6_addr),
895+
.key_offset = offsetof(struct rs_buf_info, rsbi_key),
896+
.head_offset = offsetof(struct rs_buf_info, rsbi_link),
897+
};
898+
899+
/* Maximum number of peers a socket can communicate with */
900+
extern unsigned int rds_sock_max_peers;
901+
902+
struct rs_buf_info *rds_add_buf_info(struct rds_sock *rs, struct in6_addr *addr,
903+
int *ret, gfp_t gfp);
904+
static inline struct rs_buf_info *rds_get_buf_info(struct rds_sock *rs,
905+
struct in6_addr *addr)
906+
{
907+
return rhashtable_lookup_fast(&rs->rs_buf_info_tbl, addr,
908+
rs_buf_info_params);
909+
}
875910
char *rds_str_array(char **array, size_t elements, size_t index);
876911
void rds_sock_addref(struct rds_sock *rs);
877912
void rds_sock_put(struct rds_sock *rs);

net/rds/recv.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2006, 2018 Oracle and/or its affiliates. All rights reserved.
2+
* Copyright (c) 2006, 2019 Oracle and/or its affiliates. All rights reserved.
33
*
44
* This software is available to you under a choice of one of two
55
* licenses. You may choose to be licensed under the terms of the GNU
@@ -913,9 +913,9 @@ static int rds_notify_cong(struct rds_sock *rs, struct msghdr *msghdr)
913913
if (err)
914914
return err;
915915

916-
spin_lock_irqsave(&rs->rs_lock, flags);
916+
spin_lock_irqsave(&rs->rs_snd_lock, flags);
917917
rs->rs_cong_notify &= ~notify;
918-
spin_unlock_irqrestore(&rs->rs_lock, flags);
918+
spin_unlock_irqrestore(&rs->rs_snd_lock, flags);
919919

920920
return 0;
921921
}

0 commit comments

Comments
 (0)