273
273
msg_store_clients ,
274
274
durable ,
275
275
transient_threshold ,
276
+ qi_embed_msgs_below ,
276
277
277
278
len , % % w/o unacked
278
279
bytes , % % w/o unacked
370
371
{any (), binary ()}},
371
372
durable :: boolean (),
372
373
transient_threshold :: non_neg_integer (),
374
+ qi_embed_msgs_below :: non_neg_integer (),
373
375
374
376
len :: non_neg_integer (),
375
377
bytes :: non_neg_integer (),
@@ -569,12 +571,13 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
569
571
MsgProps = # message_properties { needs_confirming = NeedsConfirming },
570
572
IsDelivered , _ChPid , _Flow ,
571
573
State = # vqstate { q1 = Q1 , q3 = Q3 , q4 = Q4 ,
572
- next_seq_id = SeqId ,
573
- in_counter = InCount ,
574
- durable = IsDurable ,
575
- unconfirmed = UC }) ->
574
+ qi_embed_msgs_below = IndexMaxSize ,
575
+ next_seq_id = SeqId ,
576
+ in_counter = InCount ,
577
+ durable = IsDurable ,
578
+ unconfirmed = UC }) ->
576
579
IsPersistent1 = IsDurable andalso IsPersistent ,
577
- MsgStatus = msg_status (IsPersistent1 , IsDelivered , SeqId , Msg , MsgProps ),
580
+ MsgStatus = msg_status (IsPersistent1 , IsDelivered , SeqId , Msg , MsgProps , IndexMaxSize ),
578
581
{MsgStatus1 , State1 } = maybe_write_to_disk (false , false , MsgStatus , State ),
579
582
State2 = case ? QUEUE :is_empty (Q3 ) of
580
583
false -> State1 # vqstate { q1 = ? QUEUE :in (m (MsgStatus1 ), Q1 ) };
@@ -593,13 +596,14 @@ publish_delivered(Msg = #basic_message { is_persistent = IsPersistent,
593
596
MsgProps = # message_properties {
594
597
needs_confirming = NeedsConfirming },
595
598
_ChPid , _Flow ,
596
- State = # vqstate { next_seq_id = SeqId ,
597
- out_counter = OutCount ,
598
- in_counter = InCount ,
599
- durable = IsDurable ,
600
- unconfirmed = UC }) ->
599
+ State = # vqstate { qi_embed_msgs_below = IndexMaxSize ,
600
+ next_seq_id = SeqId ,
601
+ out_counter = OutCount ,
602
+ in_counter = InCount ,
603
+ durable = IsDurable ,
604
+ unconfirmed = UC }) ->
601
605
IsPersistent1 = IsDurable andalso IsPersistent ,
602
- MsgStatus = msg_status (IsPersistent1 , true , SeqId , Msg , MsgProps ),
606
+ MsgStatus = msg_status (IsPersistent1 , true , SeqId , Msg , MsgProps , IndexMaxSize ),
603
607
{MsgStatus1 , State1 } = maybe_write_to_disk (false , false , MsgStatus , State ),
604
608
State2 = record_pending_ack (m (MsgStatus1 ), State1 ),
605
609
UC1 = gb_sets_maybe_insert (NeedsConfirming , MsgId , UC ),
@@ -1029,15 +1033,15 @@ gb_sets_maybe_insert(false, _Val, Set) -> Set;
1029
1033
gb_sets_maybe_insert (true , Val , Set ) -> gb_sets :add (Val , Set ).
1030
1034
1031
1035
msg_status (IsPersistent , IsDelivered , SeqId ,
1032
- Msg = # basic_message {id = MsgId }, MsgProps ) ->
1036
+ Msg = # basic_message {id = MsgId }, MsgProps , IndexMaxSize ) ->
1033
1037
# msg_status {seq_id = SeqId ,
1034
1038
msg_id = MsgId ,
1035
1039
msg = Msg ,
1036
1040
is_persistent = IsPersistent ,
1037
1041
is_delivered = IsDelivered ,
1038
1042
msg_in_store = false ,
1039
1043
index_on_disk = false ,
1040
- persist_to = determine_persist_to (Msg , MsgProps ),
1044
+ persist_to = determine_persist_to (Msg , MsgProps , IndexMaxSize ),
1041
1045
msg_props = MsgProps }.
1042
1046
1043
1047
beta_msg_status ({Msg = # basic_message {id = MsgId },
@@ -1198,6 +1202,9 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms,
1198
1202
Now = time_compat :monotonic_time (),
1199
1203
IoBatchSize = rabbit_misc :get_env (rabbit , msg_store_io_batch_size ,
1200
1204
? IO_BATCH_SIZE ),
1205
+
1206
+ {ok , IndexMaxSize } = application :get_env (
1207
+ rabbit , queue_index_embed_msgs_below ),
1201
1208
State = # vqstate {
1202
1209
q1 = ? QUEUE :new (),
1203
1210
q2 = ? QUEUE :new (),
@@ -1212,6 +1219,7 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms,
1212
1219
msg_store_clients = {PersistentClient , TransientClient },
1213
1220
durable = IsDurable ,
1214
1221
transient_threshold = NextSeqId ,
1222
+ qi_embed_msgs_below = IndexMaxSize ,
1215
1223
1216
1224
len = DeltaCount1 ,
1217
1225
persistent_count = DeltaCount1 ,
@@ -1469,9 +1477,8 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, State) ->
1469
1477
determine_persist_to (# basic_message {
1470
1478
content = # content {properties = Props ,
1471
1479
properties_bin = PropsBin }},
1472
- # message_properties {size = BodySize }) ->
1473
- {ok , IndexMaxSize } = application :get_env (
1474
- rabbit , queue_index_embed_msgs_below ),
1480
+ # message_properties {size = BodySize },
1481
+ IndexMaxSize ) ->
1475
1482
% % The >= is so that you can set the env to 0 and never persist
1476
1483
% % to the index.
1477
1484
% %
0 commit comments