Skip to content

Commit 12253d2

Browse files
Merge pull request #2954 from rabbitmq/new-segment-entry-count-default
Set segment_entry_count per vhost and use a better default
2 parents 1ff28f1 + d9344b2 commit 12253d2

File tree

5 files changed

+65
-17
lines changed

5 files changed

+65
-17
lines changed

deps/rabbit/BUILD.bazel

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -132,12 +132,7 @@ _APP_ENV = """[
132132
%% interval at which connection/channel tracking executes post operations
133133
{tracking_execution_timeout, 15000},
134134
{stream_messages_soft_limit, 256},
135-
{track_auth_attempt_source, false},
136-
%% Number of entries per index segment.
137-
%% This value can only be changed safely
138-
%% on an empty node. Default calculated
139-
%% as trunc(math:pow(2,?REL_SEQ_BITS))).
140-
{queue_index_segment_entry_count, 16384}
135+
{track_auth_attempt_source, false}
141136
]
142137
"""
143138

deps/rabbit/Makefile

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -122,12 +122,7 @@ define PROJECT_ENV
122122
%% interval at which connection/channel tracking executes post operations
123123
{tracking_execution_timeout, 15000},
124124
{stream_messages_soft_limit, 256},
125-
{track_auth_attempt_source, false},
126-
%% Number of entries per index segment.
127-
%% This value can only be changed safely
128-
%% on an empty node. Default calculated
129-
%% as trunc(math:pow(2,?REL_SEQ_BITS))).
130-
{queue_index_segment_entry_count, 16384}
125+
{track_auth_attempt_source, false}
131126
]
132127
endef
133128

deps/rabbit/src/rabbit_queue_index.erl

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
read_global_recovery_terms/1,
2525
cleanup_global_recovery_terms/0]).
2626

27+
%% Used by rabbit_vhost to set the segment_entry_count.
28+
-export([all_queue_directory_names/1]).
29+
2730
-define(CLEAN_FILENAME, "clean.dot").
2831

2932
%%----------------------------------------------------------------------------
@@ -280,6 +283,8 @@ reset_state(#qistate{ queue_name = Name,
280283
on_sync_fun(), on_sync_fun()) -> qistate().
281284

282285
init(#resource{ virtual_host = VHost } = Name, OnSyncFun, OnSyncMsgFun) ->
286+
#{segment_entry_count := SegmentEntryCount} = rabbit_vhost:read_config(VHost),
287+
put(segment_entry_count, SegmentEntryCount),
283288
VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
284289
State = #qistate { dir = Dir } = blank_state(VHostDir, Name),
285290
false = rabbit_file:is_file(Dir), %% is_file == is file or dir
@@ -294,6 +299,8 @@ init(#resource{ virtual_host = VHost } = Name, OnSyncFun, OnSyncMsgFun) ->
294299

295300
recover(#resource{ virtual_host = VHost } = Name, Terms, MsgStoreRecovered,
296301
ContainsCheckFun, OnSyncFun, OnSyncMsgFun) ->
302+
#{segment_entry_count := SegmentEntryCount} = rabbit_vhost:read_config(VHost),
303+
put(segment_entry_count, SegmentEntryCount),
297304
VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
298305
State = blank_state(VHostDir, Name),
299306
State1 = State #qistate{on_sync = OnSyncFun,
@@ -728,6 +735,9 @@ queue_index_walker_reader(QueueName, Gatherer) ->
728735
ok = gatherer:finish(Gatherer).
729736

730737
scan_queue_segments(Fun, Acc, #resource{ virtual_host = VHost } = QueueName) ->
738+
%% Set the segment_entry_count for this worker process.
739+
#{segment_entry_count := SegmentEntryCount} = rabbit_vhost:read_config(VHost),
740+
put(segment_entry_count, SegmentEntryCount),
731741
VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
732742
scan_queue_segments(Fun, Acc, VHostDir, QueueName).
733743

@@ -1167,9 +1177,7 @@ array_new(Default) ->
11671177
array:new([{default, Default}, fixed, {size, segment_entry_count()}]).
11681178

11691179
segment_entry_count() ->
1170-
{ok, SegmentEntryCount} =
1171-
application:get_env(rabbit, queue_index_segment_entry_count),
1172-
SegmentEntryCount.
1180+
get(segment_entry_count).
11731181

11741182
bool_to_int(true ) -> 1;
11751183
bool_to_int(false) -> 0.

deps/rabbit/src/rabbit_vhost.erl

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,14 @@
1010
-include_lib("rabbit_common/include/rabbit.hrl").
1111
-include("vhost.hrl").
1212

13-
-export([recover/0, recover/1]).
13+
-export([recover/0, recover/1, read_config/1]).
1414
-export([add/2, add/4, delete/2, exists/1, with/2, with_user_and_vhost/3, assert/1, update/2,
1515
set_limits/2, vhost_cluster_state/1, is_running_on_all_nodes/1, await_running_on_all_nodes/2,
1616
list/0, count/0, list_names/0, all/0]).
1717
-export([parse_tags/1, update_metadata/2, tag_with/2, untag_from/2, update_tags/2, update_tags/3]).
1818
-export([lookup/1]).
1919
-export([info/1, info/2, info_all/0, info_all/1, info_all/2, info_all/3]).
20-
-export([dir/1, msg_store_dir_path/1, msg_store_dir_wildcard/0]).
20+
-export([dir/1, msg_store_dir_path/1, msg_store_dir_wildcard/0, config_file_path/1]).
2121
-export([delete_storage/1]).
2222
-export([vhost_down/1]).
2323
-export([put_vhost/5]).
@@ -53,6 +53,7 @@ recover(VHost) ->
5353
VHostStubFile = filename:join(VHostDir, ".vhost"),
5454
ok = rabbit_file:ensure_dir(VHostStubFile),
5555
ok = file:write_file(VHostStubFile, VHost),
56+
ok = ensure_config_file(VHost),
5657
{Recovered, Failed} = rabbit_amqqueue:recover(VHost),
5758
AllQs = Recovered ++ Failed,
5859
QNames = [amqqueue:get_name(Q) || Q <- AllQs],
@@ -62,6 +63,42 @@ recover(VHost) ->
6263
ok = rabbit_mirror_queue_misc:on_vhost_up(VHost),
6364
ok.
6465

66+
ensure_config_file(VHost) ->
67+
Path = config_file_path(VHost),
68+
case filelib:is_regular(Path) of
69+
%% The config file exists. Do nothing.
70+
true ->
71+
ok;
72+
%% The config file does not exist.
73+
%% Check if there are queues in this vhost.
74+
false ->
75+
QueueDirs = rabbit_queue_index:all_queue_directory_names(VHost),
76+
SegmentEntryCount = case QueueDirs of
77+
%% There are no queues. Write the configured value for
78+
%% the segment entry count, or the new RabbitMQ default
79+
%% introduced in v3.8.17. The new default provides much
80+
%% better memory footprint when many queues are used.
81+
[] ->
82+
application:get_env(rabbit, queue_index_segment_entry_count,
83+
2048);
84+
%% There are queues already. Write the historic RabbitMQ
85+
%% default of 16384 for forward compatibility. Historic
86+
%% default calculated as trunc(math:pow(2,?REL_SEQ_BITS)).
87+
_ ->
88+
16384
89+
end,
90+
rabbit_log:info("Setting segment_entry_count for vhost '~s' with ~b queues to '~b'",
91+
[VHost, length(QueueDirs), SegmentEntryCount]),
92+
file:write_file(Path, io_lib:format(
93+
"%% This file is auto-generated! Edit at your own risk!~n"
94+
"{segment_entry_count, ~b}.",
95+
[SegmentEntryCount]))
96+
end.
97+
98+
read_config(VHost) ->
99+
{ok, Config} = file:consult(config_file_path(VHost)),
100+
maps:from_list(Config).
101+
65102
-define(INFO_KEYS, vhost:info_keys()).
66103

67104
-spec parse_tags(binary() | string() | atom()) -> [atom()].
@@ -446,6 +483,10 @@ msg_store_dir_base() ->
446483
Dir = rabbit_mnesia:dir(),
447484
filename:join([Dir, "msg_stores", "vhosts"]).
448485

486+
config_file_path(VHost) ->
487+
VHostDir = msg_store_dir_path(VHost),
488+
filename:join(VHostDir, ".config").
489+
449490
-spec trim_tag(list() | binary() | atom()) -> atom().
450491
trim_tag(Val) ->
451492
rabbit_data_coercion:to_atom(string:trim(rabbit_data_coercion:to_list(Val))).

deps/rabbit/test/backing_queue_SUITE.erl

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -527,6 +527,7 @@ bq_queue_index(Config) ->
527527
?MODULE, bq_queue_index1, [Config]).
528528

529529
bq_queue_index1(_Config) ->
530+
init_queue_index(),
530531
SegmentSize = rabbit_queue_index:next_segment_boundary(0),
531532
TwoSegs = SegmentSize + SegmentSize,
532533
MostOfASegment = trunc(SegmentSize*0.75),
@@ -708,6 +709,7 @@ bq_queue_recover(Config) ->
708709
?MODULE, bq_queue_recover1, [Config]).
709710

710711
bq_queue_recover1(Config) ->
712+
init_queue_index(),
711713
Count = 2 * rabbit_queue_index:next_segment_boundary(0),
712714
QName0 = queue_name(Config, <<"bq_queue_recover-q">>),
713715
{new, Q} = rabbit_amqqueue:declare(QName0, true, false, [], none, <<"acting-user">>),
@@ -1315,6 +1317,13 @@ with_empty_test_queue(Fun) ->
13151317
{0, 0, Qi} = init_test_queue(QName),
13161318
rabbit_queue_index:delete_and_terminate(Fun(Qi, QName)).
13171319

1320+
init_queue_index() ->
1321+
%% We must set the segment entry count in the process dictionary
1322+
%% for tests that call the queue index directly to have a correct
1323+
%% value.
1324+
put(segment_entry_count, 2048),
1325+
ok.
1326+
13181327
restart_app() ->
13191328
rabbit:stop(),
13201329
rabbit:start().

0 commit comments

Comments
 (0)