@@ -453,15 +453,15 @@ static bool mptcp_subflow_cleanup_rbuf(struct sock *ssk)
453
453
454
454
static void mptcp_cleanup_rbuf (struct mptcp_sock * msk )
455
455
{
456
+ struct sock * ack_hint = READ_ONCE (msk -> ack_hint );
456
457
struct mptcp_subflow_context * subflow ;
457
458
458
459
/* if the hinted ssk is still active, try to use it */
459
- if (likely (msk -> ack_hint )) {
460
+ if (likely (ack_hint )) {
460
461
mptcp_for_each_subflow (msk , subflow ) {
461
462
struct sock * ssk = mptcp_subflow_tcp_sock (subflow );
462
463
463
- if (msk -> ack_hint == ssk &&
464
- mptcp_subflow_cleanup_rbuf (ssk ))
464
+ if (ack_hint == ssk && mptcp_subflow_cleanup_rbuf (ssk ))
465
465
return ;
466
466
}
467
467
}
@@ -614,13 +614,13 @@ static bool __mptcp_move_skbs_from_subflow(struct mptcp_sock *msk,
614
614
break ;
615
615
}
616
616
} while (more_data_avail );
617
- msk -> ack_hint = ssk ;
617
+ WRITE_ONCE ( msk -> ack_hint , ssk ) ;
618
618
619
619
* bytes += moved ;
620
620
return done ;
621
621
}
622
622
623
- static bool mptcp_ofo_queue (struct mptcp_sock * msk )
623
+ static bool __mptcp_ofo_queue (struct mptcp_sock * msk )
624
624
{
625
625
struct sock * sk = (struct sock * )msk ;
626
626
struct sk_buff * skb , * tail ;
@@ -666,34 +666,27 @@ static bool mptcp_ofo_queue(struct mptcp_sock *msk)
666
666
/* In most cases we will be able to lock the mptcp socket. If its already
667
667
* owned, we need to defer to the work queue to avoid ABBA deadlock.
668
668
*/
669
- static bool move_skbs_to_msk (struct mptcp_sock * msk , struct sock * ssk )
669
+ static void move_skbs_to_msk (struct mptcp_sock * msk , struct sock * ssk )
670
670
{
671
671
struct sock * sk = (struct sock * )msk ;
672
672
unsigned int moved = 0 ;
673
673
674
- if (READ_ONCE (sk -> sk_lock .owned ))
675
- return false;
676
-
677
- if (unlikely (!spin_trylock_bh (& sk -> sk_lock .slock )))
678
- return false;
679
-
680
- /* must re-check after taking the lock */
681
- if (!READ_ONCE (sk -> sk_lock .owned )) {
682
- __mptcp_move_skbs_from_subflow (msk , ssk , & moved );
683
- mptcp_ofo_queue (msk );
674
+ if (inet_sk_state_load (sk ) == TCP_CLOSE )
675
+ return ;
684
676
685
- /* If the moves have caught up with the DATA_FIN sequence number
686
- * it's time to ack the DATA_FIN and change socket state, but
687
- * this is not a good place to change state. Let the workqueue
688
- * do it.
689
- */
690
- if (mptcp_pending_data_fin (sk , NULL ))
691
- mptcp_schedule_work (sk );
692
- }
677
+ mptcp_data_lock (sk );
693
678
694
- spin_unlock_bh (& sk -> sk_lock .slock );
679
+ __mptcp_move_skbs_from_subflow (msk , ssk , & moved );
680
+ __mptcp_ofo_queue (msk );
695
681
696
- return moved > 0 ;
682
+ /* If the moves have caught up with the DATA_FIN sequence number
683
+ * it's time to ack the DATA_FIN and change socket state, but
684
+ * this is not a good place to change state. Let the workqueue
685
+ * do it.
686
+ */
687
+ if (mptcp_pending_data_fin (sk , NULL ))
688
+ mptcp_schedule_work (sk );
689
+ mptcp_data_unlock (sk );
697
690
}
698
691
699
692
void mptcp_data_ready (struct sock * sk , struct sock * ssk )
@@ -937,17 +930,30 @@ static bool mptcp_wmem_alloc(struct sock *sk, int size)
937
930
if (msk -> wmem_reserved >= size )
938
931
goto account ;
939
932
940
- if (!sk_wmem_schedule (sk , size ))
933
+ mptcp_data_lock (sk );
934
+ if (!sk_wmem_schedule (sk , size )) {
935
+ mptcp_data_unlock (sk );
941
936
return false;
937
+ }
942
938
943
939
sk -> sk_forward_alloc -= size ;
944
940
msk -> wmem_reserved += size ;
941
+ mptcp_data_unlock (sk );
945
942
946
943
account :
947
944
msk -> wmem_reserved -= size ;
948
945
return true;
949
946
}
950
947
948
+ static void mptcp_wmem_uncharge (struct sock * sk , int size )
949
+ {
950
+ struct mptcp_sock * msk = mptcp_sk (sk );
951
+
952
+ if (msk -> wmem_reserved < 0 )
953
+ msk -> wmem_reserved = 0 ;
954
+ msk -> wmem_reserved += size ;
955
+ }
956
+
951
957
static void dfrag_uncharge (struct sock * sk , int len )
952
958
{
953
959
sk_mem_uncharge (sk , len );
@@ -976,6 +982,7 @@ static void mptcp_clean_una(struct sock *sk)
976
982
if (__mptcp_check_fallback (msk ))
977
983
atomic64_set (& msk -> snd_una , msk -> snd_nxt );
978
984
985
+ mptcp_data_lock (sk );
979
986
snd_una = atomic64_read (& msk -> snd_una );
980
987
981
988
list_for_each_entry_safe (dfrag , dtmp , & msk -> rtx_queue , list ) {
@@ -1007,6 +1014,7 @@ static void mptcp_clean_una(struct sock *sk)
1007
1014
out :
1008
1015
if (cleaned && tcp_under_memory_pressure (sk ))
1009
1016
sk_mem_reclaim_partial (sk );
1017
+ mptcp_data_unlock (sk );
1010
1018
}
1011
1019
1012
1020
static void mptcp_clean_una_wakeup (struct sock * sk )
@@ -1436,7 +1444,7 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
1436
1444
1437
1445
if (copy_page_from_iter (dfrag -> page , offset , psize ,
1438
1446
& msg -> msg_iter ) != psize ) {
1439
- msk -> wmem_reserved += psize + frag_truesize ;
1447
+ mptcp_wmem_uncharge ( sk , psize + frag_truesize ) ;
1440
1448
ret = - EFAULT ;
1441
1449
goto out ;
1442
1450
}
@@ -1502,11 +1510,10 @@ static int __mptcp_recvmsg_mskq(struct mptcp_sock *msk,
1502
1510
struct msghdr * msg ,
1503
1511
size_t len )
1504
1512
{
1505
- struct sock * sk = (struct sock * )msk ;
1506
1513
struct sk_buff * skb ;
1507
1514
int copied = 0 ;
1508
1515
1509
- while ((skb = skb_peek (& sk -> sk_receive_queue )) != NULL ) {
1516
+ while ((skb = skb_peek (& msk -> receive_queue )) != NULL ) {
1510
1517
u32 offset = MPTCP_SKB_CB (skb )-> offset ;
1511
1518
u32 data_len = skb -> len - offset ;
1512
1519
u32 count = min_t (size_t , len - copied , data_len );
@@ -1526,7 +1533,10 @@ static int __mptcp_recvmsg_mskq(struct mptcp_sock *msk,
1526
1533
break ;
1527
1534
}
1528
1535
1529
- __skb_unlink (skb , & sk -> sk_receive_queue );
1536
+ /* we will bulk release the skb memory later */
1537
+ skb -> destructor = NULL ;
1538
+ msk -> rmem_released += skb -> truesize ;
1539
+ __skb_unlink (skb , & msk -> receive_queue );
1530
1540
__kfree_skb (skb );
1531
1541
1532
1542
if (copied >= len )
@@ -1634,25 +1644,47 @@ static void mptcp_rcv_space_adjust(struct mptcp_sock *msk, int copied)
1634
1644
msk -> rcvq_space .time = mstamp ;
1635
1645
}
1636
1646
1647
+ static void __mptcp_update_rmem (struct sock * sk )
1648
+ {
1649
+ struct mptcp_sock * msk = mptcp_sk (sk );
1650
+
1651
+ if (!msk -> rmem_released )
1652
+ return ;
1653
+
1654
+ atomic_sub (msk -> rmem_released , & sk -> sk_rmem_alloc );
1655
+ sk_mem_uncharge (sk , msk -> rmem_released );
1656
+ msk -> rmem_released = 0 ;
1657
+ }
1658
+
1659
+ static void __mptcp_splice_receive_queue (struct sock * sk )
1660
+ {
1661
+ struct mptcp_sock * msk = mptcp_sk (sk );
1662
+
1663
+ skb_queue_splice_tail_init (& sk -> sk_receive_queue , & msk -> receive_queue );
1664
+ }
1665
+
1637
1666
static bool __mptcp_move_skbs (struct mptcp_sock * msk , unsigned int rcv )
1638
1667
{
1668
+ struct sock * sk = (struct sock * )msk ;
1639
1669
unsigned int moved = 0 ;
1640
- bool done ;
1641
-
1642
- /* avoid looping forever below on racing close */
1643
- if (((struct sock * )msk )-> sk_state == TCP_CLOSE )
1644
- return false;
1670
+ bool ret , done ;
1645
1671
1646
1672
__mptcp_flush_join_list (msk );
1647
1673
do {
1648
1674
struct sock * ssk = mptcp_subflow_recv_lookup (msk );
1649
1675
bool slowpath ;
1650
1676
1651
- if (!ssk )
1677
+ /* we can have data pending in the subflows only if the msk
1678
+ * receive buffer was full at subflow_data_ready() time,
1679
+ * that is an unlikely slow path.
1680
+ */
1681
+ if (likely (!ssk ))
1652
1682
break ;
1653
1683
1654
1684
slowpath = lock_sock_fast (ssk );
1685
+ mptcp_data_lock (sk );
1655
1686
done = __mptcp_move_skbs_from_subflow (msk , ssk , & moved );
1687
+ mptcp_data_unlock (sk );
1656
1688
if (moved && rcv ) {
1657
1689
WRITE_ONCE (msk -> rmem_pending , min (rcv , moved ));
1658
1690
tcp_cleanup_rbuf (ssk , 1 );
@@ -1661,11 +1693,19 @@ static bool __mptcp_move_skbs(struct mptcp_sock *msk, unsigned int rcv)
1661
1693
unlock_sock_fast (ssk , slowpath );
1662
1694
} while (!done );
1663
1695
1664
- if (mptcp_ofo_queue (msk ) || moved > 0 ) {
1665
- mptcp_check_data_fin ((struct sock * )msk );
1666
- return true;
1696
+ /* acquire the data lock only if some input data is pending */
1697
+ ret = moved > 0 ;
1698
+ if (!RB_EMPTY_ROOT (& msk -> out_of_order_queue ) ||
1699
+ !skb_queue_empty_lockless (& sk -> sk_receive_queue )) {
1700
+ mptcp_data_lock (sk );
1701
+ __mptcp_update_rmem (sk );
1702
+ ret |= __mptcp_ofo_queue (msk );
1703
+ __mptcp_splice_receive_queue (sk );
1704
+ mptcp_data_unlock (sk );
1667
1705
}
1668
- return false;
1706
+ if (ret )
1707
+ mptcp_check_data_fin ((struct sock * )msk );
1708
+ return !skb_queue_empty (& msk -> receive_queue );
1669
1709
}
1670
1710
1671
1711
static int mptcp_recvmsg (struct sock * sk , struct msghdr * msg , size_t len ,
@@ -1679,7 +1719,7 @@ static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
1679
1719
if (msg -> msg_flags & ~(MSG_WAITALL | MSG_DONTWAIT ))
1680
1720
return - EOPNOTSUPP ;
1681
1721
1682
- lock_sock (sk );
1722
+ mptcp_lock_sock (sk , __mptcp_splice_receive_queue ( sk ) );
1683
1723
if (unlikely (sk -> sk_state == TCP_LISTEN )) {
1684
1724
copied = - ENOTCONN ;
1685
1725
goto out_err ;
@@ -1689,7 +1729,6 @@ static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
1689
1729
1690
1730
len = min_t (size_t , len , INT_MAX );
1691
1731
target = sock_rcvlowat (sk , flags & MSG_WAITALL , len );
1692
- __mptcp_flush_join_list (msk );
1693
1732
1694
1733
for (;;) {
1695
1734
int bytes_read , old_space ;
@@ -1703,7 +1742,7 @@ static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
1703
1742
1704
1743
copied += bytes_read ;
1705
1744
1706
- if (skb_queue_empty (& sk -> sk_receive_queue ) &&
1745
+ if (skb_queue_empty (& msk -> receive_queue ) &&
1707
1746
__mptcp_move_skbs (msk , len - copied ))
1708
1747
continue ;
1709
1748
@@ -1734,8 +1773,14 @@ static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
1734
1773
if (test_and_clear_bit (MPTCP_WORK_EOF , & msk -> flags ))
1735
1774
mptcp_check_for_eof (msk );
1736
1775
1737
- if (sk -> sk_shutdown & RCV_SHUTDOWN )
1776
+ if (sk -> sk_shutdown & RCV_SHUTDOWN ) {
1777
+ /* race breaker: the shutdown could be after the
1778
+ * previous receive queue check
1779
+ */
1780
+ if (__mptcp_move_skbs (msk , len - copied ))
1781
+ continue ;
1738
1782
break ;
1783
+ }
1739
1784
1740
1785
if (sk -> sk_state == TCP_CLOSE ) {
1741
1786
copied = - ENOTCONN ;
@@ -1757,7 +1802,8 @@ static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
1757
1802
mptcp_wait_data (sk , & timeo );
1758
1803
}
1759
1804
1760
- if (skb_queue_empty (& sk -> sk_receive_queue )) {
1805
+ if (skb_queue_empty_lockless (& sk -> sk_receive_queue ) &&
1806
+ skb_queue_empty (& msk -> receive_queue )) {
1761
1807
/* entire backlog drained, clear DATA_READY. */
1762
1808
clear_bit (MPTCP_DATA_READY , & msk -> flags );
1763
1809
@@ -1773,7 +1819,7 @@ static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
1773
1819
out_err :
1774
1820
pr_debug ("msk=%p data_ready=%d rx queue empty=%d copied=%d" ,
1775
1821
msk , test_bit (MPTCP_DATA_READY , & msk -> flags ),
1776
- skb_queue_empty (& sk -> sk_receive_queue ), copied );
1822
+ skb_queue_empty_lockless (& sk -> sk_receive_queue ), copied );
1777
1823
mptcp_rcv_space_adjust (msk , copied );
1778
1824
1779
1825
release_sock (sk );
@@ -2076,9 +2122,11 @@ static int __mptcp_init_sock(struct sock *sk)
2076
2122
INIT_LIST_HEAD (& msk -> join_list );
2077
2123
INIT_LIST_HEAD (& msk -> rtx_queue );
2078
2124
INIT_WORK (& msk -> work , mptcp_worker );
2125
+ __skb_queue_head_init (& msk -> receive_queue );
2079
2126
msk -> out_of_order_queue = RB_ROOT ;
2080
2127
msk -> first_pending = NULL ;
2081
2128
msk -> wmem_reserved = 0 ;
2129
+ msk -> rmem_released = 0 ;
2082
2130
2083
2131
msk -> ack_hint = NULL ;
2084
2132
msk -> first = NULL ;
@@ -2274,6 +2322,7 @@ static void __mptcp_destroy_sock(struct sock *sk)
2274
2322
sk -> sk_prot -> destroy (sk );
2275
2323
2276
2324
WARN_ON_ONCE (msk -> wmem_reserved );
2325
+ WARN_ON_ONCE (msk -> rmem_released );
2277
2326
sk_stream_kill_queues (sk );
2278
2327
xfrm_sk_free_policy (sk );
2279
2328
sk_refcnt_debug_release (sk );
@@ -2491,6 +2540,11 @@ static struct sock *mptcp_accept(struct sock *sk, int flags, int *err,
2491
2540
2492
2541
void mptcp_destroy_common (struct mptcp_sock * msk )
2493
2542
{
2543
+ struct sock * sk = (struct sock * )msk ;
2544
+
2545
+ /* move to sk_receive_queue, sk_stream_kill_queues will purge it */
2546
+ skb_queue_splice_tail_init (& msk -> receive_queue , & sk -> sk_receive_queue );
2547
+
2494
2548
skb_rbtree_purge (& msk -> out_of_order_queue );
2495
2549
mptcp_token_destroy (msk );
2496
2550
mptcp_pm_free_anno_list (msk );
@@ -2626,6 +2680,7 @@ static void mptcp_release_cb(struct sock *sk)
2626
2680
2627
2681
/* clear any wmem reservation and errors */
2628
2682
__mptcp_update_wmem (sk );
2683
+ __mptcp_update_rmem (sk );
2629
2684
2630
2685
do {
2631
2686
flags = sk -> sk_tsq_flags ;
0 commit comments