@@ -1887,7 +1887,7 @@ count_pending_acks(#vqstate { ram_pending_ack = RPA,
1887
1887
purge_betas_and_deltas (DelsAndAcksFun , State ) ->
1888
1888
% % We use the maximum memory limit when purging to get greater performance.
1889
1889
MemoryLimit = 2048 ,
1890
- State0 = # vqstate { q3 = Q3 } = maybe_deltas_to_betas (DelsAndAcksFun , State , MemoryLimit ),
1890
+ State0 = # vqstate { q3 = Q3 } = maybe_deltas_to_betas (DelsAndAcksFun , State , MemoryLimit , metadata_only ),
1891
1891
1892
1892
case ? QUEUE :is_empty (Q3 ) of
1893
1893
true -> State0 ;
@@ -2580,11 +2580,11 @@ maybe_deltas_to_betas(State = #vqstate { rates = #rates{ out = OutRate }}) ->
2580
2580
AfterFun = process_delivers_and_acks_fun (deliver_and_ack ),
2581
2581
% % We allow from 1 to 2048 messages in memory depending on the consume rate.
2582
2582
MemoryLimit = min (1 + floor (2 * OutRate ), 2048 ),
2583
- maybe_deltas_to_betas (AfterFun , State , MemoryLimit ).
2583
+ maybe_deltas_to_betas (AfterFun , State , MemoryLimit , messages ).
2584
2584
2585
2585
maybe_deltas_to_betas (_DelsAndAcksFun ,
2586
2586
State = # vqstate {delta = ? BLANK_DELTA_PATTERN (X ) },
2587
- _MemoryLimit ) ->
2587
+ _MemoryLimit , _WhatToRead ) ->
2588
2588
State ;
2589
2589
maybe_deltas_to_betas (DelsAndAcksFun ,
2590
2590
State = # vqstate {
@@ -2600,7 +2600,7 @@ maybe_deltas_to_betas(DelsAndAcksFun,
2600
2600
delta_transient_bytes = DeltaTransientBytes ,
2601
2601
transient_threshold = TransientThreshold ,
2602
2602
version = Version },
2603
- MemoryLimit ) ->
2603
+ MemoryLimit , WhatToRead ) ->
2604
2604
# delta { start_seq_id = DeltaSeqId ,
2605
2605
count = DeltaCount ,
2606
2606
transient = Transient ,
@@ -2622,74 +2622,80 @@ maybe_deltas_to_betas(DelsAndAcksFun,
2622
2622
lists :min ([IndexMod :next_segment_boundary (DeltaSeqId ),
2623
2623
DeltaSeqLimit , DeltaSeqIdEnd ]),
2624
2624
{List0 , IndexState1 } = IndexMod :read (DeltaSeqId , DeltaSeqId1 , IndexState ),
2625
- % % We try to read messages from disk all at once instead of
2626
- % % 1 by 1 at fetch time. When v1 is used and messages are
2627
- % % embedded, then the message content is already read from
2628
- % % disk at this point. For v2 embedded we must do a separate
2629
- % % call to obtain the contents and then merge the contents
2630
- % % back into the #msg_status records.
2631
- % %
2632
- % % For shared message store messages we do the same but only
2633
- % % for messages < ?SHARED_READ_MANY_SIZE_THRESHOLD bytes and
2634
- % % when there are at least ?SHARED_READ_MANY_COUNT_THRESHOLD
2635
- % % messages to fetch from that store. Other messages will be
2636
- % % fetched one by one right before sending the messages.
2637
- % %
2638
- % % Since we have two different shared stores for persistent
2639
- % % and transient messages they are treated separately when
2640
- % % deciding whether to read_many from either of them.
2641
- % %
2642
- % % Because v2 and shared stores function differently we
2643
- % % must keep different information for performing the reads.
2644
- {V2Reads0 , ShPersistReads , ShTransientReads } = lists :foldl (fun
2645
- ({_ , SeqId , MsgLocation , _ , _ }, {V2ReadsAcc , ShPReadsAcc , ShTReadsAcc }) when is_tuple (MsgLocation ) ->
2646
- {[{SeqId , MsgLocation }|V2ReadsAcc ], ShPReadsAcc , ShTReadsAcc };
2647
- ({MsgId , _ , rabbit_msg_store , # message_properties {size = Size }, true },
2648
- {V2ReadsAcc , ShPReadsAcc , ShTReadsAcc }) when Size =< ? SHARED_READ_MANY_SIZE_THRESHOLD ->
2649
- {V2ReadsAcc , [MsgId |ShPReadsAcc ], ShTReadsAcc };
2650
- ({MsgId , _ , rabbit_msg_store , # message_properties {size = Size }, false },
2651
- {V2ReadsAcc , ShPReadsAcc , ShTReadsAcc }) when Size =< ? SHARED_READ_MANY_SIZE_THRESHOLD ->
2652
- {V2ReadsAcc , ShPReadsAcc , [MsgId |ShTReadsAcc ]};
2653
- (_ , Acc ) ->
2654
- Acc
2655
- end , {[], [], []}, List0 ),
2656
- % % In order to properly read and merge V2 messages we want them
2657
- % % in the older->younger order.
2658
- V2Reads = lists :reverse (V2Reads0 ),
2659
- % % We do read_many for v2 store unconditionally.
2660
- {V2Msgs , StoreState2 } = rabbit_classic_queue_store_v2 :read_many (V2Reads , StoreState ),
2661
- List1 = merge_read_msgs (List0 , V2Reads , V2Msgs ),
2662
- % % We read from the shared message store only if there are multiple messages
2663
- % % (10+ as we wouldn't get much benefits from smaller number of messages)
2664
- % % otherwise we wait and read later.
2665
- % %
2666
- % % Because read_many does not use FHC we do not get an updated MCState
2667
- % % like with normal reads.
2668
- List2 = case length (ShPersistReads ) < ? SHARED_READ_MANY_COUNT_THRESHOLD of
2669
- true ->
2670
- List1 ;
2671
- false ->
2672
- ShPersistMsgs = rabbit_msg_store :read_many (ShPersistReads , MCStateP ),
2673
- case map_size (ShPersistMsgs ) of
2674
- 0 -> List1 ;
2675
- _ -> merge_sh_read_msgs (List1 , ShPersistMsgs )
2676
- end
2677
- end ,
2678
- List = case length (ShTransientReads ) < ? SHARED_READ_MANY_COUNT_THRESHOLD of
2679
- true ->
2680
- List2 ;
2681
- false ->
2682
- ShTransientMsgs = rabbit_msg_store :read_many (ShTransientReads , MCStateT ),
2683
- case map_size (ShTransientMsgs ) of
2684
- 0 -> List2 ;
2685
- _ -> merge_sh_read_msgs (List2 , ShTransientMsgs )
2686
- end
2625
+ {List , StoreState3 } = case WhatToRead of
2626
+ messages ->
2627
+ % % We try to read messages from disk all at once instead of
2628
+ % % 1 by 1 at fetch time. When v1 is used and messages are
2629
+ % % embedded, then the message content is already read from
2630
+ % % disk at this point. For v2 embedded we must do a separate
2631
+ % % call to obtain the contents and then merge the contents
2632
+ % % back into the #msg_status records.
2633
+ % %
2634
+ % % For shared message store messages we do the same but only
2635
+ % % for messages < ?SHARED_READ_MANY_SIZE_THRESHOLD bytes and
2636
+ % % when there are at least ?SHARED_READ_MANY_COUNT_THRESHOLD
2637
+ % % messages to fetch from that store. Other messages will be
2638
+ % % fetched one by one right before sending the messages.
2639
+ % %
2640
+ % % Since we have two different shared stores for persistent
2641
+ % % and transient messages they are treated separately when
2642
+ % % deciding whether to read_many from either of them.
2643
+ % %
2644
+ % % Because v2 and shared stores function differently we
2645
+ % % must keep different information for performing the reads.
2646
+ {V2Reads0 , ShPersistReads , ShTransientReads } = lists :foldl (fun
2647
+ ({_ , SeqId , MsgLocation , _ , _ }, {V2ReadsAcc , ShPReadsAcc , ShTReadsAcc }) when is_tuple (MsgLocation ) ->
2648
+ {[{SeqId , MsgLocation }|V2ReadsAcc ], ShPReadsAcc , ShTReadsAcc };
2649
+ ({MsgId , _ , rabbit_msg_store , # message_properties {size = Size }, true },
2650
+ {V2ReadsAcc , ShPReadsAcc , ShTReadsAcc }) when Size =< ? SHARED_READ_MANY_SIZE_THRESHOLD ->
2651
+ {V2ReadsAcc , [MsgId |ShPReadsAcc ], ShTReadsAcc };
2652
+ ({MsgId , _ , rabbit_msg_store , # message_properties {size = Size }, false },
2653
+ {V2ReadsAcc , ShPReadsAcc , ShTReadsAcc }) when Size =< ? SHARED_READ_MANY_SIZE_THRESHOLD ->
2654
+ {V2ReadsAcc , ShPReadsAcc , [MsgId |ShTReadsAcc ]};
2655
+ (_ , Acc ) ->
2656
+ Acc
2657
+ end , {[], [], []}, List0 ),
2658
+ % % In order to properly read and merge V2 messages we want them
2659
+ % % in the older->younger order.
2660
+ V2Reads = lists :reverse (V2Reads0 ),
2661
+ % % We do read_many for v2 store unconditionally.
2662
+ {V2Msgs , StoreState2 } = rabbit_classic_queue_store_v2 :read_many (V2Reads , StoreState ),
2663
+ List1 = merge_read_msgs (List0 , V2Reads , V2Msgs ),
2664
+ % % We read from the shared message store only if there are multiple messages
2665
+ % % (10+ as we wouldn't get much benefits from smaller number of messages)
2666
+ % % otherwise we wait and read later.
2667
+ % %
2668
+ % % Because read_many does not use FHC we do not get an updated MCState
2669
+ % % like with normal reads.
2670
+ List2 = case length (ShPersistReads ) < ? SHARED_READ_MANY_COUNT_THRESHOLD of
2671
+ true ->
2672
+ List1 ;
2673
+ false ->
2674
+ ShPersistMsgs = rabbit_msg_store :read_many (ShPersistReads , MCStateP ),
2675
+ case map_size (ShPersistMsgs ) of
2676
+ 0 -> List1 ;
2677
+ _ -> merge_sh_read_msgs (List1 , ShPersistMsgs )
2678
+ end
2679
+ end ,
2680
+ List3 = case length (ShTransientReads ) < ? SHARED_READ_MANY_COUNT_THRESHOLD of
2681
+ true ->
2682
+ List2 ;
2683
+ false ->
2684
+ ShTransientMsgs = rabbit_msg_store :read_many (ShTransientReads , MCStateT ),
2685
+ case map_size (ShTransientMsgs ) of
2686
+ 0 -> List2 ;
2687
+ _ -> merge_sh_read_msgs (List2 , ShTransientMsgs )
2688
+ end
2689
+ end ,
2690
+ {List3 , StoreState2 };
2691
+ metadata_only ->
2692
+ {List0 , StoreState }
2687
2693
end ,
2688
2694
{Q3a , RamCountsInc , RamBytesInc , State1 , TransientCount , TransientBytes } =
2689
2695
betas_from_index_entries (List , TransientThreshold ,
2690
2696
DelsAndAcksFun ,
2691
2697
State # vqstate { index_state = IndexState1 ,
2692
- store_state = StoreState2 }),
2698
+ store_state = StoreState3 }),
2693
2699
State2 = State1 # vqstate { ram_msg_count = RamMsgCount + RamCountsInc ,
2694
2700
ram_bytes = RamBytes + RamBytesInc ,
2695
2701
disk_read_count = DiskReadCount + RamCountsInc },
@@ -2701,7 +2707,7 @@ maybe_deltas_to_betas(DelsAndAcksFun,
2701
2707
DelsAndAcksFun ,
2702
2708
State2 # vqstate {
2703
2709
delta = d (Delta # delta { start_seq_id = DeltaSeqId1 })},
2704
- MemoryLimit );
2710
+ MemoryLimit , WhatToRead );
2705
2711
Q3aLen ->
2706
2712
Q3b = ? QUEUE :join (Q3 , Q3a ),
2707
2713
case DeltaCount - Q3aLen of
0 commit comments