Skip to content

Commit d806d69

Browse files
committed
Add tests for AMQP 0.9.1 direct reply to feature
Add tests for https://www.rabbitmq.com/docs/direct-reply-to Relates #11380
1 parent a6874e3 commit d806d69

File tree

4 files changed

+140
-4
lines changed

4 files changed

+140
-4
lines changed

deps/rabbit/BUILD.bazel

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1223,6 +1223,10 @@ rabbitmq_integration_suite(
12231223
],
12241224
)
12251225

1226+
rabbitmq_integration_suite(
1227+
name = "amqpl_direct_reply_to_SUITE",
1228+
)
1229+
12261230
assert_suites()
12271231

12281232
filegroup(

deps/rabbit/app.bzl

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2117,3 +2117,12 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
21172117
erlc_opts = "//:test_erlc_opts",
21182118
deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"],
21192119
)
2120+
erlang_bytecode(
2121+
name = "amqpl_direct_reply_to_SUITE_beam_files",
2122+
testonly = True,
2123+
srcs = ["test/amqpl_direct_reply_to_SUITE.erl"],
2124+
outs = ["test/amqpl_direct_reply_to_SUITE.beam"],
2125+
app_name = "rabbit",
2126+
erlc_opts = "//:test_erlc_opts",
2127+
deps = ["//deps/amqp_client:erlang_app"],
2128+
)

deps/rabbit/src/rabbit_channel.erl

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
-behaviour(gen_server2).
4646

4747
-export([start_link/11, start_link/12, do/2, do/3, do_flow/3, flush/1, shutdown/1]).
48-
-export([send_command/2, deliver_reply/2]).
48+
-export([send_command/2]).
4949
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1,
5050
emit_info_all/4, info_local/1]).
5151
-export([refresh_config_local/0, ready_for_close/1]).
@@ -158,7 +158,7 @@
158158
%% rejected but are yet to be sent to the client
159159
rejected,
160160
%% used by "one shot RPC" (amq.
161-
reply_consumer,
161+
reply_consumer :: none | {rabbit_types:ctag(), binary(), binary()},
162162
delivery_flow, %% Deprecated since removal of CMQ in 4.0
163163
interceptor_state,
164164
queue_states,
@@ -1210,8 +1210,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
12101210
Message = rabbit_message_interceptor:intercept(Message0),
12111211
check_user_id_header(Message, User),
12121212
QNames = rabbit_exchange:route(Exchange, Message, #{return_binding_keys => true}),
1213-
[rabbit_channel:deliver_reply(RK, Message) ||
1214-
{virtual_reply_queue, RK} <- QNames],
1213+
[deliver_reply(RK, Message) || {virtual_reply_queue, RK} <- QNames],
12151214
Queues = rabbit_amqqueue:lookup_many(QNames),
12161215
rabbit_trace:tap_in(Message, QNames, ConnName, ChannelNum,
12171216
Username, TraceState),
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
%%
7+
8+
-module(amqpl_direct_reply_to_SUITE).
9+
10+
-include_lib("common_test/include/ct.hrl").
11+
-include_lib("eunit/include/eunit.hrl").
12+
-include_lib("amqp_client/include/amqp_client.hrl").
13+
14+
-compile([nowarn_export_all,
15+
export_all]).
16+
17+
all() ->
18+
[
19+
{group, cluster_size_3}
20+
].
21+
22+
groups() ->
23+
[
24+
{cluster_size_3, [shuffle],
25+
[
26+
rpc_new_to_old_node,
27+
rpc_old_to_new_node
28+
]}
29+
].
30+
31+
%% -------------------------------------------------------------------
32+
%% Testsuite setup/teardown.
33+
%% -------------------------------------------------------------------
34+
35+
init_per_suite(Config) ->
36+
rabbit_ct_helpers:log_environment(),
37+
Config.
38+
39+
end_per_suite(Config) ->
40+
Config.
41+
42+
init_per_group(_Group, Config) ->
43+
Nodes = 3,
44+
Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"),
45+
Config1 = rabbit_ct_helpers:set_config(
46+
Config, [{rmq_nodes_count, Nodes},
47+
{rmq_nodename_suffix, Suffix}]),
48+
rabbit_ct_helpers:run_setup_steps(
49+
Config1,
50+
rabbit_ct_broker_helpers:setup_steps() ++
51+
rabbit_ct_client_helpers:setup_steps()).
52+
53+
end_per_group(_Group, Config) ->
54+
rabbit_ct_helpers:run_teardown_steps(
55+
Config,
56+
rabbit_ct_client_helpers:teardown_steps() ++
57+
rabbit_ct_broker_helpers:teardown_steps()).
58+
59+
init_per_testcase(Testcase, Config) ->
60+
rabbit_ct_helpers:testcase_started(Config, Testcase).
61+
62+
end_per_testcase(Testcase, Config) ->
63+
rabbit_ct_helpers:testcase_finished(Config, Testcase).
64+
65+
%% "new" and "old" refers to new and old RabbitMQ versions in mixed version tests.
66+
rpc_new_to_old_node(Config) ->
67+
rpc(0, 1, Config).
68+
69+
rpc_old_to_new_node(Config) ->
70+
rpc(1, 0, Config).
71+
72+
rpc(RequesterNode, ResponderNode, Config) ->
73+
RequestQueue = <<"my request queue">>,
74+
%% This is the pseudo queue that is specially interpreted by RabbitMQ.
75+
ReplyQueue = <<"amq.rabbitmq.reply-to">>,
76+
RequestPayload = <<"my request">>,
77+
ReplyPayload = <<"my reply">>,
78+
CorrelationId = <<"my correlation ID">>,
79+
RequesterCh = rabbit_ct_client_helpers:open_channel(Config, RequesterNode),
80+
ResponderCh = rabbit_ct_client_helpers:open_channel(Config, ResponderNode),
81+
82+
%% There is no need to declare this pseudo queue first.
83+
amqp_channel:subscribe(RequesterCh,
84+
#'basic.consume'{queue = ReplyQueue,
85+
no_ack = true},
86+
self()),
87+
CTag = receive #'basic.consume_ok'{consumer_tag = CTag0} -> CTag0
88+
end,
89+
#'queue.declare_ok'{} = amqp_channel:call(
90+
RequesterCh,
91+
#'queue.declare'{queue = RequestQueue}),
92+
#'confirm.select_ok'{} = amqp_channel:call(RequesterCh, #'confirm.select'{}),
93+
amqp_channel:register_confirm_handler(RequesterCh, self()),
94+
%% Send the request.
95+
amqp_channel:cast(
96+
RequesterCh,
97+
#'basic.publish'{routing_key = RequestQueue},
98+
#amqp_msg{props = #'P_basic'{reply_to = ReplyQueue,
99+
correlation_id = CorrelationId},
100+
payload = RequestPayload}),
101+
receive #'basic.ack'{} -> ok
102+
after 5000 -> ct:fail(confirm_timeout)
103+
end,
104+
105+
%% Receive the request.
106+
{#'basic.get_ok'{},
107+
#amqp_msg{props = #'P_basic'{reply_to = ReplyTo,
108+
correlation_id = CorrelationId},
109+
payload = RequestPayload}
110+
} = amqp_channel:call(ResponderCh, #'basic.get'{queue = RequestQueue}),
111+
%% Send the reply.
112+
amqp_channel:cast(
113+
ResponderCh,
114+
#'basic.publish'{routing_key = ReplyTo},
115+
#amqp_msg{props = #'P_basic'{correlation_id = CorrelationId},
116+
payload = ReplyPayload}),
117+
118+
%% Receive the reply.
119+
receive {#'basic.deliver'{consumer_tag = CTag},
120+
#amqp_msg{payload = ReplyPayload,
121+
props = #'P_basic'{correlation_id = CorrelationId}}} ->
122+
ok
123+
after 5000 -> ct:fail(missing_reply)
124+
end.

0 commit comments

Comments
 (0)