Skip to content

Commit 559a83d

Browse files
committed
See #7209. Evaluate quorum queue membership periodically.
1 parent 7037c88 commit 559a83d

13 files changed

+599
-6
lines changed

deps/rabbit/BUILD.bazel

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ load("//:rabbitmq_home.bzl", "rabbitmq_home")
55
load("//:rabbitmq_run.bzl", "rabbitmq_run")
66
load(
77
"//:rabbitmq.bzl",
8+
"ENABLE_FEATURE_MAYBE_EXPR",
89
"RABBITMQ_DIALYZER_OPTS",
910
"assert_suites",
1011
"rabbitmq_app",
@@ -330,6 +331,14 @@ rabbitmq_integration_suite(
330331
size = "medium",
331332
)
332333

334+
rabbitmq_integration_suite(
335+
name = "quorum_queue_member_reconciliation_SUITE",
336+
size = "medium",
337+
additional_beam = [
338+
":test_quorum_queue_utils_beam",
339+
],
340+
)
341+
333342
rabbitmq_integration_suite(
334343
name = "clustering_management_SUITE",
335344
size = "large",
@@ -665,6 +674,7 @@ rabbitmq_integration_suite(
665674
additional_beam = [
666675
":test_quorum_queue_utils_beam",
667676
],
677+
flaky = True,
668678
shard_count = 6,
669679
)
670680

@@ -1230,5 +1240,6 @@ eunit(
12301240
":test_test_util_beam",
12311241
":test_test_rabbit_event_handler_beam",
12321242
],
1243+
erl_extra_args = [ENABLE_FEATURE_MAYBE_EXPR],
12331244
target = ":test_erlang_app",
12341245
)

deps/rabbit/app.bzl

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,7 @@ def all_beam_files(name = "all_beam_files"):
192192
"src/rabbit_queue_type_util.erl",
193193
"src/rabbit_quorum_memory_manager.erl",
194194
"src/rabbit_quorum_queue.erl",
195+
"src/rabbit_quorum_queue_periodic_membership_reconciliation.erl",
195196
"src/rabbit_ra_registry.erl",
196197
"src/rabbit_ra_systems.erl",
197198
"src/rabbit_reader.erl",
@@ -436,6 +437,7 @@ def all_test_beam_files(name = "all_test_beam_files"):
436437
"src/rabbit_queue_type_util.erl",
437438
"src/rabbit_quorum_memory_manager.erl",
438439
"src/rabbit_quorum_queue.erl",
440+
"src/rabbit_quorum_queue_periodic_membership_reconciliation.erl",
439441
"src/rabbit_ra_registry.erl",
440442
"src/rabbit_ra_systems.erl",
441443
"src/rabbit_reader.erl",
@@ -698,6 +700,7 @@ def all_srcs(name = "all_srcs"):
698700
"src/rabbit_queue_type_util.erl",
699701
"src/rabbit_quorum_memory_manager.erl",
700702
"src/rabbit_quorum_queue.erl",
703+
"src/rabbit_quorum_queue_periodic_membership_reconciliation.erl",
701704
"src/rabbit_ra_registry.erl",
702705
"src/rabbit_ra_systems.erl",
703706
"src/rabbit_reader.erl",
@@ -802,6 +805,7 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
802805
erlc_opts = "//:test_erlc_opts",
803806
deps = ["//deps/amqp_client:erlang_app"],
804807
)
808+
805809
erlang_bytecode(
806810
name = "cluster_rename_SUITE_beam_files",
807811
testonly = True,
@@ -1972,3 +1976,12 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
19721976
erlc_opts = "//:test_erlc_opts",
19731977
deps = ["//deps/amqp_client:erlang_app"],
19741978
)
1979+
erlang_bytecode(
1980+
name = "quorum_queue_member_reconciliation_SUITE_beam_files",
1981+
testonly = True,
1982+
srcs = ["test/quorum_queue_member_reconciliation_SUITE.erl"],
1983+
outs = ["test/quorum_queue_member_reconciliation_SUITE.beam"],
1984+
app_name = "rabbit",
1985+
erlc_opts = "//:test_erlc_opts",
1986+
deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"],
1987+
)

deps/rabbit/priv/schema/rabbit.schema

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2487,6 +2487,30 @@ end}.
24872487
{mapping, "quorum_queue.property_equivalence.relaxed_checks_on_redeclaration", "rabbit.quorum_relaxed_checks_on_redeclaration", [
24882488
{datatype, {enum, [true, false]}}]}.
24892489

2490+
2491+
%%
2492+
%% Quorum Queue membership reconciliation
2493+
%%
2494+
2495+
{mapping, "quorum_queue.continuous_membership_reconciliation.enabled", "rabbit.quorum_membership_reconciliation_enabled", [
2496+
{datatype, {enum, [true, false]}}]}.
2497+
2498+
{mapping, "quorum_queue.continuous_membership_reconciliation.auto_remove", "rabbit.quorum_membership_reconciliation_auto_remove", [
2499+
{datatype, {enum, [true, false]}}]}.
2500+
2501+
{mapping, "quorum_queue.continuous_membership_reconciliation.interval", "rabbit.quorum_membership_reconciliation_interval", [
2502+
{datatype, integer}, {validators, ["non_negative_integer"]}
2503+
]}.
2504+
2505+
{mapping, "quorum_queue.continuous_membership_reconciliation.trigger_interval", "rabbit.quorum_membership_reconciliation_trigger_interval", [
2506+
{datatype, integer}, {validators, ["non_negative_integer"]}
2507+
]}.
2508+
2509+
{mapping, "quorum_queue.continuous_membership_reconciliation.target_group_size", "rabbit.quorum_membership_reconciliation_target_group_size", [
2510+
{datatype, integer}, {validators, ["non_negative_integer"]}
2511+
]}.
2512+
2513+
24902514
%%
24912515
%% Runtime parameters
24922516
%%

deps/rabbit/src/rabbit.erl

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,12 @@
175175
{requires, [rabbit_alarm, guid_generator]},
176176
{enables, core_initialized}]}).
177177

178+
-rabbit_boot_step({rabbit_quorum_queue_periodic_membership_reconciliation,
179+
[{description, "Quorums Queue membership reconciliation"},
180+
{mfa, {rabbit_sup, start_restartable_child,
181+
[rabbit_quorum_queue_periodic_membership_reconciliation]}},
182+
{requires, [database]}]}).
183+
178184
-rabbit_boot_step({rabbit_epmd_monitor,
179185
[{description, "epmd monitor"},
180186
{mfa, {rabbit_sup, start_restartable_child,

deps/rabbit/src/rabbit_node_monitor.erl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -815,6 +815,7 @@ handle_dead_rabbit(Node, State = #state{partitions = Partitions,
815815
ok = rabbit_amqqueue:on_node_down(Node),
816816
ok = rabbit_alarm:on_node_down(Node),
817817
ok = rabbit_mnesia:on_node_down(Node),
818+
ok = rabbit_quorum_queue_periodic_membership_reconciliation:on_node_down(Node),
818819
%% If we have been partitioned, and we are now in the only remaining
819820
%% partition, we no longer care about partitions - forget them. Note
820821
%% that we do not attempt to deal with individual (other) partitions
@@ -843,7 +844,8 @@ ensure_keepalive_timer(State) ->
843844
handle_live_rabbit(Node) ->
844845
ok = rabbit_amqqueue:on_node_up(Node),
845846
ok = rabbit_alarm:on_node_up(Node),
846-
ok = rabbit_mnesia:on_node_up(Node).
847+
ok = rabbit_mnesia:on_node_up(Node),
848+
ok = rabbit_quorum_queue_periodic_membership_reconciliation:on_node_up(Node).
847849

848850
maybe_autoheal(State = #state{partitions = []}) ->
849851
State;

deps/rabbit/src/rabbit_nodes.erl

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -282,14 +282,17 @@ do_filter_reachable(Members) ->
282282
Members).
283283

284284
-spec is_running(Node) -> IsRunning when
285-
Node :: node(),
285+
Node :: node() | [node()],
286286
IsRunning :: boolean().
287287
%% @doc Indicates if the given node is running.
288288
%%
289289
%% @see filter_running/1.
290290

291291
is_running(Node) when is_atom(Node) ->
292-
[Node] =:= filter_running([Node]).
292+
[Node] =:= filter_running([Node]);
293+
is_running(Nodes) when is_list(Nodes) ->
294+
lists:sort(Nodes) =:= lists:sort(filter_running(Nodes)).
295+
293296

294297
-spec list_running() -> Nodes when
295298
Nodes :: [node()].

deps/rabbit/src/rabbit_policy.erl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,11 +367,13 @@ validate(_VHost, <<"operator_policy">>, Name, Term, _User) ->
367367
notify(VHost, <<"policy">>, Name, Term0, ActingUser) ->
368368
Term = rabbit_data_coercion:atomize_keys(Term0),
369369
update_matched_objects(VHost, Term, ActingUser),
370+
rabbit_quorum_queue_periodic_membership_reconciliation:policy_set(),
370371
rabbit_event:notify(policy_set, [{name, Name}, {vhost, VHost},
371372
{user_who_performed_action, ActingUser} | Term]);
372373
notify(VHost, <<"operator_policy">>, Name, Term0, ActingUser) ->
373374
Term = rabbit_data_coercion:atomize_keys(Term0),
374375
update_matched_objects(VHost, Term, ActingUser),
376+
rabbit_quorum_queue_periodic_membership_reconciliation:policy_set(),
375377
rabbit_event:notify(policy_set, [{name, Name}, {vhost, VHost},
376378
{user_who_performed_action, ActingUser} | Term]).
377379

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
-module(rabbit_quorum_queue).
99

1010
-behaviour(rabbit_queue_type).
11+
-behaviour(rabbit_policy_validator).
12+
-behaviour(rabbit_policy_merge_strategy).
1113

1214
-export([init/1,
1315
close/1,
@@ -35,8 +37,8 @@
3537
-export([format/1]).
3638
-export([open_files/1]).
3739
-export([peek/2, peek/3]).
38-
-export([add_member/4]).
39-
-export([delete_member/3]).
40+
-export([add_member/4, add_member/2]).
41+
-export([delete_member/3, delete_member/2]).
4042
-export([requeue/3]).
4143
-export([policy_changed/1]).
4244
-export([format_ra_event/3]).
@@ -65,6 +67,7 @@
6567
is_compatible/3,
6668
declare/2,
6769
is_stateful/0]).
70+
-export([validate_policy/1, merge_policy_value/3]).
6871

6972
-export([force_shrink_member_to_current_member/2,
7073
force_all_queues_shrink_member_to_current_member/0]).
@@ -114,6 +117,34 @@
114117
-define(ADD_MEMBER_TIMEOUT, 5000).
115118
-define(SNAPSHOT_INTERVAL, 8192). %% the ra default is 4096
116119

120+
%%----------- QQ policies ---------------------------------------------------
121+
122+
-rabbit_boot_step(
123+
{?MODULE,
124+
[{description, "QQ target group size policies. "
125+
"target-group-size controls the targeted number of "
126+
"member nodes for the queue. If set, RabbitMQ will try to "
127+
"grow the queue members to the target size. "
128+
"See module rabbit_queue_member_eval."},
129+
{mfa, {rabbit_registry, register,
130+
[policy_validator, <<"target-group-size">>, ?MODULE]}},
131+
{mfa, {rabbit_registry, register,
132+
[operator_policy_validator, <<"target-group-size">>, ?MODULE]}},
133+
{mfa, {rabbit_registry, register,
134+
[policy_merge_strategy, <<"target-group-size">>, ?MODULE]}},
135+
{requires, rabbit_registry},
136+
{enables, recovery}]}).
137+
138+
validate_policy(Args) ->
139+
Count = proplists:get_value(<<"target-group-size">>, Args, none),
140+
case is_integer(Count) andalso Count > 0 of
141+
true -> ok;
142+
false -> {error, "~tp is not a valid qq target count value", [Count]}
143+
end.
144+
145+
merge_policy_value(<<"target-group-size">>, Val, OpVal) ->
146+
max(Val, OpVal).
147+
117148
%%----------- rabbit_queue_type ---------------------------------------------
118149

119150
-spec is_enabled() -> boolean().
@@ -215,6 +246,7 @@ start_cluster(Q) ->
215246
ok = rabbit_fifo_client:update_machine_state(LeaderId,
216247
ra_machine_config(NewQ)),
217248
notify_decorators(QName, startup),
249+
rabbit_quorum_queue_periodic_membership_reconciliation:queue_created(NewQ),
218250
rabbit_event:notify(queue_created,
219251
[{name, QName},
220252
{durable, Durable},
@@ -1093,6 +1125,8 @@ add_member(VHost, Name, Node, Timeout) ->
10931125
E
10941126
end.
10951127

1128+
add_member(Q, Node) ->
1129+
add_member(Q, Node, ?ADD_MEMBER_TIMEOUT).
10961130
add_member(Q, Node, Timeout) when ?amqqueue_is_quorum(Q) ->
10971131
{RaName, _} = amqqueue:get_pid(Q),
10981132
QName = amqqueue:get_name(Q),

0 commit comments

Comments
 (0)