@@ -2565,6 +2565,14 @@ fetch_from_q3(State = #vqstate { delta = #delta { count = DeltaCount },
2565
2565
{loaded , {MsgStatus , State1 }}
2566
2566
end .
2567
2567
2568
+ % % Thresholds for doing multi-read against the shared message
2569
+ % % stores. The values have been obtained through numerous
2570
+ % % experiments. Be careful when editing these values: after a
2571
+ % % certain size the performance drops and it becomes no longer
2572
+ % % interesting to keep the extra data in memory.
2573
+ - define (SHARED_READ_MANY_SIZE_THRESHOLD , 12000 ).
2574
+ - define (SHARED_READ_MANY_COUNT_THRESHOLD , 10 ).
2575
+
2568
2576
maybe_deltas_to_betas (State = # vqstate { rates = # rates { out = OutRate }}) ->
2569
2577
AfterFun = process_delivers_and_acks_fun (deliver_and_ack ),
2570
2578
% % We allow from 1 to 2048 messages in memory depending on the consume rate.
@@ -2619,24 +2627,25 @@ maybe_deltas_to_betas(DelsAndAcksFun,
2619
2627
% % back into the #msg_status records.
2620
2628
% %
2621
2629
% % For shared message store messages we do the same but only
2622
- % % for messages < 20000 bytes and when there are at least 10
2623
- % % messages to fetch (otherwise we do the fetch 1 by 1 right
2624
- % % before sending the messages). The values have been
2625
- % % obtained through experiments because after a certain size
2626
- % % the performance drops and it become no longer interesting
2627
- % % to keep the extra data in memory. Since we have
2628
- % % two different shared stores for persistent/transient
2629
- % % they are treated separately when deciding whether to
2630
- % % read_many from either of them.
2630
+ % % for messages < ?SHARED_READ_MANY_SIZE_THRESHOLD bytes and
2631
+ % % when there are at least ?SHARED_READ_MANY_COUNT_THRESHOLD
2632
+ % % messages to fetch from that store. Other messages will be
2633
+ % % fetched one by one right before sending the messages.
2634
+ % %
2635
+ % % Since we have two different shared stores for persistent
2636
+ % % and transient messages they are treated separately when
2637
+ % % deciding whether to read_many from either of them.
2631
2638
% %
2632
2639
% % Because v2 and shared stores function differently we
2633
2640
% % must keep different information for performing the reads.
2634
2641
{V2Reads0 , ShPersistReads , ShTransientReads } = lists :foldl (fun
2635
2642
({_ , SeqId , MsgLocation , _ , _ }, {V2ReadsAcc , ShPReadsAcc , ShTReadsAcc }) when is_tuple (MsgLocation ) ->
2636
2643
{[{SeqId , MsgLocation }|V2ReadsAcc ], ShPReadsAcc , ShTReadsAcc };
2637
- ({MsgId , _ , rabbit_msg_store , # message_properties {size = Size }, true }, {V2ReadsAcc , ShPReadsAcc , ShTReadsAcc }) when Size =< 20000 ->
2644
+ ({MsgId , _ , rabbit_msg_store , # message_properties {size = Size }, true },
2645
+ {V2ReadsAcc , ShPReadsAcc , ShTReadsAcc }) when Size =< ? SHARED_READ_MANY_SIZE_THRESHOLD ->
2638
2646
{V2ReadsAcc , [MsgId |ShPReadsAcc ], ShTReadsAcc };
2639
- ({MsgId , _ , rabbit_msg_store , # message_properties {size = Size }, false }, {V2ReadsAcc , ShPReadsAcc , ShTReadsAcc }) when Size =< 20000 ->
2647
+ ({MsgId , _ , rabbit_msg_store , # message_properties {size = Size }, false },
2648
+ {V2ReadsAcc , ShPReadsAcc , ShTReadsAcc }) when Size =< ? SHARED_READ_MANY_SIZE_THRESHOLD ->
2640
2649
{V2ReadsAcc , ShPReadsAcc , [MsgId |ShTReadsAcc ]};
2641
2650
(_ , Acc ) ->
2642
2651
Acc
@@ -2653,7 +2662,7 @@ maybe_deltas_to_betas(DelsAndAcksFun,
2653
2662
% %
2654
2663
% % Because read_many does not use FHC we do not get an updated MCState
2655
2664
% % like with normal reads.
2656
- List2 = case length (ShPersistReads ) < 10 of
2665
+ List2 = case length (ShPersistReads ) < ? SHARED_READ_MANY_COUNT_THRESHOLD of
2657
2666
true ->
2658
2667
List1 ;
2659
2668
false ->
@@ -2663,7 +2672,7 @@ maybe_deltas_to_betas(DelsAndAcksFun,
2663
2672
_ -> merge_sh_read_msgs (List1 , ShPersistMsgs )
2664
2673
end
2665
2674
end ,
2666
- List = case length (ShTransientReads ) < 10 of
2675
+ List = case length (ShTransientReads ) < ? SHARED_READ_MANY_COUNT_THRESHOLD of
2667
2676
true ->
2668
2677
List2 ;
2669
2678
false ->
0 commit comments