@@ -327,7 +327,7 @@ apply(#{index := RaftIdx}, #purge{},
327
327
messages = #{},
328
328
returns = lqueue :new (),
329
329
msg_bytes_enqueue = 0 ,
330
- prefix_msgs = {[] , []},
330
+ prefix_msgs = {0 , [], 0 , []},
331
331
low_msg_num = undefined ,
332
332
msg_bytes_in_memory = 0 ,
333
333
msgs_ready_in_memory = 0 },
@@ -555,7 +555,7 @@ state_enter(leader, #?MODULE{consumers = Cons,
555
555
cfg = # cfg {name = Name ,
556
556
resource = Resource ,
557
557
become_leader_handler = BLH },
558
- prefix_msgs = {[] , []}
558
+ prefix_msgs = {0 , [], 0 , []}
559
559
}) ->
560
560
% return effects to monitor all current consumers and enqueuers
561
561
Pids = lists :usort (maps :keys (Enqs )
@@ -770,16 +770,16 @@ usage(Name) when is_atom(Name) ->
770
770
% %% Internal
771
771
772
772
messages_ready (#? MODULE {messages = M ,
773
- prefix_msgs = {PreR , PreM },
773
+ prefix_msgs = {RCnt , _R , PCnt , _P },
774
774
returns = R }) ->
775
775
776
776
% % prefix messages will rarely have anything in them during normal
777
777
% % operations so length/1 is fine here
778
- maps :size (M ) + lqueue :len (R ) + length ( PreR ) + length ( PreM ) .
778
+ maps :size (M ) + lqueue :len (R ) + RCnt + PCnt .
779
779
780
780
messages_total (#? MODULE {ra_indexes = I ,
781
- prefix_msgs = {PreR , PreM }}) ->
782
- rabbit_fifo_index :size (I ) + length ( PreR ) + length ( PreM ) .
781
+ prefix_msgs = {RCnt , _R , PCnt , _P }}) ->
782
+ rabbit_fifo_index :size (I ) + RCnt + PCnt .
783
783
784
784
update_use ({inactive , _ , _ , _ } = CUInfo , inactive ) ->
785
785
CUInfo ;
@@ -1016,13 +1016,15 @@ maybe_store_dehydrated_state(RaftIdx,
1016
1016
= Cfg ,
1017
1017
ra_indexes = Indexes ,
1018
1018
enqueue_count = 0 ,
1019
- release_cursors = Cursors0 } = State ) ->
1019
+ release_cursors = Cursors0 } = State0 ) ->
1020
1020
case rabbit_fifo_index :exists (RaftIdx , Indexes ) of
1021
1021
false ->
1022
1022
% % the incoming enqueue must already have been dropped
1023
- State ;
1023
+ State0 ;
1024
1024
true ->
1025
- Dehydrated = dehydrate_state (State ),
1025
+ State = convert_prefix_msgs (State0 ),
1026
+ {Time , Dehydrated } = timer :tc (fun () -> dehydrate_state (State ) end ),
1027
+ rabbit_log :info (" dehydrating state took ~b ms" , [Time div 1000 ]),
1026
1028
Cursor = {release_cursor , RaftIdx , Dehydrated },
1027
1029
Cursors = lqueue :in (Cursor , Cursors0 ),
1028
1030
Interval = lqueue :len (Cursors ) * Base ,
@@ -1336,7 +1338,8 @@ evaluate_limit(Result,
1336
1338
max_bytes = undefined }} = State ,
1337
1339
Effects ) ->
1338
1340
{State , Result , Effects };
1339
- evaluate_limit (Result , State0 , Effects0 ) ->
1341
+ evaluate_limit (Result , State00 , Effects0 ) ->
1342
+ State0 = convert_prefix_msgs (State00 ),
1340
1343
case is_over_limit (State0 ) of
1341
1344
true ->
1342
1345
{State , Effects } = drop_head (State0 , Effects0 ),
@@ -1380,17 +1383,21 @@ append_log_effects(Effects0, AccMap) ->
1380
1383
% %
1381
1384
% % When we return it is always done to the current return queue
1382
1385
% % for both prefix messages and current messages
1383
- take_next_msg (#? MODULE {prefix_msgs = {[{'$empty_msg' , _ } = Msg | Rem ], P }} = State ) ->
1386
+ take_next_msg (#? MODULE {prefix_msgs = {R , P }} = State ) ->
1387
+ % % conversion
1388
+ take_next_msg (State #? MODULE {prefix_msgs = {length (R ), R , length (P ), P }});
1389
+ take_next_msg (#? MODULE {prefix_msgs = {NumR , [{'$empty_msg' , _ } = Msg | Rem ],
1390
+ NumP , P }} = State ) ->
1384
1391
% % there are prefix returns, these should be served first
1385
- {Msg , State #? MODULE {prefix_msgs = {Rem , P }}};
1386
- take_next_msg (#? MODULE {prefix_msgs = {[Header | Rem ], P }} = State ) ->
1392
+ {Msg , State #? MODULE {prefix_msgs = {NumR - 1 , Rem , NumP , P }}};
1393
+ take_next_msg (#? MODULE {prefix_msgs = {NumR , [Header | Rem ], NumP , P }} = State ) ->
1387
1394
% % there are prefix returns, these should be served first
1388
1395
{{'$prefix_msg' , Header },
1389
- State #? MODULE {prefix_msgs = {Rem , P }}};
1396
+ State #? MODULE {prefix_msgs = {NumR - 1 , Rem , NumP , P }}};
1390
1397
take_next_msg (#? MODULE {returns = Returns ,
1391
1398
low_msg_num = Low0 ,
1392
1399
messages = Messages0 ,
1393
- prefix_msgs = {R , P }} = State ) ->
1400
+ prefix_msgs = {NumR , R , NumP , P }} = State ) ->
1394
1401
% % use peek rather than out there as the most likely case is an empty
1395
1402
% % queue
1396
1403
case lqueue :peek (Returns ) of
@@ -1420,10 +1427,10 @@ take_next_msg(#?MODULE{returns = Returns,
1420
1427
{Header , 'empty' } ->
1421
1428
% % There are prefix msgs
1422
1429
{{'$empty_msg' , Header },
1423
- State #? MODULE {prefix_msgs = {R , Rem }}};
1430
+ State #? MODULE {prefix_msgs = {NumR , R , NumP - 1 , Rem }}};
1424
1431
Header ->
1425
1432
{{'$prefix_msg' , Header },
1426
- State #? MODULE {prefix_msgs = {R , Rem }}}
1433
+ State #? MODULE {prefix_msgs = {NumR , R , NumP - 1 , Rem }}}
1427
1434
end
1428
1435
end .
1429
1436
@@ -1600,17 +1607,23 @@ maybe_queue_consumer(ConsumerId, #consumer{credit = Credit},
1600
1607
ServiceQueue0
1601
1608
end .
1602
1609
1610
+ convert_prefix_msgs (#? MODULE {prefix_msgs = {R , P }} = State ) ->
1611
+ State #? MODULE {prefix_msgs = {length (R ), R , length (P ), P }};
1612
+ convert_prefix_msgs (State ) ->
1613
+ State .
1614
+
1603
1615
% % creates a dehydrated version of the current state to be cached and
1604
1616
% % potentially used to for a snaphot at a later point
1605
1617
dehydrate_state (#? MODULE {messages = Messages ,
1606
1618
consumers = Consumers ,
1607
1619
returns = Returns ,
1608
1620
low_msg_num = Low ,
1609
1621
next_msg_num = Next ,
1610
- prefix_msgs = {PrefRet0 , PrefMsg0 },
1622
+ prefix_msgs = {PRCnt , PrefRet0 , PPCnt , PrefMsg0 },
1611
1623
waiting_consumers = Waiting0 } = State ) ->
1624
+ RCnt = lqueue :len (Returns ),
1612
1625
% % TODO: optimise this function as far as possible
1613
- PrefRet = lists :foldl (fun ({'$prefix_msg' , Header }, Acc ) ->
1626
+ PrefRet1 = lists :foldr (fun ({'$prefix_msg' , Header }, Acc ) ->
1614
1627
[Header | Acc ];
1615
1628
({'$empty_msg' , _ } = Msg , Acc ) ->
1616
1629
[Msg | Acc ];
@@ -1619,8 +1632,9 @@ dehydrate_state(#?MODULE{messages = Messages,
1619
1632
({_ , {_ , {Header , _ }}}, Acc ) ->
1620
1633
[Header | Acc ]
1621
1634
end ,
1622
- lists : reverse ( PrefRet0 ) ,
1635
+ [] ,
1623
1636
lqueue :to_list (Returns )),
1637
+ PrefRet = PrefRet0 ++ PrefRet1 ,
1624
1638
PrefMsgsSuff = dehydrate_messages (Low , Next - 1 , Messages , []),
1625
1639
% % prefix messages are not populated in normal operation only after
1626
1640
% % recovering from a snapshot
@@ -1634,8 +1648,8 @@ dehydrate_state(#?MODULE{messages = Messages,
1634
1648
dehydrate_consumer (C )
1635
1649
end , Consumers ),
1636
1650
returns = lqueue :new (),
1637
- prefix_msgs = {lists : reverse ( PrefRet ) ,
1638
- PrefMsgs },
1651
+ prefix_msgs = {PRCnt + RCnt , PrefRet ,
1652
+ PPCnt + maps : size ( Messages ), PrefMsgs },
1639
1653
waiting_consumers = Waiting }.
1640
1654
1641
1655
dehydrate_messages (Low , Next , _Msgs , Acc )
0 commit comments