Skip to content

Policy key to not promote unsynchronised queues. #1578

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 25, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/rabbit_mirror_queue_coordinator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,12 @@ handle_cast({gm_deaths, DeadGMPids},
DeadPids),
{stop, shutdown, State};
{error, not_found} ->
{stop, normal, State}
{stop, normal, State};
{error, {not_synced, _}} ->
rabbit_log:error("Mirror queue ~p in unexpected state."
" Promoted to master but already a master.",
[QueueName]),
error(unexpected_mirrored_state)
end;

handle_cast(request_depth, State = #state { depth_fun = DepthFun,
Expand Down
40 changes: 1 addition & 39 deletions src/rabbit_mirror_queue_master.erl
Original file line number Diff line number Diff line change
Expand Up @@ -212,45 +212,7 @@ delete_and_terminate(Reason, State = #state { backing_queue = BQ,

stop_all_slaves(Reason, #state{name = QName, gm = GM, wait_timeout = WT}) ->
{ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName),
PidsMRefs = [{Pid, erlang:monitor(process, Pid)} || Pid <- [GM | SPids]],
ok = gm:broadcast(GM, {delete_and_terminate, Reason}),
%% It's possible that we could be partitioned from some slaves
%% between the lookup and the broadcast, in which case we could
%% monitor them but they would not have received the GM
%% message. So only wait for slaves which are still
%% not-partitioned.
PendingSlavePids =
lists:foldl(
fun({Pid, MRef}, Acc) ->
case rabbit_mnesia:on_running_node(Pid) of
true ->
receive
{'DOWN', MRef, process, _Pid, _Info} ->
Acc
after WT ->
rabbit_mirror_queue_misc:log_warning(
QName, "Missing 'DOWN' message from ~p in"
" node ~p~n", [Pid, node(Pid)]),
[Pid | Acc]
end;
false ->
Acc
end
end, [], PidsMRefs),
%% Normally when we remove a slave another slave or master will
%% notice and update Mnesia. But we just removed them all, and
%% have stopped listening ourselves. So manually clean up.
rabbit_misc:execute_mnesia_transaction(
fun () ->
[Q] = mnesia:read({rabbit_queue, QName}),
rabbit_mirror_queue_misc:store_updated_slaves(
Q #amqqueue { gm_pids = [], slave_pids = [],
%% Restarted slaves on running nodes can
%% ensure old incarnations are stopped using
%% the pending slave pids.
slave_pids_pending_shutdown = PendingSlavePids})
end),
ok = gm:forget_group(QName).
rabbit_mirror_queue_misc:stop_all_slaves(Reason, SPids, QName, GM, WT).

purge(State = #state { gm = GM,
backing_queue = BQ,
Expand Down
126 changes: 93 additions & 33 deletions src/rabbit_mirror_queue_misc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
update_mirrors/2, update_mirrors/1, validate_policy/1,
maybe_auto_sync/1, maybe_drop_master_after_sync/1,
sync_batch_size/1, log_info/3, log_warning/3]).
-export([stop_all_slaves/5]).

-export([sync_queue/1, cancel_sync_queue/1]).

Expand All @@ -47,6 +48,8 @@
[policy_validator, <<"ha-sync-batch-size">>, ?MODULE]}},
{mfa, {rabbit_registry, register,
[policy_validator, <<"ha-promote-on-shutdown">>, ?MODULE]}},
{mfa, {rabbit_registry, register,
[policy_validator, <<"ha-promote-on-failure">>, ?MODULE]}},
{requires, rabbit_registry},
{enables, recovery}]}).

Expand Down Expand Up @@ -85,6 +88,7 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
[] -> {error, not_found};
[Q = #amqqueue { pid = QPid,
slave_pids = SPids,
sync_slave_pids = SyncSPids,
gm_pids = GMPids }] ->
{DeadGM, AliveGM} = lists:partition(
fun ({GM, _}) ->
Expand All @@ -104,35 +108,41 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
{QPid, SPids};
_ -> promote_slave(Alive)
end,
Extra =
case {{QPid, SPids}, {QPid1, SPids1}} of
{Same, Same} ->
[];
_ when QPid =:= QPid1 orelse QPid1 =:= Self ->
%% Either master hasn't changed, so
%% we're ok to update mnesia; or we have
%% become the master. If gm altered,
%% we have no choice but to proceed.
Q1 = Q#amqqueue{pid = QPid1,
slave_pids = SPids1,
gm_pids = AliveGM},
store_updated_slaves(Q1),
%% If we add and remove nodes at the
%% same time we might tell the old
%% master we need to sync and then
%% shut it down. So let's check if
%% the new master needs to sync.
maybe_auto_sync(Q1),
slaves_to_start_on_failure(Q1, DeadGMPids);
_ ->
%% Master has changed, and we're not it.
%% [1].
Q1 = Q#amqqueue{slave_pids = Alive,
gm_pids = AliveGM},
store_updated_slaves(Q1),
[]
end,
{ok, QPid1, DeadPids, Extra}
DoNotPromote = SyncSPids =:= [] andalso
rabbit_policy:get(<<"ha-promote-on-failure">>, Q) =:= <<"when-synced">>,
case {{QPid, SPids}, {QPid1, SPids1}} of
{Same, Same} ->
{ok, QPid1, DeadPids, []};
_ when QPid1 =/= QPid andalso QPid1 =:= Self andalso DoNotPromote =:= true ->
%% We have been promoted to master
%% but there are no synchronised mirrors
%% hence this node is not synchronised either
%% Bailing out.
{error, {not_synced, SPids1}};
_ when QPid =:= QPid1 orelse QPid1 =:= Self ->
%% Either master hasn't changed, so
%% we're ok to update mnesia; or we have
%% become the master. If gm altered,
%% we have no choice but to proceed.
Q1 = Q#amqqueue{pid = QPid1,
slave_pids = SPids1,
gm_pids = AliveGM},
store_updated_slaves(Q1),
%% If we add and remove nodes at the
%% same time we might tell the old
%% master we need to sync and then
%% shut it down. So let's check if
%% the new master needs to sync.
maybe_auto_sync(Q1),
{ok, QPid1, DeadPids, slaves_to_start_on_failure(Q1, DeadGMPids)};
_ ->
%% Master has changed, and we're not it.
%% [1].
Q1 = Q#amqqueue{slave_pids = Alive,
gm_pids = AliveGM},
store_updated_slaves(Q1),
{ok, QPid1, DeadPids, []}
end
end
end).
%% [1] We still update mnesia here in case the slave that is supposed
Expand Down Expand Up @@ -305,6 +315,44 @@ update_recoverable(SPids, RS) ->
DelNodes = RunningNodes -- SNodes, %% i.e. running with no slave
(RS -- DelNodes) ++ AddNodes.

stop_all_slaves(Reason, SPids, QName, GM, WaitTimeout) ->
PidsMRefs = [{Pid, erlang:monitor(process, Pid)} || Pid <- [GM | SPids]],
ok = gm:broadcast(GM, {delete_and_terminate, Reason}),
%% It's possible that we could be partitioned from some slaves
%% between the lookup and the broadcast, in which case we could
%% monitor them but they would not have received the GM
%% message. So only wait for slaves which are still
%% not-partitioned.
PendingSlavePids = lists:foldl(fun({Pid, MRef}, Acc) ->
case rabbit_mnesia:on_running_node(Pid) of
true ->
receive
{'DOWN', MRef, process, _Pid, _Info} ->
Acc
after WaitTimeout ->
rabbit_mirror_queue_misc:log_warning(
QName, "Missing 'DOWN' message from ~p in"
" node ~p~n", [Pid, node(Pid)]),
[Pid | Acc]
end;
false ->
Acc
end
end, [], PidsMRefs),
%% Normally when we remove a slave another slave or master will
%% notice and update Mnesia. But we just removed them all, and
%% have stopped listening ourselves. So manually clean up.
rabbit_misc:execute_mnesia_transaction(fun () ->
[Q] = mnesia:read({rabbit_queue, QName}),
rabbit_mirror_queue_misc:store_updated_slaves(
Q #amqqueue { gm_pids = [], slave_pids = [],
%% Restarted slaves on running nodes can
%% ensure old incarnations are stopped using
%% the pending slave pids.
slave_pids_pending_shutdown = PendingSlavePids})
end),
ok = gm:forget_group(QName).

%%----------------------------------------------------------------------------

promote_slave([SPid | SPids]) ->
Expand Down Expand Up @@ -478,10 +526,12 @@ validate_policy(KeyList) ->
<<"ha-sync-batch-size">>, KeyList, none),
PromoteOnShutdown = proplists:get_value(
<<"ha-promote-on-shutdown">>, KeyList, none),
case {Mode, Params, SyncMode, SyncBatchSize, PromoteOnShutdown} of
{none, none, none, none, none} ->
PromoteOnFailure = proplists:get_value(
<<"ha-promote-on-failure">>, KeyList, none),
case {Mode, Params, SyncMode, SyncBatchSize, PromoteOnShutdown, PromoteOnFailure} of
{none, none, none, none, none, none} ->
ok;
{none, _, _, _, _} ->
{none, _, _, _, _, _} ->
{error, "ha-mode must be specified to specify ha-params, "
"ha-sync-mode or ha-promote-on-shutdown", []};
_ ->
Expand All @@ -490,7 +540,8 @@ validate_policy(KeyList) ->
{Params, ha_params_validator(Mode)},
{SyncMode, fun validate_sync_mode/1},
{SyncBatchSize, fun validate_sync_batch_size/1},
{PromoteOnShutdown, fun validate_pos/1}])
{PromoteOnShutdown, fun validate_pos/1},
{PromoteOnFailure, fun validate_pof/1}])
end.

ha_params_validator(Mode) ->
Expand Down Expand Up @@ -532,3 +583,12 @@ validate_pos(PromoteOnShutdown) ->
Mode -> {error, "ha-promote-on-shutdown must be "
"\"always\" or \"when-synced\", got ~p", [Mode]}
end.

validate_pof(PromoteOnShutdown) ->
case PromoteOnShutdown of
<<"always">> -> ok;
<<"when-synced">> -> ok;
none -> ok;
Mode -> {error, "ha-promote-on-failure must be "
"\"always\" or \"when-synced\", got ~p", [Mode]}
end.
12 changes: 10 additions & 2 deletions src/rabbit_mirror_queue_slave.erl
Original file line number Diff line number Diff line change
Expand Up @@ -223,13 +223,21 @@ handle_call(go, _From, {not_started, Q} = NotStarted) ->
end;

handle_call({gm_deaths, DeadGMPids}, From,
State = #state { gm = GM, q = Q = #amqqueue {
name = QName, pid = MPid }}) ->
State = #state{ gm = GM,
q = Q = #amqqueue{ name = QName, pid = MPid },
backing_queue = BQ,
backing_queue_state = BQS}) ->
Self = self(),
case rabbit_mirror_queue_misc:remove_from_queue(QName, Self, DeadGMPids) of
{error, not_found} ->
gen_server2:reply(From, ok),
{stop, normal, State};
{error, {not_synced, SPids}} ->
WaitTimeout = rabbit_misc:get_env(rabbit, slave_wait_timeout, 15000),
rabbit_mirror_queue_misc:stop_all_slaves(
{error, not_synced}, SPids, QName, GM, WaitTimeout),
BQ:delete_and_terminate({error, not_synced}, BQS),
{stop, normal, State#state{backing_queue_state = undefined}};
{ok, Pid, DeadPids, ExtraNodes} ->
rabbit_mirror_queue_misc:report_deaths(Self, false, QName,
DeadPids),
Expand Down
51 changes: 50 additions & 1 deletion test/dynamic_ha_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ groups() ->
vhost_deletion,
force_delete_if_no_master,
promote_on_shutdown,
promote_on_failure,
slave_recovers_after_vhost_failure,
slave_recovers_after_vhost_down_an_up,
master_migrates_on_vhost_down,
Expand Down Expand Up @@ -287,22 +288,61 @@ force_delete_if_no_master(Config) ->
amqp_channel:call(BCh3, #'queue.delete'{queue = <<"ha.nopromote.test2">>}),
ok.

promote_on_failure(Config) ->
[A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
rabbit_ct_broker_helpers:set_ha_policy(Config, A, <<"^ha.promote">>,
<<"all">>, [{<<"ha-promote-on-failure">>, <<"always">>}]),
rabbit_ct_broker_helpers:set_ha_policy(Config, A, <<"^ha.nopromote">>,
<<"all">>, [{<<"ha-promote-on-failure">>, <<"when-synced">>}]),

ACh = rabbit_ct_client_helpers:open_channel(Config, A),
[begin
amqp_channel:call(ACh, #'queue.declare'{queue = Q,
durable = true}),
rabbit_ct_client_helpers:publish(ACh, Q, 10)
end || Q <- [<<"ha.promote.test">>, <<"ha.nopromote.test">>]],
ok = rabbit_ct_broker_helpers:restart_node(Config, B),
ok = rabbit_ct_broker_helpers:kill_node(Config, A),
BCh = rabbit_ct_client_helpers:open_channel(Config, B),
#'queue.declare_ok'{message_count = 0} =
amqp_channel:call(
BCh, #'queue.declare'{queue = <<"ha.promote.test">>,
durable = true}),
?assertExit(
{{shutdown, {server_initiated_close, 404, _}}, _},
amqp_channel:call(
BCh, #'queue.declare'{queue = <<"ha.nopromote.test">>,
durable = true})),
ok = rabbit_ct_broker_helpers:start_node(Config, A),
ACh2 = rabbit_ct_client_helpers:open_channel(Config, A),
#'queue.declare_ok'{message_count = 10} =
amqp_channel:call(
ACh2, #'queue.declare'{queue = <<"ha.nopromote.test">>,
durable = true}),
ok.

promote_on_shutdown(Config) ->
[A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
rabbit_ct_broker_helpers:set_ha_policy(Config, A, <<"^ha.promote">>,
<<"all">>, [{<<"ha-promote-on-shutdown">>, <<"always">>}]),
rabbit_ct_broker_helpers:set_ha_policy(Config, A, <<"^ha.nopromote">>,
<<"all">>),
rabbit_ct_broker_helpers:set_ha_policy(Config, A, <<"^ha.nopromoteonfailure">>,
<<"all">>, [{<<"ha-promote-on-failure">>, <<"when-synced">>},
{<<"ha-promote-on-shutdown">>, <<"always">>}]),

ACh = rabbit_ct_client_helpers:open_channel(Config, A),
[begin
amqp_channel:call(ACh, #'queue.declare'{queue = Q,
durable = true}),
rabbit_ct_client_helpers:publish(ACh, Q, 10)
end || Q <- [<<"ha.promote.test">>, <<"ha.nopromote.test">>]],
end || Q <- [<<"ha.promote.test">>,
<<"ha.nopromote.test">>,
<<"ha.nopromoteonfailure.test">>]],
ok = rabbit_ct_broker_helpers:restart_node(Config, B),
ok = rabbit_ct_broker_helpers:stop_node(Config, A),
BCh = rabbit_ct_client_helpers:open_channel(Config, B),
BCh1 = rabbit_ct_client_helpers:open_channel(Config, B),
#'queue.declare_ok'{message_count = 0} =
amqp_channel:call(
BCh, #'queue.declare'{queue = <<"ha.promote.test">>,
Expand All @@ -312,12 +352,21 @@ promote_on_shutdown(Config) ->
amqp_channel:call(
BCh, #'queue.declare'{queue = <<"ha.nopromote.test">>,
durable = true})),
?assertExit(
{{shutdown, {server_initiated_close, 404, _}}, _},
amqp_channel:call(
BCh1, #'queue.declare'{queue = <<"ha.nopromoteonfailure.test">>,
durable = true})),
ok = rabbit_ct_broker_helpers:start_node(Config, A),
ACh2 = rabbit_ct_client_helpers:open_channel(Config, A),
#'queue.declare_ok'{message_count = 10} =
amqp_channel:call(
ACh2, #'queue.declare'{queue = <<"ha.nopromote.test">>,
durable = true}),
#'queue.declare_ok'{message_count = 10} =
amqp_channel:call(
ACh2, #'queue.declare'{queue = <<"ha.nopromoteonfailure.test">>,
durable = true}),
ok.

nodes_policy_should_pick_master_from_its_params(Config) ->
Expand Down