Skip to content

Set segment_entry_count per vhost and use a better default #2954

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions deps/rabbit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,7 @@ _APP_ENV = """[
%% interval at which connection/channel tracking executes post operations
{tracking_execution_timeout, 15000},
{stream_messages_soft_limit, 256},
{track_auth_attempt_source, false},
%% Number of entries per index segment.
%% This value can only be changed safely
%% on an empty node. Default calculated
%% as trunc(math:pow(2,?REL_SEQ_BITS))).
{queue_index_segment_entry_count, 16384}
{track_auth_attempt_source, false}
]
"""

Expand Down
7 changes: 1 addition & 6 deletions deps/rabbit/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,7 @@ define PROJECT_ENV
%% interval at which connection/channel tracking executes post operations
{tracking_execution_timeout, 15000},
{stream_messages_soft_limit, 256},
{track_auth_attempt_source, false},
%% Number of entries per index segment.
%% This value can only be changed safely
%% on an empty node. Default calculated
%% as trunc(math:pow(2,?REL_SEQ_BITS))).
{queue_index_segment_entry_count, 16384}
{track_auth_attempt_source, false}
]
endef

Expand Down
14 changes: 11 additions & 3 deletions deps/rabbit/src/rabbit_queue_index.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
read_global_recovery_terms/1,
cleanup_global_recovery_terms/0]).

%% Used by rabbit_vhost to set the segment_entry_count.
-export([all_queue_directory_names/1]).

-define(CLEAN_FILENAME, "clean.dot").

%%----------------------------------------------------------------------------
Expand Down Expand Up @@ -280,6 +283,8 @@ reset_state(#qistate{ queue_name = Name,
on_sync_fun(), on_sync_fun()) -> qistate().

init(#resource{ virtual_host = VHost } = Name, OnSyncFun, OnSyncMsgFun) ->
#{segment_entry_count := SegmentEntryCount} = rabbit_vhost:read_config(VHost),
put(segment_entry_count, SegmentEntryCount),
VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
State = #qistate { dir = Dir } = blank_state(VHostDir, Name),
false = rabbit_file:is_file(Dir), %% is_file == is file or dir
Expand All @@ -294,6 +299,8 @@ init(#resource{ virtual_host = VHost } = Name, OnSyncFun, OnSyncMsgFun) ->

recover(#resource{ virtual_host = VHost } = Name, Terms, MsgStoreRecovered,
ContainsCheckFun, OnSyncFun, OnSyncMsgFun) ->
#{segment_entry_count := SegmentEntryCount} = rabbit_vhost:read_config(VHost),
put(segment_entry_count, SegmentEntryCount),
VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
State = blank_state(VHostDir, Name),
State1 = State #qistate{on_sync = OnSyncFun,
Expand Down Expand Up @@ -728,6 +735,9 @@ queue_index_walker_reader(QueueName, Gatherer) ->
ok = gatherer:finish(Gatherer).

scan_queue_segments(Fun, Acc, #resource{ virtual_host = VHost } = QueueName) ->
%% Set the segment_entry_count for this worker process.
#{segment_entry_count := SegmentEntryCount} = rabbit_vhost:read_config(VHost),
put(segment_entry_count, SegmentEntryCount),
VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
scan_queue_segments(Fun, Acc, VHostDir, QueueName).

Expand Down Expand Up @@ -1167,9 +1177,7 @@ array_new(Default) ->
array:new([{default, Default}, fixed, {size, segment_entry_count()}]).

segment_entry_count() ->
{ok, SegmentEntryCount} =
application:get_env(rabbit, queue_index_segment_entry_count),
SegmentEntryCount.
get(segment_entry_count).

bool_to_int(true ) -> 1;
bool_to_int(false) -> 0.
Expand Down
45 changes: 43 additions & 2 deletions deps/rabbit/src/rabbit_vhost.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@
-include_lib("rabbit_common/include/rabbit.hrl").
-include("vhost.hrl").

-export([recover/0, recover/1]).
-export([recover/0, recover/1, read_config/1]).
-export([add/2, add/4, delete/2, exists/1, with/2, with_user_and_vhost/3, assert/1, update/2,
set_limits/2, vhost_cluster_state/1, is_running_on_all_nodes/1, await_running_on_all_nodes/2,
list/0, count/0, list_names/0, all/0]).
-export([parse_tags/1, update_metadata/2, tag_with/2, untag_from/2, update_tags/2, update_tags/3]).
-export([lookup/1]).
-export([info/1, info/2, info_all/0, info_all/1, info_all/2, info_all/3]).
-export([dir/1, msg_store_dir_path/1, msg_store_dir_wildcard/0]).
-export([dir/1, msg_store_dir_path/1, msg_store_dir_wildcard/0, config_file_path/1]).
-export([delete_storage/1]).
-export([vhost_down/1]).
-export([put_vhost/5]).
Expand Down Expand Up @@ -53,6 +53,7 @@ recover(VHost) ->
VHostStubFile = filename:join(VHostDir, ".vhost"),
ok = rabbit_file:ensure_dir(VHostStubFile),
ok = file:write_file(VHostStubFile, VHost),
ok = ensure_config_file(VHost),
{Recovered, Failed} = rabbit_amqqueue:recover(VHost),
AllQs = Recovered ++ Failed,
QNames = [amqqueue:get_name(Q) || Q <- AllQs],
Expand All @@ -62,6 +63,42 @@ recover(VHost) ->
ok = rabbit_mirror_queue_misc:on_vhost_up(VHost),
ok.

ensure_config_file(VHost) ->
Path = config_file_path(VHost),
case filelib:is_regular(Path) of
%% The config file exists. Do nothing.
true ->
ok;
%% The config file does not exist.
%% Check if there are queues in this vhost.
false ->
QueueDirs = rabbit_queue_index:all_queue_directory_names(VHost),
SegmentEntryCount = case QueueDirs of
%% There are no queues. Write the configured value for
%% the segment entry count, or the new RabbitMQ default
%% introduced in v3.8.17. The new default provides much
%% better memory footprint when many queues are used.
[] ->
application:get_env(rabbit, queue_index_segment_entry_count,
2048);
%% There are queues already. Write the historic RabbitMQ
%% default of 16384 for forward compatibility. Historic
%% default calculated as trunc(math:pow(2,?REL_SEQ_BITS)).
_ ->
16384
end,
rabbit_log:info("Setting segment_entry_count for vhost '~s' with ~b queues to '~b'",
[VHost, length(QueueDirs), SegmentEntryCount]),
file:write_file(Path, io_lib:format(
"%% This file is auto-generated! Edit at your own risk!~n"
"{segment_entry_count, ~b}.",
[SegmentEntryCount]))
end.

read_config(VHost) ->
{ok, Config} = file:consult(config_file_path(VHost)),
maps:from_list(Config).

-define(INFO_KEYS, vhost:info_keys()).

-spec parse_tags(binary() | string() | atom()) -> [atom()].
Expand Down Expand Up @@ -446,6 +483,10 @@ msg_store_dir_base() ->
Dir = rabbit_mnesia:dir(),
filename:join([Dir, "msg_stores", "vhosts"]).

config_file_path(VHost) ->
VHostDir = msg_store_dir_path(VHost),
filename:join(VHostDir, ".config").
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should avoid .config as it might generate confusion with advanced.config and such. How about .meta since all we store here is metadata?

Or did you intentionally want to emphasize that this is a file:consult/1-compatible file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do want to show it's a consult file, but I agree that .config is not good, it's not something we want the user to ever modify. On that note I think I will add a comment saying it's auto generated inside the file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have left .config and added a comment at the top of the file instead. I have noticed that we already have some .config for example https://github.com/rabbitmq/rabbitmq-server/blob/master/deps/rabbit/src/rabbit_node_monitor.erl#L73-L74

The .config for the queue is deep within the mnesia directory so it is unlikely that users stumble upon it. But if that's not good enough we can still change it, I'm not sure into what though.


-spec trim_tag(list() | binary() | atom()) -> atom().
trim_tag(Val) ->
rabbit_data_coercion:to_atom(string:trim(rabbit_data_coercion:to_list(Val))).
Expand Down
9 changes: 9 additions & 0 deletions deps/rabbit/test/backing_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,7 @@ bq_queue_index(Config) ->
?MODULE, bq_queue_index1, [Config]).

bq_queue_index1(_Config) ->
init_queue_index(),
SegmentSize = rabbit_queue_index:next_segment_boundary(0),
TwoSegs = SegmentSize + SegmentSize,
MostOfASegment = trunc(SegmentSize*0.75),
Expand Down Expand Up @@ -708,6 +709,7 @@ bq_queue_recover(Config) ->
?MODULE, bq_queue_recover1, [Config]).

bq_queue_recover1(Config) ->
init_queue_index(),
Count = 2 * rabbit_queue_index:next_segment_boundary(0),
QName0 = queue_name(Config, <<"bq_queue_recover-q">>),
{new, Q} = rabbit_amqqueue:declare(QName0, true, false, [], none, <<"acting-user">>),
Expand Down Expand Up @@ -1315,6 +1317,13 @@ with_empty_test_queue(Fun) ->
{0, 0, Qi} = init_test_queue(QName),
rabbit_queue_index:delete_and_terminate(Fun(Qi, QName)).

init_queue_index() ->
%% We must set the segment entry count in the process dictionary
%% for tests that call the queue index directly to have a correct
%% value.
put(segment_entry_count, 2048),
ok.

restart_app() ->
rabbit:stop(),
rabbit:start().
Expand Down