Skip to content

Commit 1c81095

Browse files
Merge pull request #1372 from rabbitmq/rabbitmq-server-1371
Take ha-mode into account in choosing queue master
2 parents 879d183 + 6b1b2f2 commit 1c81095

6 files changed

+149
-33
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
.*.sw?
44
*.beam
55
*.coverdata
6+
MnesiaCore.*
67
/.erlang.mk/
78
/cover/
89
/debug/

src/rabbit_mirror_queue_misc.erl

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@
1919

2020
-export([remove_from_queue/3, on_node_up/0, add_mirrors/3,
2121
report_deaths/4, store_updated_slaves/1,
22-
initial_queue_node/2, suggested_queue_nodes/1,
23-
is_mirrored/1, update_mirrors/2, update_mirrors/1, validate_policy/1,
22+
initial_queue_node/2, suggested_queue_nodes/1, actual_queue_nodes/1,
23+
is_mirrored/1, is_mirrored_ha_nodes/1,
24+
update_mirrors/2, update_mirrors/1, validate_policy/1,
2425
maybe_auto_sync/1, maybe_drop_master_after_sync/1,
2526
sync_batch_size/1, log_info/3, log_warning/3]).
2627

@@ -31,6 +32,8 @@
3132

3233
-include("rabbit.hrl").
3334

35+
-define(HA_NODES_MODULE, rabbit_mirror_queue_mode_nodes).
36+
3437
-rabbit_boot_step(
3538
{?MODULE,
3639
[{description, "HA policy validation"},
@@ -355,6 +358,12 @@ is_mirrored(Q) ->
355358
_ -> false
356359
end.
357360

361+
is_mirrored_ha_nodes(Q) ->
362+
case module(Q) of
363+
{ok, ?HA_NODES_MODULE} -> true;
364+
_ -> false
365+
end.
366+
358367
actual_queue_nodes(#amqqueue{pid = MPid,
359368
slave_pids = SPids,
360369
sync_slave_pids = SSPids}) ->

src/rabbit_queue_location_min_masters.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ description() ->
3737
[{description,
3838
<<"Locate queue master node from cluster node with least bound queues">>}].
3939

40-
queue_master_location(#amqqueue{}) ->
41-
Cluster = rabbit_queue_master_location_misc:all_nodes(),
40+
queue_master_location(#amqqueue{} = Q) ->
41+
Cluster = rabbit_queue_master_location_misc:all_nodes(Q),
4242
VHosts = rabbit_vhost:list(),
4343
BoundQueueMasters = get_bound_queue_masters_per_vhost(VHosts, []),
4444
{_Count, MinMaster}= get_min_master(Cluster, BoundQueueMasters),

src/rabbit_queue_location_random.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ description() ->
3737
[{description,
3838
<<"Locate queue master node from cluster in a random manner">>}].
3939

40-
queue_master_location(#amqqueue{}) ->
41-
Cluster = rabbit_queue_master_location_misc:all_nodes(),
40+
queue_master_location(#amqqueue{} = Q) ->
41+
Cluster = rabbit_queue_master_location_misc:all_nodes(Q),
4242
RandomPos = erlang:phash2(time_compat:monotonic_time(), length(Cluster)),
4343
MasterNode = lists:nth(RandomPos + 1, Cluster),
4444
{ok, MasterNode}.

src/rabbit_queue_master_location_misc.erl

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
get_location_mod_by_config/1,
2525
get_location_mod_by_args/1,
2626
get_location_mod_by_policy/1,
27-
all_nodes/0]).
27+
all_nodes/1]).
2828

2929
lookup_master(QueueNameBin, VHostPath) when is_binary(QueueNameBin),
3030
is_binary(VHostPath) ->
@@ -92,4 +92,20 @@ get_location_mod_by_config(#amqqueue{}) ->
9292
_ -> {error, "queue_master_locator undefined"}
9393
end.
9494

95-
all_nodes() -> rabbit_mnesia:cluster_nodes(running).
95+
all_nodes(Queue = #amqqueue{}) ->
96+
handle_is_mirrored_ha_nodes(rabbit_mirror_queue_misc:is_mirrored_ha_nodes(Queue), Queue).
97+
98+
handle_is_mirrored_ha_nodes(false, _Queue) ->
99+
% Note: ha-mode is NOT 'nodes' - it is either exactly or all, which means
100+
% that any node in the cluster is eligible to be the new queue master node
101+
rabbit_nodes:all_running();
102+
handle_is_mirrored_ha_nodes(true, Queue) ->
103+
% Note: ha-mode is 'nodes', which explicitly specifies allowed nodes.
104+
% We must use suggested_queue_nodes to get that list of nodes as the
105+
% starting point for finding the queue master location
106+
handle_suggested_queue_nodes(rabbit_mirror_queue_misc:suggested_queue_nodes(Queue)).
107+
108+
handle_suggested_queue_nodes({_MNode, []}) ->
109+
rabbit_nodes:all_running();
110+
handle_suggested_queue_nodes({MNode, SNodes}) ->
111+
[MNode | SNodes].

test/queue_master_location_SUITE.erl

Lines changed: 115 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434

3535
-include_lib("common_test/include/ct.hrl").
3636
-include_lib("amqp_client/include/amqp_client.hrl").
37+
-include_lib("eunit/include/eunit.hrl").
3738

3839
-compile(export_all).
3940

@@ -50,6 +51,9 @@ groups() ->
5051
{cluster_size_3, [], [
5152
declare_args,
5253
declare_policy,
54+
declare_policy_nodes,
55+
declare_policy_all,
56+
declare_policy_exactly,
5357
declare_config,
5458
calculate_min_master,
5559
calculate_random,
@@ -111,7 +115,7 @@ end_per_testcase(Testcase, Config) ->
111115
declare_args(Config) ->
112116
setup_test_environment(Config),
113117
unset_location_config(Config),
114-
QueueName = rabbit_misc:r(<<"/">>, queue, Q= <<"qm.test">>),
118+
QueueName = rabbit_misc:r(<<"/">>, queue, Q = <<"qm.test">>),
115119
Args = [{<<"x-queue-master-locator">>, longstr, <<"min-masters">>}],
116120
declare(Config, QueueName, false, false, Args, none),
117121
verify_min_master(Config, Q).
@@ -120,14 +124,75 @@ declare_policy(Config) ->
120124
setup_test_environment(Config),
121125
unset_location_config(Config),
122126
set_location_policy(Config, ?POLICY, <<"min-masters">>),
123-
QueueName = rabbit_misc:r(<<"/">>, queue, Q= <<"qm.test">>),
127+
QueueName = rabbit_misc:r(<<"/">>, queue, Q = <<"qm.test">>),
124128
declare(Config, QueueName, false, false, _Args=[], none),
125129
verify_min_master(Config, Q).
126130

131+
declare_policy_nodes(Config) ->
132+
setup_test_environment(Config),
133+
unset_location_config(Config),
134+
% Note:
135+
% Node0 has 15 queues, Node1 has 8 and Node2 has 1
136+
Node0Name = rabbit_data_coercion:to_binary(
137+
rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename)),
138+
Node1 = rabbit_ct_broker_helpers:get_node_config(Config, 1, nodename),
139+
Node1Name = rabbit_data_coercion:to_binary(Node1),
140+
Nodes = [Node1Name, Node0Name],
141+
Policy = [{<<"queue-master-locator">>, <<"min-masters">>},
142+
{<<"ha-mode">>, <<"nodes">>},
143+
{<<"ha-params">>, Nodes}],
144+
ok = rabbit_ct_broker_helpers:set_policy(Config, 0, ?POLICY,
145+
<<".*">>, <<"queues">>, Policy),
146+
QueueName = rabbit_misc:r(<<"/">>, queue, Q = <<"qm.test">>),
147+
declare(Config, QueueName, false, false, _Args=[], none),
148+
verify_min_master(Config, Q, Node1).
149+
150+
declare_policy_all(Config) ->
151+
setup_test_environment(Config),
152+
unset_location_config(Config),
153+
% Note:
154+
% Node0 has 15 queues, Node1 has 8 and Node2 has 1
155+
Policy = [{<<"queue-master-locator">>, <<"min-masters">>},
156+
{<<"ha-mode">>, <<"all">>}],
157+
ok = rabbit_ct_broker_helpers:set_policy(Config, 0, ?POLICY,
158+
<<".*">>, <<"queues">>, Policy),
159+
QueueName = rabbit_misc:r(<<"/">>, queue, Q = <<"qm.test">>),
160+
declare(Config, QueueName, false, false, _Args=[], none),
161+
verify_min_master(Config, Q).
162+
163+
declare_policy_exactly(Config) ->
164+
setup_test_environment(Config),
165+
unset_location_config(Config),
166+
Policy = [{<<"queue-master-locator">>, <<"min-masters">>},
167+
{<<"ha-mode">>, <<"exactly">>},
168+
{<<"ha-params">>, 2}],
169+
ok = rabbit_ct_broker_helpers:set_policy(Config, 0, ?POLICY,
170+
<<".*">>, <<"queues">>, Policy),
171+
QueueRes = rabbit_misc:r(<<"/">>, queue, Q = <<"qm.test">>),
172+
declare(Config, QueueRes, false, false, _Args=[], none),
173+
174+
Node0 = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
175+
rabbit_ct_broker_helpers:control_action(sync_queue, Node0,
176+
[binary_to_list(Q)], [{"-p", "/"}]),
177+
wait_for_sync(Config, Node0, QueueRes, 1),
178+
179+
{ok, Queue} = rabbit_ct_broker_helpers:rpc(Config, Node0,
180+
rabbit_amqqueue, lookup, [QueueRes]),
181+
{MNode0, [SNode], [SSNode]} = rabbit_ct_broker_helpers:rpc(Config, Node0,
182+
rabbit_mirror_queue_misc,
183+
actual_queue_nodes, [Queue]),
184+
?assertEqual(SNode, SSNode),
185+
{ok, MNode1} = rabbit_ct_broker_helpers:rpc(Config, 0,
186+
rabbit_queue_master_location_misc,
187+
lookup_master, [Q, ?DEFAULT_VHOST_PATH]),
188+
?assertEqual(MNode0, MNode1),
189+
Node2 = rabbit_ct_broker_helpers:get_node_config(Config, 2, nodename),
190+
?assertEqual(MNode1, Node2).
191+
127192
declare_config(Config) ->
128193
setup_test_environment(Config),
129194
set_location_config(Config, <<"min-masters">>),
130-
QueueName = rabbit_misc:r(<<"/">>, queue, Q= <<"qm.test">>),
195+
QueueName = rabbit_misc:r(<<"/">>, queue, Q = <<"qm.test">>),
131196
declare(Config, QueueName, false, false, _Args=[], none),
132197
verify_min_master(Config, Q),
133198
unset_location_config(Config),
@@ -139,23 +204,23 @@ declare_config(Config) ->
139204

140205
calculate_min_master(Config) ->
141206
setup_test_environment(Config),
142-
QueueName = rabbit_misc:r(<<"/">>, queue, Q= <<"qm.test">>),
207+
QueueName = rabbit_misc:r(<<"/">>, queue, Q = <<"qm.test">>),
143208
Args = [{<<"x-queue-master-locator">>, longstr, <<"min-masters">>}],
144209
declare(Config, QueueName, false, false, Args, none),
145210
verify_min_master(Config, Q),
146211
ok.
147212

148213
calculate_random(Config) ->
149214
setup_test_environment(Config),
150-
QueueName = rabbit_misc:r(<<"/">>, queue, Q= <<"qm.test">>),
215+
QueueName = rabbit_misc:r(<<"/">>, queue, Q = <<"qm.test">>),
151216
Args = [{<<"x-queue-master-locator">>, longstr, <<"random">>}],
152217
declare(Config, QueueName, false, false, Args, none),
153218
verify_random(Config, Q),
154219
ok.
155220

156221
calculate_client_local(Config) ->
157222
setup_test_environment(Config),
158-
QueueName = rabbit_misc:r(<<"/">>, queue, Q= <<"qm.test">>),
223+
QueueName = rabbit_misc:r(<<"/">>, queue, Q = <<"qm.test">>),
159224
Args = [{<<"x-queue-master-locator">>, longstr, <<"client-local">>}],
160225
declare(Config, QueueName, false, false, Args, none),
161226
verify_client_local(Config, Q),
@@ -232,41 +297,66 @@ min_master_node(Config) ->
232297

233298
set_location_config(Config, Strategy) ->
234299
Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
235-
[ok = rpc:call(Node, application, set_env,
236-
[rabbit, queue_master_locator, Strategy]) || Node <- Nodes],
300+
[ok = rabbit_ct_broker_helpers:rpc(Config, Node,
301+
application, set_env,
302+
[rabbit, queue_master_locator, Strategy]) || Node <- Nodes],
237303
ok.
238304

239305
unset_location_config(Config) ->
240306
Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
241-
[ok = rpc:call(Node, application, unset_env,
242-
[rabbit, queue_master_locator]) || Node <- Nodes],
307+
[ok = rabbit_ct_broker_helpers:rpc(Config, Node,
308+
application, unset_env,
309+
[rabbit, queue_master_locator]) || Node <- Nodes],
243310
ok.
244311

245-
declare(Config, QueueName, Durable, AutoDelete, Args, Owner) ->
246-
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
247-
{new, Queue} = rpc:call(Node, rabbit_amqqueue, declare,
248-
[QueueName, Durable, AutoDelete, Args, Owner]),
312+
declare(Config, QueueName, Durable, AutoDelete, Args0, Owner) ->
313+
Args1 = [QueueName, Durable, AutoDelete, Args0, Owner],
314+
{new, Queue} = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, declare, Args1),
249315
Queue.
250316

317+
verify_min_master(Config, Q, MinMasterNode) ->
318+
Rpc = rabbit_ct_broker_helpers:rpc(Config, 0,
319+
rabbit_queue_master_location_misc,
320+
lookup_master, [Q, ?DEFAULT_VHOST_PATH]),
321+
?assertEqual({ok, MinMasterNode}, Rpc).
322+
251323
verify_min_master(Config, Q) ->
252-
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
253324
MinMaster = min_master_node(Config),
254-
ct:pal("Expecting min master ~p~n", [MinMaster]),
255-
{ok, MinMaster} = rpc:call(Node, rabbit_queue_master_location_misc,
256-
lookup_master, [Q, ?DEFAULT_VHOST_PATH]).
325+
verify_min_master(Config, Q, MinMaster).
257326

258327
verify_random(Config, Q) ->
259-
[Node | _] = Nodes = rabbit_ct_broker_helpers:get_node_configs(Config,
260-
nodename),
261-
{ok, Master} = rpc:call(Node, rabbit_queue_master_location_misc,
262-
lookup_master, [Q, ?DEFAULT_VHOST_PATH]),
263-
true = lists:member(Master, Nodes).
328+
[Node | _] = Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
329+
{ok, Master} = rabbit_ct_broker_helpers:rpc(Config, Node,
330+
rabbit_queue_master_location_misc,
331+
lookup_master, [Q, ?DEFAULT_VHOST_PATH]),
332+
?assert(lists:member(Master, Nodes)).
264333

265334
verify_client_local(Config, Q) ->
266335
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
267-
{ok, Node} = rpc:call(Node, rabbit_queue_master_location_misc,
268-
lookup_master, [Q, ?DEFAULT_VHOST_PATH]).
336+
Rpc = rabbit_ct_broker_helpers:rpc(Config, Node,
337+
rabbit_queue_master_location_misc,
338+
lookup_master, [Q, ?DEFAULT_VHOST_PATH]),
339+
?assertEqual({ok, Node}, Rpc).
269340

270341
set_location_policy(Config, Name, Strategy) ->
271342
ok = rabbit_ct_broker_helpers:set_policy(Config, 0,
272343
Name, <<".*">>, <<"queues">>, [{<<"queue-master-locator">>, Strategy}]).
344+
345+
wait_for_sync(Config, Nodename, Q, ExpectedSSPidLen) ->
346+
wait_for_sync(Config, Nodename, Q, ExpectedSSPidLen, 600).
347+
348+
wait_for_sync(_, _, _, _, 0) ->
349+
throw(sync_timeout);
350+
wait_for_sync(Config, Nodename, Q, ExpectedSSPidLen, N) ->
351+
case synced(Config, Nodename, Q, ExpectedSSPidLen) of
352+
true -> ok;
353+
false -> timer:sleep(100),
354+
wait_for_sync(Config, Nodename, Q, ExpectedSSPidLen, N-1)
355+
end.
356+
357+
synced(Config, Nodename, Q, ExpectedSSPidLen) ->
358+
Args = [<<"/">>, [name, synchronised_slave_pids]],
359+
Info = rabbit_ct_broker_helpers:rpc(Config, Nodename,
360+
rabbit_amqqueue, info_all, Args),
361+
[SSPids] = [Pids || [{name, Q1}, {synchronised_slave_pids, Pids}] <- Info, Q =:= Q1],
362+
length(SSPids) =:= ExpectedSSPidLen.

0 commit comments

Comments
 (0)