Skip to content

Commit 76a60db

Browse files
committed
Fix crash in rabbit_writer
091 payload fragements rev must be a list of binaries. Fix following crash: ``` crasher: initial call: rabbit_writer:enter_mainloop/2 pid: <0.700.0> registered_name: [] exception error: bad argument in function size/1 called as size([<<0,83,119,160,3,0,1,2>>]) *** argument 1: not tuple or binary in call from rabbit_binary_generator:build_content_frames/7 (rabbit_binary_generator.erl, line 89) in call from rabbit_binary_generator:build_simple_content_frames/4 (rabbit_binary_generator.erl, line 61) in call from rabbit_writer:assemble_frames/5 (rabbit_writer.erl, line 334) in call from rabbit_writer:internal_send_command_async/3 (rabbit_writer.erl, line 365) in call from rabbit_writer:handle_message/3 (rabbit_writer.erl, line 232) in call from rabbit_writer:mainloop1/2 (rabbit_writer.erl, line 216) in call from rabbit_writer:mainloop/2 (rabbit_writer.erl, line 207) ```
1 parent f077cdf commit 76a60db

File tree

2 files changed

+59
-3
lines changed

2 files changed

+59
-3
lines changed

deps/rabbit/src/mc_amqpl.erl

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,11 @@ convert_from(mc_amqp, Sections) ->
9191
%% TODO: This is potentially inefficient, but #content.payload_fragments_rev expects
9292
%% currently a flat list of binaries. Can we make rabbit_writer work
9393
%% with an iolist instead?
94-
{[erlang:iolist_to_iovec(amqp10_framing:encode_bin(X))
95-
|| X <- BodyRev], ?AMQP10_TYPE}
94+
BinsRev = [begin
95+
IoList = amqp10_framing:encode_bin(X),
96+
erlang:iolist_to_binary(IoList)
97+
end || X <- BodyRev],
98+
{BinsRev, ?AMQP10_TYPE}
9699
end,
97100
#'v1_0.properties'{message_id = MsgId,
98101
user_id = UserId0,

deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ groups() ->
2929
roundtrip_classic_queue_with_drain,
3030
roundtrip_quorum_queue_with_drain,
3131
roundtrip_stream_queue_with_drain,
32+
amqp_stream_amqpl,
3233
message_headers_conversion
3334
]},
3435
{metrics, [], [
@@ -303,6 +304,58 @@ roundtrip_queue_with_drain(Config, QueueType, QName) when is_binary(QueueType) -
303304
ok = amqp10_client:close_connection(Connection),
304305
ok.
305306

307+
%% Send a message with a body containing a single AMQP 1.0 value section
308+
%% to a stream and consume via AMQP 0.9.1.
309+
amqp_stream_amqpl(Config) ->
310+
Host = ?config(rmq_hostname, Config),
311+
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
312+
Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
313+
ContainerId = QName = atom_to_binary(?FUNCTION_NAME),
314+
315+
amqp_channel:call(Ch, #'queue.declare'{
316+
queue = QName,
317+
durable = true,
318+
arguments = [{<<"x-queue-type">>, longstr, <<"stream">>}]}),
319+
320+
Address = <<"/amq/queue/", QName/binary>>,
321+
OpnConf = #{address => Host,
322+
port => Port,
323+
container_id => ContainerId,
324+
sasl => {plain, <<"guest">>, <<"guest">>}},
325+
{ok, Connection} = amqp10_client:open_connection(OpnConf),
326+
{ok, Session} = amqp10_client:begin_session(Connection),
327+
SenderLinkName = <<"test-sender">>,
328+
{ok, Sender} = amqp10_client:attach_sender_link(Session,
329+
SenderLinkName,
330+
Address),
331+
wait_for_credit(Sender),
332+
OutMsg = amqp10_msg:new(<<"my-tag">>, {'v1_0.amqp_value', {binary, <<0, 255>>}}, true),
333+
ok = amqp10_client:send_msg(Sender, OutMsg),
334+
flush("final"),
335+
ok = amqp10_client:detach_link(Sender),
336+
ok = amqp10_client:close_connection(Connection),
337+
338+
#'basic.qos_ok'{} = amqp_channel:call(Ch, #'basic.qos'{global = false,
339+
prefetch_count = 1}),
340+
CTag = <<"my-tag">>,
341+
#'basic.consume_ok'{} = amqp_channel:subscribe(
342+
Ch,
343+
#'basic.consume'{
344+
queue = QName,
345+
consumer_tag = CTag,
346+
arguments = [{<<"x-stream-offset">>, longstr, <<"first">>}]},
347+
self()),
348+
receive
349+
{#'basic.deliver'{consumer_tag = CTag,
350+
redelivered = false},
351+
#amqp_msg{props = #'P_basic'{type = <<"amqp-1.0">>}}} ->
352+
ok
353+
after 5000 ->
354+
exit(basic_deliver_timeout)
355+
end,
356+
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
357+
ok = rabbit_ct_client_helpers:close_channel(Ch).
358+
306359
message_headers_conversion(Config) ->
307360
Host = ?config(rmq_hostname, Config),
308361
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
@@ -468,7 +521,7 @@ wait_for_accepts(N) ->
468521

469522
delete_queue(Config, QName) ->
470523
Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
471-
_ = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
524+
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
472525
rabbit_ct_client_helpers:close_channel(Ch).
473526

474527

0 commit comments

Comments
 (0)