273
273
msg_store_clients ,
274
274
durable ,
275
275
transient_threshold ,
276
+ qi_embed_msgs_below ,
276
277
277
278
len , % % w/o unacked
278
279
bytes , % % w/o unacked
371
372
{any (), binary ()}},
372
373
durable :: boolean (),
373
374
transient_threshold :: non_neg_integer (),
375
+ qi_embed_msgs_below :: non_neg_integer (),
374
376
375
377
len :: non_neg_integer (),
376
378
bytes :: non_neg_integer (),
@@ -570,12 +572,13 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
570
572
MsgProps = # message_properties { needs_confirming = NeedsConfirming },
571
573
IsDelivered , _ChPid , _Flow ,
572
574
State = # vqstate { q1 = Q1 , q3 = Q3 , q4 = Q4 ,
573
- next_seq_id = SeqId ,
574
- in_counter = InCount ,
575
- durable = IsDurable ,
576
- unconfirmed = UC }) ->
575
+ qi_embed_msgs_below = IndexMaxSize ,
576
+ next_seq_id = SeqId ,
577
+ in_counter = InCount ,
578
+ durable = IsDurable ,
579
+ unconfirmed = UC }) ->
577
580
IsPersistent1 = IsDurable andalso IsPersistent ,
578
- MsgStatus = msg_status (IsPersistent1 , IsDelivered , SeqId , Msg , MsgProps ),
581
+ MsgStatus = msg_status (IsPersistent1 , IsDelivered , SeqId , Msg , MsgProps , IndexMaxSize ),
579
582
{MsgStatus1 , State1 } = maybe_write_to_disk (false , false , MsgStatus , State ),
580
583
State2 = case ? QUEUE :is_empty (Q3 ) of
581
584
false -> State1 # vqstate { q1 = ? QUEUE :in (m (MsgStatus1 ), Q1 ) };
@@ -594,13 +597,14 @@ publish_delivered(Msg = #basic_message { is_persistent = IsPersistent,
594
597
MsgProps = # message_properties {
595
598
needs_confirming = NeedsConfirming },
596
599
_ChPid , _Flow ,
597
- State = # vqstate { next_seq_id = SeqId ,
598
- out_counter = OutCount ,
599
- in_counter = InCount ,
600
- durable = IsDurable ,
601
- unconfirmed = UC }) ->
600
+ State = # vqstate { qi_embed_msgs_below = IndexMaxSize ,
601
+ next_seq_id = SeqId ,
602
+ out_counter = OutCount ,
603
+ in_counter = InCount ,
604
+ durable = IsDurable ,
605
+ unconfirmed = UC }) ->
602
606
IsPersistent1 = IsDurable andalso IsPersistent ,
603
- MsgStatus = msg_status (IsPersistent1 , true , SeqId , Msg , MsgProps ),
607
+ MsgStatus = msg_status (IsPersistent1 , true , SeqId , Msg , MsgProps , IndexMaxSize ),
604
608
{MsgStatus1 , State1 } = maybe_write_to_disk (false , false , MsgStatus , State ),
605
609
State2 = record_pending_ack (m (MsgStatus1 ), State1 ),
606
610
UC1 = gb_sets_maybe_insert (NeedsConfirming , MsgId , UC ),
@@ -968,15 +972,15 @@ gb_sets_maybe_insert(false, _Val, Set) -> Set;
968
972
gb_sets_maybe_insert (true , Val , Set ) -> gb_sets :add (Val , Set ).
969
973
970
974
msg_status (IsPersistent , IsDelivered , SeqId ,
971
- Msg = # basic_message {id = MsgId }, MsgProps ) ->
975
+ Msg = # basic_message {id = MsgId }, MsgProps , IndexMaxSize ) ->
972
976
# msg_status {seq_id = SeqId ,
973
977
msg_id = MsgId ,
974
978
msg = Msg ,
975
979
is_persistent = IsPersistent ,
976
980
is_delivered = IsDelivered ,
977
981
msg_in_store = false ,
978
982
index_on_disk = false ,
979
- persist_to = determine_persist_to (Msg , MsgProps ),
983
+ persist_to = determine_persist_to (Msg , MsgProps , IndexMaxSize ),
980
984
msg_props = MsgProps }.
981
985
982
986
beta_msg_status ({Msg = # basic_message {id = MsgId },
@@ -1137,6 +1141,9 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms,
1137
1141
Now = now (),
1138
1142
IoBatchSize = rabbit_misc :get_env (rabbit , msg_store_io_batch_size ,
1139
1143
? IO_BATCH_SIZE ),
1144
+
1145
+ {ok , IndexMaxSize } = application :get_env (
1146
+ rabbit , queue_index_embed_msgs_below ),
1140
1147
State = # vqstate {
1141
1148
q1 = ? QUEUE :new (),
1142
1149
q2 = ? QUEUE :new (),
@@ -1151,6 +1158,7 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms,
1151
1158
msg_store_clients = {PersistentClient , TransientClient },
1152
1159
durable = IsDurable ,
1153
1160
transient_threshold = NextSeqId ,
1161
+ qi_embed_msgs_below = IndexMaxSize ,
1154
1162
1155
1163
len = DeltaCount1 ,
1156
1164
persistent_count = DeltaCount1 ,
@@ -1408,9 +1416,8 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, State) ->
1408
1416
determine_persist_to (# basic_message {
1409
1417
content = # content {properties = Props ,
1410
1418
properties_bin = PropsBin }},
1411
- # message_properties {size = BodySize }) ->
1412
- {ok , IndexMaxSize } = application :get_env (
1413
- rabbit , queue_index_embed_msgs_below ),
1419
+ # message_properties {size = BodySize },
1420
+ IndexMaxSize ) ->
1414
1421
% % The >= is so that you can set the env to 0 and never persist
1415
1422
% % to the index.
1416
1423
% %
0 commit comments