@@ -1828,39 +1828,30 @@ reduce_memory_use(State = #vqstate {
1828
1828
State1
1829
1829
end .
1830
1830
1831
- limit_ram_acks (Quota , State = # vqstate {ram_bytes = CurrRamBytes }) ->
1832
- limit_ram_acks (Quota , CurrRamBytes , State ).
1833
-
1834
- limit_ram_acks (0 , CurrRamBytes ,
1835
- State = # vqstate {index_state = IndexState ,
1836
- target_ram_count = TargetRamCount }) ->
1831
+ limit_ram_acks (0 , State = # vqstate { index_state = IndexState ,
1832
+ target_ram_count = TargetRamCount }) ->
1837
1833
IndexState1 = rabbit_queue_index :flush_pre_publish_cache (
1838
1834
TargetRamCount , IndexState ),
1839
- {0 , State # vqstate {index_state = IndexState1 ,
1840
- ram_bytes = CurrRamBytes }};
1841
- limit_ram_acks (Quota , CurrRamBytes ,
1842
- State = # vqstate {
1843
- index_state = IndexState ,
1844
- target_ram_count = TargetRamCount ,
1845
- ram_pending_ack = RPA ,
1846
- disk_pending_ack = DPA }) ->
1835
+ {0 , State # vqstate { index_state = IndexState1 }};
1836
+ limit_ram_acks (Quota , State = # vqstate { index_state = IndexState ,
1837
+ target_ram_count = TargetRamCount ,
1838
+ ram_pending_ack = RPA ,
1839
+ disk_pending_ack = DPA }) ->
1847
1840
case gb_trees :is_empty (RPA ) of
1848
1841
true ->
1849
1842
IndexState1 = rabbit_queue_index :flush_pre_publish_cache (
1850
1843
TargetRamCount , IndexState ),
1851
- {Quota , State # vqstate {index_state = IndexState1 ,
1852
- ram_bytes = CurrRamBytes }};
1844
+ {Quota , State # vqstate {index_state = IndexState1 }};
1853
1845
false ->
1854
1846
{SeqId , MsgStatus , RPA1 } = gb_trees :take_largest (RPA ),
1855
1847
{MsgStatus1 , State1 } =
1856
1848
maybe_prepare_write_to_disk (true , false , MsgStatus , State ),
1857
1849
MsgStatus2 = m (trim_msg_status (MsgStatus1 )),
1858
1850
DPA1 = gb_trees :insert (SeqId , MsgStatus2 , DPA ),
1859
- DeltaRam = delta_ram (msg_in_ram (MsgStatus ), msg_in_ram (MsgStatus2 )),
1860
1851
limit_ram_acks (Quota - 1 ,
1861
- CurrRamBytes + DeltaRam * msg_size ( MsgStatus ) ,
1862
- State1 # vqstate { ram_pending_ack = RPA1 ,
1863
- disk_pending_ack = DPA1 })
1852
+ stats ({ 0 , 0 }, { MsgStatus , MsgStatus2 } ,
1853
+ State1 # vqstate { ram_pending_ack = RPA1 ,
1854
+ disk_pending_ack = DPA1 }) )
1864
1855
end .
1865
1856
1866
1857
permitted_beta_count (# vqstate { len = 0 }) ->
@@ -1986,59 +1977,40 @@ push_alphas_to_betas(Quota, State) ->
1986
1977
end , Quota1 , State1 # vqstate .q4 , State1 ),
1987
1978
{Quota2 , State2 }.
1988
1979
1989
- push_alphas_to_betas (Generator , Consumer , Quota , Q ,
1990
- State = # vqstate {ram_msg_count = CurrRamReadyCount ,
1991
- ram_bytes = CurrRamBytes }) ->
1992
- push_alphas_to_betas1 (Generator , Consumer , Quota , Q ,
1993
- CurrRamReadyCount , CurrRamBytes ,
1994
- State ).
1995
-
1996
- push_alphas_to_betas1 (_Generator , _Consumer , Quota , _Q ,
1997
- CurrRamReadyCount , CurrRamBytes ,
1998
- State = # vqstate { index_state = IndexState ,
1980
+ push_alphas_to_betas (_Generator , _Consumer , Quota , _Q ,
1981
+ State = # vqstate { ram_msg_count = RamMsgCount ,
1982
+ index_state = IndexState ,
1999
1983
target_ram_count = TargetRamCount })
2000
1984
when Quota =:= 0 orelse
2001
1985
TargetRamCount =:= infinity orelse
2002
- TargetRamCount >= CurrRamReadyCount ->
1986
+ TargetRamCount >= RamMsgCount ->
2003
1987
IndexState1 = rabbit_queue_index :flush_pre_publish_cache (
2004
1988
TargetRamCount , IndexState ),
2005
- {Quota , State # vqstate {index_state = IndexState1 ,
2006
- ram_msg_count = CurrRamReadyCount ,
2007
- ram_bytes = CurrRamBytes }};
2008
- push_alphas_to_betas1 (Generator , Consumer , Quota , Q ,
2009
- CurrRamReadyCount , CurrRamBytes ,
1989
+ {Quota , State # vqstate {index_state = IndexState1 }};
1990
+ push_alphas_to_betas (Generator , Consumer , Quota , Q ,
2010
1991
State = # vqstate {
2011
1992
index_state = IndexState ,
2012
1993
target_ram_count = TargetRamCount }) ->
2013
1994
case credit_flow :blocked () of
2014
- true ->
2015
- IndexState1 = rabbit_queue_index :flush_pre_publish_cache (
2016
- TargetRamCount , IndexState ),
2017
- {Quota , State # vqstate {index_state = IndexState1 ,
2018
- ram_msg_count = CurrRamReadyCount ,
2019
- ram_bytes = CurrRamBytes }};
2020
- false ->
2021
- case Generator (Q ) of
2022
- {empty , _Q } ->
2023
- IndexState1 = rabbit_queue_index :flush_pre_publish_cache (
2024
- TargetRamCount , IndexState ),
2025
- {Quota , State # vqstate {index_state = IndexState1 ,
2026
- ram_msg_count = CurrRamReadyCount ,
2027
- ram_bytes = CurrRamBytes }};
2028
- {{value , MsgStatus }, Qa } ->
2029
- {MsgStatus1 , State1 } =
2030
- maybe_prepare_write_to_disk (true , false , MsgStatus ,
2031
- State ),
2032
- MsgStatus2 = m (trim_msg_status (MsgStatus1 )),
2033
- DeltaRam = delta_ram (msg_in_ram (MsgStatus ),
2034
- msg_in_ram (MsgStatus2 )),
2035
- DeltaRamReady = DeltaRam ,
2036
- State2 = Consumer (MsgStatus2 , Qa , State1 ),
2037
- push_alphas_to_betas1 (Generator , Consumer , Quota - 1 , Qa ,
2038
- CurrRamReadyCount + DeltaRamReady ,
2039
- CurrRamBytes + DeltaRam * msg_size (MsgStatus ),
2040
- State2 )
2041
- end
1995
+ true -> IndexState1 = rabbit_queue_index :flush_pre_publish_cache (
1996
+ TargetRamCount , IndexState ),
1997
+ {Quota , State # vqstate {index_state = IndexState1 }};
1998
+ false -> case Generator (Q ) of
1999
+ {empty , _Q } ->
2000
+ IndexState1 = rabbit_queue_index :flush_pre_publish_cache (
2001
+ TargetRamCount , IndexState ),
2002
+ {Quota , State # vqstate {index_state = IndexState1 }};
2003
+ {{value , MsgStatus }, Qa } ->
2004
+ {MsgStatus1 , State1 } =
2005
+ maybe_prepare_write_to_disk (true , false , MsgStatus ,
2006
+ State ),
2007
+ MsgStatus2 = m (trim_msg_status (MsgStatus1 )),
2008
+ State2 = stats (
2009
+ ready0 , {MsgStatus , MsgStatus2 }, State1 ),
2010
+ State3 = Consumer (MsgStatus2 , Qa , State2 ),
2011
+ push_alphas_to_betas (Generator , Consumer , Quota - 1 ,
2012
+ Qa , State3 )
2013
+ end
2042
2014
end .
2043
2015
2044
2016
push_betas_to_deltas (Quota , State = # vqstate { q2 = Q2 ,
@@ -2058,11 +2030,7 @@ push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2,
2058
2030
delta = Delta1 ,
2059
2031
q3 = Q3a }.
2060
2032
2061
- push_betas_to_deltas (Generator , LimitFun , Q ,
2062
- {_Quota , _Delta ,
2063
- # vqstate {
2064
- ram_msg_count = CurrRamReadyCount ,
2065
- ram_bytes = CurrRamBytes }} = PushState ) ->
2033
+ push_betas_to_deltas (Generator , LimitFun , Q , PushState ) ->
2066
2034
case ? QUEUE :is_empty (Q ) of
2067
2035
true ->
2068
2036
{Q , PushState };
@@ -2072,60 +2040,40 @@ push_betas_to_deltas(Generator, LimitFun, Q,
2072
2040
Limit = LimitFun (MinSeqId ),
2073
2041
case MaxSeqId < Limit of
2074
2042
true -> {Q , PushState };
2075
- false -> push_betas_to_deltas1 (Generator , Limit , Q ,
2076
- CurrRamReadyCount , CurrRamBytes ,
2077
- PushState )
2043
+ false -> push_betas_to_deltas1 (Generator , Limit , Q , PushState )
2078
2044
end
2079
2045
end .
2080
2046
2081
2047
push_betas_to_deltas1 (_Generator , _Limit , Q ,
2082
- CurrRamReadyCount , CurrRamBytes ,
2083
2048
{0 , Delta , State =
2084
2049
# vqstate {index_state = IndexState ,
2085
2050
target_ram_count = TargetRamCount }}) ->
2086
2051
IndexState1 = rabbit_queue_index :flush_pre_publish_cache (
2087
2052
TargetRamCount , IndexState ),
2088
- {Q , {0 , Delta , State # vqstate {index_state = IndexState1 ,
2089
- ram_msg_count = CurrRamReadyCount ,
2090
- ram_bytes = CurrRamBytes }}};
2053
+ {Q , {0 , Delta , State # vqstate {index_state = IndexState1 }}};
2091
2054
push_betas_to_deltas1 (Generator , Limit , Q ,
2092
- CurrRamReadyCount , CurrRamBytes ,
2093
2055
{Quota , Delta , State =
2094
2056
# vqstate {index_state = IndexState ,
2095
2057
target_ram_count = TargetRamCount }}) ->
2096
2058
case Generator (Q ) of
2097
2059
{empty , _Q } ->
2098
2060
IndexState1 = rabbit_queue_index :flush_pre_publish_cache (
2099
2061
TargetRamCount , IndexState ),
2100
- {Q , {Quota , Delta , State # vqstate {index_state = IndexState1 ,
2101
- ram_msg_count = CurrRamReadyCount ,
2102
- ram_bytes = CurrRamBytes }}};
2062
+ {Q , {Quota , Delta , State # vqstate {index_state = IndexState1 }}};
2103
2063
{{value , # msg_status { seq_id = SeqId }}, _Qa }
2104
2064
when SeqId < Limit ->
2105
2065
IndexState1 = rabbit_queue_index :flush_pre_publish_cache (
2106
2066
TargetRamCount , IndexState ),
2107
- {Q , {Quota , Delta , State # vqstate {index_state = IndexState1 ,
2108
- ram_msg_count = CurrRamReadyCount ,
2109
- ram_bytes = CurrRamBytes }}};
2067
+ {Q , {Quota , Delta , State # vqstate {index_state = IndexState1 }}};
2110
2068
{{value , MsgStatus = # msg_status { seq_id = SeqId }}, Qa } ->
2111
2069
{# msg_status { index_on_disk = true }, State1 } =
2112
2070
maybe_batch_write_index_to_disk (true , MsgStatus , State ),
2113
- { Size , DeltaRam } = size_and_delta_ram ( MsgStatus ),
2071
+ State2 = stats ( ready0 , { MsgStatus , none }, State1 ),
2114
2072
Delta1 = expand_delta (SeqId , Delta ),
2115
2073
push_betas_to_deltas1 (Generator , Limit , Qa ,
2116
- CurrRamReadyCount + DeltaRam ,
2117
- CurrRamBytes + DeltaRam * Size ,
2118
- {Quota - 1 , Delta1 , State1 })
2074
+ {Quota - 1 , Delta1 , State2 })
2119
2075
end .
2120
2076
2121
- % % Optimised version for paging only, based on stats/3 being called
2122
- % % like this: stats(ready0, {MsgStatus, none}, State1).
2123
- size_and_delta_ram (# msg_status {msg_props = # message_properties {size = Size },
2124
- msg = undefined }) ->
2125
- {Size , 0 };
2126
- size_and_delta_ram (# msg_status {msg_props = # message_properties {size = Size }}) ->
2127
- {Size , - 1 }.
2128
-
2129
2077
% %----------------------------------------------------------------------------
2130
2078
% % Upgrading
2131
2079
% %----------------------------------------------------------------------------
0 commit comments