Skip to content

Commit 7c58d23

Browse files
committed
Set a floor of zero for incoming-window
Prior to this commit, when the sending client overshot RabbitMQ's incoming-window (which is allowed in the event of a cluster wide memory or disk alarm), and RabbitMQ sent a FLOW frame to the client, RabbitMQ sent a negative incoming-window field in the FLOW frame causing the following crash in the writer proc: ``` crasher: initial call: rabbit_amqp_writer:init/1 pid: <0.19353.0> registered_name: [] exception error: bad argument in function iolist_size/1 called as iolist_size([<<112,0,0,23,120>>, [82,-15], <<"pÿÿÿü">>,<<"pÿÿÿÿ">>,67, <<112,0,0,23,120>>, "Rª",64,64,64,64]) *** argument 1: not an iodata term in call from amqp10_binary_generator:generate1/1 (amqp10_binary_generator.erl, line 141) in call from amqp10_binary_generator:generate1/1 (amqp10_binary_generator.erl, line 88) in call from amqp10_binary_generator:generate/1 (amqp10_binary_generator.erl, line 79) in call from rabbit_amqp_writer:assemble_frame/3 (rabbit_amqp_writer.erl, line 206) in call from rabbit_amqp_writer:internal_send_command_async/3 (rabbit_amqp_writer.erl, line 189) in call from rabbit_amqp_writer:handle_cast/2 (rabbit_amqp_writer.erl, line 110) in call from gen_server:try_handle_cast/3 (gen_server.erl, line 1121) ``` This commit fixes this crash by maintaning a floor of zero for incoming-window in the FLOW frame. Fixes #12816 (cherry picked from commit 0d34ef6) # Conflicts: # deps/rabbit/test/amqp_client_SUITE.erl
1 parent ab3df7c commit 7c58d23

File tree

3 files changed

+64
-6
lines changed

3 files changed

+64
-6
lines changed

deps/amqp10_client/src/amqp10_client_session.erl

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -470,11 +470,6 @@ mapped({call, From},
470470
#state{remote_incoming_window = Window})
471471
when Window =< 0 ->
472472
{keep_state_and_data, {reply, From, {error, remote_incoming_window_exceeded}}};
473-
mapped({call, From},
474-
{transfer, _Transfer, _Sections},
475-
#state{remote_incoming_window = Window})
476-
when Window =< 0 ->
477-
{keep_state_and_data, {reply, From, {error, remote_incoming_window_exceeded}}};
478473
mapped({call, From = {Pid, _}},
479474
{transfer, #'v1_0.transfer'{handle = {uint, OutHandle},
480475
delivery_tag = {binary, DeliveryTag},

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1952,7 +1952,10 @@ session_flow_fields(Frames, State)
19521952
session_flow_fields(Flow = #'v1_0.flow'{},
19531953
#state{next_outgoing_id = NextOutgoingId,
19541954
next_incoming_id = NextIncomingId,
1955-
incoming_window = IncomingWindow}) ->
1955+
incoming_window = IncomingWindow0}) ->
1956+
%% IncomingWindow0 can be negative when the sending client overshoots our window.
1957+
%% However, we must set a floor of 0 in the FLOW frame because field incoming-window is an uint.
1958+
IncomingWindow = max(0, IncomingWindow0),
19561959
Flow#'v1_0.flow'{
19571960
next_outgoing_id = ?UINT(NextOutgoingId),
19581961
outgoing_window = ?UINT_OUTGOING_WINDOW,

deps/rabbit/test/amqp_client_SUITE.erl

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ groups() ->
9797
detach_requeues_drop_head_classic_queue,
9898
resource_alarm_before_session_begin,
9999
resource_alarm_after_session_begin,
100+
resource_alarm_send_many,
100101
max_message_size_client_to_server,
101102
max_message_size_server_to_client,
102103
global_counters,
@@ -3202,6 +3203,42 @@ resource_alarm_after_session_begin(Config) ->
32023203
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
32033204
ok = rabbit_ct_client_helpers:close_channel(Ch).
32043205

3206+
%% Test case for
3207+
%% https://github.com/rabbitmq/rabbitmq-server/issues/12816
3208+
resource_alarm_send_many(Config) ->
3209+
QName = atom_to_binary(?FUNCTION_NAME),
3210+
Ch = rabbit_ct_client_helpers:open_channel(Config),
3211+
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}),
3212+
Address = rabbitmq_amqp_address:queue(QName),
3213+
OpnConf = connection_config(Config),
3214+
{ok, Connection} = amqp10_client:open_connection(OpnConf),
3215+
{ok, Session} = amqp10_client:begin_session_sync(Connection),
3216+
3217+
%% Send many messages while a memory alarm kicks in.
3218+
%% Our expectations are:
3219+
%% 1. At some point, our client's remote-incoming-window should be exceeded because
3220+
%% RabbitMQ sets its incoming-window to 0 when the alarm kicks in.
3221+
%% 2. No crash.
3222+
{Pid, Ref} = spawn_monitor(?MODULE,
3223+
send_until_remote_incoming_window_exceeded,
3224+
[Session, Address]),
3225+
DefaultWatermark = rpc(Config, vm_memory_monitor, get_vm_memory_high_watermark, []),
3226+
ok = rpc(Config, vm_memory_monitor, set_vm_memory_high_watermark, [0]),
3227+
receive {'DOWN', Ref, process, Pid, Reason} ->
3228+
?assertEqual(normal, Reason)
3229+
after 30_000 ->
3230+
ct:fail(send_timeout)
3231+
end,
3232+
3233+
%% Clear memory alarm.
3234+
ok = rpc(Config, vm_memory_monitor, set_vm_memory_high_watermark, [DefaultWatermark]),
3235+
timer:sleep(100),
3236+
3237+
ok = end_session_sync(Session),
3238+
ok = amqp10_client:close_connection(Connection),
3239+
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
3240+
ok = rabbit_ct_client_helpers:close_channel(Ch).
3241+
32053242
auth_attempt_metrics(Config) ->
32063243
open_and_close_connection(Config),
32073244
[Attempt1] = rpc(Config, rabbit_core_metrics, get_auth_attempts, []),
@@ -6119,6 +6156,7 @@ count_received_messages0(Receiver, Count) ->
61196156
Count
61206157
end.
61216158

6159+
<<<<<<< HEAD
61226160
send_messages(Sender, Left, Settled) ->
61236161
send_messages(Sender, Left, Settled, <<>>).
61246162

@@ -6143,6 +6181,28 @@ send_messages(Sender, Left, Settled, BodySuffix) ->
61436181
%% So, we must be defensive here and assume that the next amqp10_client:send/2 call might return {error, insufficient_credit}
61446182
%% again causing us then to really wait to receive a credited event (instead of just processing an old credited event).
61456183
send_messages(Sender, Left, Settled, BodySuffix)
6184+
=======
6185+
send_until_remote_incoming_window_exceeded(Session, Address) ->
6186+
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address, settled),
6187+
ok = wait_for_credit(Sender),
6188+
ok = send_until_remote_incoming_window_exceeded0(Sender, 100_000),
6189+
ok = amqp10_client:detach_link(Sender).
6190+
6191+
send_until_remote_incoming_window_exceeded0(_Sender, 0) ->
6192+
ct:fail(remote_incoming_window_never_exceeded);
6193+
send_until_remote_incoming_window_exceeded0(Sender, Left) ->
6194+
Bin = integer_to_binary(Left),
6195+
Msg = amqp10_msg:new(Bin, Bin, true),
6196+
case amqp10_client:send_msg(Sender, Msg) of
6197+
ok ->
6198+
send_until_remote_incoming_window_exceeded0(Sender, Left - 1);
6199+
{error, insufficient_credit} ->
6200+
ok = wait_for_credit(Sender),
6201+
send_until_remote_incoming_window_exceeded0(Sender, Left);
6202+
{error, remote_incoming_window_exceeded = Reason} ->
6203+
ct:pal("~s: ~b messages left", [Reason, Left]),
6204+
ok
6205+
>>>>>>> 0d34ef604 (Set a floor of zero for incoming-window)
61466206
end.
61476207

61486208
assert_link_credit_runs_out(_Sender, 0) ->

0 commit comments

Comments
 (0)