Skip to content

Commit a224862

Browse files
committed
See #7323. Oper policy for ha-mode and ha-params
1 parent 8c8ff7e commit a224862

File tree

2 files changed

+127
-0
lines changed

2 files changed

+127
-0
lines changed

deps/rabbit/src/rabbit_mirror_queue_misc.erl

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
-module(rabbit_mirror_queue_misc).
99
-behaviour(rabbit_policy_validator).
10+
-behaviour(rabbit_policy_merge_strategy).
1011

1112
-include("amqqueue.hrl").
1213

@@ -15,6 +16,7 @@
1516
initial_queue_node/2, suggested_queue_nodes/1, actual_queue_nodes/1,
1617
is_mirrored/1, is_mirrored_ha_nodes/1,
1718
update_mirrors/2, update_mirrors/1, validate_policy/1,
19+
merge_policy_value/3,
1820
maybe_auto_sync/1, maybe_drop_master_after_sync/1,
1921
sync_batch_size/1, default_max_sync_throughput/0,
2022
log_info/3, log_warning/3]).
@@ -46,6 +48,14 @@
4648
[policy_validator, <<"ha-promote-on-shutdown">>, ?MODULE]}},
4749
{mfa, {rabbit_registry, register,
4850
[policy_validator, <<"ha-promote-on-failure">>, ?MODULE]}},
51+
{mfa, {rabbit_registry, register,
52+
[operator_policy_validator, <<"ha-mode">>, ?MODULE]}},
53+
{mfa, {rabbit_registry, register,
54+
[operator_policy_validator, <<"ha-params">>, ?MODULE]}},
55+
{mfa, {rabbit_registry, register,
56+
[policy_merge_strategy, <<"ha-mode">>, ?MODULE]}},
57+
{mfa, {rabbit_registry, register,
58+
[policy_merge_strategy, <<"ha-params">>, ?MODULE]}},
4959
{requires, rabbit_registry},
5060
{enables, recovery}]}).
5161

@@ -788,3 +798,36 @@ validate_pof(PromoteOnShutdown) ->
788798
Mode -> {error, "ha-promote-on-failure must be "
789799
"\"always\" or \"when-synced\", got ~tp", [Mode]}
790800
end.
801+
802+
merge_policy_value(<<"ha-mode">>, Val, Val) ->
803+
Val;
804+
merge_policy_value(<<"ha-mode">>, <<"all">> = Val, _OpVal) ->
805+
Val;
806+
merge_policy_value(<<"ha-mode">>, _Val, <<"all">> = OpVal) ->
807+
OpVal;
808+
merge_policy_value(<<"ha-mode">>, <<"exactly">> = Val, _OpVal) ->
809+
Val;
810+
merge_policy_value(<<"ha-mode">>, _Val, <<"exactly">> = OpVal) ->
811+
OpVal;
812+
%% Both values are integers, both are ha-mode 'exactly'
813+
merge_policy_value(<<"ha-params">>, Val, OpVal) when is_integer(Val)
814+
andalso
815+
is_integer(OpVal)->
816+
if Val > OpVal ->
817+
Val;
818+
true ->
819+
OpVal
820+
end;
821+
%% The integer values is of ha-mode 'exactly', the other is a list and of
822+
%% ha-mode 'nodes'. 'exactly' takes precedence
823+
merge_policy_value(<<"ha-params">>, Val, _OpVal) when is_integer(Val) ->
824+
Val;
825+
merge_policy_value(<<"ha-params">>, _Val, OpVal) when is_integer(OpVal) ->
826+
OpVal;
827+
%% Both values are lists, of ha-mode 'nodes', max length takes precedence.
828+
merge_policy_value(<<"ha-params">>, Val, OpVal) ->
829+
if length(Val) > length(OpVal) ->
830+
Val;
831+
true ->
832+
OpVal
833+
end.

deps/rabbit/test/policy_SUITE.erl

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
-include_lib("common_test/include/ct.hrl").
1111
-include_lib("amqp_client/include/amqp_client.hrl").
1212

13+
1314
-compile(export_all).
1415

1516
all() ->
@@ -20,6 +21,7 @@ all() ->
2021
groups() ->
2122
[
2223
{cluster_size_2, [], [
24+
target_count_policy,
2325
policy_ttl,
2426
operator_policy_ttl,
2527
operator_retroactive_policy_ttl,
@@ -149,13 +151,72 @@ operator_retroactive_policy_publish_ttl(Config) ->
149151
rabbit_ct_client_helpers:close_connection(Conn),
150152
passed.
151153

154+
target_count_policy(Config) ->
155+
[Server | _] = Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
156+
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
157+
QName = <<"policy_ha">>,
158+
declare(Ch, QName),
159+
BNodes = [atom_to_binary(N) || N <- Nodes],
160+
161+
AllPolicy = [{<<"ha-mode">>, <<"all">>}],
162+
ExactlyPolicyOne = [{<<"ha-mode">>, <<"exactly">>},
163+
{<<"ha-params">>, 1}],
164+
ExactlyPolicyTwo = [{<<"ha-mode">>, <<"exactly">>},
165+
{<<"ha-params">>, 2}],
166+
NodesPolicyAll = [{<<"ha-mode">>, <<"nodes">>},
167+
{<<"ha-params">>, BNodes}],
168+
NodesPolicyOne = [{<<"ha-mode">>, <<"nodes">>},
169+
{<<"ha-params">>, [hd(BNodes)]}],
170+
171+
%% ALL has precedence
172+
Opts = #{config => Config,
173+
server => Server,
174+
qname => QName},
175+
176+
verify_policies(AllPolicy, ExactlyPolicyTwo, [{<<"ha-mode">>, <<"all">>}], Opts),
177+
178+
verify_policies(ExactlyPolicyTwo, AllPolicy, [{<<"ha-mode">>, <<"all">>}], Opts),
179+
180+
verify_policies(AllPolicy, NodesPolicyAll, [{<<"ha-mode">>, <<"all">>}], Opts),
181+
182+
verify_policies(NodesPolicyAll, AllPolicy, [{<<"ha-mode">>, <<"all">>}], Opts),
183+
184+
%% exactly has precedence over nodes
185+
verify_policies(ExactlyPolicyTwo, NodesPolicyAll,[{<<"ha-mode">>, <<"exactly">>}, {<<"ha-params">>, 2}], Opts),
186+
187+
verify_policies(NodesPolicyAll, ExactlyPolicyTwo, [{<<"ha-mode">>, <<"exactly">>}, {<<"ha-params">>, 2}], Opts),
188+
189+
%% Highest exactly value has precedence
190+
verify_policies(ExactlyPolicyTwo, ExactlyPolicyOne, [{<<"ha-mode">>, <<"exactly">>}, {<<"ha-params">>, 2}], Opts),
191+
192+
verify_policies(ExactlyPolicyOne, ExactlyPolicyTwo, [{<<"ha-mode">>, <<"exactly">>}, {<<"ha-params">>, 2}], Opts),
193+
194+
%% Longest node count has precedence
195+
SortedNodes = lists:sort(BNodes),
196+
verify_policies(NodesPolicyAll, NodesPolicyOne, [{<<"ha-mode">>, <<"nodes">>}, {<<"ha-params">>, SortedNodes}], Opts),
197+
verify_policies(NodesPolicyOne, NodesPolicyAll, [{<<"ha-mode">>, <<"nodes">>}, {<<"ha-params">>, SortedNodes}], Opts),
198+
199+
delete(Ch, QName),
200+
rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"policy">>),
201+
rabbit_ct_broker_helpers:clear_operator_policy(Config, 0, <<"op_policy">>),
202+
rabbit_ct_client_helpers:close_channel(Ch),
203+
rabbit_ct_client_helpers:close_connection(Conn),
204+
passed.
205+
206+
152207
%%----------------------------------------------------------------------------
153208

154209

155210
declare(Ch, Q) ->
156211
amqp_channel:call(Ch, #'queue.declare'{queue = Q,
157212
durable = true}).
158213

214+
declare(Ch, Q, Args) ->
215+
amqp_channel:call(Ch, #'queue.declare'{queue = Q,
216+
durable = true,
217+
auto_delete = false,
218+
arguments = Args}).
219+
159220
delete(Ch, Q) ->
160221
amqp_channel:call(Ch, #'queue.delete'{queue = Q}).
161222

@@ -201,4 +262,27 @@ get_messages(Number, Ch, Q) ->
201262
exit(failed)
202263
end.
203264

265+
check_policy_value(Server, QName, Value) ->
266+
{ok, Q} = rpc:call(Server, rabbit_amqqueue, lookup, [rabbit_misc:r(<<"/">>, queue, QName)]),
267+
proplists:get_value(Value, rpc:call(Server, rabbit_policy, effective_definition, [Q])).
268+
269+
verify_policies(Policy, OperPolicy, VerifyFuns, #{config := Config,
270+
server := Server,
271+
qname := QName}) ->
272+
rabbit_ct_broker_helpers:set_policy(Config, 0, <<"policy">>,
273+
<<"policy_ha">>, <<"queues">>,
274+
Policy),
275+
rabbit_ct_broker_helpers:set_operator_policy(Config, 0, <<"op_policy">>,
276+
<<"policy_ha">>, <<"queues">>,
277+
OperPolicy),
278+
verify_policy(VerifyFuns, Server, QName).
279+
280+
verify_policy([], _, _) ->
281+
ok;
282+
verify_policy([{HA, Expect} | Tail], Server, QName) ->
283+
ct:print(">>> Expect: ~p >>> actual ~p",[Expect, check_policy_value(Server, QName, HA)]),
284+
Expect = check_policy_value(Server, QName, HA),
285+
verify_policy(Tail, Server, QName).
286+
287+
204288
%%----------------------------------------------------------------------------

0 commit comments

Comments
 (0)