@@ -533,38 +533,29 @@ terminate(_Reason, State) ->
533
533
% % the only difference between purge and delete is that delete also
534
534
% % needs to delete everything that's been delivered and not ack'd.
535
535
delete_and_terminate (_Reason , State ) ->
536
- % % TODO: there is no need to interact with qi at all - which we do
537
- % % as part of 'purge' and 'purge_pending_ack', other than
538
- % % deleting it.
539
- {_PurgeCount , State1 } = purge (State ),
540
- State2 = # vqstate { index_state = IndexState ,
541
- msg_store_clients = {MSCStateP , MSCStateT } } =
542
- purge_pending_ack (false , State1 ),
543
- IndexState1 = rabbit_queue_index :delete_and_terminate (IndexState ),
536
+ % % Normally when we purge messages we interact with the qi by
537
+ % % issues delivers and acks for every purged message. In this case
538
+ % % we don't need to do that, so we just delete the qi.
539
+ State1 = purge_and_index_reset (State ),
540
+ State2 = # vqstate { msg_store_clients = {MSCStateP , MSCStateT } } =
541
+ purge_pending_ack_delete_and_terminate (State1 ),
544
542
case MSCStateP of
545
543
undefined -> ok ;
546
544
_ -> rabbit_msg_store :client_delete_and_terminate (MSCStateP )
547
545
end ,
548
546
rabbit_msg_store :client_delete_and_terminate (MSCStateT ),
549
- a (State2 # vqstate { index_state = IndexState1 ,
550
- msg_store_clients = undefined }).
547
+ a (State2 # vqstate { msg_store_clients = undefined }).
551
548
552
549
delete_crashed (# amqqueue {name = QName }) ->
553
550
ok = rabbit_queue_index :erase (QName ).
554
551
555
- purge (State = # vqstate { q4 = Q4 ,
556
- len = Len }) ->
557
- % % TODO: when there are no pending acks, which is a common case,
558
- % % we could simply wipe the qi instead of issuing delivers and
559
- % % acks for all the messages.
560
- State1 = remove_queue_entries (Q4 , State ),
561
-
562
- State2 = # vqstate { q1 = Q1 } =
563
- purge_betas_and_deltas (State1 # vqstate { q4 = ? QUEUE :new () }),
564
-
565
- State3 = remove_queue_entries (Q1 , State2 ),
566
-
567
- {Len , a (State3 # vqstate { q1 = ? QUEUE :new () })}.
552
+ purge (State = # vqstate { len = Len }) ->
553
+ case is_pending_ack_empty (State ) of
554
+ true ->
555
+ {Len , purge_and_index_reset (State )};
556
+ false ->
557
+ {Len , purge_when_pending_acks (State )}
558
+ end .
568
559
569
560
purge_acks (State ) -> a (purge_pending_ack (false , State )).
570
561
@@ -754,10 +745,8 @@ len(#vqstate { len = Len }) -> Len.
754
745
755
746
is_empty (State ) -> 0 == len (State ).
756
747
757
- depth (State = # vqstate { ram_pending_ack = RPA ,
758
- disk_pending_ack = DPA ,
759
- qi_pending_ack = QPA }) ->
760
- len (State ) + gb_trees :size (RPA ) + gb_trees :size (DPA ) + gb_trees :size (QPA ).
748
+ depth (State ) ->
749
+ len (State ) + count_pending_acks (State ).
761
750
762
751
set_ram_duration_target (
763
752
DurationTarget , State = # vqstate {
@@ -1072,7 +1061,7 @@ maybe_write_delivered(false, _SeqId, IndexState) ->
1072
1061
maybe_write_delivered (true , SeqId , IndexState ) ->
1073
1062
rabbit_queue_index :deliver ([SeqId ], IndexState ).
1074
1063
1075
- betas_from_index_entries (List , TransientThreshold , RPA , DPA , QPA , IndexState ) ->
1064
+ betas_from_index_entries (List , TransientThreshold , RPA , DPA , QPA , DelsAndAcksFun , State ) ->
1076
1065
{Filtered , Delivers , Acks , RamReadyCount , RamBytes } =
1077
1066
lists :foldr (
1078
1067
fun ({_MsgOrId , SeqId , _MsgProps , IsPersistent , IsDelivered } = M ,
@@ -1095,9 +1084,7 @@ betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA, IndexState) ->
1095
1084
end
1096
1085
end
1097
1086
end , {? QUEUE :new (), [], [], 0 , 0 }, List ),
1098
- {Filtered , RamReadyCount , RamBytes ,
1099
- rabbit_queue_index :ack (
1100
- Acks , rabbit_queue_index :deliver (Delivers , IndexState ))}.
1087
+ {Filtered , RamReadyCount , RamBytes , DelsAndAcksFun (Delivers , Acks , State )}.
1101
1088
% % [0] We don't increase RamBytes here, even though it pertains to
1102
1089
% % unacked messages too, since if HaveMsg then the message must have
1103
1090
% % been stored in the QI, thus the message must have been in
@@ -1323,25 +1310,75 @@ remove(AckRequired, MsgStatus = #msg_status {
1323
1310
State2 # vqstate {out_counter = OutCount + 1 ,
1324
1311
index_state = IndexState2 })}.
1325
1312
1326
- purge_betas_and_deltas (State = # vqstate { q3 = Q3 }) ->
1313
+ % %----------------------------------------------------------------------------
1314
+ % % Helpers for Public API purge/1 function
1315
+ % %----------------------------------------------------------------------------
1316
+
1317
+ % % The difference between purge_when_pending_acks/1
1318
+ % % vs. purge_and_index_reset/1 is that the first one issues a deliver
1319
+ % % and an ack to the queue index for every message that's being
1320
+ % % removed, while the later just resets the queue index state.
1321
+ purge_when_pending_acks (State ) ->
1322
+ State1 = purge1 (process_delivers_and_acks_fun (deliver_and_ack ), State ),
1323
+ a (State1 ).
1324
+
1325
+ purge_and_index_reset (State ) ->
1326
+ State1 = purge1 (process_delivers_and_acks_fun (none ), State ),
1327
+ a (reset_qi_state (State1 )).
1328
+
1329
+ % % This function removes messages from each of {q1, q2, q3, q4}.
1330
+ % %
1331
+ % % With remove_queue_entries/3 q1 and q4 are emptied, while q2 and q3
1332
+ % % are specially handled by purge_betas_and_deltas/2.
1333
+ % %
1334
+ % % purge_betas_and_deltas/2 loads messages from the queue index,
1335
+ % % filling up q3 and in some cases moving messages form q2 to q3 while
1336
+ % % reseting q2 to an empty queue (see maybe_deltas_to_betas/2). The
1337
+ % % messages loaded into q3 are removed by calling
1338
+ % % remove_queue_entries/3 until there are no more messages to be read
1339
+ % % from the queue index. Messages are read in batches from the queue
1340
+ % % index.
1341
+ purge1 (AfterFun , State = # vqstate { q4 = Q4 }) ->
1342
+ State1 = remove_queue_entries (Q4 , AfterFun , State ),
1343
+
1344
+ State2 = # vqstate {q1 = Q1 } =
1345
+ purge_betas_and_deltas (AfterFun , State1 # vqstate {q4 = ? QUEUE :new ()}),
1346
+
1347
+ State3 = remove_queue_entries (Q1 , AfterFun , State2 ),
1348
+
1349
+ a (State3 # vqstate {q1 = ? QUEUE :new ()}).
1350
+
1351
+ reset_qi_state (State = # vqstate {index_state = IndexState }) ->
1352
+ State # vqstate {index_state =
1353
+ rabbit_queue_index :reset_state (IndexState )}.
1354
+
1355
+ is_pending_ack_empty (State ) ->
1356
+ count_pending_acks (State ) =:= 0 .
1357
+
1358
+ count_pending_acks (# vqstate { ram_pending_ack = RPA ,
1359
+ disk_pending_ack = DPA ,
1360
+ qi_pending_ack = QPA }) ->
1361
+ gb_trees :size (RPA ) + gb_trees :size (DPA ) + gb_trees :size (QPA ).
1362
+
1363
+ purge_betas_and_deltas (DelsAndAcksFun , State = # vqstate { q3 = Q3 }) ->
1327
1364
case ? QUEUE :is_empty (Q3 ) of
1328
1365
true -> State ;
1329
- false -> State1 = remove_queue_entries (Q3 , State ),
1330
- purge_betas_and_deltas (maybe_deltas_to_betas (
1366
+ false -> State1 = remove_queue_entries (Q3 , DelsAndAcksFun , State ),
1367
+ purge_betas_and_deltas (DelsAndAcksFun ,
1368
+ maybe_deltas_to_betas (
1369
+ DelsAndAcksFun ,
1331
1370
State1 # vqstate {q3 = ? QUEUE :new ()}))
1332
1371
end .
1333
1372
1334
- remove_queue_entries (Q , State = # vqstate { index_state = IndexState ,
1335
- msg_store_clients = MSCState }) ->
1373
+ remove_queue_entries (Q , DelsAndAcksFun ,
1374
+ State = # vqstate { msg_store_clients = MSCState }) ->
1336
1375
{MsgIdsByStore , Delivers , Acks , State1 } =
1337
1376
? QUEUE :foldl (fun remove_queue_entries1 /2 ,
1338
1377
{orddict :new (), [], [], State }, Q ),
1339
1378
ok = orddict :fold (fun (IsPersistent , MsgIds , ok ) ->
1340
1379
msg_store_remove (MSCState , IsPersistent , MsgIds )
1341
1380
end , ok , MsgIdsByStore ),
1342
- IndexState1 = rabbit_queue_index :ack (
1343
- Acks , rabbit_queue_index :deliver (Delivers , IndexState )),
1344
- State1 # vqstate {index_state = IndexState1 }.
1381
+ DelsAndAcksFun (Delivers , Acks , State1 ).
1345
1382
1346
1383
remove_queue_entries1 (
1347
1384
# msg_status { msg_id = MsgId , seq_id = SeqId , is_delivered = IsDelivered ,
@@ -1356,6 +1393,18 @@ remove_queue_entries1(
1356
1393
cons_if (IndexOnDisk , SeqId , Acks ),
1357
1394
stats ({- 1 , 0 }, {MsgStatus , none }, State )}.
1358
1395
1396
+ process_delivers_and_acks_fun (deliver_and_ack ) ->
1397
+ fun (Delivers , Acks , State = # vqstate { index_state = IndexState }) ->
1398
+ IndexState1 =
1399
+ rabbit_queue_index :ack (
1400
+ Acks , rabbit_queue_index :deliver (Delivers , IndexState )),
1401
+ State # vqstate { index_state = IndexState1 }
1402
+ end ;
1403
+ process_delivers_and_acks_fun (_ ) ->
1404
+ fun (_ , _ , State ) ->
1405
+ State
1406
+ end .
1407
+
1359
1408
% %----------------------------------------------------------------------------
1360
1409
% % Internal gubbins for publishing
1361
1410
% %----------------------------------------------------------------------------
@@ -1550,11 +1599,29 @@ remove_pending_ack(false, SeqId, State = #vqstate{ram_pending_ack = RPA,
1550
1599
end .
1551
1600
1552
1601
purge_pending_ack (KeepPersistent ,
1553
- State = # vqstate { ram_pending_ack = RPA ,
1554
- disk_pending_ack = DPA ,
1555
- qi_pending_ack = QPA ,
1556
- index_state = IndexState ,
1602
+ State = # vqstate { index_state = IndexState ,
1557
1603
msg_store_clients = MSCState }) ->
1604
+ {IndexOnDiskSeqIds , MsgIdsByStore , State1 } = purge_pending_ack1 (State ),
1605
+ case KeepPersistent of
1606
+ true -> remove_transient_msgs_by_id (MsgIdsByStore , MSCState ),
1607
+ State1 ;
1608
+ false -> IndexState1 =
1609
+ rabbit_queue_index :ack (IndexOnDiskSeqIds , IndexState ),
1610
+ remove_msgs_by_id (MsgIdsByStore , MSCState ),
1611
+ State1 # vqstate { index_state = IndexState1 }
1612
+ end .
1613
+
1614
+ purge_pending_ack_delete_and_terminate (
1615
+ State = # vqstate { index_state = IndexState ,
1616
+ msg_store_clients = MSCState }) ->
1617
+ {_ , MsgIdsByStore , State1 } = purge_pending_ack1 (State ),
1618
+ IndexState1 = rabbit_queue_index :delete_and_terminate (IndexState ),
1619
+ remove_msgs_by_id (MsgIdsByStore , MSCState ),
1620
+ State1 # vqstate { index_state = IndexState1 }.
1621
+
1622
+ purge_pending_ack1 (State = # vqstate { ram_pending_ack = RPA ,
1623
+ disk_pending_ack = DPA ,
1624
+ qi_pending_ack = QPA }) ->
1558
1625
F = fun (_SeqId , MsgStatus , Acc ) -> accumulate_ack (MsgStatus , Acc ) end ,
1559
1626
{IndexOnDiskSeqIds , MsgIdsByStore , _AllMsgIds } =
1560
1627
rabbit_misc :gb_trees_fold (
@@ -1564,19 +1631,26 @@ purge_pending_ack(KeepPersistent,
1564
1631
State1 = State # vqstate { ram_pending_ack = gb_trees :empty (),
1565
1632
disk_pending_ack = gb_trees :empty (),
1566
1633
qi_pending_ack = gb_trees :empty ()},
1634
+ {IndexOnDiskSeqIds , MsgIdsByStore , State1 }.
1567
1635
1568
- case KeepPersistent of
1569
- true -> case orddict :find (false , MsgIdsByStore ) of
1570
- error -> State1 ;
1571
- {ok , MsgIds } -> ok = msg_store_remove (MSCState , false ,
1572
- MsgIds ),
1573
- State1
1574
- end ;
1575
- false -> IndexState1 =
1576
- rabbit_queue_index :ack (IndexOnDiskSeqIds , IndexState ),
1577
- [ok = msg_store_remove (MSCState , IsPersistent , MsgIds )
1578
- || {IsPersistent , MsgIds } <- orddict :to_list (MsgIdsByStore )],
1579
- State1 # vqstate { index_state = IndexState1 }
1636
+ % % MsgIdsByStore is an orddict with two keys:
1637
+ % %
1638
+ % % true: holds a list of Persistent Message Ids.
1639
+ % % false: holds a list of Transient Message Ids.
1640
+ % %
1641
+ % % When we call orddict:to_list/1 we get two sets of msg ids, where
1642
+ % % IsPersistent is either true for persistent messages or false for
1643
+ % % transient ones. The msg_store_remove/3 function takes this boolean
1644
+ % % flag to determine from which store the messages should be removed
1645
+ % % from.
1646
+ remove_msgs_by_id (MsgIdsByStore , MSCState ) ->
1647
+ [ok = msg_store_remove (MSCState , IsPersistent , MsgIds )
1648
+ || {IsPersistent , MsgIds } <- orddict :to_list (MsgIdsByStore )].
1649
+
1650
+ remove_transient_msgs_by_id (MsgIdsByStore , MSCState ) ->
1651
+ case orddict :find (false , MsgIdsByStore ) of
1652
+ error -> ok ;
1653
+ {ok , MsgIds } -> ok = msg_store_remove (MSCState , false , MsgIds )
1580
1654
end .
1581
1655
1582
1656
accumulate_ack_init () -> {[], orddict :new (), []}.
@@ -1910,9 +1984,15 @@ fetch_from_q3(State = #vqstate { q1 = Q1,
1910
1984
{loaded , {MsgStatus , State2 }}
1911
1985
end .
1912
1986
1913
- maybe_deltas_to_betas (State = # vqstate { delta = ? BLANK_DELTA_PATTERN (X ) }) ->
1987
+ maybe_deltas_to_betas (State ) ->
1988
+ AfterFun = process_delivers_and_acks_fun (deliver_and_ack ),
1989
+ maybe_deltas_to_betas (AfterFun , State ).
1990
+
1991
+ maybe_deltas_to_betas (_DelsAndAcksFun ,
1992
+ State = # vqstate {delta = ? BLANK_DELTA_PATTERN (X ) }) ->
1914
1993
State ;
1915
- maybe_deltas_to_betas (State = # vqstate {
1994
+ maybe_deltas_to_betas (DelsAndAcksFun ,
1995
+ State = # vqstate {
1916
1996
q2 = Q2 ,
1917
1997
delta = Delta ,
1918
1998
q3 = Q3 ,
@@ -1932,34 +2012,35 @@ maybe_deltas_to_betas(State = #vqstate {
1932
2012
DeltaSeqIdEnd ]),
1933
2013
{List , IndexState1 } = rabbit_queue_index :read (DeltaSeqId , DeltaSeqId1 ,
1934
2014
IndexState ),
1935
- {Q3a , RamCountsInc , RamBytesInc , IndexState2 } =
2015
+ {Q3a , RamCountsInc , RamBytesInc , State1 } =
1936
2016
betas_from_index_entries (List , TransientThreshold ,
1937
- RPA , DPA , QPA , IndexState1 ) ,
1938
- State1 = State # vqstate { index_state = IndexState2 ,
1939
- ram_msg_count = RamMsgCount + RamCountsInc ,
1940
- ram_bytes = RamBytes + RamBytesInc ,
1941
- disk_read_count = DiskReadCount + RamCountsInc },
2017
+ RPA , DPA , QPA , DelsAndAcksFun ,
2018
+ State # vqstate { index_state = IndexState1 }) ,
2019
+ State2 = State1 # vqstate { ram_msg_count = RamMsgCount + RamCountsInc ,
2020
+ ram_bytes = RamBytes + RamBytesInc ,
2021
+ disk_read_count = DiskReadCount + RamCountsInc },
1942
2022
case ? QUEUE :len (Q3a ) of
1943
2023
0 ->
1944
2024
% % we ignored every message in the segment due to it being
1945
2025
% % transient and below the threshold
1946
2026
maybe_deltas_to_betas (
1947
- State1 # vqstate {
2027
+ DelsAndAcksFun ,
2028
+ State2 # vqstate {
1948
2029
delta = d (Delta # delta { start_seq_id = DeltaSeqId1 })});
1949
2030
Q3aLen ->
1950
2031
Q3b = ? QUEUE :join (Q3 , Q3a ),
1951
2032
case DeltaCount - Q3aLen of
1952
2033
0 ->
1953
2034
% % delta is now empty, but it wasn't before, so
1954
2035
% % can now join q2 onto q3
1955
- State1 # vqstate { q2 = ? QUEUE :new (),
2036
+ State2 # vqstate { q2 = ? QUEUE :new (),
1956
2037
delta = ? BLANK_DELTA ,
1957
2038
q3 = ? QUEUE :join (Q3b , Q2 ) };
1958
2039
N when N > 0 ->
1959
2040
Delta1 = d (# delta { start_seq_id = DeltaSeqId1 ,
1960
2041
count = N ,
1961
2042
end_seq_id = DeltaSeqIdEnd }),
1962
- State1 # vqstate { delta = Delta1 ,
2043
+ State2 # vqstate { delta = Delta1 ,
1963
2044
q3 = Q3b }
1964
2045
end
1965
2046
end .
0 commit comments