@@ -1828,20 +1828,13 @@ reduce_memory_use(State = #vqstate {
1828
1828
State1
1829
1829
end .
1830
1830
1831
- limit_ram_acks (0 , State = # vqstate { index_state = IndexState ,
1832
- target_ram_count = TargetRamCount }) ->
1833
- IndexState1 = rabbit_queue_index :flush_pre_publish_cache (
1834
- TargetRamCount , IndexState ),
1835
- {0 , State # vqstate { index_state = IndexState1 }};
1836
- limit_ram_acks (Quota , State = # vqstate { index_state = IndexState ,
1837
- target_ram_count = TargetRamCount ,
1838
- ram_pending_ack = RPA ,
1831
+ limit_ram_acks (0 , State ) ->
1832
+ {0 , ui (State )};
1833
+ limit_ram_acks (Quota , State = # vqstate { ram_pending_ack = RPA ,
1839
1834
disk_pending_ack = DPA }) ->
1840
1835
case gb_trees :is_empty (RPA ) of
1841
1836
true ->
1842
- IndexState1 = rabbit_queue_index :flush_pre_publish_cache (
1843
- TargetRamCount , IndexState ),
1844
- {Quota , State # vqstate {index_state = IndexState1 }};
1837
+ {Quota , ui (State )};
1845
1838
false ->
1846
1839
{SeqId , MsgStatus , RPA1 } = gb_trees :take_largest (RPA ),
1847
1840
{MsgStatus1 , State1 } =
@@ -1979,27 +1972,17 @@ push_alphas_to_betas(Quota, State) ->
1979
1972
1980
1973
push_alphas_to_betas (_Generator , _Consumer , Quota , _Q ,
1981
1974
State = # vqstate { ram_msg_count = RamMsgCount ,
1982
- index_state = IndexState ,
1983
1975
target_ram_count = TargetRamCount })
1984
1976
when Quota =:= 0 orelse
1985
1977
TargetRamCount =:= infinity orelse
1986
1978
TargetRamCount >= RamMsgCount ->
1987
- IndexState1 = rabbit_queue_index :flush_pre_publish_cache (
1988
- TargetRamCount , IndexState ),
1989
- {Quota , State # vqstate {index_state = IndexState1 }};
1990
- push_alphas_to_betas (Generator , Consumer , Quota , Q ,
1991
- State = # vqstate {
1992
- index_state = IndexState ,
1993
- target_ram_count = TargetRamCount }) ->
1979
+ {Quota , ui (State )};
1980
+ push_alphas_to_betas (Generator , Consumer , Quota , Q , State ) ->
1994
1981
case credit_flow :blocked () of
1995
- true -> IndexState1 = rabbit_queue_index :flush_pre_publish_cache (
1996
- TargetRamCount , IndexState ),
1997
- {Quota , State # vqstate {index_state = IndexState1 }};
1982
+ true -> {Quota , ui (State )};
1998
1983
false -> case Generator (Q ) of
1999
1984
{empty , _Q } ->
2000
- IndexState1 = rabbit_queue_index :flush_pre_publish_cache (
2001
- TargetRamCount , IndexState ),
2002
- {Quota , State # vqstate {index_state = IndexState1 }};
1985
+ {Quota , ui (State )};
2003
1986
{{value , MsgStatus }, Qa } ->
2004
1987
{MsgStatus1 , State1 } =
2005
1988
maybe_prepare_write_to_disk (true , false , MsgStatus ,
@@ -2045,26 +2028,16 @@ push_betas_to_deltas(Generator, LimitFun, Q, PushState) ->
2045
2028
end .
2046
2029
2047
2030
push_betas_to_deltas1 (_Generator , _Limit , Q ,
2048
- {0 , Delta , State =
2049
- # vqstate {index_state = IndexState ,
2050
- target_ram_count = TargetRamCount }}) ->
2051
- IndexState1 = rabbit_queue_index :flush_pre_publish_cache (
2052
- TargetRamCount , IndexState ),
2053
- {Q , {0 , Delta , State # vqstate {index_state = IndexState1 }}};
2031
+ {0 , Delta , State }) ->
2032
+ {Q , {0 , Delta , ui (State )}};
2054
2033
push_betas_to_deltas1 (Generator , Limit , Q ,
2055
- {Quota , Delta , State =
2056
- # vqstate {index_state = IndexState ,
2057
- target_ram_count = TargetRamCount }}) ->
2034
+ {Quota , Delta , State }) ->
2058
2035
case Generator (Q ) of
2059
2036
{empty , _Q } ->
2060
- IndexState1 = rabbit_queue_index :flush_pre_publish_cache (
2061
- TargetRamCount , IndexState ),
2062
- {Q , {Quota , Delta , State # vqstate {index_state = IndexState1 }}};
2037
+ {Q , {Quota , Delta , ui (State )}};
2063
2038
{{value , # msg_status { seq_id = SeqId }}, _Qa }
2064
2039
when SeqId < Limit ->
2065
- IndexState1 = rabbit_queue_index :flush_pre_publish_cache (
2066
- TargetRamCount , IndexState ),
2067
- {Q , {Quota , Delta , State # vqstate {index_state = IndexState1 }}};
2040
+ {Q , {Quota , Delta , ui (State )}};
2068
2041
{{value , MsgStatus = # msg_status { seq_id = SeqId }}, Qa } ->
2069
2042
{# msg_status { index_on_disk = true }, State1 } =
2070
2043
maybe_batch_write_index_to_disk (true , MsgStatus , State ),
@@ -2074,6 +2047,13 @@ push_betas_to_deltas1(Generator, Limit, Q,
2074
2047
{Quota - 1 , Delta1 , State2 })
2075
2048
end .
2076
2049
2050
+ % % Flushes queue index batch caches and updates queue index state.
2051
+ ui (# vqstate {index_state = IndexState ,
2052
+ target_ram_count = TargetRamCount } = State ) ->
2053
+ IndexState1 = rabbit_queue_index :flush_pre_publish_cache (
2054
+ TargetRamCount , IndexState ),
2055
+ State # vqstate {index_state = IndexState1 }.
2056
+
2077
2057
% %----------------------------------------------------------------------------
2078
2058
% % Upgrading
2079
2059
% %----------------------------------------------------------------------------
0 commit comments