Skip to content

Commit 0d34ef6

Browse files
committed
Set a floor of zero for incoming-window
Prior to this commit, when the sending client overshot RabbitMQ's incoming-window (which is allowed in the event of a cluster wide memory or disk alarm), and RabbitMQ sent a FLOW frame to the client, RabbitMQ sent a negative incoming-window field in the FLOW frame causing the following crash in the writer proc: ``` crasher: initial call: rabbit_amqp_writer:init/1 pid: <0.19353.0> registered_name: [] exception error: bad argument in function iolist_size/1 called as iolist_size([<<112,0,0,23,120>>, [82,-15], <<"pÿÿÿü">>,<<"pÿÿÿÿ">>,67, <<112,0,0,23,120>>, "Rª",64,64,64,64]) *** argument 1: not an iodata term in call from amqp10_binary_generator:generate1/1 (amqp10_binary_generator.erl, line 141) in call from amqp10_binary_generator:generate1/1 (amqp10_binary_generator.erl, line 88) in call from amqp10_binary_generator:generate/1 (amqp10_binary_generator.erl, line 79) in call from rabbit_amqp_writer:assemble_frame/3 (rabbit_amqp_writer.erl, line 206) in call from rabbit_amqp_writer:internal_send_command_async/3 (rabbit_amqp_writer.erl, line 189) in call from rabbit_amqp_writer:handle_cast/2 (rabbit_amqp_writer.erl, line 110) in call from gen_server:try_handle_cast/3 (gen_server.erl, line 1121) ``` This commit fixes this crash by maintaning a floor of zero for incoming-window in the FLOW frame. Fixes #12816
1 parent c15ba8e commit 0d34ef6

File tree

3 files changed

+63
-6
lines changed

3 files changed

+63
-6
lines changed

deps/amqp10_client/src/amqp10_client_session.erl

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -467,11 +467,6 @@ mapped({call, From},
467467
#state{remote_incoming_window = Window})
468468
when Window =< 0 ->
469469
{keep_state_and_data, {reply, From, {error, remote_incoming_window_exceeded}}};
470-
mapped({call, From},
471-
{transfer, _Transfer, _Sections},
472-
#state{remote_incoming_window = Window})
473-
when Window =< 0 ->
474-
{keep_state_and_data, {reply, From, {error, remote_incoming_window_exceeded}}};
475470
mapped({call, From = {Pid, _}},
476471
{transfer, #'v1_0.transfer'{handle = {uint, OutHandle},
477472
delivery_tag = {binary, DeliveryTag},

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2003,7 +2003,10 @@ session_flow_fields(Frames, State)
20032003
session_flow_fields(Flow = #'v1_0.flow'{},
20042004
#state{next_outgoing_id = NextOutgoingId,
20052005
next_incoming_id = NextIncomingId,
2006-
incoming_window = IncomingWindow}) ->
2006+
incoming_window = IncomingWindow0}) ->
2007+
%% IncomingWindow0 can be negative when the sending client overshoots our window.
2008+
%% However, we must set a floor of 0 in the FLOW frame because field incoming-window is an uint.
2009+
IncomingWindow = max(0, IncomingWindow0),
20072010
Flow#'v1_0.flow'{
20082011
next_outgoing_id = ?UINT(NextOutgoingId),
20092012
outgoing_window = ?UINT_OUTGOING_WINDOW,

deps/rabbit/test/amqp_client_SUITE.erl

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ groups() ->
108108
detach_requeues_drop_head_classic_queue,
109109
resource_alarm_before_session_begin,
110110
resource_alarm_after_session_begin,
111+
resource_alarm_send_many,
111112
max_message_size_client_to_server,
112113
max_message_size_server_to_client,
113114
global_counters,
@@ -3207,6 +3208,42 @@ resource_alarm_after_session_begin(Config) ->
32073208
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
32083209
ok = rabbit_ct_client_helpers:close_channel(Ch).
32093210

3211+
%% Test case for
3212+
%% https://github.com/rabbitmq/rabbitmq-server/issues/12816
3213+
resource_alarm_send_many(Config) ->
3214+
QName = atom_to_binary(?FUNCTION_NAME),
3215+
Ch = rabbit_ct_client_helpers:open_channel(Config),
3216+
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}),
3217+
Address = rabbitmq_amqp_address:queue(QName),
3218+
OpnConf = connection_config(Config),
3219+
{ok, Connection} = amqp10_client:open_connection(OpnConf),
3220+
{ok, Session} = amqp10_client:begin_session_sync(Connection),
3221+
3222+
%% Send many messages while a memory alarm kicks in.
3223+
%% Our expectations are:
3224+
%% 1. At some point, our client's remote-incoming-window should be exceeded because
3225+
%% RabbitMQ sets its incoming-window to 0 when the alarm kicks in.
3226+
%% 2. No crash.
3227+
{Pid, Ref} = spawn_monitor(?MODULE,
3228+
send_until_remote_incoming_window_exceeded,
3229+
[Session, Address]),
3230+
DefaultWatermark = rpc(Config, vm_memory_monitor, get_vm_memory_high_watermark, []),
3231+
ok = rpc(Config, vm_memory_monitor, set_vm_memory_high_watermark, [0]),
3232+
receive {'DOWN', Ref, process, Pid, Reason} ->
3233+
?assertEqual(normal, Reason)
3234+
after 30_000 ->
3235+
ct:fail(send_timeout)
3236+
end,
3237+
3238+
%% Clear memory alarm.
3239+
ok = rpc(Config, vm_memory_monitor, set_vm_memory_high_watermark, [DefaultWatermark]),
3240+
timer:sleep(100),
3241+
3242+
ok = end_session_sync(Session),
3243+
ok = amqp10_client:close_connection(Connection),
3244+
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
3245+
ok = rabbit_ct_client_helpers:close_channel(Ch).
3246+
32103247
auth_attempt_metrics(Config) ->
32113248
open_and_close_connection(Config),
32123249
[Attempt1] = rpc(Config, rabbit_core_metrics, get_auth_attempts, []),
@@ -6286,6 +6323,28 @@ count_received_messages0(Receiver, Count) ->
62866323
Count
62876324
end.
62886325

6326+
send_until_remote_incoming_window_exceeded(Session, Address) ->
6327+
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address, settled),
6328+
ok = wait_for_credit(Sender),
6329+
ok = send_until_remote_incoming_window_exceeded0(Sender, 100_000),
6330+
ok = amqp10_client:detach_link(Sender).
6331+
6332+
send_until_remote_incoming_window_exceeded0(_Sender, 0) ->
6333+
ct:fail(remote_incoming_window_never_exceeded);
6334+
send_until_remote_incoming_window_exceeded0(Sender, Left) ->
6335+
Bin = integer_to_binary(Left),
6336+
Msg = amqp10_msg:new(Bin, Bin, true),
6337+
case amqp10_client:send_msg(Sender, Msg) of
6338+
ok ->
6339+
send_until_remote_incoming_window_exceeded0(Sender, Left - 1);
6340+
{error, insufficient_credit} ->
6341+
ok = wait_for_credit(Sender),
6342+
send_until_remote_incoming_window_exceeded0(Sender, Left);
6343+
{error, remote_incoming_window_exceeded = Reason} ->
6344+
ct:pal("~s: ~b messages left", [Reason, Left]),
6345+
ok
6346+
end.
6347+
62896348
assert_link_credit_runs_out(_Sender, 0) ->
62906349
ct:fail(sufficient_link_credit);
62916350
assert_link_credit_runs_out(Sender, Left) ->

0 commit comments

Comments
 (0)