Skip to content

Commit 4641e66

Browse files
Merge pull request #10426 from rabbitmq/amazon-mq-global-quorum-critical
New upgrade time QQ health check: add check_if_new_quorum_queue_replicas_have_finished_initial_sync by @illotum (plus a test)
2 parents 1855f0d + c4ae6f3 commit 4641e66

8 files changed

+206
-37
lines changed

deps/rabbit/src/amqqueue.erl

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@
8484
reset_mirroring_and_decorators/1,
8585
set_immutable/1,
8686
qnode/1,
87+
to_printable/1,
8788
macros/0]).
8889

8990
-define(record_version, amqqueue_v2).
@@ -641,6 +642,14 @@ qnode(none) ->
641642
qnode({_, Node}) ->
642643
Node.
643644

645+
-spec to_printable(amqqueue()) -> #{binary() => any()}.
646+
to_printable(#amqqueue{name = QName = #resource{name = Name},
647+
vhost = VHost, type = Type}) ->
648+
#{<<"readable_name">> => rabbit_data_coercion:to_binary(rabbit_misc:rs(QName)),
649+
<<"name">> => Name,
650+
<<"virtual_host">> => VHost,
651+
<<"type">> => Type}.
652+
644653
% private
645654

646655
macros() ->

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 35 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@
5555
file_handle_other_reservation/0]).
5656
-export([file_handle_release_reservation/0]).
5757
-export([list_with_minimum_quorum/0,
58-
filter_quorum_critical/1,
58+
list_with_local_promotable/0,
59+
list_with_local_promotable_for_cli/0,
5960
filter_quorum_critical/3,
6061
all_replica_states/0]).
6162
-export([capabilities/0]).
@@ -77,6 +78,10 @@
7778
-export([force_shrink_member_to_current_member/2,
7879
force_all_queues_shrink_member_to_current_member/0]).
7980

81+
-ifdef(TEST).
82+
-export([filter_promotable/2]).
83+
-endif.
84+
8085
-import(rabbit_queue_type_util, [args_policy_lookup/3,
8186
qname_to_internal_name/1,
8287
erpc_call/5]).
@@ -89,6 +94,8 @@
8994
-type qmsg() :: {rabbit_types:r('queue'), pid(), msg_id(), boolean(),
9095
mc:state()}.
9196
-type membership() :: voter | non_voter | promotable. %% see ra_membership() in Ra.
97+
-type replica_states() :: #{atom() => replica_state()}.
98+
-type replica_state() :: leader | follower | non_voter | promotable.
9299

93100
-define(RA_SYSTEM, quorum_queues).
94101
-define(RA_WAL_NAME, ra_log_wal).
@@ -405,35 +412,37 @@ all_replica_states() ->
405412

406413
-spec list_with_minimum_quorum() -> [amqqueue:amqqueue()].
407414
list_with_minimum_quorum() ->
408-
filter_quorum_critical(
409-
rabbit_amqqueue:list_local_quorum_queues()).
410-
411-
-spec filter_quorum_critical([amqqueue:amqqueue()]) -> [amqqueue:amqqueue()].
412-
filter_quorum_critical(Queues) ->
413-
%% Example map of QQ replica states:
414-
%% #{rabbit@warp10 =>
415-
%% #{'%2F_qq.636' => leader,'%2F_qq.243' => leader,
416-
%% '%2F_qq.1939' => leader,'%2F_qq.1150' => leader,
417-
%% '%2F_qq.1109' => leader,'%2F_qq.1654' => leader,
418-
%% '%2F_qq.1679' => leader,'%2F_qq.1003' => leader,
419-
%% '%2F_qq.1593' => leader,'%2F_qq.1765' => leader,
420-
%% '%2F_qq.933' => leader,'%2F_qq.38' => leader,
421-
%% '%2F_qq.1357' => leader,'%2F_qq.1345' => leader,
422-
%% '%2F_qq.1694' => leader,'%2F_qq.994' => leader,
423-
%% '%2F_qq.490' => leader,'%2F_qq.1704' => leader,
424-
%% '%2F_qq.58' => leader,'%2F_qq.564' => leader,
425-
%% '%2F_qq.683' => leader,'%2F_qq.386' => leader,
426-
%% '%2F_qq.753' => leader,'%2F_qq.6' => leader,
427-
%% '%2F_qq.1590' => leader,'%2F_qq.1363' => leader,
428-
%% '%2F_qq.882' => leader,'%2F_qq.1161' => leader,...}}
429-
ReplicaStates = maps:from_list(
430-
rabbit_misc:append_rpc_all_nodes(rabbit_nodes:list_running(),
431-
?MODULE, all_replica_states, [])),
415+
Queues = rabbit_amqqueue:list_local_quorum_queues(),
416+
ReplicaStates = get_replica_states(rabbit_nodes:list_running()),
432417
filter_quorum_critical(Queues, ReplicaStates, node()).
433418

434-
-spec filter_quorum_critical([amqqueue:amqqueue()], #{node() => #{atom() => atom()}}, node()) ->
419+
-spec list_with_local_promotable() -> [amqqueue:amqqueue()].
420+
list_with_local_promotable() ->
421+
Queues = rabbit_amqqueue:list_local_quorum_queues(),
422+
#{node() := ReplicaStates} = get_replica_states([node()]),
423+
filter_promotable(Queues, ReplicaStates).
424+
425+
-spec list_with_local_promotable_for_cli() -> [#{binary() => any()}].
426+
list_with_local_promotable_for_cli() ->
427+
Qs = list_with_local_promotable(),
428+
lists:map(fun amqqueue:to_printable/1, Qs).
429+
430+
-spec get_replica_states([node()]) -> #{node() => replica_states()}.
431+
get_replica_states(Nodes) ->
432+
maps:from_list(
433+
rabbit_misc:append_rpc_all_nodes(Nodes, ?MODULE, all_replica_states, [])).
434+
435+
-spec filter_promotable([amqqueue:amqqueue()], replica_states()) ->
435436
[amqqueue:amqqueue()].
437+
filter_promotable(Queues, ReplicaStates) ->
438+
lists:filter(fun (Q) ->
439+
{RaName, _Node} = amqqueue:get_pid(Q),
440+
State = maps:get(RaName, ReplicaStates),
441+
State == promotable
442+
end, Queues).
436443

444+
-spec filter_quorum_critical([amqqueue:amqqueue()], #{node() => replica_states()}, node()) ->
445+
[amqqueue:amqqueue()].
437446
filter_quorum_critical(Queues, ReplicaStates, Self) ->
438447
lists:filter(fun (Q) ->
439448
MemberNodes = get_nodes(Q),

deps/rabbit/src/rabbit_upgrade_preparation.erl

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -85,15 +85,7 @@ list_with_minimum_quorum_for_cli() ->
8585
EndangeredQueues = lists:append(
8686
rabbit_quorum_queue:list_with_minimum_quorum(),
8787
rabbit_stream_queue:list_with_minimum_quorum()),
88-
[begin
89-
#resource{name = Name} = QName = amqqueue:get_name(Q),
90-
#{
91-
<<"readable_name">> => rabbit_data_coercion:to_binary(rabbit_misc:rs(QName)),
92-
<<"name">> => Name,
93-
<<"virtual_host">> => amqqueue:get_vhost(Q),
94-
<<"type">> => amqqueue:get_type(Q)
95-
}
96-
end || Q <- EndangeredQueues] ++
88+
[amqqueue:to_printable(Q) || Q <- EndangeredQueues] ++
9789
[#{
9890
<<"readable_name">> => C,
9991
<<"name">> => C,

deps/rabbit/test/unit_quorum_queue_SUITE.erl

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
all() ->
66
[
77
all_replica_states_includes_nonvoters,
8+
filter_nonvoters,
89
filter_quorum_critical_accounts_nonvoters
910
].
1011

@@ -27,6 +28,24 @@ filter_quorum_critical_accounts_nonvoters(_Config) ->
2728
[Q1] = rabbit_quorum_queue:filter_quorum_critical(Qs, Ss, test@follower2),
2829
ok.
2930

31+
filter_nonvoters(_Config) ->
32+
Qs = [_, _, _, Q4] =
33+
[amqqueue:new(rabbit_misc:r(<<"/">>, queue, <<"q1">>),
34+
{q1, test@leader},
35+
false, false, none, [], undefined, #{}),
36+
amqqueue:new(rabbit_misc:r(<<"/">>, queue, <<"q2">>),
37+
{q2, test@leader},
38+
false, false, none, [], undefined, #{}),
39+
amqqueue:new(rabbit_misc:r(<<"/">>, queue, <<"q3">>),
40+
{q3, test@leader},
41+
false, false, none, [], undefined, #{}),
42+
amqqueue:new(rabbit_misc:r(<<"/">>, queue, <<"q4">>),
43+
{q4, test@leader},
44+
false, false, none, [], undefined, #{})],
45+
Ss = #{q1 => leader, q2 => follower, q3 => non_voter, q4 => promotable},
46+
[Q4] = rabbit_quorum_queue:filter_promotable(Qs, Ss),
47+
ok.
48+
3049
all_replica_states_includes_nonvoters(_Config) ->
3150
ets:new(ra_state, [named_table, public, {write_concurrency, true}]),
3251
ets:insert(ra_state, [
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
## This Source Code Form is subject to the terms of the Mozilla Public
2+
## License, v. 2.0. If a copy of the MPL was not distributed with this
3+
## file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
##
5+
## Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
7+
defmodule RabbitMQ.CLI.Queues.Commands.CheckIfNewQuorumQueueReplicasHaveFinishedInitialSyncCommand do
8+
@moduledoc """
9+
Exits with a non-zero code if there are quorum queues
10+
that run "non-voter" (not yet done with their initial sync, promotable to voters)
11+
replicas on the current node.
12+
13+
This command is used to verify if a new cluster node hosts only
14+
fully synchronized.
15+
"""
16+
17+
@behaviour RabbitMQ.CLI.CommandBehaviour
18+
19+
import RabbitMQ.CLI.Core.Platform, only: [line_separator: 0]
20+
21+
def scopes(), do: [:diagnostics, :queues]
22+
23+
use RabbitMQ.CLI.Core.AcceptsDefaultSwitchesAndTimeout
24+
use RabbitMQ.CLI.Core.MergesNoDefaults
25+
use RabbitMQ.CLI.Core.AcceptsNoPositionalArguments
26+
use RabbitMQ.CLI.Core.RequiresRabbitAppRunning
27+
28+
def run([], %{node: node_name, timeout: timeout}) do
29+
case :rabbit_misc.rpc_call(
30+
node_name,
31+
:rabbit_quorum_queue,
32+
:list_with_local_promotable_for_cli,
33+
[],
34+
timeout
35+
) do
36+
[] -> {:ok, []}
37+
qs when is_list(qs) -> {:ok, qs}
38+
other -> other
39+
end
40+
end
41+
42+
def output({:ok, []}, %{formatter: "json"}) do
43+
{:ok, %{"result" => "ok"}}
44+
end
45+
46+
def output({:ok, []}, %{silent: true}) do
47+
{:ok, :check_passed}
48+
end
49+
50+
def output({:ok, []}, %{node: node_name}) do
51+
{:ok, "Node #{node_name} reported no queues with promotable replicas"}
52+
end
53+
54+
def output({:ok, qs}, %{node: node_name, formatter: "json"}) when is_list(qs) do
55+
{:error, :check_failed,
56+
%{
57+
"result" => "error",
58+
"queues" => qs,
59+
"message" => "Node #{node_name} reported local queues promotable replicas"
60+
}}
61+
end
62+
63+
def output({:ok, qs}, %{silent: true}) when is_list(qs) do
64+
{:error, :check_failed}
65+
end
66+
67+
def output({:ok, qs}, %{node: node_name}) when is_list(qs) do
68+
lines = queue_lines(qs, node_name)
69+
70+
{:error, :check_failed, Enum.join(lines, line_separator())}
71+
end
72+
73+
use RabbitMQ.CLI.DefaultOutput
74+
75+
def help_section(), do: :observability_and_health_checks
76+
77+
def description() do
78+
"Health check that exits with a non-zero code if there are queues " <>
79+
"that run promotable replicas on the current node."
80+
end
81+
82+
def usage, do: "check_if_new_quorum_queue_replicas_have_finished_initial_sync"
83+
84+
def banner([], %{node: node_name}) do
85+
"Checking if node #{node_name} runs promotable replicas of any queues ..."
86+
end
87+
88+
#
89+
# Implementation
90+
#
91+
92+
def queue_lines(qs, node_name) do
93+
for q <- qs, do: "#{q["readable_name"]} hasn't finished synchronization with #{node_name}."
94+
end
95+
end

deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/check_if_node_is_mirror_sync_critical_command.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
## License, v. 2.0. If a copy of the MPL was not distributed with this
33
## file, You can obtain one at https://mozilla.org/MPL/2.0/.
44
##
5-
## Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
5+
## Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
66

77
defmodule RabbitMQ.CLI.Queues.Commands.CheckIfNodeIsMirrorSyncCriticalCommand do
88
@moduledoc """

deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/check_if_node_is_quorum_critical_command.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
## License, v. 2.0. If a copy of the MPL was not distributed with this
33
## file, You can obtain one at https://mozilla.org/MPL/2.0/.
44
##
5-
## Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
5+
## Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
66

77
defmodule RabbitMQ.CLI.Queues.Commands.CheckIfNodeIsQuorumCriticalCommand do
88
@moduledoc """
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
## This Source Code Form is subject to the terms of the Mozilla Public
2+
## License, v. 2.0. If a copy of the MPL was not distributed with this
3+
## file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
##
5+
## Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
7+
defmodule RabbitMQ.CLI.Queues.Commands.CheckIfNewQuorumQueueReplicasHaveFinishedInitialSyncCommandTest do
8+
use ExUnit.Case, async: false
9+
import TestHelper
10+
11+
@command RabbitMQ.CLI.Queues.Commands.CheckIfNewQuorumQueueReplicasHaveFinishedInitialSyncCommand
12+
13+
setup_all do
14+
RabbitMQ.CLI.Core.Distribution.start()
15+
16+
:ok
17+
end
18+
19+
setup context do
20+
{:ok,
21+
opts: %{
22+
node: get_rabbit_hostname(),
23+
timeout: context[:test_timeout] || 30000
24+
}}
25+
end
26+
27+
test "validate: accepts no positional arguments" do
28+
assert @command.validate([], %{}) == :ok
29+
end
30+
31+
test "validate: any positional arguments fail validation" do
32+
assert @command.validate(["quorum-queue-a"], %{}) == {:validation_failure, :too_many_args}
33+
34+
assert @command.validate(["quorum-queue-a", "two"], %{}) ==
35+
{:validation_failure, :too_many_args}
36+
37+
assert @command.validate(["quorum-queue-a", "two", "three"], %{}) ==
38+
{:validation_failure, :too_many_args}
39+
end
40+
41+
@tag test_timeout: 3000
42+
test "run: targeting an unreachable node throws a badrpc" do
43+
assert match?({:badrpc, _}, @command.run([], %{node: :jake@thedog, vhost: "/", timeout: 200}))
44+
end
45+
end

0 commit comments

Comments
 (0)