Skip to content

Commit 7a8e166

Browse files
committed
Use rabbit_db_queue for qq leader health check lookups
and introduce rabbit_db_queue:get_all_by_type_and_vhost/2. Update leader health check timeout to 5s and process limit threshold to 20% of node's process_limit.
1 parent b2acbae commit 7a8e166

File tree

3 files changed

+36
-6
lines changed

3 files changed

+36
-6
lines changed

deps/rabbit/src/amqqueue.erl

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
pattern_match_on_type/1,
6767
pattern_match_on_durable/1,
6868
pattern_match_on_type_and_durable/2,
69+
pattern_match_on_type_and_vhost/2,
6970
reset_decorators/1,
7071
set_immutable/1,
7172
qnode/1,
@@ -532,6 +533,12 @@ pattern_match_on_durable(IsDurable) ->
532533
pattern_match_on_type_and_durable(Type, IsDurable) ->
533534
#amqqueue{type = Type, durable = IsDurable, _ = '_'}.
534535

536+
-spec pattern_match_on_type_and_vhost(atom(), binary()) ->
537+
amqqueue_pattern().
538+
539+
pattern_match_on_type_and_vhost(Type, VHost) ->
540+
#amqqueue{type = Type, vhost = VHost, _ = '_'}.
541+
535542
-spec reset_decorators(amqqueue()) -> amqqueue().
536543

537544
reset_decorators(#amqqueue{} = Queue) ->

deps/rabbit/src/rabbit_db_queue.erl

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
get_all/0,
2222
get_all/1,
2323
get_all_by_type/1,
24+
get_all_by_type_and_vhost/2,
2425
get_all_by_type_and_node/3,
2526
list/0,
2627
count/0,
@@ -829,6 +830,28 @@ get_all_by_type(Type) ->
829830
khepri => fun() -> get_all_by_pattern_in_khepri(Pattern) end
830831
}).
831832

833+
%% -------------------------------------------------------------------
834+
%% get_all_by_type_and_vhost().
835+
%% -------------------------------------------------------------------
836+
837+
-spec get_all_by_type_and_vhost(Type, VHost) -> [Queue] when
838+
Type :: atom(),
839+
VHost :: binary(),
840+
Queue :: amqqueue:amqqueue().
841+
842+
%% @doc Gets all queues belonging to the given type and vhost
843+
%%
844+
%% @returns a list of queue records.
845+
%%
846+
%% @private
847+
848+
get_all_by_type_and_vhost(Type, VHost) ->
849+
Pattern = amqqueue:pattern_match_on_type_and_vhost(Type, VHost),
850+
rabbit_khepri:handle_fallback(
851+
#{mnesia => fun() -> get_all_by_pattern_in_mnesia(Pattern) end,
852+
khepri => fun() -> get_all_by_pattern_in_khepri(Pattern) end
853+
}).
854+
832855
get_all_by_pattern_in_mnesia(Pattern) ->
833856
rabbit_db:list_in_mnesia(?MNESIA_TABLE, Pattern).
834857

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@
147147
-define(SNAPSHOT_INTERVAL, 8192). %% the ra default is 4096
148148
% -define(UNLIMITED_PREFETCH_COUNT, 2000). %% something large for ra
149149
-define(MIN_CHECKPOINT_INTERVAL, 8192). %% the ra default is 16384
150-
-define(LEADER_HEALTH_CHECK_TIMEOUT, 1_000).
150+
-define(LEADER_HEALTH_CHECK_TIMEOUT, 5_000).
151151
-define(GLOBAL_LEADER_HEALTH_CHECK_TIMEOUT, 60_000).
152152

153153
%%----------- QQ policies ---------------------------------------------------
@@ -2151,19 +2151,19 @@ file_handle_release_reservation() ->
21512151
ok.
21522152

21532153
leader_health_check(QueueNameOrRegEx, VHost) ->
2154-
%% Set a process limit threshold to 40% of ErlangVM process limit, beyond which
2154+
%% Set a process limit threshold to 20% of ErlangVM process limit, beyond which
21552155
%% we cannot spawn any new processes for executing QQ leader health checks.
2156-
ProcessLimitThreshold = round(0.4 * erlang:system_info(process_limit)),
2156+
ProcessLimitThreshold = round(0.2 * erlang:system_info(process_limit)),
21572157

21582158
leader_health_check(QueueNameOrRegEx, VHost, ProcessLimitThreshold).
21592159

21602160
leader_health_check(QueueNameOrRegEx, VHost, ProcessLimitThreshold) ->
21612161
Qs =
21622162
case VHost of
2163-
global ->
2164-
rabbit_amqqueue:list();
2163+
across_all_vhosts ->
2164+
rabbit_db_queue:get_all_by_type(?MODULE);
21652165
VHost when is_binary(VHost) ->
2166-
rabbit_amqqueue:list(VHost)
2166+
rabbit_db_queue:get_all_by_type_and_vhost(?MODULE, VHost)
21672167
end,
21682168
check_process_limit_safety(length(Qs), ProcessLimitThreshold),
21692169
ParentPID = self(),

0 commit comments

Comments
 (0)