Skip to content

Commit 8aea4b8

Browse files
Merge pull request #1578 from rabbitmq/queue-not-promote-on-crash
Policy key to not promote unsynchronised queues.
2 parents 6429f80 + c3af3ff commit 8aea4b8

File tree

5 files changed

+160
-76
lines changed

5 files changed

+160
-76
lines changed

src/rabbit_mirror_queue_coordinator.erl

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,12 @@ handle_cast({gm_deaths, DeadGMPids},
365365
DeadPids),
366366
{stop, shutdown, State};
367367
{error, not_found} ->
368-
{stop, normal, State}
368+
{stop, normal, State};
369+
{error, {not_synced, _}} ->
370+
rabbit_log:error("Mirror queue ~p in unexpected state."
371+
" Promoted to master but already a master.",
372+
[QueueName]),
373+
error(unexpected_mirrored_state)
369374
end;
370375

371376
handle_cast(request_depth, State = #state { depth_fun = DepthFun,

src/rabbit_mirror_queue_master.erl

Lines changed: 1 addition & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -212,45 +212,7 @@ delete_and_terminate(Reason, State = #state { backing_queue = BQ,
212212

213213
stop_all_slaves(Reason, #state{name = QName, gm = GM, wait_timeout = WT}) ->
214214
{ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName),
215-
PidsMRefs = [{Pid, erlang:monitor(process, Pid)} || Pid <- [GM | SPids]],
216-
ok = gm:broadcast(GM, {delete_and_terminate, Reason}),
217-
%% It's possible that we could be partitioned from some slaves
218-
%% between the lookup and the broadcast, in which case we could
219-
%% monitor them but they would not have received the GM
220-
%% message. So only wait for slaves which are still
221-
%% not-partitioned.
222-
PendingSlavePids =
223-
lists:foldl(
224-
fun({Pid, MRef}, Acc) ->
225-
case rabbit_mnesia:on_running_node(Pid) of
226-
true ->
227-
receive
228-
{'DOWN', MRef, process, _Pid, _Info} ->
229-
Acc
230-
after WT ->
231-
rabbit_mirror_queue_misc:log_warning(
232-
QName, "Missing 'DOWN' message from ~p in"
233-
" node ~p~n", [Pid, node(Pid)]),
234-
[Pid | Acc]
235-
end;
236-
false ->
237-
Acc
238-
end
239-
end, [], PidsMRefs),
240-
%% Normally when we remove a slave another slave or master will
241-
%% notice and update Mnesia. But we just removed them all, and
242-
%% have stopped listening ourselves. So manually clean up.
243-
rabbit_misc:execute_mnesia_transaction(
244-
fun () ->
245-
[Q] = mnesia:read({rabbit_queue, QName}),
246-
rabbit_mirror_queue_misc:store_updated_slaves(
247-
Q #amqqueue { gm_pids = [], slave_pids = [],
248-
%% Restarted slaves on running nodes can
249-
%% ensure old incarnations are stopped using
250-
%% the pending slave pids.
251-
slave_pids_pending_shutdown = PendingSlavePids})
252-
end),
253-
ok = gm:forget_group(QName).
215+
rabbit_mirror_queue_misc:stop_all_slaves(Reason, SPids, QName, GM, WT).
254216

255217
purge(State = #state { gm = GM,
256218
backing_queue = BQ,

src/rabbit_mirror_queue_misc.erl

Lines changed: 93 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
update_mirrors/2, update_mirrors/1, validate_policy/1,
2525
maybe_auto_sync/1, maybe_drop_master_after_sync/1,
2626
sync_batch_size/1, log_info/3, log_warning/3]).
27+
-export([stop_all_slaves/5]).
2728

2829
-export([sync_queue/1, cancel_sync_queue/1]).
2930

@@ -47,6 +48,8 @@
4748
[policy_validator, <<"ha-sync-batch-size">>, ?MODULE]}},
4849
{mfa, {rabbit_registry, register,
4950
[policy_validator, <<"ha-promote-on-shutdown">>, ?MODULE]}},
51+
{mfa, {rabbit_registry, register,
52+
[policy_validator, <<"ha-promote-on-failure">>, ?MODULE]}},
5053
{requires, rabbit_registry},
5154
{enables, recovery}]}).
5255

@@ -85,6 +88,7 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
8588
[] -> {error, not_found};
8689
[Q = #amqqueue { pid = QPid,
8790
slave_pids = SPids,
91+
sync_slave_pids = SyncSPids,
8892
gm_pids = GMPids }] ->
8993
{DeadGM, AliveGM} = lists:partition(
9094
fun ({GM, _}) ->
@@ -104,35 +108,41 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
104108
{QPid, SPids};
105109
_ -> promote_slave(Alive)
106110
end,
107-
Extra =
108-
case {{QPid, SPids}, {QPid1, SPids1}} of
109-
{Same, Same} ->
110-
[];
111-
_ when QPid =:= QPid1 orelse QPid1 =:= Self ->
112-
%% Either master hasn't changed, so
113-
%% we're ok to update mnesia; or we have
114-
%% become the master. If gm altered,
115-
%% we have no choice but to proceed.
116-
Q1 = Q#amqqueue{pid = QPid1,
117-
slave_pids = SPids1,
118-
gm_pids = AliveGM},
119-
store_updated_slaves(Q1),
120-
%% If we add and remove nodes at the
121-
%% same time we might tell the old
122-
%% master we need to sync and then
123-
%% shut it down. So let's check if
124-
%% the new master needs to sync.
125-
maybe_auto_sync(Q1),
126-
slaves_to_start_on_failure(Q1, DeadGMPids);
127-
_ ->
128-
%% Master has changed, and we're not it.
129-
%% [1].
130-
Q1 = Q#amqqueue{slave_pids = Alive,
131-
gm_pids = AliveGM},
132-
store_updated_slaves(Q1),
133-
[]
134-
end,
135-
{ok, QPid1, DeadPids, Extra}
111+
DoNotPromote = SyncSPids =:= [] andalso
112+
rabbit_policy:get(<<"ha-promote-on-failure">>, Q) =:= <<"when-synced">>,
113+
case {{QPid, SPids}, {QPid1, SPids1}} of
114+
{Same, Same} ->
115+
{ok, QPid1, DeadPids, []};
116+
_ when QPid1 =/= QPid andalso QPid1 =:= Self andalso DoNotPromote =:= true ->
117+
%% We have been promoted to master
118+
%% but there are no synchronised mirrors
119+
%% hence this node is not synchronised either
120+
%% Bailing out.
121+
{error, {not_synced, SPids1}};
122+
_ when QPid =:= QPid1 orelse QPid1 =:= Self ->
123+
%% Either master hasn't changed, so
124+
%% we're ok to update mnesia; or we have
125+
%% become the master. If gm altered,
126+
%% we have no choice but to proceed.
127+
Q1 = Q#amqqueue{pid = QPid1,
128+
slave_pids = SPids1,
129+
gm_pids = AliveGM},
130+
store_updated_slaves(Q1),
131+
%% If we add and remove nodes at the
132+
%% same time we might tell the old
133+
%% master we need to sync and then
134+
%% shut it down. So let's check if
135+
%% the new master needs to sync.
136+
maybe_auto_sync(Q1),
137+
{ok, QPid1, DeadPids, slaves_to_start_on_failure(Q1, DeadGMPids)};
138+
_ ->
139+
%% Master has changed, and we're not it.
140+
%% [1].
141+
Q1 = Q#amqqueue{slave_pids = Alive,
142+
gm_pids = AliveGM},
143+
store_updated_slaves(Q1),
144+
{ok, QPid1, DeadPids, []}
145+
end
136146
end
137147
end).
138148
%% [1] We still update mnesia here in case the slave that is supposed
@@ -305,6 +315,44 @@ update_recoverable(SPids, RS) ->
305315
DelNodes = RunningNodes -- SNodes, %% i.e. running with no slave
306316
(RS -- DelNodes) ++ AddNodes.
307317

318+
stop_all_slaves(Reason, SPids, QName, GM, WaitTimeout) ->
319+
PidsMRefs = [{Pid, erlang:monitor(process, Pid)} || Pid <- [GM | SPids]],
320+
ok = gm:broadcast(GM, {delete_and_terminate, Reason}),
321+
%% It's possible that we could be partitioned from some slaves
322+
%% between the lookup and the broadcast, in which case we could
323+
%% monitor them but they would not have received the GM
324+
%% message. So only wait for slaves which are still
325+
%% not-partitioned.
326+
PendingSlavePids = lists:foldl(fun({Pid, MRef}, Acc) ->
327+
case rabbit_mnesia:on_running_node(Pid) of
328+
true ->
329+
receive
330+
{'DOWN', MRef, process, _Pid, _Info} ->
331+
Acc
332+
after WaitTimeout ->
333+
rabbit_mirror_queue_misc:log_warning(
334+
QName, "Missing 'DOWN' message from ~p in"
335+
" node ~p~n", [Pid, node(Pid)]),
336+
[Pid | Acc]
337+
end;
338+
false ->
339+
Acc
340+
end
341+
end, [], PidsMRefs),
342+
%% Normally when we remove a slave another slave or master will
343+
%% notice and update Mnesia. But we just removed them all, and
344+
%% have stopped listening ourselves. So manually clean up.
345+
rabbit_misc:execute_mnesia_transaction(fun () ->
346+
[Q] = mnesia:read({rabbit_queue, QName}),
347+
rabbit_mirror_queue_misc:store_updated_slaves(
348+
Q #amqqueue { gm_pids = [], slave_pids = [],
349+
%% Restarted slaves on running nodes can
350+
%% ensure old incarnations are stopped using
351+
%% the pending slave pids.
352+
slave_pids_pending_shutdown = PendingSlavePids})
353+
end),
354+
ok = gm:forget_group(QName).
355+
308356
%%----------------------------------------------------------------------------
309357

310358
promote_slave([SPid | SPids]) ->
@@ -478,10 +526,12 @@ validate_policy(KeyList) ->
478526
<<"ha-sync-batch-size">>, KeyList, none),
479527
PromoteOnShutdown = proplists:get_value(
480528
<<"ha-promote-on-shutdown">>, KeyList, none),
481-
case {Mode, Params, SyncMode, SyncBatchSize, PromoteOnShutdown} of
482-
{none, none, none, none, none} ->
529+
PromoteOnFailure = proplists:get_value(
530+
<<"ha-promote-on-failure">>, KeyList, none),
531+
case {Mode, Params, SyncMode, SyncBatchSize, PromoteOnShutdown, PromoteOnFailure} of
532+
{none, none, none, none, none, none} ->
483533
ok;
484-
{none, _, _, _, _} ->
534+
{none, _, _, _, _, _} ->
485535
{error, "ha-mode must be specified to specify ha-params, "
486536
"ha-sync-mode or ha-promote-on-shutdown", []};
487537
_ ->
@@ -490,7 +540,8 @@ validate_policy(KeyList) ->
490540
{Params, ha_params_validator(Mode)},
491541
{SyncMode, fun validate_sync_mode/1},
492542
{SyncBatchSize, fun validate_sync_batch_size/1},
493-
{PromoteOnShutdown, fun validate_pos/1}])
543+
{PromoteOnShutdown, fun validate_pos/1},
544+
{PromoteOnFailure, fun validate_pof/1}])
494545
end.
495546

496547
ha_params_validator(Mode) ->
@@ -532,3 +583,12 @@ validate_pos(PromoteOnShutdown) ->
532583
Mode -> {error, "ha-promote-on-shutdown must be "
533584
"\"always\" or \"when-synced\", got ~p", [Mode]}
534585
end.
586+
587+
validate_pof(PromoteOnShutdown) ->
588+
case PromoteOnShutdown of
589+
<<"always">> -> ok;
590+
<<"when-synced">> -> ok;
591+
none -> ok;
592+
Mode -> {error, "ha-promote-on-failure must be "
593+
"\"always\" or \"when-synced\", got ~p", [Mode]}
594+
end.

src/rabbit_mirror_queue_slave.erl

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,13 +223,21 @@ handle_call(go, _From, {not_started, Q} = NotStarted) ->
223223
end;
224224

225225
handle_call({gm_deaths, DeadGMPids}, From,
226-
State = #state { gm = GM, q = Q = #amqqueue {
227-
name = QName, pid = MPid }}) ->
226+
State = #state{ gm = GM,
227+
q = Q = #amqqueue{ name = QName, pid = MPid },
228+
backing_queue = BQ,
229+
backing_queue_state = BQS}) ->
228230
Self = self(),
229231
case rabbit_mirror_queue_misc:remove_from_queue(QName, Self, DeadGMPids) of
230232
{error, not_found} ->
231233
gen_server2:reply(From, ok),
232234
{stop, normal, State};
235+
{error, {not_synced, SPids}} ->
236+
WaitTimeout = rabbit_misc:get_env(rabbit, slave_wait_timeout, 15000),
237+
rabbit_mirror_queue_misc:stop_all_slaves(
238+
{error, not_synced}, SPids, QName, GM, WaitTimeout),
239+
BQ:delete_and_terminate({error, not_synced}, BQS),
240+
{stop, normal, State#state{backing_queue_state = undefined}};
233241
{ok, Pid, DeadPids, ExtraNodes} ->
234242
rabbit_mirror_queue_misc:report_deaths(Self, false, QName,
235243
DeadPids),

test/dynamic_ha_SUITE.erl

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ groups() ->
5959
vhost_deletion,
6060
force_delete_if_no_master,
6161
promote_on_shutdown,
62+
promote_on_failure,
6263
slave_recovers_after_vhost_failure,
6364
slave_recovers_after_vhost_down_an_up,
6465
master_migrates_on_vhost_down,
@@ -287,22 +288,61 @@ force_delete_if_no_master(Config) ->
287288
amqp_channel:call(BCh3, #'queue.delete'{queue = <<"ha.nopromote.test2">>}),
288289
ok.
289290

291+
promote_on_failure(Config) ->
292+
[A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
293+
rabbit_ct_broker_helpers:set_ha_policy(Config, A, <<"^ha.promote">>,
294+
<<"all">>, [{<<"ha-promote-on-failure">>, <<"always">>}]),
295+
rabbit_ct_broker_helpers:set_ha_policy(Config, A, <<"^ha.nopromote">>,
296+
<<"all">>, [{<<"ha-promote-on-failure">>, <<"when-synced">>}]),
297+
298+
ACh = rabbit_ct_client_helpers:open_channel(Config, A),
299+
[begin
300+
amqp_channel:call(ACh, #'queue.declare'{queue = Q,
301+
durable = true}),
302+
rabbit_ct_client_helpers:publish(ACh, Q, 10)
303+
end || Q <- [<<"ha.promote.test">>, <<"ha.nopromote.test">>]],
304+
ok = rabbit_ct_broker_helpers:restart_node(Config, B),
305+
ok = rabbit_ct_broker_helpers:kill_node(Config, A),
306+
BCh = rabbit_ct_client_helpers:open_channel(Config, B),
307+
#'queue.declare_ok'{message_count = 0} =
308+
amqp_channel:call(
309+
BCh, #'queue.declare'{queue = <<"ha.promote.test">>,
310+
durable = true}),
311+
?assertExit(
312+
{{shutdown, {server_initiated_close, 404, _}}, _},
313+
amqp_channel:call(
314+
BCh, #'queue.declare'{queue = <<"ha.nopromote.test">>,
315+
durable = true})),
316+
ok = rabbit_ct_broker_helpers:start_node(Config, A),
317+
ACh2 = rabbit_ct_client_helpers:open_channel(Config, A),
318+
#'queue.declare_ok'{message_count = 10} =
319+
amqp_channel:call(
320+
ACh2, #'queue.declare'{queue = <<"ha.nopromote.test">>,
321+
durable = true}),
322+
ok.
323+
290324
promote_on_shutdown(Config) ->
291325
[A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
292326
rabbit_ct_broker_helpers:set_ha_policy(Config, A, <<"^ha.promote">>,
293327
<<"all">>, [{<<"ha-promote-on-shutdown">>, <<"always">>}]),
294328
rabbit_ct_broker_helpers:set_ha_policy(Config, A, <<"^ha.nopromote">>,
295329
<<"all">>),
330+
rabbit_ct_broker_helpers:set_ha_policy(Config, A, <<"^ha.nopromoteonfailure">>,
331+
<<"all">>, [{<<"ha-promote-on-failure">>, <<"when-synced">>},
332+
{<<"ha-promote-on-shutdown">>, <<"always">>}]),
296333

297334
ACh = rabbit_ct_client_helpers:open_channel(Config, A),
298335
[begin
299336
amqp_channel:call(ACh, #'queue.declare'{queue = Q,
300337
durable = true}),
301338
rabbit_ct_client_helpers:publish(ACh, Q, 10)
302-
end || Q <- [<<"ha.promote.test">>, <<"ha.nopromote.test">>]],
339+
end || Q <- [<<"ha.promote.test">>,
340+
<<"ha.nopromote.test">>,
341+
<<"ha.nopromoteonfailure.test">>]],
303342
ok = rabbit_ct_broker_helpers:restart_node(Config, B),
304343
ok = rabbit_ct_broker_helpers:stop_node(Config, A),
305344
BCh = rabbit_ct_client_helpers:open_channel(Config, B),
345+
BCh1 = rabbit_ct_client_helpers:open_channel(Config, B),
306346
#'queue.declare_ok'{message_count = 0} =
307347
amqp_channel:call(
308348
BCh, #'queue.declare'{queue = <<"ha.promote.test">>,
@@ -312,12 +352,21 @@ promote_on_shutdown(Config) ->
312352
amqp_channel:call(
313353
BCh, #'queue.declare'{queue = <<"ha.nopromote.test">>,
314354
durable = true})),
355+
?assertExit(
356+
{{shutdown, {server_initiated_close, 404, _}}, _},
357+
amqp_channel:call(
358+
BCh1, #'queue.declare'{queue = <<"ha.nopromoteonfailure.test">>,
359+
durable = true})),
315360
ok = rabbit_ct_broker_helpers:start_node(Config, A),
316361
ACh2 = rabbit_ct_client_helpers:open_channel(Config, A),
317362
#'queue.declare_ok'{message_count = 10} =
318363
amqp_channel:call(
319364
ACh2, #'queue.declare'{queue = <<"ha.nopromote.test">>,
320365
durable = true}),
366+
#'queue.declare_ok'{message_count = 10} =
367+
amqp_channel:call(
368+
ACh2, #'queue.declare'{queue = <<"ha.nopromoteonfailure.test">>,
369+
durable = true}),
321370
ok.
322371

323372
nodes_policy_should_pick_master_from_its_params(Config) ->

0 commit comments

Comments
 (0)