Skip to content

Commit 4f91a4b

Browse files
Merge pull request #11666 from rabbitmq/mergify/bp/v3.13.x/pr-11665
Fix direct reply to crash when tracing is enabled (backport #11665)
2 parents a0201b8 + 6c18754 commit 4f91a4b

File tree

2 files changed

+121
-3
lines changed

2 files changed

+121
-3
lines changed

deps/rabbit/src/rabbit_trace.erl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ tap_in(Msg, QNames, ConnName, ChannelNum, Username, TraceX) ->
6262
RoutedQs = lists:map(fun(#resource{kind = queue, name = Name}) ->
6363
{longstr, Name};
6464
({#resource{kind = queue, name = Name}, _}) ->
65+
{longstr, Name};
66+
({virtual_reply_queue, Name}) ->
6567
{longstr, Name}
6668
end, QNames),
6769
trace(TraceX, Msg, <<"publish">>, XName,

deps/rabbit/test/amqpl_direct_reply_to_SUITE.erl

Lines changed: 119 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,16 @@
1616

1717
all() ->
1818
[
19+
{group, cluster_size_1},
1920
{group, cluster_size_3}
2021
].
2122

2223
groups() ->
2324
[
25+
{cluster_size_1, [shuffle],
26+
[
27+
trace
28+
]},
2429
{cluster_size_3, [shuffle],
2530
[
2631
rpc_new_to_old_node,
@@ -39,8 +44,11 @@ init_per_suite(Config) ->
3944
end_per_suite(Config) ->
4045
Config.
4146

42-
init_per_group(_Group, Config) ->
43-
Nodes = 3,
47+
init_per_group(Group, Config) ->
48+
Nodes = case Group of
49+
cluster_size_1 -> 1;
50+
cluster_size_3 -> 3
51+
end,
4452
Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"),
4553
Config1 = rabbit_ct_helpers:set_config(
4654
Config, [{rmq_nodes_count, Nodes},
@@ -62,6 +70,114 @@ init_per_testcase(Testcase, Config) ->
6270
end_per_testcase(Testcase, Config) ->
6371
rabbit_ct_helpers:testcase_finished(Config, Testcase).
6472

73+
%% Test case for
74+
%% https://github.com/rabbitmq/rabbitmq-server/discussions/11662
75+
trace(Config) ->
76+
{ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["trace_on"]),
77+
78+
Node = atom_to_binary(rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename)),
79+
TraceQueue = <<"tests.amqpl_direct_reply_to.trace.tracing">>,
80+
RequestQueue = <<"tests.amqpl_direct_reply_to.trace.requests">>,
81+
%% This is the pseudo queue that is specially interpreted by RabbitMQ.
82+
ReplyQueue = <<"amq.rabbitmq.reply-to">>,
83+
RequestPayload = <<"my request">>,
84+
ReplyPayload = <<"my reply">>,
85+
CorrelationId = <<"my correlation ID">>,
86+
Qs = [RequestQueue, TraceQueue],
87+
Ch = rabbit_ct_client_helpers:open_channel(Config),
88+
RequesterCh = rabbit_ct_client_helpers:open_channel(Config, 0),
89+
ResponderCh = rabbit_ct_client_helpers:open_channel(Config, 0),
90+
91+
[#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = Q0}) || Q0 <- Qs],
92+
#'queue.bind_ok'{} = amqp_channel:call(
93+
Ch, #'queue.bind'{
94+
queue = TraceQueue,
95+
exchange = <<"amq.rabbitmq.trace">>,
96+
%% We subscribe only to messages entering RabbitMQ.
97+
routing_key = <<"publish.#">>}),
98+
99+
%% There is no need to declare this pseudo queue first.
100+
amqp_channel:subscribe(RequesterCh,
101+
#'basic.consume'{queue = ReplyQueue,
102+
no_ack = true},
103+
self()),
104+
CTag = receive #'basic.consume_ok'{consumer_tag = CTag0} -> CTag0
105+
end,
106+
#'confirm.select_ok'{} = amqp_channel:call(RequesterCh, #'confirm.select'{}),
107+
amqp_channel:register_confirm_handler(RequesterCh, self()),
108+
109+
%% Send the request.
110+
amqp_channel:cast(
111+
RequesterCh,
112+
#'basic.publish'{routing_key = RequestQueue},
113+
#amqp_msg{props = #'P_basic'{reply_to = ReplyQueue,
114+
correlation_id = CorrelationId},
115+
payload = RequestPayload}),
116+
receive #'basic.ack'{} -> ok
117+
after 5000 -> ct:fail(confirm_timeout)
118+
end,
119+
120+
%% Receive the request.
121+
{#'basic.get_ok'{},
122+
#amqp_msg{props = #'P_basic'{reply_to = ReplyTo,
123+
correlation_id = CorrelationId},
124+
payload = RequestPayload}
125+
} = amqp_channel:call(ResponderCh, #'basic.get'{queue = RequestQueue}),
126+
127+
%% Send the reply.
128+
amqp_channel:cast(
129+
ResponderCh,
130+
#'basic.publish'{routing_key = ReplyTo},
131+
#amqp_msg{props = #'P_basic'{correlation_id = CorrelationId},
132+
payload = ReplyPayload}),
133+
134+
%% Receive the reply.
135+
receive {#'basic.deliver'{consumer_tag = CTag},
136+
#amqp_msg{payload = ReplyPayload,
137+
props = #'P_basic'{correlation_id = CorrelationId}}} ->
138+
ok
139+
after 5000 -> ct:fail(missing_reply)
140+
end,
141+
142+
%% 2 messages should have entered RabbitMQ:
143+
%% 1. the RPC request
144+
%% 2. the RPC reply
145+
146+
{#'basic.get_ok'{routing_key = <<"publish.">>},
147+
#amqp_msg{props = #'P_basic'{headers = RequestHeaders},
148+
payload = RequestPayload}
149+
} = amqp_channel:call(Ch, #'basic.get'{queue = TraceQueue}),
150+
?assertMatch(#{
151+
<<"exchange_name">> := <<>>,
152+
<<"routing_keys">> := [RequestQueue],
153+
<<"connection">> := <<"127.0.0.1:", _/binary>>,
154+
<<"node">> := Node,
155+
<<"vhost">> := <<"/">>,
156+
<<"user">> := <<"guest">>,
157+
<<"properties">> := #{<<"correlation_id">> := CorrelationId},
158+
<<"routed_queues">> := [RequestQueue]
159+
},
160+
rabbit_misc:amqp_table(RequestHeaders)),
161+
162+
{#'basic.get_ok'{routing_key = <<"publish.">>},
163+
#amqp_msg{props = #'P_basic'{headers = ResponseHeaders},
164+
payload = ReplyPayload}
165+
} = amqp_channel:call(Ch, #'basic.get'{queue = TraceQueue}),
166+
?assertMatch(#{
167+
<<"exchange_name">> := <<>>,
168+
<<"routing_keys">> := [<<"amq.rabbitmq.reply-to.", _/binary>>],
169+
<<"connection">> := <<"127.0.0.1:", _/binary>>,
170+
<<"node">> := Node,
171+
<<"vhost">> := <<"/">>,
172+
<<"user">> := <<"guest">>,
173+
<<"properties">> := #{<<"correlation_id">> := CorrelationId},
174+
<<"routed_queues">> := [<<"amq.rabbitmq.reply-to.", _/binary>>]
175+
},
176+
rabbit_misc:amqp_table(ResponseHeaders)),
177+
178+
[#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = Q0}) || Q0 <- Qs],
179+
{ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["trace_off"]).
180+
65181
%% "new" and "old" refers to new and old RabbitMQ versions in mixed version tests.
66182
rpc_new_to_old_node(Config) ->
67183
rpc(0, 1, Config).
@@ -70,7 +186,7 @@ rpc_old_to_new_node(Config) ->
70186
rpc(1, 0, Config).
71187

72188
rpc(RequesterNode, ResponderNode, Config) ->
73-
RequestQueue = <<"my request queue">>,
189+
RequestQueue = <<"tests.amqpl_direct_reply_to.rpc.requests">>,
74190
%% This is the pseudo queue that is specially interpreted by RabbitMQ.
75191
ReplyQueue = <<"amq.rabbitmq.reply-to">>,
76192
RequestPayload = <<"my request">>,

0 commit comments

Comments
 (0)