Skip to content

Commit 07e914a

Browse files
authored
Merge pull request #1817 from rabbitmq/ra_server_supervision
Update with Ra server supervision changes
2 parents 91d01e7 + 65e6979 commit 07e914a

File tree

3 files changed

+17
-14
lines changed

3 files changed

+17
-14
lines changed

src/rabbit_fifo.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -533,7 +533,8 @@ state_enter(leader, #state{consumers = Cons,
533533
Pids = lists:usort(maps:keys(Enqs) ++ [P || {_, P} <- maps:keys(Cons)]),
534534
Mons = [{monitor, process, P} || P <- Pids],
535535
Nots = [{send_msg, P, leader_change, ra_event} || P <- Pids],
536-
Effects = Mons ++ Nots,
536+
NodeMons = lists:usort([{monitor, node, node(P)} || P <- Pids]),
537+
Effects = Mons ++ Nots ++ NodeMons,
537538
case BLH of
538539
undefined ->
539540
Effects;

test/quorum_queue_SUITE.erl

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,7 @@ start_queue(Config) ->
340340
%% Check that the application and one ra node are up
341341
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
342342
rpc:call(Server, application, which_applications, []))),
343-
?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup])),
343+
?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])),
344344

345345
%% Test declare an existing queue
346346
?assertEqual({'queue.declare_ok', LQ, 0, 0},
@@ -356,7 +356,7 @@ start_queue(Config) ->
356356
%% Check that the application and process are still up
357357
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
358358
rpc:call(Server, application, which_applications, []))),
359-
?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup])).
359+
?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])).
360360

361361
start_queue_concurrent(Config) ->
362362
Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -417,13 +417,13 @@ stop_queue(Config) ->
417417
%% Check that the application and one ra node are up
418418
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
419419
rpc:call(Server, application, which_applications, []))),
420-
?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup])),
420+
?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])),
421421

422422
%% Delete the quorum queue
423423
?assertMatch(#'queue.delete_ok'{}, amqp_channel:call(Ch, #'queue.delete'{queue = LQ})),
424424
%% Check that the application and process are down
425425
wait_until(fun() ->
426-
[] == rpc:call(Server, supervisor, which_children, [ra_server_sup])
426+
[] == rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])
427427
end),
428428
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
429429
rpc:call(Server, application, which_applications, []))).
@@ -442,7 +442,7 @@ restart_queue(Config) ->
442442
%% Check that the application and one ra node are up
443443
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
444444
rpc:call(Server, application, which_applications, []))),
445-
?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup])).
445+
?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])).
446446

447447
idempotent_recover(Config) ->
448448
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
@@ -521,7 +521,7 @@ restart_all_types(Config) ->
521521
%% Check that the application and two ra nodes are up
522522
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
523523
rpc:call(Server, application, which_applications, []))),
524-
?assertMatch([_,_], rpc:call(Server, supervisor, which_children, [ra_server_sup])),
524+
?assertMatch([_,_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])),
525525
%% Check the classic queues restarted correctly
526526
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server),
527527
{#'basic.get_ok'{}, #amqp_msg{}} =
@@ -563,7 +563,7 @@ stop_start_rabbit_app(Config) ->
563563
%% Check that the application and two ra nodes are up
564564
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
565565
rpc:call(Server, application, which_applications, []))),
566-
?assertMatch([_,_], rpc:call(Server, supervisor, which_children, [ra_server_sup])),
566+
?assertMatch([_,_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])),
567567
%% Check the classic queues restarted correctly
568568
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server),
569569
{#'basic.get_ok'{}, #amqp_msg{}} =
@@ -1263,7 +1263,7 @@ cleanup_queue_state_on_channel_after_publish(Config) ->
12631263
amqp_channel:call(Ch1, #'queue.delete'{queue = QQ})),
12641264
wait_until(fun() ->
12651265
[] == rpc:call(Server, supervisor, which_children,
1266-
[ra_server_sup])
1266+
[ra_server_sup_sup])
12671267
end),
12681268
%% Check that all queue states have been cleaned
12691269
wait_for_cleanup(Server, NCh1, 0),
@@ -1300,7 +1300,7 @@ cleanup_queue_state_on_channel_after_subscribe(Config) ->
13001300
wait_for_cleanup(Server, NCh2, 1),
13011301
?assertMatch(#'queue.delete_ok'{}, amqp_channel:call(Ch1, #'queue.delete'{queue = QQ})),
13021302
wait_until(fun() ->
1303-
[] == rpc:call(Server, supervisor, which_children, [ra_server_sup])
1303+
[] == rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])
13041304
end),
13051305
%% Check that all queue states have been cleaned
13061306
wait_for_cleanup(Server, NCh1, 0),
@@ -1964,7 +1964,7 @@ delete_immediately_by_resource(Config) ->
19641964

19651965
%% Check that the application and process are down
19661966
wait_until(fun() ->
1967-
[] == rpc:call(Server, supervisor, which_children, [ra_server_sup])
1967+
[] == rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])
19681968
end),
19691969
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
19701970
rpc:call(Server, application, which_applications, []))).
@@ -2235,7 +2235,8 @@ wait_for_cleanup(Server, Channel, Number) ->
22352235
wait_for_cleanup(Server, Channel, Number, 60).
22362236

22372237
wait_for_cleanup(Server, Channel, Number, 0) ->
2238-
?assertEqual(Number, length(rpc:call(Server, rabbit_channel, list_queue_states, [Channel])));
2238+
?assertEqual(length(rpc:call(Server, rabbit_channel, list_queue_states, [Channel])),
2239+
Number);
22392240
wait_for_cleanup(Server, Channel, Number, N) ->
22402241
case length(rpc:call(Server, rabbit_channel, list_queue_states, [Channel])) of
22412242
Length when Number == Length ->
@@ -2261,7 +2262,8 @@ wait_for_messages(Servers, QName, Number, Fun, 0) ->
22612262
(_) ->
22622263
-1
22632264
end, Msgs),
2264-
?assertEqual(Totals, [Number || _ <- lists:seq(1, length(Servers))]);
2265+
?assertEqual([Number || _ <- lists:seq(1, length(Servers))],
2266+
Totals);
22652267
wait_for_messages(Servers, QName, Number, Fun, N) ->
22662268
Msgs = dirty_query(Servers, QName, Fun),
22672269
case lists:all(fun(M) when is_map(M) ->

test/rabbit_fifo_SUITE.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ init_per_testcase(TestCase, Config) ->
5555
meck:expect(rabbit_quorum_queue, update_metrics, fun (_, _) -> ok end),
5656
meck:expect(rabbit_quorum_queue, cancel_consumer_handler,
5757
fun (_, _) -> ok end),
58-
ra_server_sup:remove_all(),
58+
ra_server_sup_sup:remove_all(),
5959
ServerName2 = list_to_atom(atom_to_list(TestCase) ++ "2"),
6060
ServerName3 = list_to_atom(atom_to_list(TestCase) ++ "3"),
6161
ClusterName = rabbit_misc:r("/", queue, atom_to_binary(TestCase, utf8)),

0 commit comments

Comments
 (0)