@@ -1060,7 +1060,7 @@ maybe_write_delivered(false, _SeqId, IndexState) ->
1060
1060
maybe_write_delivered (true , SeqId , IndexState ) ->
1061
1061
rabbit_queue_index :deliver ([SeqId ], IndexState ).
1062
1062
1063
- betas_from_index_entries (List , TransientThreshold , RPA , DPA , QPA , DelsAndAcksFun , State ) ->
1063
+ betas_from_index_entries (List , TransientThreshold , DelsAndAcksFun , State ) ->
1064
1064
{Filtered , Delivers , Acks , RamReadyCount , RamBytes } =
1065
1065
lists :foldr (
1066
1066
fun ({_MsgOrId , SeqId , _MsgProps , IsPersistent , IsDelivered } = M ,
@@ -1072,9 +1072,7 @@ betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA, DelsAndAcksFun
1072
1072
false -> MsgStatus = m (beta_msg_status (M )),
1073
1073
HaveMsg = msg_in_ram (MsgStatus ),
1074
1074
Size = msg_size (MsgStatus ),
1075
- case (gb_trees :is_defined (SeqId , RPA ) orelse
1076
- gb_trees :is_defined (SeqId , DPA ) orelse
1077
- gb_trees :is_defined (SeqId , QPA )) of
1075
+ case is_msg_in_pending_acks (SeqId , State ) of
1078
1076
false -> {? QUEUE :in_r (MsgStatus , Filtered1 ),
1079
1077
Delivers1 , Acks1 ,
1080
1078
RRC + one_if (HaveMsg ),
@@ -1089,6 +1087,13 @@ betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA, DelsAndAcksFun
1089
1087
% % been stored in the QI, thus the message must have been in
1090
1088
% % qi_pending_ack, thus it must already have been in RAM.
1091
1089
1090
+ is_msg_in_pending_acks (SeqId , # vqstate { ram_pending_ack = RPA ,
1091
+ disk_pending_ack = DPA ,
1092
+ qi_pending_ack = QPA }) ->
1093
+ (gb_trees :is_defined (SeqId , RPA ) orelse
1094
+ gb_trees :is_defined (SeqId , DPA ) orelse
1095
+ gb_trees :is_defined (SeqId , QPA )).
1096
+
1092
1097
expand_delta (SeqId , ? BLANK_DELTA_PATTERN (X )) ->
1093
1098
d (# delta { start_seq_id = SeqId , count = 1 , end_seq_id = SeqId + 1 });
1094
1099
expand_delta (SeqId , # delta { start_seq_id = StartSeqId ,
@@ -1822,9 +1827,7 @@ next({delta, #delta{start_seq_id = SeqId,
1822
1827
next ({delta , Delta , [], State }, IndexState ) ->
1823
1828
next ({delta , Delta , State }, IndexState );
1824
1829
next ({delta , Delta , [{_ , SeqId , _ , _ , _ } = M | Rest ], State }, IndexState ) ->
1825
- case (gb_trees :is_defined (SeqId , State # vqstate .ram_pending_ack ) orelse
1826
- gb_trees :is_defined (SeqId , State # vqstate .disk_pending_ack ) orelse
1827
- gb_trees :is_defined (SeqId , State # vqstate .qi_pending_ack )) of
1830
+ case is_msg_in_pending_acks (SeqId , State ) of
1828
1831
false -> Next = {delta , Delta , Rest , State },
1829
1832
{value , beta_msg_status (M ), false , Next , IndexState };
1830
1833
true -> next ({delta , Delta , Rest , State }, IndexState )
@@ -1996,9 +1999,6 @@ maybe_deltas_to_betas(DelsAndAcksFun,
1996
1999
index_state = IndexState ,
1997
2000
ram_msg_count = RamMsgCount ,
1998
2001
ram_bytes = RamBytes ,
1999
- ram_pending_ack = RPA ,
2000
- disk_pending_ack = DPA ,
2001
- qi_pending_ack = QPA ,
2002
2002
disk_read_count = DiskReadCount ,
2003
2003
transient_threshold = TransientThreshold }) ->
2004
2004
# delta { start_seq_id = DeltaSeqId ,
@@ -2011,7 +2011,7 @@ maybe_deltas_to_betas(DelsAndAcksFun,
2011
2011
IndexState ),
2012
2012
{Q3a , RamCountsInc , RamBytesInc , State1 } =
2013
2013
betas_from_index_entries (List , TransientThreshold ,
2014
- RPA , DPA , QPA , DelsAndAcksFun ,
2014
+ DelsAndAcksFun ,
2015
2015
State # vqstate { index_state = IndexState1 }),
2016
2016
State2 = State1 # vqstate { ram_msg_count = RamMsgCount + RamCountsInc ,
2017
2017
ram_bytes = RamBytes + RamBytesInc ,
0 commit comments