Skip to content

Commit c4a53f1

Browse files
author
Loïc Hoguin
committed
Set segment_entry_count per vhost and use a better default
1 parent b576242 commit c4a53f1

File tree

3 files changed

+48
-11
lines changed

3 files changed

+48
-11
lines changed

deps/rabbit/Makefile

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -119,12 +119,7 @@ define PROJECT_ENV
119119
%% interval at which connection/channel tracking executes post operations
120120
{tracking_execution_timeout, 15000},
121121
{stream_messages_soft_limit, 256},
122-
{track_auth_attempt_source, false},
123-
%% Number of entries per index segment.
124-
%% This value can only be changed safely
125-
%% on an empty node. Default calculated
126-
%% as trunc(math:pow(2,?REL_SEQ_BITS))).
127-
{queue_index_segment_entry_count, 16384}
122+
{track_auth_attempt_source, false}
128123
]
129124
endef
130125

deps/rabbit/src/rabbit_queue_index.erl

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
read_global_recovery_terms/1,
2525
cleanup_global_recovery_terms/0]).
2626

27+
%% Used by rabbit_vhost to set the segment_entry_count.
28+
-export([all_queue_directory_names/1]).
29+
2730
-define(CLEAN_FILENAME, "clean.dot").
2831

2932
%%----------------------------------------------------------------------------
@@ -280,6 +283,8 @@ reset_state(#qistate{ queue_name = Name,
280283
on_sync_fun(), on_sync_fun()) -> qistate().
281284

282285
init(#resource{ virtual_host = VHost } = Name, OnSyncFun, OnSyncMsgFun) ->
286+
#{segment_entry_count := SegmentEntryCount} = rabbit_vhost:read_config(VHost),
287+
put(segment_entry_count, SegmentEntryCount),
283288
VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
284289
State = #qistate { dir = Dir } = blank_state(VHostDir, Name),
285290
false = rabbit_file:is_file(Dir), %% is_file == is file or dir
@@ -294,6 +299,8 @@ init(#resource{ virtual_host = VHost } = Name, OnSyncFun, OnSyncMsgFun) ->
294299

295300
recover(#resource{ virtual_host = VHost } = Name, Terms, MsgStoreRecovered,
296301
ContainsCheckFun, OnSyncFun, OnSyncMsgFun) ->
302+
#{segment_entry_count := SegmentEntryCount} = rabbit_vhost:read_config(VHost),
303+
put(segment_entry_count, SegmentEntryCount),
297304
VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
298305
State = blank_state(VHostDir, Name),
299306
State1 = State #qistate{on_sync = OnSyncFun,
@@ -1167,9 +1174,7 @@ array_new(Default) ->
11671174
array:new([{default, Default}, fixed, {size, segment_entry_count()}]).
11681175

11691176
segment_entry_count() ->
1170-
{ok, SegmentEntryCount} =
1171-
application:get_env(rabbit, queue_index_segment_entry_count),
1172-
SegmentEntryCount.
1177+
get(segment_entry_count).
11731178

11741179
bool_to_int(true ) -> 1;
11751180
bool_to_int(false) -> 0.

deps/rabbit/src/rabbit_vhost.erl

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,12 @@
1010
-include_lib("rabbit_common/include/rabbit.hrl").
1111
-include("vhost.hrl").
1212

13-
-export([recover/0, recover/1]).
13+
-export([recover/0, recover/1, read_config/1]).
1414
-export([add/2, add/4, delete/2, exists/1, with/2, with_user_and_vhost/3, assert/1, update/2,
1515
set_limits/2, vhost_cluster_state/1, is_running_on_all_nodes/1, await_running_on_all_nodes/2,
1616
list/0, count/0, list_names/0, all/0, parse_tags/1]).
1717
-export([info/1, info/2, info_all/0, info_all/1, info_all/2, info_all/3]).
18-
-export([dir/1, msg_store_dir_path/1, msg_store_dir_wildcard/0]).
18+
-export([dir/1, msg_store_dir_path/1, msg_store_dir_wildcard/0, config_file_path/1]).
1919
-export([delete_storage/1]).
2020
-export([vhost_down/1]).
2121
-export([put_vhost/5]).
@@ -48,6 +48,7 @@ recover(VHost) ->
4848
VHostStubFile = filename:join(VHostDir, ".vhost"),
4949
ok = rabbit_file:ensure_dir(VHostStubFile),
5050
ok = file:write_file(VHostStubFile, VHost),
51+
ok = ensure_config_file(VHost),
5152
{Recovered, Failed} = rabbit_amqqueue:recover(VHost),
5253
AllQs = Recovered ++ Failed,
5354
QNames = [amqqueue:get_name(Q) || Q <- AllQs],
@@ -57,6 +58,38 @@ recover(VHost) ->
5758
ok = rabbit_mirror_queue_misc:on_vhost_up(VHost),
5859
ok.
5960

61+
ensure_config_file(VHost) ->
62+
Path = config_file_path(VHost),
63+
case filelib:is_regular(Path) of
64+
%% The config file exists. Do nothing.
65+
true ->
66+
ok;
67+
%% The config file does not exist.
68+
%% Check if there are queues in this vhost.
69+
false ->
70+
QueueDirs = rabbit_queue_index:all_queue_directory_names(VHost),
71+
SegmentEntryCount = case QueueDirs of
72+
%% There are no queues. Write the configured value for
73+
%% the segment entry count, or the new RabbitMQ default
74+
%% introduced in v3.8.16.
75+
[] ->
76+
application:get_env(rabbit, queue_index_segment_entry_count,
77+
1024); %% @todo Figure out what the new default should be.
78+
%% There are queues already. Write the historic RabbitMQ
79+
%% default of 16384 for forward compatibility. Historic
80+
%% default calculated as trunc(math:pow(2,?REL_SEQ_BITS)).
81+
_ ->
82+
16384
83+
end,
84+
rabbit_log:info("Setting segment_entry_count for vhost '~s' with ~b queues to '~b'",
85+
[VHost, length(QueueDirs), SegmentEntryCount]),
86+
file:write_file(Path, io_lib:format("{segment_entry_count, ~b}.", [SegmentEntryCount]))
87+
end.
88+
89+
read_config(VHost) ->
90+
{ok, Config} = file:consult(config_file_path(VHost)),
91+
maps:from_list(Config).
92+
6093
-define(INFO_KEYS, vhost:info_keys()).
6194

6295
-spec parse_tags(binary() | string() | atom()) -> [atom()].
@@ -377,6 +410,10 @@ msg_store_dir_base() ->
377410
Dir = rabbit_mnesia:dir(),
378411
filename:join([Dir, "msg_stores", "vhosts"]).
379412

413+
config_file_path(VHost) ->
414+
VHostDir = msg_store_dir_path(VHost),
415+
filename:join(VHostDir, ".config").
416+
380417
-spec trim_tag(list() | binary() | atom()) -> atom().
381418
trim_tag(Val) ->
382419
rabbit_data_coercion:to_atom(string:trim(rabbit_data_coercion:to_list(Val))).

0 commit comments

Comments
 (0)