Skip to content

Commit e99c8fc

Browse files
committed
Name changes and handling quorum queue leader election not being complete
1 parent 45108ea commit e99c8fc

11 files changed

+91
-83
lines changed

deps/rabbit/BUILD.bazel

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ load("//:rabbitmq_home.bzl", "rabbitmq_home")
55
load("//:rabbitmq_run.bzl", "rabbitmq_run")
66
load(
77
"//:rabbitmq.bzl",
8-
"RABBITMQ_DIALYZER_OPTS",
98
"ENABLE_FEATURE_MAYBE_EXPR",
9+
"RABBITMQ_DIALYZER_OPTS",
1010
"assert_suites",
1111
"rabbitmq_app",
1212
"rabbitmq_integration_suite",
@@ -332,7 +332,7 @@ rabbitmq_integration_suite(
332332
)
333333

334334
rabbitmq_integration_suite(
335-
name = "member_evaluation_SUITE",
335+
name = "quorum_queue_member_reconciliation_SUITE",
336336
size = "medium",
337337
additional_beam = [
338338
":test_quorum_queue_utils_beam",

deps/rabbit/app.bzl

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -190,9 +190,9 @@ def all_beam_files(name = "all_beam_files"):
190190
"src/rabbit_queue_location_validator.erl",
191191
"src/rabbit_queue_master_location_misc.erl",
192192
"src/rabbit_queue_type_util.erl",
193-
"src/rabbit_queue_member_eval.erl",
194193
"src/rabbit_quorum_memory_manager.erl",
195194
"src/rabbit_quorum_queue.erl",
195+
"src/rabbit_quorum_queue_periodic_membership_reconciliation.erl",
196196
"src/rabbit_ra_registry.erl",
197197
"src/rabbit_ra_systems.erl",
198198
"src/rabbit_reader.erl",
@@ -435,9 +435,9 @@ def all_test_beam_files(name = "all_test_beam_files"):
435435
"src/rabbit_queue_location_validator.erl",
436436
"src/rabbit_queue_master_location_misc.erl",
437437
"src/rabbit_queue_type_util.erl",
438-
"src/rabbit_queue_member_eval.erl",
439438
"src/rabbit_quorum_memory_manager.erl",
440439
"src/rabbit_quorum_queue.erl",
440+
"src/rabbit_quorum_queue_periodic_membership_reconciliation.erl",
441441
"src/rabbit_ra_registry.erl",
442442
"src/rabbit_ra_systems.erl",
443443
"src/rabbit_reader.erl",
@@ -696,11 +696,11 @@ def all_srcs(name = "all_srcs"):
696696
"src/rabbit_queue_location_validator.erl",
697697
"src/rabbit_queue_master_location_misc.erl",
698698
"src/rabbit_queue_master_locator.erl",
699-
"src/rabbit_queue_member_eval.erl",
700699
"src/rabbit_queue_type.erl",
701700
"src/rabbit_queue_type_util.erl",
702701
"src/rabbit_quorum_memory_manager.erl",
703702
"src/rabbit_quorum_queue.erl",
703+
"src/rabbit_quorum_queue_periodic_membership_reconciliation.erl",
704704
"src/rabbit_ra_registry.erl",
705705
"src/rabbit_ra_systems.erl",
706706
"src/rabbit_reader.erl",
@@ -805,16 +805,7 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
805805
erlc_opts = "//:test_erlc_opts",
806806
deps = ["//deps/amqp_client:erlang_app"],
807807
)
808-
erlang_bytecode(
809-
name = "member_evaluation_SUITE_beam_files",
810-
testonly = True,
811-
srcs = ["test/member_evaluation_SUITE.erl"],
812-
outs = ["test/member_evaluation_SUITE.beam"],
813-
hdrs = ["include/amqqueue.hrl", "include/amqqueue_v2.hrl"],
814-
app_name = "rabbit",
815-
erlc_opts = "//:test_erlc_opts",
816-
deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"],
817-
)
808+
818809
erlang_bytecode(
819810
name = "cluster_rename_SUITE_beam_files",
820811
testonly = True,
@@ -1985,3 +1976,12 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
19851976
erlc_opts = "//:test_erlc_opts",
19861977
deps = ["//deps/amqp_client:erlang_app"],
19871978
)
1979+
erlang_bytecode(
1980+
name = "quorum_queue_member_reconciliation_SUITE_beam_files",
1981+
testonly = True,
1982+
srcs = ["test/quorum_queue_member_reconciliation_SUITE.erl"],
1983+
outs = ["test/quorum_queue_member_reconciliation_SUITE.beam"],
1984+
app_name = "rabbit",
1985+
erlc_opts = "//:test_erlc_opts",
1986+
deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"],
1987+
)

deps/rabbit/priv/schema/rabbit.schema

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2489,22 +2489,22 @@ end}.
24892489

24902490

24912491
%%
2492-
%% Queue membership evaluation
2492+
%% Quorum Queue membership reconciliation
24932493
%%
24942494

2495-
{mapping, "quorum_queue_membership_evaluation.enabled", "rabbit.quorum_membership_changes_enabled", [
2495+
{mapping, "quorum_queue.continuous_membership_reconciliation.enabled", "rabbit.quorum_membership_reconciliation_enabled", [
24962496
{datatype, {enum, [true, false]}}]}.
24972497

24982498

2499-
{mapping, "quorum_queue_membership_evaluation.default_timeout", "rabbit.quorum_membership_changes_default_timeout", [
2499+
{mapping, "quorum_queue.continuous_membership_reconciliation.default_timeout", "rabbit.quorum_membership_reconciliation_default_timeout", [
25002500
{datatype, integer}, {validators, ["non_negative_integer"]}
25012501
]}.
25022502

2503-
{mapping, "quorum_queue_membership_evaluation.short_timeout", "rabbit.quorum_membership_changes_short_timeout", [
2503+
{mapping, "quorum_queue.continuous_membership_reconciliation.short_timeout", "rabbit.quorum_membership_reconciliation_short_timeout", [
25042504
{datatype, integer}, {validators, ["non_negative_integer"]}
25052505
]}.
25062506

2507-
{mapping, "quorum_queue_membership_evaluation.target_group_size", "rabbit.quorum_membership_changes_target_group_size", [
2507+
{mapping, "quorum_queue.continuous_membership_reconciliation.target_group_size", "rabbit.quorum_membership_reconciliation_target_group_size", [
25082508
{datatype, integer}, {validators, ["non_negative_integer"]}
25092509
]}.
25102510

deps/rabbit/src/rabbit.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -175,10 +175,10 @@
175175
{requires, [rabbit_alarm, guid_generator]},
176176
{enables, core_initialized}]}).
177177

178-
-rabbit_boot_step({rabbit_queue_member_eval,
179-
[{description, "Queue membership evaluation"},
178+
-rabbit_boot_step({rabbit_quorum_queue_periodic_membership_reconciliation,
179+
[{description, "Quorums Queue membership reconciliation"},
180180
{mfa, {rabbit_sup, start_restartable_child,
181-
[rabbit_queue_member_eval]}},
181+
[rabbit_quorum_queue_periodic_membership_reconciliation]}},
182182
{requires, [database]}]}).
183183

184184
-rabbit_boot_step({rabbit_epmd_monitor,

deps/rabbit/src/rabbit_node_monitor.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -815,7 +815,7 @@ handle_dead_rabbit(Node, State = #state{partitions = Partitions,
815815
ok = rabbit_amqqueue:on_node_down(Node),
816816
ok = rabbit_alarm:on_node_down(Node),
817817
ok = rabbit_mnesia:on_node_down(Node),
818-
ok = rabbit_queue_member_eval:on_node_down(Node),
818+
ok = rabbit_quorum_queue_periodic_membership_reconciliation:on_node_down(Node),
819819
%% If we have been partitioned, and we are now in the only remaining
820820
%% partition, we no longer care about partitions - forget them. Note
821821
%% that we do not attempt to deal with individual (other) partitions
@@ -845,7 +845,7 @@ handle_live_rabbit(Node) ->
845845
ok = rabbit_amqqueue:on_node_up(Node),
846846
ok = rabbit_alarm:on_node_up(Node),
847847
ok = rabbit_mnesia:on_node_up(Node),
848-
ok = rabbit_queue_member_eval:on_node_up(Node).
848+
ok = rabbit_quorum_queue_periodic_membership_reconciliation:on_node_up(Node).
849849

850850
maybe_autoheal(State = #state{partitions = []}) ->
851851
State;

deps/rabbit/src/rabbit_policy.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -367,13 +367,13 @@ validate(_VHost, <<"operator_policy">>, Name, Term, _User) ->
367367
notify(VHost, <<"policy">>, Name, Term0, ActingUser) ->
368368
Term = rabbit_data_coercion:atomize_keys(Term0),
369369
update_matched_objects(VHost, Term, ActingUser),
370-
rabbit_queue_member_eval:policy_set(),
370+
rabbit_quorum_queue_periodic_membership_reconciliation:policy_set(),
371371
rabbit_event:notify(policy_set, [{name, Name}, {vhost, VHost},
372372
{user_who_performed_action, ActingUser} | Term]);
373373
notify(VHost, <<"operator_policy">>, Name, Term0, ActingUser) ->
374374
Term = rabbit_data_coercion:atomize_keys(Term0),
375375
update_matched_objects(VHost, Term, ActingUser),
376-
rabbit_queue_member_eval:policy_set(),
376+
rabbit_quorum_queue_periodic_membership_reconciliation:policy_set(),
377377
rabbit_event:notify(policy_set, [{name, Name}, {vhost, VHost},
378378
{user_who_performed_action, ActingUser} | Term]).
379379

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ start_cluster(Q) ->
246246
ok = rabbit_fifo_client:update_machine_state(LeaderId,
247247
ra_machine_config(NewQ)),
248248
notify_decorators(QName, startup),
249-
rabbit_queue_member_eval:queue_created(NewQ),
249+
rabbit_quorum_queue_periodic_membership_reconciliation:queue_created(NewQ),
250250
rabbit_event:notify(queue_created,
251251
[{name, QName},
252252
{durable, Durable},

deps/rabbit/src/rabbit_queue_member_eval.erl renamed to deps/rabbit/src/rabbit_quorum_queue_periodic_membership_reconciliation.erl

Lines changed: 47 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
%% Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
66
%%
77

8-
-module(rabbit_queue_member_eval).
8+
-module(rabbit_quorum_queue_periodic_membership_reconciliation).
99

1010
-feature(maybe_expr, enable).
1111

@@ -24,7 +24,7 @@
2424
-define(SHORT_INTERVAL, 10_000).
2525
-define(QUEUE_COUNT_START_RANDOM_SELECTION, 1_000).
2626

27-
-define(EVAL_MSG, member_eval).
27+
-define(EVAL_MSG, membership_reconciliation).
2828

2929
-record(state, {timer_ref :: reference() | undefined,
3030
default_interval :: non_neg_integer(),
@@ -43,35 +43,31 @@ start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
4343
%% API
4444
%%----------------------------------------------------------------------------
4545

46-
on_node_up(_Node) ->
47-
rabbit_log:debug("SIMON: NODEUP",[]),
48-
gen_server:cast(?SERVER, eval_trigger).
46+
on_node_up(Node) ->
47+
gen_server:cast(?SERVER, {membership_reconciliation_trigger, {node_up, Node}}).
4948

50-
on_node_down(_Node) ->
51-
rabbit_log:debug("SIMON: NODEDOWN",[]),
52-
gen_server:cast(?SERVER, eval_trigger).
49+
on_node_down(Node) ->
50+
gen_server:cast(?SERVER, {membership_reconciliation_trigger, {node_down, Node}}).
5351

54-
queue_created(_Node) ->
55-
rabbit_log:debug("SIMON: QUEUE CREATED",[]),
56-
gen_server:cast(?SERVER, eval_trigger).
52+
queue_created(Q) ->
53+
gen_server:cast(?SERVER, {membership_reconciliation_trigger, {queue_created, Q}}).
5754

5855
policy_set() ->
59-
rabbit_log:debug("SIMON: POLICY SET",[]),
60-
gen_server:cast(?SERVER, eval_trigger).
56+
gen_server:cast(?SERVER, {membership_reconciliation_trigger, policy_set}).
6157

6258
%%----------------------------------------------------------------------------
6359
%% gen_server callbacks
6460
%%----------------------------------------------------------------------------
6561

6662
init([]) ->
67-
DefaultInterval = rabbit_misc:get_env(rabbit, quorum_membership_changes_default_timeout,
63+
Enabled = rabbit_misc:get_env(rabbit, quorum_membership_reconciliation_enabled,
64+
false),
65+
DefaultInterval = rabbit_misc:get_env(rabbit, quorum_membership_reconciliation_default_timeout,
6866
?DEFAULT_INTERVAL),
69-
ShortInterval = rabbit_misc:get_env(rabbit, quorum_membership_changes_short_timeout,
67+
ShortInterval = rabbit_misc:get_env(rabbit, quorum_membership_reconciliation_short_timeout,
7068
?SHORT_INTERVAL),
71-
TargetGroupSize = rabbit_misc:get_env(rabbit, quorum_membership_changes_target_group_size,
69+
TargetGroupSize = rabbit_misc:get_env(rabbit, quorum_membership_reconciliation_target_group_size,
7270
undefined),
73-
Enabled = rabbit_misc:get_env(rabbit, quorum_membership_changes_enabled,
74-
false),
7571
State = #state{default_interval = DefaultInterval,
7672
short_interval = ShortInterval,
7773
target_group_size = TargetGroupSize,
@@ -87,10 +83,12 @@ init([]) ->
8783
handle_call(_Request, _From, State) ->
8884
{reply, ok, State}.
8985

90-
handle_cast(eval_trigger, #state{enabled = false} = State) ->
86+
handle_cast({membership_reconciliation_trigger, _Reason}, #state{enabled = false} = State) ->
9187
{noreply, State, hibernate};
92-
handle_cast(eval_trigger, #state{timer_ref = OldRef,
88+
handle_cast({membership_reconciliation_trigger, Reason}, #state{timer_ref = OldRef,
9389
short_interval = Time} = State) ->
90+
rabbit_log:debug("Quorum Queue membership reconciliation triggered: ~p",
91+
[Reason]),
9492
_ = erlang:cancel_timer(OldRef),
9593
Ref = erlang:send_after(Time, self(), ?EVAL_MSG),
9694
{noreply, State#state{timer_ref = Ref}};
@@ -100,7 +98,7 @@ handle_cast(_Msg, State) ->
10098
handle_info(?EVAL_MSG, #state{default_interval = DefaultInterval,
10199
short_interval = ShortInterval,
102100
target_group_size = TargetSize} = State) ->
103-
Res = eval_quorum_queue_membership(TargetSize),
101+
Res = reconclitiate_quorum_queue_membership(TargetSize),
104102
NewTimeout = case Res of
105103
noop ->
106104
DefaultInterval;
@@ -124,39 +122,41 @@ code_change(_OldVsn, State, _Extra) ->
124122
%% Internal functions
125123
%%----------------------------------------------------------------------------
126124

127-
eval_quorum_queue_membership(TargetSize) ->
125+
reconclitiate_quorum_queue_membership(TargetSize) ->
128126
LocalLeaders = rabbit_amqqueue:list_local_leaders(),
129127
ExpectedNodes = rabbit_nodes:list_members(),
130128
Running = rabbit_nodes:list_running(),
131-
eval_quorum_members(ExpectedNodes, Running, LocalLeaders, TargetSize, noop).
129+
reconclitiate_quorum_members(ExpectedNodes, Running, LocalLeaders, TargetSize, noop).
132130

133-
eval_quorum_members(_ExpectedNodes, _Running, [], _TargetSize, Result) ->
131+
reconclitiate_quorum_members(_ExpectedNodes, _Running, [], _TargetSize, Result) ->
134132
Result;
135-
eval_quorum_members(ExpectedNodes, Running, [Q | LocalLeaders], TargetSize, OldResult) ->
136-
{ok, Members, {_, LeaderNode}} = ra:members(amqqueue:get_pid(Q)),
137-
133+
reconclitiate_quorum_members(ExpectedNodes, Running, [Q | LocalLeaders], TargetSize, OldResult) ->
138134
Result =
139-
maybe
140-
%% Check if Leader is indeed this node
141-
LeaderNode ?= node(),
142-
%% And that this not is not in maintenance mode
143-
true ?= not rabbit_maintenance:is_being_drained_local_read(node()),
144-
MemberNodes = [Node || {_, Node} <- Members],
145-
Remove = MemberNodes -- ExpectedNodes,
146-
case Remove of
147-
[] ->
148-
add_member(Q, Running, MemberNodes, get_target_size(Q, TargetSize));
135+
maybe
136+
{ok, Members, {_, LeaderNode}} = ra:members(amqqueue:get_pid(Q)),
137+
%% Check if Leader is indeed this node
138+
LeaderNode ?= node(),
139+
%% And that this not is not in maintenance mode
140+
true ?= not rabbit_maintenance:is_being_drained_local_read(node()),
141+
MemberNodes = [Node || {_, Node} <- Members],
142+
Remove = MemberNodes -- ExpectedNodes,
143+
case Remove of
144+
[] ->
145+
maybe_add_member(Q, Running, MemberNodes, get_target_size(Q, TargetSize));
146+
_ ->
147+
remove_members(Q, Remove)
148+
end
149+
else
150+
{timeout, Reason} ->
151+
rabbit_log:debug("Find leader timeout: ~p", [Reason]),
152+
ok;
149153
_ ->
150-
remove_members(Q, Remove)
151-
end
152-
else
153-
_ ->
154-
noop
155-
end,
156-
eval_quorum_members(ExpectedNodes, Running, LocalLeaders, TargetSize,
157-
update_result(OldResult, Result)).
154+
noop
155+
end,
156+
reconclitiate_quorum_members(ExpectedNodes, Running, LocalLeaders, TargetSize,
157+
update_result(OldResult, Result)).
158158

159-
add_member(Q, Running, MemberNodes, TargetSize) ->
159+
maybe_add_member(Q, Running, MemberNodes, TargetSize) ->
160160
%% Filter out any new nodes under maintenance
161161
New = rabbit_maintenance:filter_out_drained_nodes_local_read(Running -- MemberNodes),
162162
case should_add_node(MemberNodes, New, TargetSize) of
@@ -191,13 +191,10 @@ should_add_node(MemberNodes, New, TargetSize) ->
191191
maybe
192192
true ?= NumberOfNewNodes > 0, %% There are new nodes to grow to
193193
true ?= CurrentSize < TargetSize, %% Target size not reached
194-
true ?= is_even(CurrentSize) orelse NumberOfNewNodes > 1, %% Enough nodes to grow to odd member size
194+
true ?= rabbit_misc:is_even(CurrentSize) orelse NumberOfNewNodes > 1, %% Enough nodes to grow to odd member size
195195
true ?= rabbit_nodes:is_running(lists:delete(node(), MemberNodes))
196196
end.
197197

198-
is_even(N) ->
199-
(N band 1) =:= 0.
200-
201198
get_target_size(Q, undefined) ->
202199
get_target_size(Q);
203200
get_target_size(Q, N) when N > 0 ->

deps/rabbit/test/member_evaluation_SUITE.erl renamed to deps/rabbit/test/quorum_queue_member_reconciliation_SUITE.erl

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44
%%
55
%% Copyright (c) 2018-2023 VMware, Inc. or its affiliates. All rights reserved.
66

7-
-module(member_evaluation_SUITE).
7+
8+
-module(quorum_queue_member_reconciliation_SUITE).
89

910
-include_lib("common_test/include/ct.hrl").
1011
-include_lib("eunit/include/eunit.hrl").
@@ -35,10 +36,10 @@ init_per_suite(Config0) ->
3536
rabbit_ct_helpers:log_environment(),
3637
Config1 = rabbit_ct_helpers:merge_app_env(
3738
Config0, {rabbit, [{quorum_tick_interval, 1000},
38-
{quorum_membership_changes_enabled, true},
39-
{quorum_membership_changes_default_timeout, 5000},
40-
{quorum_membership_changes_short_timeout, 2000},
41-
{quorum_membership_changes_target_group_size, 3}]}),
39+
{quorum_membership_reconciliation_enabled, true},
40+
{quorum_membership_reconciliation_default_timeout, 5000},
41+
{quorum_membership_reconciliation_short_timeout, 2000},
42+
{quorum_membership_reconciliation_target_group_size, 3}]}),
4243
rabbit_ct_helpers:run_setup_steps(Config1, []).
4344

4445
end_per_suite(Config) ->

deps/rabbit_common/src/rabbit_misc.erl

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,8 @@
8080
-export([find_child/2]).
8181
-export([is_regular_file/1]).
8282
-export([maps_any/2]).
83+
-export([is_even/1, is_odd/1]).
84+
8385

8486
%% Horrible macro to use in guards
8587
-define(IS_BENIGN_EXIT(R),
@@ -1470,3 +1472,10 @@ maps_any_1(Pred, {K, V, I}) ->
14701472
false ->
14711473
maps_any_1(Pred, maps:next(I))
14721474
end.
1475+
1476+
-spec is_even(integer()) -> boolean().
1477+
is_even(N) ->
1478+
(N band 1) =:= 0.
1479+
-spec is_odd(integer()) -> boolean().
1480+
is_odd(N) ->
1481+
(N band 1) =:= 1.

moduleindex.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -736,6 +736,7 @@ rabbit:
736736
- rabbit_queue_type_util
737737
- rabbit_quorum_memory_manager
738738
- rabbit_quorum_queue
739+
- rabbit_quorum_queue_periodic_membership_reconciliation
739740
- rabbit_ra_registry
740741
- rabbit_ra_systems
741742
- rabbit_reader

0 commit comments

Comments
 (0)