@@ -695,8 +695,7 @@ ack(AckTags, State) ->
695
695
{accumulate_ack (MsgStatus , Acc ), State3 }
696
696
end , {accumulate_ack_init (), State }, AckTags ),
697
697
IndexState1 = rabbit_queue_index :ack (IndexOnDiskSeqIds , IndexState ),
698
- [ok = msg_store_remove (MSCState , IsPersistent , MsgIds )
699
- || {IsPersistent , MsgIds } <- orddict :to_list (MsgIdsByStore )],
698
+ remove_msgs_by_id (MsgIdsByStore , MSCState ),
700
699
{lists :reverse (AllMsgIds ),
701
700
a (State1 # vqstate { index_state = IndexState1 ,
702
701
ack_out_counter = AckOutCount + length (AckTags ) })}.
@@ -1122,7 +1121,7 @@ maybe_write_delivered(false, _SeqId, IndexState) ->
1122
1121
maybe_write_delivered (true , SeqId , IndexState ) ->
1123
1122
rabbit_queue_index :deliver ([SeqId ], IndexState ).
1124
1123
1125
- betas_from_index_entries (List , TransientThreshold , RPA , DPA , QPA , DelsAndAcksFun , State ) ->
1124
+ betas_from_index_entries (List , TransientThreshold , DelsAndAcksFun , State ) ->
1126
1125
{Filtered , Delivers , Acks , RamReadyCount , RamBytes } =
1127
1126
lists :foldr (
1128
1127
fun ({_MsgOrId , SeqId , _MsgProps , IsPersistent , IsDelivered } = M ,
@@ -1134,9 +1133,7 @@ betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA, DelsAndAcksFun
1134
1133
false -> MsgStatus = m (beta_msg_status (M )),
1135
1134
HaveMsg = msg_in_ram (MsgStatus ),
1136
1135
Size = msg_size (MsgStatus ),
1137
- case (gb_trees :is_defined (SeqId , RPA ) orelse
1138
- gb_trees :is_defined (SeqId , DPA ) orelse
1139
- gb_trees :is_defined (SeqId , QPA )) of
1136
+ case is_msg_in_pending_acks (SeqId , State ) of
1140
1137
false -> {? QUEUE :in_r (MsgStatus , Filtered1 ),
1141
1138
Delivers1 , Acks1 ,
1142
1139
RRC + one_if (HaveMsg ),
@@ -1151,6 +1148,13 @@ betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA, DelsAndAcksFun
1151
1148
% % been stored in the QI, thus the message must have been in
1152
1149
% % qi_pending_ack, thus it must already have been in RAM.
1153
1150
1151
+ is_msg_in_pending_acks (SeqId , # vqstate { ram_pending_ack = RPA ,
1152
+ disk_pending_ack = DPA ,
1153
+ qi_pending_ack = QPA }) ->
1154
+ (gb_trees :is_defined (SeqId , RPA ) orelse
1155
+ gb_trees :is_defined (SeqId , DPA ) orelse
1156
+ gb_trees :is_defined (SeqId , QPA )).
1157
+
1154
1158
expand_delta (SeqId , ? BLANK_DELTA_PATTERN (X )) ->
1155
1159
d (# delta { start_seq_id = SeqId , count = 1 , end_seq_id = SeqId + 1 });
1156
1160
expand_delta (SeqId , # delta { start_seq_id = StartSeqId ,
@@ -1436,9 +1440,7 @@ remove_queue_entries(Q, DelsAndAcksFun,
1436
1440
{MsgIdsByStore , Delivers , Acks , State1 } =
1437
1441
? QUEUE :foldl (fun remove_queue_entries1 /2 ,
1438
1442
{orddict :new (), [], [], State }, Q ),
1439
- ok = orddict :fold (fun (IsPersistent , MsgIds , ok ) ->
1440
- msg_store_remove (MSCState , IsPersistent , MsgIds )
1441
- end , ok , MsgIdsByStore ),
1443
+ remove_msgs_by_id (MsgIdsByStore , MSCState ),
1442
1444
DelsAndAcksFun (Delivers , Acks , State1 ).
1443
1445
1444
1446
remove_queue_entries1 (
@@ -1886,9 +1888,7 @@ next({delta, #delta{start_seq_id = SeqId,
1886
1888
next ({delta , Delta , [], State }, IndexState ) ->
1887
1889
next ({delta , Delta , State }, IndexState );
1888
1890
next ({delta , Delta , [{_ , SeqId , _ , _ , _ } = M | Rest ], State }, IndexState ) ->
1889
- case (gb_trees :is_defined (SeqId , State # vqstate .ram_pending_ack ) orelse
1890
- gb_trees :is_defined (SeqId , State # vqstate .disk_pending_ack ) orelse
1891
- gb_trees :is_defined (SeqId , State # vqstate .qi_pending_ack )) of
1891
+ case is_msg_in_pending_acks (SeqId , State ) of
1892
1892
false -> Next = {delta , Delta , Rest , State },
1893
1893
{value , beta_msg_status (M ), false , Next , IndexState };
1894
1894
true -> next ({delta , Delta , Rest , State }, IndexState )
@@ -2060,9 +2060,6 @@ maybe_deltas_to_betas(DelsAndAcksFun,
2060
2060
index_state = IndexState ,
2061
2061
ram_msg_count = RamMsgCount ,
2062
2062
ram_bytes = RamBytes ,
2063
- ram_pending_ack = RPA ,
2064
- disk_pending_ack = DPA ,
2065
- qi_pending_ack = QPA ,
2066
2063
disk_read_count = DiskReadCount ,
2067
2064
transient_threshold = TransientThreshold }) ->
2068
2065
# delta { start_seq_id = DeltaSeqId ,
@@ -2075,7 +2072,7 @@ maybe_deltas_to_betas(DelsAndAcksFun,
2075
2072
IndexState ),
2076
2073
{Q3a , RamCountsInc , RamBytesInc , State1 } =
2077
2074
betas_from_index_entries (List , TransientThreshold ,
2078
- RPA , DPA , QPA , DelsAndAcksFun ,
2075
+ DelsAndAcksFun ,
2079
2076
State # vqstate { index_state = IndexState1 }),
2080
2077
State2 = State1 # vqstate { ram_msg_count = RamMsgCount + RamCountsInc ,
2081
2078
ram_bytes = RamBytes + RamBytesInc ,
0 commit comments