Skip to content

Commit 6a989d3

Browse files
committed
Add inter cluster AMQP 1.0 shovel test
Add tests which shovel messages via AMQP from an 3.13 cluster to a 4.0 cluster and vice versa. This test ensures that a 3.13 AMQP 1.0 client can communicate with a 4.0 node, which isn't tested anywhere else since all other mixed version tests use the new 4.0 AMQP 1.0 client.
1 parent 66c3e26 commit 6a989d3

File tree

5 files changed

+195
-2
lines changed

5 files changed

+195
-2
lines changed

deps/rabbitmq_shovel/BUILD.bazel

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ rabbitmq_home(
101101
name = "broker-for-tests-home",
102102
plugins = [
103103
"//deps/rabbit:erlang_app",
104+
"//deps/rabbitmq_amqp1_0:erlang_app",
104105
":erlang_app",
105106
],
106107
)
@@ -118,6 +119,13 @@ rabbitmq_integration_suite(
118119
flaky = True,
119120
)
120121

122+
rabbitmq_integration_suite(
123+
name = "amqp10_inter_cluster_SUITE",
124+
additional_beam = [
125+
"test/shovel_test_utils.beam",
126+
],
127+
)
128+
121129
rabbitmq_suite(
122130
name = "amqp10_shovel_SUITE",
123131
size = "small",

deps/rabbitmq_shovel/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ dep_amqp10_client = git https://github.com/rabbitmq/rabbitmq-amqp1.0-client.git
2323

2424
LOCAL_DEPS = crypto
2525

26-
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers meck
26+
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers rabbitmq_amqp1_0 meck
2727

2828
PLT_APPS += rabbitmqctl
2929

deps/rabbitmq_shovel/app.bzl

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,3 +250,11 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
250250
app_name = "rabbitmq_shovel",
251251
erlc_opts = "//:test_erlc_opts",
252252
)
253+
erlang_bytecode(
254+
name = "amqp10_inter_cluster_SUITE_beam_files",
255+
testonly = True,
256+
srcs = ["test/amqp10_inter_cluster_SUITE.erl"],
257+
outs = ["test/amqp10_inter_cluster_SUITE.beam"],
258+
app_name = "rabbitmq_shovel",
259+
erlc_opts = "//:test_erlc_opts",
260+
)
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
%%
7+
8+
-module(amqp10_inter_cluster_SUITE).
9+
10+
-include_lib("common_test/include/ct.hrl").
11+
-include_lib("eunit/include/eunit.hrl").
12+
-compile([export_all, nowarn_export_all]).
13+
14+
-import(rabbit_ct_broker_helpers, [rpc/5]).
15+
16+
all() ->
17+
[
18+
{group, tests}
19+
].
20+
21+
groups() ->
22+
[
23+
{tests, [shuffle],
24+
[
25+
old_to_new_on_old,
26+
old_to_new_on_new,
27+
new_to_old_on_old,
28+
new_to_old_on_new
29+
]}
30+
].
31+
32+
%% In mixed version tests:
33+
%% * node 0 is the new version single node cluster
34+
%% * node 1 is the old version single node cluster
35+
-define(NEW, 0).
36+
-define(OLD, 1).
37+
38+
init_per_suite(Config0) ->
39+
{ok, _} = application:ensure_all_started(amqp10_client),
40+
rabbit_ct_helpers:log_environment(),
41+
Config1 = rabbit_ct_helpers:set_config(
42+
Config0,
43+
[{rmq_nodename_suffix, ?MODULE},
44+
{rmq_nodes_count, 2},
45+
{rmq_nodes_clustered, false}]),
46+
Config = rabbit_ct_helpers:run_setup_steps(
47+
Config1,
48+
rabbit_ct_broker_helpers:setup_steps() ++
49+
rabbit_ct_client_helpers:setup_steps()),
50+
%% If node 1 runs 4.x, this is the new no-op plugin.
51+
%% If node 1 runs 3.x, this is the old real plugin.
52+
ok = rabbit_ct_broker_helpers:enable_plugin(Config, ?OLD, rabbitmq_amqp1_0),
53+
Config.
54+
55+
end_per_suite(Config) ->
56+
application:stop(amqp10_client),
57+
rabbit_ct_helpers:run_teardown_steps(
58+
Config,
59+
rabbit_ct_client_helpers:teardown_steps() ++
60+
rabbit_ct_broker_helpers:teardown_steps()).
61+
62+
init_per_group(_, Config) ->
63+
Config.
64+
65+
end_per_group(_, Config) ->
66+
Config.
67+
68+
init_per_testcase(Testcase, Config) ->
69+
rabbit_ct_helpers:testcase_started(Config, Testcase).
70+
71+
end_per_testcase(Testcase, Config) ->
72+
rabbit_ct_helpers:testcase_finished(Config, Testcase).
73+
74+
old_to_new_on_old(Config) ->
75+
ok = shovel(?OLD, ?NEW, ?OLD, Config).
76+
77+
old_to_new_on_new(Config) ->
78+
ok = shovel(?OLD, ?NEW, ?NEW, Config).
79+
80+
new_to_old_on_old(Config) ->
81+
ok = shovel(?NEW, ?OLD, ?OLD, Config).
82+
83+
new_to_old_on_new(Config) ->
84+
ok = shovel(?NEW, ?OLD, ?NEW, Config).
85+
86+
shovel(SrcNode, DestNode, ShovelNode, Config) ->
87+
SrcUri = shovel_test_utils:make_uri(Config, SrcNode),
88+
DestUri = shovel_test_utils:make_uri(Config, DestNode),
89+
SrcQ = <<"my source queue">>,
90+
DestQ = <<"my destination queue">>,
91+
Definition = [
92+
{<<"src-uri">>, SrcUri},
93+
{<<"src-protocol">>, <<"amqp10">>},
94+
{<<"src-address">>, SrcQ},
95+
{<<"dest-uri">>, [DestUri]},
96+
{<<"dest-protocol">>, <<"amqp10">>},
97+
{<<"dest-address">>, DestQ}
98+
],
99+
ShovelName = <<"my shovel">>,
100+
ok = rpc(Config, ShovelNode, rabbit_runtime_parameters, set,
101+
[<<"/">>, <<"shovel">>, ShovelName, Definition, none]),
102+
ok = shovel_test_utils:await_shovel(Config, ShovelNode, ShovelName),
103+
104+
Hostname = ?config(rmq_hostname, Config),
105+
SrcPort = rabbit_ct_broker_helpers:get_node_config(Config, SrcNode, tcp_port_amqp),
106+
DestPort = rabbit_ct_broker_helpers:get_node_config(Config, DestNode, tcp_port_amqp),
107+
{ok, SrcConn} = amqp10_client:open_connection(Hostname, SrcPort),
108+
{ok, DestConn} = amqp10_client:open_connection(Hostname, DestPort),
109+
{ok, SrcSess} = amqp10_client:begin_session_sync(SrcConn),
110+
{ok, DestSess} = amqp10_client:begin_session_sync(DestConn),
111+
{ok, Sender} = amqp10_client:attach_sender_link(
112+
SrcSess, <<"my sender">>, <<"/amq/queue/", SrcQ/binary>>, settled),
113+
{ok, Receiver} = amqp10_client:attach_receiver_link(
114+
DestSess, <<"my receiver">>, <<"/amq/queue/", DestQ/binary>>, settled),
115+
116+
ok = wait_for_credit(Sender),
117+
NumMsgs = 20,
118+
lists:map(
119+
fun(N) ->
120+
Bin = integer_to_binary(N),
121+
Msg = amqp10_msg:new(Bin, Bin, true),
122+
ok = amqp10_client:send_msg(Sender, Msg)
123+
end, lists:seq(1, NumMsgs)),
124+
ok = amqp10_client:close_connection(SrcConn),
125+
126+
ok = amqp10_client:flow_link_credit(Receiver, NumMsgs, never),
127+
Msgs = receive_messages(Receiver, NumMsgs),
128+
lists:map(
129+
fun(N) ->
130+
Msg = lists:nth(N, Msgs),
131+
?assertEqual(integer_to_binary(N),
132+
amqp10_msg:body_bin(Msg))
133+
end, lists:seq(1, NumMsgs)),
134+
ok = amqp10_client:close_connection(DestConn),
135+
136+
ok = rpc(Config, ShovelNode, rabbit_runtime_parameters, clear,
137+
[<<"/">>, <<"shovel">>, ShovelName, none]),
138+
ExpectedQueueLen = 0,
139+
?assertEqual([ExpectedQueueLen], rpc(Config, ?OLD, ?MODULE, delete_queues, [])),
140+
?assertEqual([ExpectedQueueLen], rpc(Config, ?NEW, ?MODULE, delete_queues, [])).
141+
142+
wait_for_credit(Sender) ->
143+
receive
144+
{amqp10_event, {link, Sender, credited}} ->
145+
ok
146+
after 5000 ->
147+
flush(?FUNCTION_NAME),
148+
ct:fail(credited_timeout)
149+
end.
150+
151+
receive_messages(Receiver, N) ->
152+
receive_messages0(Receiver, N, []).
153+
154+
receive_messages0(_Receiver, 0, Acc) ->
155+
lists:reverse(Acc);
156+
receive_messages0(Receiver, N, Acc) ->
157+
receive
158+
{amqp10_msg, Receiver, Msg} ->
159+
receive_messages0(Receiver, N - 1, [Msg | Acc])
160+
after 5000 ->
161+
ct:fail({timeout, {num_received, length(Acc)}, {num_missing, N}})
162+
end.
163+
164+
flush(Prefix) ->
165+
receive
166+
Msg ->
167+
ct:pal("~p flushed: ~p~n", [Prefix, Msg]),
168+
flush(Prefix)
169+
after 1 ->
170+
ok
171+
end.
172+
173+
delete_queues() ->
174+
[begin
175+
{ok, N} = rabbit_amqqueue:delete(Q, false, false, <<"tests">>),
176+
N
177+
end || Q <- rabbit_amqqueue:list()].

deps/rabbitmq_shovel/test/shovel_test_utils.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
-export([set_param/3, set_param/4, set_param/5, set_param_nowait/3,
1212
await_shovel/2, await_shovel/3, await_shovel1/2,
1313
shovels_from_status/0, get_shovel_status/2, get_shovel_status/3,
14-
await/1, await/2, clear_param/2, clear_param/3]).
14+
await/1, await/2, clear_param/2, clear_param/3, make_uri/2]).
1515

1616
make_uri(Config, Node) ->
1717
Hostname = ?config(rmq_hostname, Config),

0 commit comments

Comments
 (0)