Skip to content

Commit e264ef6

Browse files
ansdmergify[bot]
authored andcommitted
Fix direct reply to crash when tracing is enabled
This commit fixes https://github.com/rabbitmq/rabbitmq-server/discussions/11662 Prior to this commit the following crash occurred when an RPC reply message entered RabbitMQ and tracing was enabled: ``` ** Reason for termination == ** {function_clause, [{rabbit_trace,'-tap_in/6-fun-0-', [{virtual_reply_queue, <<"amq.rabbitmq.reply-to.g1h2AA5yZXBseUAyNzc5NjQyMAAAC1oAAAAAZo4bIw==.+Uvn1EmAp0ZA+oQx2yoQFA==">>}], [{file,"rabbit_trace.erl"},{line,62}]}, {lists,map,2,[{file,"lists.erl"},{line,1559}]}, {rabbit_trace,tap_in,6,[{file,"rabbit_trace.erl"},{line,62}]}, {rabbit_channel,handle_method,3, [{file,"rabbit_channel.erl"},{line,1284}]}, {rabbit_channel,handle_cast,2, [{file,"rabbit_channel.erl"},{line,659}]}, {gen_server2,handle_msg,2,[{file,"gen_server2.erl"},{line,1056}]}, {proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,241}]}]} ``` (Note that no trace message is emitted for messages that are delivered to direct reply to requesting clients (neither in 3.12, nor in 3.13, nor after this commit). This behaviour can be added in future when a direct reply virtual queue becomes its own queue type.) (cherry picked from commit 5deff45)
1 parent cb288ab commit e264ef6

File tree

2 files changed

+120
-2
lines changed

2 files changed

+120
-2
lines changed

deps/rabbit/src/rabbit_trace.erl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ tap_in(Msg, QNames, ConnName, ChannelNum, Username, TraceX) ->
6262
RoutedQs = lists:map(fun(#resource{kind = queue, name = Name}) ->
6363
{longstr, Name};
6464
({#resource{kind = queue, name = Name}, _}) ->
65+
{longstr, Name};
66+
({virtual_reply_queue, Name}) ->
6567
{longstr, Name}
6668
end, QNames),
6769
trace(TraceX, Msg, <<"publish">>, XName,

deps/rabbit/test/amqpl_direct_reply_to_SUITE.erl

Lines changed: 118 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,16 @@
1616

1717
all() ->
1818
[
19+
{group, cluster_size_1},
1920
{group, cluster_size_3}
2021
].
2122

2223
groups() ->
2324
[
25+
{cluster_size_1, [shuffle],
26+
[
27+
trace
28+
]},
2429
{cluster_size_3, [shuffle],
2530
[
2631
rpc_new_to_old_node,
@@ -39,8 +44,11 @@ init_per_suite(Config) ->
3944
end_per_suite(Config) ->
4045
Config.
4146

42-
init_per_group(_Group, Config) ->
43-
Nodes = 3,
47+
init_per_group(Group, Config) ->
48+
Nodes = case Group of
49+
cluster_size_1 -> 1;
50+
cluster_size_3 -> 3
51+
end,
4452
Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"),
4553
Config1 = rabbit_ct_helpers:set_config(
4654
Config, [{rmq_nodes_count, Nodes},
@@ -62,6 +70,114 @@ init_per_testcase(Testcase, Config) ->
6270
end_per_testcase(Testcase, Config) ->
6371
rabbit_ct_helpers:testcase_finished(Config, Testcase).
6472

73+
%% Test case for
74+
%% https://github.com/rabbitmq/rabbitmq-server/discussions/11662
75+
trace(Config) ->
76+
{ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["trace_on"]),
77+
78+
Node = atom_to_binary(rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename)),
79+
TraceQueue = <<"my trace queue">>,
80+
RequestQueue = <<"my request queue">>,
81+
%% This is the pseudo queue that is specially interpreted by RabbitMQ.
82+
ReplyQueue = <<"amq.rabbitmq.reply-to">>,
83+
RequestPayload = <<"my request">>,
84+
ReplyPayload = <<"my reply">>,
85+
CorrelationId = <<"my correlation ID">>,
86+
Qs = [RequestQueue, TraceQueue],
87+
Ch = rabbit_ct_client_helpers:open_channel(Config),
88+
RequesterCh = rabbit_ct_client_helpers:open_channel(Config, 0),
89+
ResponderCh = rabbit_ct_client_helpers:open_channel(Config, 0),
90+
91+
[#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = Q0}) || Q0 <- Qs],
92+
#'queue.bind_ok'{} = amqp_channel:call(
93+
Ch, #'queue.bind'{
94+
queue = TraceQueue,
95+
exchange = <<"amq.rabbitmq.trace">>,
96+
%% We subscribe only to messages entering RabbitMQ.
97+
routing_key = <<"publish.#">>}),
98+
99+
%% There is no need to declare this pseudo queue first.
100+
amqp_channel:subscribe(RequesterCh,
101+
#'basic.consume'{queue = ReplyQueue,
102+
no_ack = true},
103+
self()),
104+
CTag = receive #'basic.consume_ok'{consumer_tag = CTag0} -> CTag0
105+
end,
106+
#'confirm.select_ok'{} = amqp_channel:call(RequesterCh, #'confirm.select'{}),
107+
amqp_channel:register_confirm_handler(RequesterCh, self()),
108+
109+
%% Send the request.
110+
amqp_channel:cast(
111+
RequesterCh,
112+
#'basic.publish'{routing_key = RequestQueue},
113+
#amqp_msg{props = #'P_basic'{reply_to = ReplyQueue,
114+
correlation_id = CorrelationId},
115+
payload = RequestPayload}),
116+
receive #'basic.ack'{} -> ok
117+
after 5000 -> ct:fail(confirm_timeout)
118+
end,
119+
120+
%% Receive the request.
121+
{#'basic.get_ok'{},
122+
#amqp_msg{props = #'P_basic'{reply_to = ReplyTo,
123+
correlation_id = CorrelationId},
124+
payload = RequestPayload}
125+
} = amqp_channel:call(ResponderCh, #'basic.get'{queue = RequestQueue}),
126+
127+
%% Send the reply.
128+
amqp_channel:cast(
129+
ResponderCh,
130+
#'basic.publish'{routing_key = ReplyTo},
131+
#amqp_msg{props = #'P_basic'{correlation_id = CorrelationId},
132+
payload = ReplyPayload}),
133+
134+
%% Receive the reply.
135+
receive {#'basic.deliver'{consumer_tag = CTag},
136+
#amqp_msg{payload = ReplyPayload,
137+
props = #'P_basic'{correlation_id = CorrelationId}}} ->
138+
ok
139+
after 5000 -> ct:fail(missing_reply)
140+
end,
141+
142+
%% 2 messages should have entered RabbitMQ:
143+
%% 1. the RPC request
144+
%% 2. the RPC reply
145+
146+
{#'basic.get_ok'{routing_key = <<"publish.">>},
147+
#amqp_msg{props = #'P_basic'{headers = RequestHeaders},
148+
payload = RequestPayload}
149+
} = amqp_channel:call(Ch, #'basic.get'{queue = TraceQueue}),
150+
?assertMatch(#{
151+
<<"exchange_name">> := <<>>,
152+
<<"routing_keys">> := [RequestQueue],
153+
<<"connection">> := <<"127.0.0.1:", _/binary>>,
154+
<<"node">> := Node,
155+
<<"vhost">> := <<"/">>,
156+
<<"user">> := <<"guest">>,
157+
<<"properties">> := #{<<"correlation_id">> := CorrelationId},
158+
<<"routed_queues">> := [RequestQueue]
159+
},
160+
rabbit_misc:amqp_table(RequestHeaders)),
161+
162+
{#'basic.get_ok'{routing_key = <<"publish.">>},
163+
#amqp_msg{props = #'P_basic'{headers = ResponseHeaders},
164+
payload = ReplyPayload}
165+
} = amqp_channel:call(Ch, #'basic.get'{queue = TraceQueue}),
166+
?assertMatch(#{
167+
<<"exchange_name">> := <<>>,
168+
<<"routing_keys">> := [<<"amq.rabbitmq.reply-to.", _/binary>>],
169+
<<"connection">> := <<"127.0.0.1:", _/binary>>,
170+
<<"node">> := Node,
171+
<<"vhost">> := <<"/">>,
172+
<<"user">> := <<"guest">>,
173+
<<"properties">> := #{<<"correlation_id">> := CorrelationId},
174+
<<"routed_queues">> := [<<"amq.rabbitmq.reply-to.", _/binary>>]
175+
},
176+
rabbit_misc:amqp_table(ResponseHeaders)),
177+
178+
[#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = Q0}) || Q0 <- Qs],
179+
{ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["trace_off"]).
180+
65181
%% "new" and "old" refers to new and old RabbitMQ versions in mixed version tests.
66182
rpc_new_to_old_node(Config) ->
67183
rpc(0, 1, Config).

0 commit comments

Comments
 (0)