Skip to content

Commit 3f1e913

Browse files
Merge branch 'main' into rin/add-looking_glass
2 parents 114df37 + e5d9071 commit 3f1e913

File tree

10 files changed

+412
-7
lines changed

10 files changed

+412
-7
lines changed

deps/rabbit/BUILD.bazel

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ _APP_ENV = """[
8585
]},
8686
{halt_on_upgrade_failure, true},
8787
{ssl_apps, [asn1, crypto, public_key, ssl]},
88+
%% classic queue storage implementation version
89+
{classic_queue_default_version, 2},
8890
%% see rabbitmq-server#114
8991
{mirroring_flow_control, true},
9092
{mirroring_sync_batch_size, 4096},
@@ -336,6 +338,13 @@ rabbitmq_integration_suite(
336338
sharding_method = "case",
337339
)
338340

341+
rabbitmq_integration_suite(
342+
name = "clustering_recovery_SUITE",
343+
size = "medium",
344+
shard_count = 2,
345+
sharding_method = "case",
346+
)
347+
339348
rabbitmq_integration_suite(
340349
name = "config_schema_SUITE",
341350
size = "medium",

deps/rabbit/Makefile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ define PROJECT_ENV
6565
]},
6666
{halt_on_upgrade_failure, true},
6767
{ssl_apps, [asn1, crypto, public_key, ssl]},
68+
%% classic queue storage implementation version
69+
{classic_queue_default_version, 2},
6870
%% see rabbitmq-server#114
6971
{mirroring_flow_control, true},
7072
{mirroring_sync_batch_size, 4096},

deps/rabbit/app.bzl

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -813,6 +813,15 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
813813
erlc_opts = "//:test_erlc_opts",
814814
deps = ["//deps/amqp_client:erlang_app"],
815815
)
816+
erlang_bytecode(
817+
name = "clustering_recovery_SUITE_beam_files",
818+
testonly = True,
819+
srcs = ["test/clustering_recovery_SUITE.erl"],
820+
outs = ["test/clustering_recovery_SUITE.beam"],
821+
app_name = "rabbit",
822+
erlc_opts = "//:test_erlc_opts",
823+
deps = ["//deps/amqp_client:erlang_app"],
824+
)
816825
erlang_bytecode(
817826
name = "config_schema_SUITE_beam_files",
818827
testonly = True,

deps/rabbit/priv/schema/rabbit.schema

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2472,7 +2472,7 @@ end}.
24722472

24732473
{translation, "rabbit.classic_queue_default_version",
24742474
fun(Conf) ->
2475-
case cuttlefish:conf_get("classic_queue.default_version", Conf, 1) of
2475+
case cuttlefish:conf_get("classic_queue.default_version", Conf, 2) of
24762476
1 -> 1;
24772477
2 -> 2;
24782478
_ -> cuttlefish:unset()

deps/rabbit/src/rabbit_amqqueue_process.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -470,12 +470,12 @@ init_queue_mode(Mode, State = #q {backing_queue = BQ,
470470

471471
init_queue_version(Version0, State = #q {backing_queue = BQ,
472472
backing_queue_state = BQS}) ->
473-
%% When the version is undefined we use the default version 1.
473+
%% When the version is undefined we use the default version 2.
474474
%% We want to BQ:set_queue_version in all cases because a v2
475475
%% policy might have been deleted, for example, and we want
476476
%% the queue to go back to v1.
477477
Version = case Version0 of
478-
undefined -> rabbit_misc:get_env(rabbit, classic_queue_default_version, 1);
478+
undefined -> rabbit_misc:get_env(rabbit, classic_queue_default_version, 2);
479479
_ -> Version0
480480
end,
481481
BQS1 = BQ:set_queue_version(Version, BQS),

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,9 @@
6666
declare/2,
6767
is_stateful/0]).
6868

69+
-export([force_shrink_member_to_current_member/2,
70+
force_all_queues_shrink_member_to_current_member/0]).
71+
6972
-import(rabbit_queue_type_util, [args_policy_lookup/3,
7073
qname_to_internal_name/1]).
7174

@@ -1717,3 +1720,41 @@ erpc_call(Node, M, F, A, Timeout) ->
17171720
end.
17181721

17191722
is_stateful() -> true.
1723+
1724+
force_shrink_member_to_current_member(VHost, Name) ->
1725+
rabbit_log:warning("Disaster recovery procedure: shrinking ~p queue at vhost ~p to a single node cluster", [Name, VHost]),
1726+
Node = node(),
1727+
QName = rabbit_misc:r(VHost, queue, Name),
1728+
case rabbit_amqqueue:lookup(QName) of
1729+
{ok, Q} when ?is_amqqueue(Q) ->
1730+
{RaName, _} = amqqueue:get_pid(Q),
1731+
ok = ra_server_proc:force_shrink_members_to_current_member({RaName, Node}),
1732+
Fun = fun (Q0) ->
1733+
TS0 = amqqueue:get_type_state(Q0),
1734+
TS = TS0#{nodes => [Node]},
1735+
amqqueue:set_type_state(Q, TS)
1736+
end,
1737+
_ = rabbit_amqqueue:update(QName, Fun),
1738+
rabbit_log:warning("Disaster recovery procedure: shrinking finished");
1739+
_ ->
1740+
rabbit_log:warning("Disaster recovery procedure: shrinking failed, queue ~p not found at vhost ~p", [Name, VHost]),
1741+
{error, not_found}
1742+
end.
1743+
1744+
force_all_queues_shrink_member_to_current_member() ->
1745+
rabbit_log:warning("Disaster recovery procedure: shrinking all quorum queues to a single node cluster"),
1746+
Node = node(),
1747+
[begin
1748+
QName = amqqueue:get_name(Q),
1749+
{RaName, _} = amqqueue:get_pid(Q),
1750+
rabbit_log:warning("Disaster recovery procedure: shrinking queue ~p", [QName]),
1751+
ok = ra_server_proc:force_shrink_members_to_current_member({RaName, Node}),
1752+
Fun = fun (QQ) ->
1753+
TS0 = amqqueue:get_type_state(QQ),
1754+
TS = TS0#{nodes => [Node]},
1755+
amqqueue:set_type_state(QQ, TS)
1756+
end,
1757+
_ = rabbit_amqqueue:update(QName, Fun)
1758+
end || Q <- rabbit_amqqueue:list(), amqqueue:get_type(Q) == ?MODULE],
1759+
rabbit_log:warning("Disaster recovery procedure: shrinking finished"),
1760+
ok.

deps/rabbit/src/rabbit_variable_queue.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -490,10 +490,11 @@ process_recovery_terms(Terms) ->
490490
PRef -> {PRef, Terms}
491491
end.
492492

493+
%% If queue-version is undefined, we assume v2 starting with RabbitMQ 3.13.0.
493494
queue_version(Q) ->
494495
Resolve = fun(_, ArgVal) -> ArgVal end,
495496
case rabbit_queue_type_util:args_policy_lookup(<<"queue-version">>, Resolve, Q) of
496-
undefined -> rabbit_misc:get_env(rabbit, classic_queue_default_version, 1);
497+
undefined -> rabbit_misc:get_env(rabbit, classic_queue_default_version, 2);
497498
Vsn when is_integer(Vsn) -> Vsn;
498499
Vsn -> binary_to_integer(Vsn)
499500
end.
Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
6+
%%
7+
8+
-module(clustering_recovery_SUITE).
9+
10+
-include_lib("common_test/include/ct.hrl").
11+
-include_lib("amqp_client/include/amqp_client.hrl").
12+
-include_lib("eunit/include/eunit.hrl").
13+
14+
-compile(export_all).
15+
16+
all() ->
17+
[
18+
{group, mnesia_store}
19+
].
20+
21+
groups() ->
22+
[{mnesia_store, [], [
23+
{clustered_3_nodes, [],
24+
[{cluster_size_3, [], [
25+
force_shrink_quorum_queue,
26+
force_shrink_all_quorum_queues
27+
]}
28+
]}
29+
]}
30+
].
31+
32+
suite() ->
33+
[
34+
%% If a testcase hangs, no need to wait for 30 minutes.
35+
{timetrap, {minutes, 5}}
36+
].
37+
38+
%% -------------------------------------------------------------------
39+
%% Testsuite setup/teardown.
40+
%% -------------------------------------------------------------------
41+
42+
init_per_suite(Config) ->
43+
rabbit_ct_helpers:log_environment(),
44+
rabbit_ct_helpers:run_setup_steps(Config).
45+
46+
end_per_suite(Config) ->
47+
rabbit_ct_helpers:run_teardown_steps(Config).
48+
49+
init_per_group(mnesia_store, Config) ->
50+
Config;
51+
init_per_group(clustered_3_nodes, Config) ->
52+
rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, true}]);
53+
init_per_group(cluster_size_3, Config) ->
54+
rabbit_ct_helpers:set_config(Config, [{rmq_nodes_count, 3}]).
55+
56+
end_per_group(_, Config) ->
57+
Config.
58+
59+
init_per_testcase(Testcase, Config) ->
60+
rabbit_ct_helpers:testcase_started(Config, Testcase),
61+
ClusterSize = ?config(rmq_nodes_count, Config),
62+
TestNumber = rabbit_ct_helpers:testcase_number(Config, ?MODULE, Testcase),
63+
Config1 = rabbit_ct_helpers:set_config(Config, [
64+
{rmq_nodename_suffix, Testcase},
65+
{tcp_ports_base, {skip_n_nodes, TestNumber * ClusterSize}},
66+
{keep_pid_file_on_exit, true}
67+
]),
68+
rabbit_ct_helpers:run_steps(Config1,
69+
rabbit_ct_broker_helpers:setup_steps() ++
70+
rabbit_ct_client_helpers:setup_steps()).
71+
72+
end_per_testcase(Testcase, Config) ->
73+
Config1 = rabbit_ct_helpers:run_steps(Config,
74+
rabbit_ct_client_helpers:teardown_steps() ++
75+
rabbit_ct_broker_helpers:teardown_steps()),
76+
rabbit_ct_helpers:testcase_finished(Config1, Testcase).
77+
78+
%% -------------------------------------------------------------------
79+
%% Testcases
80+
%% -------------------------------------------------------------------
81+
82+
force_shrink_all_quorum_queues(Config) ->
83+
[Rabbit, Hare, Bunny] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
84+
85+
QName1 = quorum_test_queue(1),
86+
QName2 = quorum_test_queue(2),
87+
QName3 = quorum_test_queue(3),
88+
Args = [{<<"x-queue-type">>, longstr, <<"quorum">>}],
89+
declare_and_publish_to_queue(Config, Rabbit, QName1, Args),
90+
declare_and_publish_to_queue(Config, Rabbit, QName2, Args),
91+
declare_and_publish_to_queue(Config, Rabbit, QName3, Args),
92+
93+
ok = rabbit_ct_broker_helpers:stop_node(Config, Hare),
94+
ok = rabbit_ct_broker_helpers:stop_node(Config, Bunny),
95+
96+
Ch = rabbit_ct_client_helpers:open_channel(Config, Rabbit),
97+
?assertExit(
98+
{{shutdown, {connection_closing, {server_initiated_close, 541, _}}}, _},
99+
amqp_channel:subscribe(Ch, #'basic.consume'{queue = QName1,
100+
consumer_tag = <<"ctag">>},
101+
self())),
102+
103+
ok = rabbit_ct_broker_helpers:rpc(Config, Rabbit, rabbit_quorum_queue, force_all_queues_shrink_member_to_current_member, []),
104+
105+
ok = consume_from_queue(Config, Rabbit, QName1),
106+
ok = consume_from_queue(Config, Rabbit, QName2),
107+
ok = consume_from_queue(Config, Rabbit, QName3),
108+
109+
ok.
110+
111+
force_shrink_quorum_queue(Config) ->
112+
[Rabbit, Hare, Bunny] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
113+
114+
QName1 = quorum_test_queue(1),
115+
Args = [{<<"x-queue-type">>, longstr, <<"quorum">>}],
116+
declare_and_publish_to_queue(Config, Rabbit, QName1, Args),
117+
118+
ok = rabbit_ct_broker_helpers:stop_node(Config, Hare),
119+
ok = rabbit_ct_broker_helpers:stop_node(Config, Bunny),
120+
121+
Ch = rabbit_ct_client_helpers:open_channel(Config, Rabbit),
122+
?assertExit(
123+
{{shutdown, {connection_closing, {server_initiated_close, 541, _}}}, _},
124+
amqp_channel:subscribe(Ch, #'basic.consume'{queue = QName1,
125+
consumer_tag = <<"ctag">>},
126+
self())),
127+
128+
ok = rabbit_ct_broker_helpers:rpc(Config, Rabbit, rabbit_quorum_queue, force_shrink_member_to_current_member, [<<"/">>, QName1]),
129+
130+
ok = consume_from_queue(Config, Rabbit, QName1),
131+
132+
ok.
133+
134+
%% -------------------------------------------------------------------
135+
%% Internal utils
136+
%% -------------------------------------------------------------------
137+
declare_and_publish_to_queue(Config, Node, QName, Args) ->
138+
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, Node),
139+
declare(Ch, QName, Args),
140+
publish_many(Ch, QName, 10),
141+
rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
142+
143+
quorum_test_queue(Number) ->
144+
list_to_binary(io_lib:format("quorum_queue_~p", [Number])).
145+
146+
declare(Ch, Name, Args) ->
147+
Res = amqp_channel:call(Ch, #'queue.declare'{durable = true,
148+
queue = Name,
149+
arguments = Args}),
150+
amqp_channel:call(Ch, #'queue.bind'{queue = Name,
151+
exchange = <<"amq.fanout">>}),
152+
Res.
153+
154+
consume_from_queue(Config, Node, QName) ->
155+
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, Node),
156+
subscribe(Ch, QName),
157+
consume(10),
158+
rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
159+
160+
publish_many(Ch, QName, N) ->
161+
amqp_channel:call(Ch, #'confirm.select'{}),
162+
[amqp_channel:cast(Ch, #'basic.publish'{routing_key = QName},
163+
#amqp_msg{props = #'P_basic'{delivery_mode = 2}})
164+
|| _ <- lists:seq(1, N)],
165+
amqp_channel:wait_for_confirms(Ch).
166+
167+
subscribe(Ch, QName) ->
168+
CTag = <<"ctag">>,
169+
amqp_channel:subscribe(Ch, #'basic.consume'{queue = QName,
170+
consumer_tag = CTag},
171+
self()),
172+
receive
173+
#'basic.consume_ok'{consumer_tag = CTag} ->
174+
ok
175+
after 10000 ->
176+
exit(consume_ok_timeout)
177+
end.
178+
179+
consume(0) ->
180+
ok;
181+
consume(N) ->
182+
receive
183+
{#'basic.deliver'{consumer_tag = <<"ctag">>}, _} ->
184+
consume(N - 1)
185+
after 10000 ->
186+
exit(deliver_timeout)
187+
end.

deps/rabbit/test/dynamic_ha_SUITE.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1018,16 +1018,16 @@ apply_policy(Config, N, undefined) ->
10181018
apply_policy(Config, N, all) ->
10191019
rabbit_ct_broker_helpers:set_ha_policy(
10201020
Config, N, ?POLICY, <<"all">>,
1021-
[{<<"ha-sync-mode">>, <<"automatic">>}]);
1021+
[{<<"ha-sync-mode">>, <<"automatic">>}, {<<"queue-mode">>, <<"lazy">>}]);
10221022
apply_policy(Config, N, {nodes, Nodes}) ->
10231023
NNodes = [atom_to_binary(Node) || Node <- Nodes],
10241024
rabbit_ct_broker_helpers:set_ha_policy(
10251025
Config, N, ?POLICY, {<<"nodes">>, NNodes},
1026-
[{<<"ha-sync-mode">>, <<"automatic">>}]);
1026+
[{<<"ha-sync-mode">>, <<"automatic">>}, {<<"queue-mode">>, <<"lazy">>}]);
10271027
apply_policy(Config, N, {exactly, Exactly}) ->
10281028
rabbit_ct_broker_helpers:set_ha_policy(
10291029
Config, N, ?POLICY, {<<"exactly">>, Exactly},
1030-
[{<<"ha-sync-mode">>, <<"automatic">>}]).
1030+
[{<<"ha-sync-mode">>, <<"automatic">>}, {<<"queue-mode">>, <<"lazy">>}]).
10311031

10321032
forget_cluster_node(Config, Node, NodeToRemove) ->
10331033
rabbit_ct_broker_helpers:rabbitmqctl(

0 commit comments

Comments
 (0)