Skip to content

Commit 5deacfc

Browse files
Merge pull request #9896 from rabbitmq/opt-mgmt-queue-listings
Fix streams minority calculation
2 parents 4a4285a + 8d2c0a6 commit 5deacfc

File tree

4 files changed

+127
-15
lines changed

4 files changed

+127
-15
lines changed

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1676,7 +1676,7 @@ online(Q) when ?is_amqqueue(Q) ->
16761676

16771677
format(Q, Ctx) when ?is_amqqueue(Q) ->
16781678
%% TODO: this should really just be voters
1679-
Nodes = get_nodes(Q),
1679+
Nodes = lists:sort(get_nodes(Q)),
16801680
Running = case Ctx of
16811681
#{running_nodes := Running0} ->
16821682
Running0;

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ format(Q, Ctx) ->
230230
case amqqueue:get_pid(Q) of
231231
Pid when is_pid(Pid) ->
232232
LeaderNode = node(Pid),
233-
Nodes = get_nodes(Q),
233+
Nodes = lists:sort(get_nodes(Q)),
234234
Running = case Ctx of
235235
#{running_nodes := Running0} ->
236236
Running0;
@@ -1203,4 +1203,4 @@ get_nodes(Q) when ?is_amqqueue(Q) ->
12031203

12041204
is_minority(All, Up) ->
12051205
MinQuorum = length(All) div 2 + 1,
1206-
length(Up) =< MinQuorum.
1206+
length(Up) < MinQuorum.

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 65 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,11 @@
1212
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
1313

1414
-import(queue_utils, [wait_for_messages_ready/3,
15-
wait_for_messages_pending_ack/3,
16-
wait_for_messages_total/3,
17-
wait_for_messages/2,
18-
dirty_query/3,
19-
ra_name/1]).
15+
wait_for_messages_pending_ack/3,
16+
wait_for_messages_total/3,
17+
wait_for_messages/2,
18+
dirty_query/3,
19+
ra_name/1]).
2020

2121
-import(clustering_utils, [
2222
assert_cluster_status/2,
@@ -39,9 +39,10 @@ all() ->
3939

4040
groups() ->
4141
[
42-
{single_node, [], all_tests()
43-
++ memory_tests()
44-
++ [node_removal_is_quorum_critical]},
42+
{single_node, [], all_tests() ++
43+
memory_tests() ++
44+
[node_removal_is_quorum_critical,
45+
format]},
4546
{unclustered, [], [
4647
{uncluster_size_2, [], [add_member]}
4748
]},
@@ -85,7 +86,8 @@ groups() ->
8586
leader_locator_balanced_maintenance,
8687
leader_locator_balanced_random_maintenance,
8788
leader_locator_policy,
88-
status
89+
status,
90+
format
8991
]
9092
++ all_tests()},
9193
{cluster_size_5, [], [start_queue,
@@ -2758,6 +2760,60 @@ status(Config) ->
27582760
wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]),
27592761
ok.
27602762

2763+
format(Config) ->
2764+
%% tests rabbit_quorum_queue:format/2
2765+
Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
2766+
2767+
Server = hd(Nodes),
2768+
2769+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
2770+
Q = ?config(queue_name, Config),
2771+
?assertEqual({'queue.declare_ok', Q, 0, 0},
2772+
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
2773+
2774+
Vhost = ?config(rmq_vhost, Config),
2775+
QName = #resource{virtual_host = Vhost,
2776+
kind = queue,
2777+
name = Q},
2778+
{ok, QRecord} = rabbit_ct_broker_helpers:rpc(Config, Server,
2779+
rabbit_amqqueue,
2780+
lookup, [QName]),
2781+
%% restart the quorum
2782+
Fmt = rabbit_ct_broker_helpers:rpc(Config, Server, rabbit_quorum_queue,
2783+
?FUNCTION_NAME, [QRecord, #{}]),
2784+
2785+
%% test all up case
2786+
?assertEqual(quorum, proplists:get_value(type, Fmt)),
2787+
?assertEqual(running, proplists:get_value(state, Fmt)),
2788+
?assertEqual(Server, proplists:get_value(leader, Fmt)),
2789+
?assertEqual(Server, proplists:get_value(node, Fmt)),
2790+
?assertEqual(Nodes, proplists:get_value(online, Fmt)),
2791+
?assertEqual(Nodes, proplists:get_value(members, Fmt)),
2792+
2793+
case length(Nodes) of
2794+
3 ->
2795+
[_, Server2, Server3] = Nodes,
2796+
ok = rabbit_control_helper:command(stop_app, Server2),
2797+
ok = rabbit_control_helper:command(stop_app, Server3),
2798+
2799+
Fmt2 = rabbit_ct_broker_helpers:rpc(Config, Server, rabbit_quorum_queue,
2800+
?FUNCTION_NAME, [QRecord, #{}]),
2801+
ok = rabbit_control_helper:command(start_app, Server2),
2802+
ok = rabbit_control_helper:command(start_app, Server3),
2803+
?assertEqual(quorum, proplists:get_value(type, Fmt2)),
2804+
?assertEqual(minority, proplists:get_value(state, Fmt2)),
2805+
?assertEqual(Server, proplists:get_value(leader, Fmt2)),
2806+
?assertEqual(Server, proplists:get_value(node, Fmt2)),
2807+
?assertEqual([Server], proplists:get_value(online, Fmt2)),
2808+
?assertEqual(Nodes, proplists:get_value(members, Fmt2)),
2809+
ok;
2810+
1 ->
2811+
ok
2812+
end,
2813+
?assertMatch(#'queue.delete_ok'{},
2814+
amqp_channel:call(Ch, #'queue.delete'{queue = Q})),
2815+
ok.
2816+
27612817
peek_with_wrong_queue_type(Config) ->
27622818
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
27632819

deps/rabbit/test/rabbit_stream_queue_SUITE.erl

Lines changed: 59 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,10 @@ all() ->
4848

4949
groups() ->
5050
[
51-
{single_node, [], [restart_single_node, recover]},
51+
{single_node, [],
52+
[restart_single_node,
53+
recover,
54+
format]},
5255
{single_node_parallel_1, [parallel], all_tests_1()},
5356
{single_node_parallel_2, [parallel], all_tests_2()},
5457
{single_node_parallel_3, [parallel], all_tests_3()},
@@ -74,6 +77,7 @@ groups() ->
7477
select_nodes_with_least_replicas,
7578
recover_after_leader_and_coordinator_kill,
7679
restart_stream,
80+
format,
7781
rebalance
7882
]},
7983
{cluster_size_3_1, [], [shrink_coordinator_cluster]},
@@ -1423,6 +1427,60 @@ restart_stream(Config) ->
14231427
ok
14241428
end.
14251429

1430+
format(Config) ->
1431+
%% tests rabbit_stream_queue:format/2
1432+
Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
1433+
1434+
Server = hd(Nodes),
1435+
1436+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
1437+
Q = ?config(queue_name, Config),
1438+
?assertEqual({'queue.declare_ok', Q, 0, 0},
1439+
declare(Config, Server, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
1440+
1441+
publish_confirm(Ch, Q, [<<"msg">>]),
1442+
Vhost = ?config(rmq_vhost, Config),
1443+
QName = #resource{virtual_host = Vhost,
1444+
kind = queue,
1445+
name = Q},
1446+
{ok, QRecord} = rabbit_ct_broker_helpers:rpc(Config, Server,
1447+
rabbit_amqqueue,
1448+
lookup, [QName]),
1449+
%% restart the stream
1450+
Fmt = rabbit_ct_broker_helpers:rpc(Config, Server, rabbit_stream_queue,
1451+
?FUNCTION_NAME, [QRecord, #{}]),
1452+
1453+
%% test all up case
1454+
?assertEqual(stream, proplists:get_value(type, Fmt)),
1455+
?assertEqual(running, proplists:get_value(state, Fmt)),
1456+
?assertEqual(Server, proplists:get_value(leader, Fmt)),
1457+
?assertEqual(Server, proplists:get_value(node, Fmt)),
1458+
?assertEqual(Nodes, proplists:get_value(online, Fmt)),
1459+
?assertEqual(Nodes, proplists:get_value(members, Fmt)),
1460+
1461+
case length(Nodes) of
1462+
3 ->
1463+
[_, Server2, Server3] = Nodes,
1464+
ok = rabbit_control_helper:command(stop_app, Server2),
1465+
ok = rabbit_control_helper:command(stop_app, Server3),
1466+
1467+
Fmt2 = rabbit_ct_broker_helpers:rpc(Config, Server, rabbit_stream_queue,
1468+
?FUNCTION_NAME, [QRecord, #{}]),
1469+
ok = rabbit_control_helper:command(start_app, Server2),
1470+
ok = rabbit_control_helper:command(start_app, Server3),
1471+
?assertEqual(stream, proplists:get_value(type, Fmt2)),
1472+
?assertEqual(minority, proplists:get_value(state, Fmt2)),
1473+
?assertEqual(Server, proplists:get_value(leader, Fmt2)),
1474+
?assertEqual(Server, proplists:get_value(node, Fmt2)),
1475+
?assertEqual([Server], proplists:get_value(online, Fmt2)),
1476+
?assertEqual(Nodes, proplists:get_value(members, Fmt2)),
1477+
ok;
1478+
1 ->
1479+
ok
1480+
end,
1481+
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]),
1482+
ok.
1483+
14261484

14271485
consume_from_last(Config) ->
14281486
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -1924,11 +1982,9 @@ leader_failover_dedupe(Config) ->
19241982
ok = rabbit_ct_broker_helpers:stop_node(Config, DownNode),
19251983
%% this should cause a new leader to be elected and the channel on node 2
19261984
%% to have to resend any pending messages to ensure none is lost
1927-
ct:pal("preinfo", []),
19281985
rabbit_ct_helpers:await_condition(
19291986
fun() ->
19301987
Info = find_queue_info(Config, PubNode, [leader, members]),
1931-
ct:pal("info ~tp", [Info]),
19321988
NewLeader = proplists:get_value(leader, Info),
19331989
NewLeader =/= DownNode
19341990
end),

0 commit comments

Comments
 (0)