Skip to content

Commit ac09b6e

Browse files
Merge pull request #9869 from rabbitmq/cli-commands-for-mirroring-deprecation
CLI commands for mirroring deprecation
2 parents 5deacfc + 4e58ad9 commit ac09b6e

11 files changed

+727
-0
lines changed

deps/rabbit/src/rabbit_mirror_queue_misc.erl

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@
3232
-export([are_cmqs_permitted/0,
3333
are_cmqs_used/1]).
3434

35+
-export([list_policies_with_classic_queue_mirroring_for_cli/0,
36+
list_operator_policies_with_classic_queue_mirroring_for_cli/0]).
37+
-export([remove_classic_queue_mirroring_from_policies_for_cli/0]).
38+
3539
%% for testing only
3640
-export([module/1]).
3741

@@ -998,6 +1002,63 @@ has_ha_policies(Policies) ->
9981002
does_policy_configure_cmq(KeyList) ->
9991003
lists:keymember(<<"ha-mode">>, 1, KeyList).
10001004

1005+
list_policies_with_classic_queue_mirroring_for_cli() ->
1006+
Policies = rabbit_policy:list(),
1007+
lists:filter(
1008+
fun(Policy) ->
1009+
KeyList = proplists:get_value(definition, Policy),
1010+
does_policy_configure_cmq(KeyList)
1011+
end, Policies).
1012+
1013+
list_operator_policies_with_classic_queue_mirroring_for_cli() ->
1014+
Policies = rabbit_policy:list_op(),
1015+
lists:filter(
1016+
fun(Policy) ->
1017+
KeyList = proplists:get_value(definition, Policy),
1018+
does_policy_configure_cmq(KeyList)
1019+
end, Policies).
1020+
1021+
remove_classic_queue_mirroring_from_policies_for_cli() ->
1022+
rabbit_log_mirroring:warning("Removing all classic queue mirroring policies"),
1023+
Policies = rabbit_policy:list(),
1024+
remove_classic_queue_mirroring_policies(Policies, set, delete, "policy"),
1025+
OpPolicies = rabbit_policy:list_op(),
1026+
remove_classic_queue_mirroring_policies(OpPolicies, set_op, delete_op, "operator policy").
1027+
1028+
remove_classic_queue_mirroring_policies(Policies, SetFun, DeleteFun, LogMsg) ->
1029+
lists:foreach(
1030+
fun(Policy) ->
1031+
KeyList = proplists:get_value(definition, Policy),
1032+
case does_policy_configure_cmq(KeyList) of
1033+
true ->
1034+
remove_classic_queue_mirroring_policy(Policy, SetFun, DeleteFun, LogMsg);
1035+
false ->
1036+
ok
1037+
end
1038+
end, Policies).
1039+
1040+
remove_classic_queue_mirroring_policy(Policy, SetFun, DeleteFun, LogMsg) ->
1041+
Definition0 = proplists:get_value(definition, Policy),
1042+
HaPolicies = [<<"ha-mode">>, <<"ha-params">>, <<"ha-sync-mode">>,
1043+
<<"ha-sync-batch-size">>, <<"ha-promote-on-shutdown">>,
1044+
<<"ha-promote-on-failure">>],
1045+
Definition = lists:filter(fun({Key, _}) ->
1046+
not lists:member(Key, HaPolicies)
1047+
end, Definition0),
1048+
VHost = proplists:get_value(vhost, Policy),
1049+
Name = proplists:get_value(name, Policy),
1050+
case Definition of
1051+
[] ->
1052+
ok = rabbit_policy:DeleteFun(VHost, Name, ?INTERNAL_USER),
1053+
rabbit_log_mirroring:warning("Removed classic queue mirroring ~ts: ~ts", [LogMsg, Name]);
1054+
_ ->
1055+
Pattern = proplists:get_value(pattern, Policy),
1056+
Priority = proplists:get_value(priority, Policy),
1057+
ApplyTo = proplists:get_value('apply-to', Policy),
1058+
ok = rabbit_policy:SetFun(VHost, Name, Pattern, Definition, Priority, ApplyTo, ?INTERNAL_USER),
1059+
rabbit_log_mirroring:warning("Updated ~ts \"~ts\" to remove classic queue mirroring", [LogMsg, Name])
1060+
end.
1061+
10011062
validate_policy(KeyList) ->
10021063
Mode = proplists:get_value(<<"ha-mode">>, KeyList, none),
10031064
Params = proplists:get_value(<<"ha-params">>, KeyList, none),
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
## This Source Code Form is subject to the terms of the Mozilla Public
2+
## License, v. 2.0. If a copy of the MPL was not distributed with this
3+
## file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
##
5+
## Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
6+
7+
defmodule RabbitMQ.CLI.Ctl.Commands.RemoveClassicQueueMirroringFromPoliciesCommand do
8+
alias RabbitMQ.CLI.Core.DocGuide
9+
10+
@behaviour RabbitMQ.CLI.CommandBehaviour
11+
12+
use RabbitMQ.CLI.Core.MergesNoDefaults
13+
use RabbitMQ.CLI.Core.AcceptsDefaultSwitchesAndTimeout
14+
use RabbitMQ.CLI.Core.AcceptsNoPositionalArguments
15+
16+
def run([], %{node: node_name, timeout: timeout}) do
17+
:rabbit_misc.rpc_call(
18+
node_name,
19+
:rabbit_mirror_queue_misc,
20+
:remove_classic_queue_mirroring_from_policies_for_cli,
21+
[],
22+
timeout
23+
)
24+
end
25+
26+
use RabbitMQ.CLI.DefaultOutput
27+
28+
def usage, do: "remove_classic_queue_mirroring_from_policies"
29+
30+
def usage_doc_guides() do
31+
[
32+
DocGuide.mirroring()
33+
]
34+
end
35+
36+
def help_section(), do: :operations
37+
38+
def description,
39+
do: "Removes keys that enable classic queue mirroring from all regular and operator policies"
40+
41+
def banner([], %{}),
42+
do:
43+
"Will remove keys that enable classic queue mirroring from all regular and operator policies"
44+
end
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
## This Source Code Form is subject to the terms of the Mozilla Public
2+
## License, v. 2.0. If a copy of the MPL was not distributed with this
3+
## file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
##
5+
## Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
6+
7+
defmodule RabbitMQ.CLI.Diagnostics.Commands.CheckIfAnyDeprecatedFeaturesAreUsedCommand do
8+
alias RabbitMQ.CLI.Core.DocGuide
9+
10+
alias RabbitMQ.CLI.Diagnostics.Commands.{
11+
CheckIfClusterHasClassicQueueMirroringPolicyCommand
12+
}
13+
14+
@behaviour RabbitMQ.CLI.CommandBehaviour
15+
16+
def scopes(), do: [:ctl, :diagnostics]
17+
18+
use RabbitMQ.CLI.Core.AcceptsDefaultSwitchesAndTimeout
19+
use RabbitMQ.CLI.Core.MergesNoDefaults
20+
use RabbitMQ.CLI.Core.AcceptsNoPositionalArguments
21+
use RabbitMQ.CLI.Core.RequiresRabbitAppRunning
22+
23+
def run([], opts) do
24+
are_deprecated_features_used = %{
25+
:classic_queue_mirroring => is_used_classic_queue_mirroring(opts)
26+
}
27+
28+
deprecated_features_list =
29+
Enum.reduce(
30+
are_deprecated_features_used,
31+
[],
32+
fn
33+
{_feat, _result}, {:badrpc, _} = acc ->
34+
acc
35+
36+
{feat, result}, acc ->
37+
case result do
38+
{:badrpc, _} = err -> err
39+
{:error, _} = err -> err
40+
true -> [feat | acc]
41+
false -> acc
42+
end
43+
end
44+
)
45+
46+
# health checks return true if they pass
47+
case deprecated_features_list do
48+
{:badrpc, _} = err -> err
49+
{:error, _} = err -> err
50+
[] -> true
51+
xs when is_list(xs) -> {false, deprecated_features_list}
52+
end
53+
end
54+
55+
def is_used_classic_queue_mirroring(%{node: node_name, timeout: timeout}) do
56+
:rabbit_misc.rpc_call(
57+
node_name,
58+
:rabbit_mirror_queue_misc,
59+
:are_cmqs_used,
60+
[:none],
61+
timeout
62+
)
63+
end
64+
65+
def output(true, %{formatter: "json"}) do
66+
{:ok, %{"result" => "ok"}}
67+
end
68+
69+
def output(true, %{silent: true}) do
70+
{:ok, :check_passed}
71+
end
72+
73+
def output(true, %{}) do
74+
{:ok, "Cluster reported no deprecated features in use"}
75+
end
76+
77+
def output({false, deprecated_features_list}, %{formatter: "json"}) do
78+
{:error, :check_failed,
79+
%{
80+
"result" => "error",
81+
"deprecated_features" => deprecated_features_list,
82+
"message" => "Cluster reported deprecated features in use"
83+
}}
84+
end
85+
86+
def output({false, _deprecated_features_list}, %{silent: true}) do
87+
{:error, :check_failed}
88+
end
89+
90+
def output({false, deprecated_features_list}, _) do
91+
{:error, :check_failed, deprecated_features_list}
92+
end
93+
94+
use RabbitMQ.CLI.DefaultOutput
95+
96+
def usage, do: "check_if_any_deprecated_features_are_used"
97+
98+
def help_section(), do: :observability_and_health_checks
99+
100+
def description(),
101+
do: "Generate a report listing all deprecated features in use"
102+
103+
def banner(_, %{node: node_name}), do: "Checking if any deprecated features are used ..."
104+
105+
#
106+
# Implementation
107+
#
108+
109+
defp run_command(command, args, opts) do
110+
{args, opts} = command.merge_defaults(args, opts)
111+
banner = command.banner(args, opts)
112+
command_result = command.run(args, opts) |> command.output(opts)
113+
{command, banner, command_result}
114+
end
115+
end
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
## This Source Code Form is subject to the terms of the Mozilla Public
2+
## License, v. 2.0. If a copy of the MPL was not distributed with this
3+
## file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
##
5+
## Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
6+
7+
defmodule RabbitMQ.CLI.Diagnostics.Commands.CheckIfClusterHasClassicQueueMirroringPolicyCommand do
8+
@moduledoc """
9+
Exits with a non-zero code if there are policies enabling classic queue mirroring.
10+
11+
This command is meant to be used as a pre-upgrade (pre-shutdown) check before classic queue
12+
mirroring is removed.
13+
"""
14+
15+
@behaviour RabbitMQ.CLI.CommandBehaviour
16+
17+
import RabbitMQ.CLI.Core.Platform, only: [line_separator: 0]
18+
19+
def scopes(), do: [:diagnostics, :queues]
20+
21+
use RabbitMQ.CLI.Core.AcceptsDefaultSwitchesAndTimeout
22+
use RabbitMQ.CLI.Core.MergesNoDefaults
23+
use RabbitMQ.CLI.Core.AcceptsNoPositionalArguments
24+
use RabbitMQ.CLI.Core.RequiresRabbitAppRunning
25+
26+
def run([], %{node: node_name, timeout: timeout}) do
27+
policies =
28+
:rabbit_misc.rpc_call(
29+
node_name,
30+
:rabbit_mirror_queue_misc,
31+
:list_policies_with_classic_queue_mirroring_for_cli,
32+
[],
33+
timeout
34+
)
35+
36+
op_policies =
37+
:rabbit_misc.rpc_call(
38+
node_name,
39+
:rabbit_mirror_queue_misc,
40+
:list_operator_policies_with_classic_queue_mirroring_for_cli,
41+
[],
42+
timeout
43+
)
44+
45+
case {policies, op_policies} do
46+
{[], []} ->
47+
true
48+
49+
{_, _} when is_list(policies) and is_list(op_policies) ->
50+
{false, policies, op_policies}
51+
52+
{{:badrpc, _} = left, _} ->
53+
left
54+
55+
{_, {:badrpc, _} = right} ->
56+
right
57+
58+
other ->
59+
other
60+
end
61+
end
62+
63+
def output(true, %{formatter: "json"}) do
64+
{:ok, %{"result" => "ok"}}
65+
end
66+
67+
def output(true, %{silent: true}) do
68+
{:ok, :check_passed}
69+
end
70+
71+
def output(true, %{}) do
72+
{:ok, "Cluster reported no policies that enable classic queue mirroring"}
73+
end
74+
75+
def output({false, ps, op_ps}, %{formatter: "json"})
76+
when is_list(ps) and is_list(op_ps) do
77+
{:error, :check_failed,
78+
%{
79+
"result" => "error",
80+
"policies" => ps,
81+
"operator_policies" => op_ps,
82+
"message" => "Cluster reported policies enabling classic queue mirroring"
83+
}}
84+
end
85+
86+
def output({false, ps, op_ps}, %{silent: true}) when is_list(ps) and is_list(op_ps) do
87+
{:error, :check_failed}
88+
end
89+
90+
def output({false, ps, op_ps}, _) when is_list(ps) and is_list(op_ps) do
91+
lines = policy_lines(ps)
92+
op_lines = op_policy_lines(op_ps)
93+
94+
{:error, :check_failed, Enum.join(Enum.concat(lines, op_lines), line_separator())}
95+
end
96+
97+
use RabbitMQ.CLI.DefaultOutput
98+
99+
def help_section(), do: :observability_and_health_checks
100+
101+
def description() do
102+
"Health check that exits with a non-zero code if there are policies that enable classic queue mirroring"
103+
end
104+
105+
def usage, do: "check_if_cluster_has_classic_queue_mirroring_policy"
106+
107+
def banner([], _) do
108+
"Checking if cluster has any classic queue mirroring policy ..."
109+
end
110+
111+
#
112+
# Implementation
113+
#
114+
115+
def policy_lines(ps) do
116+
for p <- ps do
117+
"Policy #{p[:name]} enables classic queue mirroring"
118+
end
119+
end
120+
121+
def op_policy_lines(ps) do
122+
for p <- ps do
123+
"Operator policy #{p[:name]} enables classic queue mirroring"
124+
end
125+
end
126+
end

0 commit comments

Comments
 (0)