Skip to content

Commit 1c97113

Browse files
committed
See #7209. Evaluate quorum queue membership periodically.
1 parent a59c2b5 commit 1c97113

File tree

9 files changed

+357
-7
lines changed

9 files changed

+357
-7
lines changed

deps/rabbit/BUILD.bazel

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ load("//:rabbitmq_run.bzl", "rabbitmq_run")
66
load(
77
"//:rabbitmq.bzl",
88
"RABBITMQ_DIALYZER_OPTS",
9+
"ENABLE_FEATURE_MAYBE_EXPR",
910
"assert_suites",
1011
"rabbitmq_app",
1112
"rabbitmq_integration_suite",
@@ -662,7 +663,7 @@ rabbitmq_integration_suite(
662663
":test_quorum_queue_utils_beam",
663664
],
664665
flaky = True,
665-
shard_count = 6,
666+
shard_count = 7,
666667
)
667668

668669
rabbitmq_suite(
@@ -1224,5 +1225,6 @@ eunit(
12241225
":test_test_util_beam",
12251226
":test_test_rabbit_event_handler_beam",
12261227
],
1228+
erl_extra_args = [ENABLE_FEATURE_MAYBE_EXPR],
12271229
target = ":test_erlang_app",
12281230
)

deps/rabbit/app.bzl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ def all_beam_files(name = "all_beam_files"):
189189
"src/rabbit_queue_location_validator.erl",
190190
"src/rabbit_queue_master_location_misc.erl",
191191
"src/rabbit_queue_type_util.erl",
192+
"src/rabbit_queue_member_eval.erl",
192193
"src/rabbit_quorum_memory_manager.erl",
193194
"src/rabbit_quorum_queue.erl",
194195
"src/rabbit_ra_registry.erl",
@@ -432,6 +433,7 @@ def all_test_beam_files(name = "all_test_beam_files"):
432433
"src/rabbit_queue_location_validator.erl",
433434
"src/rabbit_queue_master_location_misc.erl",
434435
"src/rabbit_queue_type_util.erl",
436+
"src/rabbit_queue_member_eval.erl",
435437
"src/rabbit_quorum_memory_manager.erl",
436438
"src/rabbit_quorum_queue.erl",
437439
"src/rabbit_ra_registry.erl",
@@ -690,6 +692,7 @@ def all_srcs(name = "all_srcs"):
690692
"src/rabbit_queue_location_validator.erl",
691693
"src/rabbit_queue_master_location_misc.erl",
692694
"src/rabbit_queue_master_locator.erl",
695+
"src/rabbit_queue_member_eval.erl",
693696
"src/rabbit_queue_type.erl",
694697
"src/rabbit_queue_type_util.erl",
695698
"src/rabbit_quorum_memory_manager.erl",

deps/rabbit/priv/schema/rabbit.schema

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2486,6 +2486,24 @@ end}.
24862486
{mapping, "quorum_queue.property_equivalence.relaxed_checks_on_redeclaration", "rabbit.quorum_relaxed_checks_on_redeclaration", [
24872487
{datatype, {enum, [true, false]}}]}.
24882488

2489+
2490+
%%
2491+
%% Queue membership evaluation
2492+
%%
2493+
2494+
{mapping, "queue_membership_evaluation.default_timeout", "rabbit.member_eval_default_timeout", [
2495+
{datatype, integer}, {validators, ["non_negative_integer"]}
2496+
]}.
2497+
2498+
{mapping, "queue_membership_evaluation.short_timeout", "rabbit.member_eval_short_timeout", [
2499+
{datatype, integer}, {validators, ["non_negative_integer"]}
2500+
]}.
2501+
2502+
{mapping, "queue_membership_evaluation.target_group_size", "rabbit.member_eval_target_group_size", [
2503+
{datatype, integer}, {validators, ["non_negative_integer"]}
2504+
]}.
2505+
2506+
24892507
%%
24902508
%% Runtime parameters
24912509
%%

deps/rabbit/src/rabbit.erl

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,12 @@
175175
{requires, [rabbit_alarm, guid_generator]},
176176
{enables, core_initialized}]}).
177177

178+
-rabbit_boot_step({rabbit_queue_member_eval,
179+
[{description, "TODO"},
180+
{mfa, {rabbit_sup, start_restartable_child,
181+
[rabbit_queue_member_eval]}},
182+
{requires, [database]}]}).
183+
178184
-rabbit_boot_step({rabbit_epmd_monitor,
179185
[{description, "epmd monitor"},
180186
{mfa, {rabbit_sup, start_restartable_child,

deps/rabbit/src/rabbit_node_monitor.erl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -815,6 +815,7 @@ handle_dead_rabbit(Node, State = #state{partitions = Partitions,
815815
ok = rabbit_amqqueue:on_node_down(Node),
816816
ok = rabbit_alarm:on_node_down(Node),
817817
ok = rabbit_mnesia:on_node_down(Node),
818+
ok = rabbit_queue_member_eval:on_node_down(Node),
818819
%% If we have been partitioned, and we are now in the only remaining
819820
%% partition, we no longer care about partitions - forget them. Note
820821
%% that we do not attempt to deal with individual (other) partitions
@@ -843,7 +844,8 @@ ensure_keepalive_timer(State) ->
843844
handle_live_rabbit(Node) ->
844845
ok = rabbit_amqqueue:on_node_up(Node),
845846
ok = rabbit_alarm:on_node_up(Node),
846-
ok = rabbit_mnesia:on_node_up(Node).
847+
ok = rabbit_mnesia:on_node_up(Node),
848+
ok = rabbit_queue_member_eval:on_node_up(Node).
847849

848850
maybe_autoheal(State = #state{partitions = []}) ->
849851
State;

deps/rabbit/src/rabbit_nodes.erl

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -281,14 +281,17 @@ do_filter_reachable(Members) ->
281281
Members).
282282

283283
-spec is_running(Node) -> IsRunning when
284-
Node :: node(),
284+
Node :: node() | [node()],
285285
IsRunning :: boolean().
286286
%% @doc Indicates if the given node is running.
287287
%%
288288
%% @see filter_running/1.
289289

290290
is_running(Node) when is_atom(Node) ->
291-
[Node] =:= filter_running([Node]).
291+
[Node] =:= filter_running([Node]);
292+
is_running(Nodes) when is_list(Nodes) ->
293+
lists:sort(Nodes) =:= lists:sort(filter_running(Nodes)).
294+
292295

293296
-spec list_running() -> Nodes when
294297
Nodes :: [node()].
Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
6+
%%
7+
8+
-module(rabbit_queue_member_eval).
9+
10+
-feature(maybe_expr, enable).
11+
12+
-behaviour(gen_server).
13+
14+
-export([on_node_up/1, on_node_down/1]).
15+
16+
-export([start_link/0]).
17+
18+
%% gen_server callbacks
19+
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
20+
code_change/3]).
21+
22+
-define(SERVER, ?MODULE).
23+
-define(DEFAULT_INTERVAL, 60000*60).
24+
-define(SHORT_INTERVAL, 10000).
25+
26+
-define(EVAL_MSG, member_eval).
27+
28+
-record(state, {timer_ref :: reference(),
29+
default_interval :: non_neg_integer(),
30+
short_interval :: non_neg_integer(),
31+
target_group_size :: non_neg_integer() | undefined}).
32+
33+
%%----------------------------------------------------------------------------
34+
%% Start
35+
%%----------------------------------------------------------------------------
36+
37+
-spec start_link() -> rabbit_types:ok_pid_or_error().
38+
start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
39+
40+
%%----------------------------------------------------------------------------
41+
%% API
42+
%%----------------------------------------------------------------------------
43+
44+
on_node_up(_Node) ->
45+
gen_server:cast(?SERVER, nodechange).
46+
47+
on_node_down(_Node) ->
48+
gen_server:cast(?SERVER, nodechange).
49+
50+
%%----------------------------------------------------------------------------
51+
%% gen_server callbacks
52+
%%----------------------------------------------------------------------------
53+
54+
init([]) ->
55+
DefaultInterval = rabbit_misc:get_env(rabbit, member_eval_default_timeout,
56+
?DEFAULT_INTERVAL),
57+
ShortInterval = rabbit_misc:get_env(rabbit, member_eval_short_timeout,
58+
?SHORT_INTERVAL),
59+
TargetGroupSize = rabbit_misc:get_env(rabbit, member_eval_target_group_size,
60+
undefined),
61+
Ref = erlang:send_after(DefaultInterval, self(), ?EVAL_MSG),
62+
{ok, #state{timer_ref = Ref,
63+
default_interval = DefaultInterval,
64+
short_interval = ShortInterval,
65+
target_group_size = TargetGroupSize}}.
66+
67+
handle_call(_Request, _From, State) ->
68+
{reply, ok, State}.
69+
70+
handle_cast(nodechange, #state{timer_ref = OldRef,
71+
short_interval = Time} = State) ->
72+
_ = erlang:cancel_timer(OldRef),
73+
Ref = erlang:send_after(Time, self(), ?EVAL_MSG),
74+
{noreply, State#state{timer_ref = Ref}};
75+
handle_cast(_Msg, State) ->
76+
{noreply, State}.
77+
78+
handle_info(?EVAL_MSG, #state{default_interval = DefaultInterval,
79+
short_interval = ShortInterval,
80+
target_group_size = TargetSize} = State) ->
81+
Res = eval_quroum_queue_membership(TargetSize),
82+
NewTimeout = case Res of
83+
noop ->
84+
DefaultInterval;
85+
_ ->
86+
ShortInterval
87+
end,
88+
Ref = erlang:send_after(NewTimeout, self(), ?EVAL_MSG),
89+
{noreply, State#state{timer_ref = Ref}};
90+
handle_info(_Info, State) ->
91+
{noreply, State}.
92+
93+
terminate(_Reason, _State) ->
94+
ok.
95+
96+
code_change(_OldVsn, State, _Extra) ->
97+
{ok, State}.
98+
99+
%%----------------------------------------------------------------------------
100+
%% Internal functions
101+
%%----------------------------------------------------------------------------
102+
103+
eval_quroum_queue_membership(TargetSize) ->
104+
LocalLeaders = rabbit_amqqueue:list_local_leaders(),
105+
ExpectedNodes = rabbit_nodes:list_members(),
106+
Running = rabbit_nodes:list_running(),
107+
eval_quorum_members(ExpectedNodes, Running, LocalLeaders, TargetSize, noop).
108+
109+
eval_quorum_members(_ExpectedNodes, _Running, [], _TargetSize, Result) ->
110+
Result;
111+
eval_quorum_members(ExpectedNodes, Running, [Q | LocalLeaders], TargetSize, OldResult) ->
112+
{ok, Members, {_, LeaderNode}} = ra:members(amqqueue:get_pid(Q)),
113+
114+
%% Check if Leader is indeed this node
115+
Result = case LeaderNode =:= node() of
116+
true ->
117+
MemberNodes = [Node || {_, Node} <- Members],
118+
Remove = MemberNodes -- ExpectedNodes,
119+
case Remove of
120+
[] ->
121+
add_member(Q, Running, MemberNodes, TargetSize);
122+
_ ->
123+
remove_members(Q, Remove)
124+
end;
125+
false ->
126+
noop
127+
end,
128+
eval_quorum_members(ExpectedNodes, Running, LocalLeaders, TargetSize,
129+
update_result(OldResult, Result)).
130+
131+
add_member(Q, Running, MemberNodes, TargetSize) ->
132+
New = Running -- MemberNodes,
133+
case should_add_node(MemberNodes, New, Q, TargetSize) of
134+
true ->
135+
%% In the future, sort the list of new nodes based on load,
136+
%% availability zones etc
137+
Node = hd(New),
138+
QName = amqqueue:get_name(Q),
139+
case rabbit_quorum_queue:add_member(Q, Node) of
140+
ok ->
141+
rabbit_log:debug(
142+
"Added node ~ts as a member to queue ~ts as "
143+
"the queues target group size is not met and "
144+
"there are enough nodes in the cluster", [Node, QName]);
145+
{error, Err} ->
146+
rabbit_log:warning(
147+
"~ts: failed to add member (replica) on node ~w, error: ~w",
148+
[rabbit_misc:rs(QName), Node, Err])
149+
end,
150+
ok;
151+
false ->
152+
noop
153+
end.
154+
155+
should_add_node(MemberNodes, New, Q, TargetSize) ->
156+
CurrentSize = length(MemberNodes),
157+
TargetSize = get_target_size(Q, TargetSize),
158+
maybe
159+
true ?= CurrentSize < TargetSize, %% Target size not reached
160+
true ?= (TargetSize - CurrentSize) =< length(New), %% Enough nodes,
161+
true ?= rabbit_nodes:is_running(lists:delete(node(), MemberNodes)),
162+
true ?= are_members_up_to_date(MemberNodes, Q)
163+
end.
164+
165+
are_members_up_to_date(_MemberNodes, _Q) ->
166+
%% _RaMetrics = erpc:multicall(lists:delete(node(), MemberNodes),
167+
%% ets,
168+
%% lookup,
169+
%% [ra_metrics, amqqueue:get_name(Q)], 10000),
170+
%% _LocalMetric = ets:lookup(ra_metrics, amqqueue:get_name(Q)),
171+
%% Do something with the result
172+
true.
173+
174+
get_target_size(Q, undefined) ->
175+
get_target_size(Q);
176+
get_target_size(_Q, N) when N > 0 ->
177+
N.
178+
179+
get_target_size(Q) ->
180+
case rabbit_policy:get(<<"target-group-size">>, Q) of
181+
undefined ->
182+
0;
183+
N ->
184+
N
185+
end.
186+
187+
remove_members(_Q, []) ->
188+
ok;
189+
remove_members(Q, [Node | Nodes]) ->
190+
case rabbit_quorum_queue:delete_member(Q, Node) of
191+
ok ->
192+
ok;
193+
{error, Err} ->
194+
QName = amqqueue:get_name(Q),
195+
rabbit_log:warning("~ts: failed to remove member (replica) on node "
196+
"~w, error: ~w",
197+
[rabbit_misc:rs(QName), Node, Err])
198+
end,
199+
remove_members(Q, Nodes).
200+
201+
202+
%% Make sure any non-noop result is stored.
203+
update_result(noop, Result) ->
204+
Result;
205+
update_result(Result, noop) ->
206+
Result.

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
-module(rabbit_quorum_queue).
99

1010
-behaviour(rabbit_queue_type).
11+
-behaviour(rabbit_policy_validator).
12+
-behaviour(rabbit_policy_merge_strategy).
1113

1214
-export([init/1,
1315
close/1,
@@ -35,8 +37,8 @@
3537
-export([format/1]).
3638
-export([open_files/1]).
3739
-export([peek/2, peek/3]).
38-
-export([add_member/4]).
39-
-export([delete_member/3]).
40+
-export([add_member/4, add_member/2]).
41+
-export([delete_member/3, delete_member/2]).
4042
-export([requeue/3]).
4143
-export([policy_changed/1]).
4244
-export([format_ra_event/3]).
@@ -65,6 +67,7 @@
6567
is_compatible/3,
6668
declare/2,
6769
is_stateful/0]).
70+
-export([validate_policy/1, merge_policy_value/3]).
6871

6972
-export([force_shrink_member_to_current_member/2,
7073
force_all_queues_shrink_member_to_current_member/0]).
@@ -114,6 +117,30 @@
114117
-define(ADD_MEMBER_TIMEOUT, 5000).
115118
-define(SNAPSHOT_INTERVAL, 8192). %% the ra default is 4096
116119

120+
%%----------- QQ policies ---------------------------------------------------
121+
122+
-rabbit_boot_step(
123+
{?MODULE,
124+
[{description, "QQ policy validation"},
125+
{mfa, {rabbit_registry, register,
126+
[policy_validator, <<"target-group-size">>, ?MODULE]}},
127+
{mfa, {rabbit_registry, register,
128+
[operator_policy_validator, <<"target-group-size">>, ?MODULE]}},
129+
{mfa, {rabbit_registry, register,
130+
[policy_merge_strategy, <<"target-group-size">>, ?MODULE]}},
131+
{requires, rabbit_registry},
132+
{enables, recovery}]}).
133+
134+
validate_policy(Args) ->
135+
Count = proplists:get_value(<<"target-group-size">>, Args, none),
136+
case is_integer(Count) andalso Count > 0 of
137+
true -> ok;
138+
false -> {error, "~tp is not a valid qq target count value", [Count]}
139+
end.
140+
141+
merge_policy_value(<<"target-group-size">>, _Val, OpVal) ->
142+
OpVal.
143+
117144
%%----------- rabbit_queue_type ---------------------------------------------
118145

119146
-spec is_enabled() -> boolean().
@@ -1087,6 +1114,8 @@ add_member(VHost, Name, Node, Timeout) ->
10871114
E
10881115
end.
10891116

1117+
add_member(Q, Node) ->
1118+
add_member(Q, Node, ?ADD_MEMBER_TIMEOUT).
10901119
add_member(Q, Node, Timeout) when ?amqqueue_is_quorum(Q) ->
10911120
{RaName, _} = amqqueue:get_pid(Q),
10921121
QName = amqqueue:get_name(Q),

0 commit comments

Comments
 (0)