Skip to content

Commit a6996ab

Browse files
committed
Fix error insufficient_credit for AMQP 1.0 shovel
WHY: Shovelling from RabbitMQ to Azure Service Bus and Azure Event Hub fails. Reported in https://discord.com/channels/1092487794984755311/1092487794984755314/1169894510743011430 Reproduction steps: 1. Follow https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-integrate-with-rabbitmq 2. Publish messages to RabbitMQ: ``` java -jar target/perf-test.jar -x 1 -y 0 -u azure -p -C 100000 -s 1 -c 100000 ``` Prior to this commit, after a few seconds and after around 20k messages arrived in Azure, RabbitMQ errored and logged: ``` {function_clause, [{amqp10_client_connection,close_sent, [info, {'EXIT',<0.949.0>, {{badmatch,{error,insufficient_credit}}, [{rabbit_amqp10_shovel,forward,4, [{file,"rabbit_amqp10_shovel.erl"}, {line,334}]}, {rabbit_shovel_worker,handle_info,2, [{file,"rabbit_shovel_worker.erl"}, {line,101}]}, {gen_server2,handle_msg,2, [{file,"gen_server2.erl"},{line,1056}]}, {proc_lib,init_p_do_apply,3, [{file,"proc_lib.erl"},{line,241}]}]}}, ``` After this commit, all 100k messages get shovelled to Azure Service Bus. HOW: 1. Fix link credit accounting in Erlang AMQP 1.0 client library. For each message being published, link credit must be decreased by 1 instead of being increased by 1. 2. If the shovel plugin runs out of credits, it must wait until the receiver (Azure Service Bus) grants more credits to RabbitMQ. Note that the solution in this commit is rather a naive quick fix for one obvious bug. AMQP 1.0 integration between RabbitMQ and Azure Service Bus is not tested and not guaranteed at this point in time. More work will be needed in the future, some work is done as part of #9022
1 parent d1940c9 commit a6996ab

File tree

3 files changed

+35
-16
lines changed

3 files changed

+35
-16
lines changed

deps/amqp10_client/src/amqp10_client_session.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -955,7 +955,7 @@ update_link(Link = #link{output_handle = OutHandle},
955955
State#state{links = Links#{OutHandle => Link}}.
956956

957957
incr_link_counters(#link{link_credit = LC, delivery_count = DC} = Link) ->
958-
Link#link{delivery_count = DC+1, link_credit = LC+1}.
958+
Link#link{delivery_count = DC+1, link_credit = LC-1}.
959959

960960
append_partial_transfer(Transfer, Payload,
961961
#link{partial_transfers = undefined} = Link) ->

deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
-import(rabbit_data_coercion, [to_binary/1]).
3939

4040
-define(INFO(Text, Args), rabbit_log_shovel:info(Text, Args)).
41-
-define(LINK_CREDIT_TIMEOUT, 5000).
41+
-define(LINK_CREDIT_TIMEOUT, 20_000).
4242

4343
-type state() :: rabbit_shovel_behaviour:state().
4444
-type uri() :: rabbit_shovel_behaviour:uri().
@@ -173,7 +173,8 @@ dest_endpoint(#{shovel_type := dynamic,
173173
dest := #{target_address := Addr}}) ->
174174
[{dest_address, Addr}].
175175

176-
-spec handle_source(Msg :: any(), state()) -> not_handled | state().
176+
-spec handle_source(Msg :: any(), state()) ->
177+
not_handled | state() | {stop, any()}.
177178
handle_source({amqp10_msg, _LinkRef, Msg}, State) ->
178179
Tag = amqp10_msg:delivery_id(Msg),
179180
Payload = amqp10_msg:body_bin(Msg),
@@ -312,7 +313,8 @@ status(_) ->
312313
ignore.
313314

314315
-spec forward(Tag :: tag(), Props :: #{atom() => any()},
315-
Payload :: binary(), state()) -> state().
316+
Payload :: binary(), state()) ->
317+
state() | {stop, any()}.
316318
forward(_Tag, _Props, _Payload,
317319
#{source := #{remaining_unacked := 0}} = State) ->
318320
State;
@@ -331,17 +333,33 @@ forward(Tag, Props, Payload,
331333
Msg = add_timestamp_header(
332334
State, set_message_properties(
333335
Props, add_forward_headers(State, Msg0))),
334-
ok = amqp10_client:send_msg(Link, Msg),
335-
rabbit_shovel_behaviour:decr_remaining_unacked(
336-
case AckMode of
337-
no_ack ->
338-
rabbit_shovel_behaviour:decr_remaining(1, State);
339-
on_confirm ->
340-
State#{dest => Dst#{unacked => Unacked#{OutTag => Tag}}};
341-
on_publish ->
342-
State1 = rabbit_shovel_behaviour:ack(Tag, false, State),
343-
rabbit_shovel_behaviour:decr_remaining(1, State1)
344-
end).
336+
case send_msg(Link, Msg) of
337+
ok ->
338+
rabbit_shovel_behaviour:decr_remaining_unacked(
339+
case AckMode of
340+
no_ack ->
341+
rabbit_shovel_behaviour:decr_remaining(1, State);
342+
on_confirm ->
343+
State#{dest => Dst#{unacked => Unacked#{OutTag => Tag}}};
344+
on_publish ->
345+
State1 = rabbit_shovel_behaviour:ack(Tag, false, State),
346+
rabbit_shovel_behaviour:decr_remaining(1, State1)
347+
end);
348+
Stop ->
349+
Stop
350+
end.
351+
352+
send_msg(Link, Msg) ->
353+
case amqp10_client:send_msg(Link, Msg) of
354+
ok ->
355+
ok;
356+
{error, insufficient_credit} ->
357+
receive {amqp10_event, {link, Link, credited}} ->
358+
ok = amqp10_client:send_msg(Link, Msg)
359+
after ?LINK_CREDIT_TIMEOUT ->
360+
{stop, credited_timeout}
361+
end
362+
end.
345363

346364
new_message(Tag, Payload, #{ack_mode := AckMode,
347365
dest := #{properties := Props,

deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,8 @@ source_endpoint(#{source := #{module := Mod}} = State) ->
140140
dest_endpoint(#{dest := #{module := Mod}} = State) ->
141141
Mod:dest_endpoint(State).
142142

143-
-spec forward(tag(), #{atom() => any()}, binary(), state()) -> state().
143+
-spec forward(tag(), #{atom() => any()}, binary(), state()) ->
144+
state() | {stop, any()}.
144145
forward(Tag, Props, Payload, #{dest := #{module := Mod}} = State) ->
145146
Mod:forward(Tag, Props, Payload, State).
146147

0 commit comments

Comments
 (0)