Skip to content

Commit fcb4d95

Browse files
Merge pull request #9870 from rabbitmq/fix-shovel-credit
Fix error insufficient_credit for AMQP 1.0 shovel
2 parents b2d410b + a6996ab commit fcb4d95

File tree

3 files changed

+35
-16
lines changed

3 files changed

+35
-16
lines changed

deps/amqp10_client/src/amqp10_client_session.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -955,7 +955,7 @@ update_link(Link = #link{output_handle = OutHandle},
955955
State#state{links = Links#{OutHandle => Link}}.
956956

957957
incr_link_counters(#link{link_credit = LC, delivery_count = DC} = Link) ->
958-
Link#link{delivery_count = DC+1, link_credit = LC+1}.
958+
Link#link{delivery_count = DC+1, link_credit = LC-1}.
959959

960960
append_partial_transfer(Transfer, Payload,
961961
#link{partial_transfers = undefined} = Link) ->

deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
-import(rabbit_data_coercion, [to_binary/1]).
3939

4040
-define(INFO(Text, Args), rabbit_log_shovel:info(Text, Args)).
41-
-define(LINK_CREDIT_TIMEOUT, 5000).
41+
-define(LINK_CREDIT_TIMEOUT, 20_000).
4242

4343
-type state() :: rabbit_shovel_behaviour:state().
4444
-type uri() :: rabbit_shovel_behaviour:uri().
@@ -173,7 +173,8 @@ dest_endpoint(#{shovel_type := dynamic,
173173
dest := #{target_address := Addr}}) ->
174174
[{dest_address, Addr}].
175175

176-
-spec handle_source(Msg :: any(), state()) -> not_handled | state().
176+
-spec handle_source(Msg :: any(), state()) ->
177+
not_handled | state() | {stop, any()}.
177178
handle_source({amqp10_msg, _LinkRef, Msg}, State) ->
178179
Tag = amqp10_msg:delivery_id(Msg),
179180
Payload = amqp10_msg:body_bin(Msg),
@@ -312,7 +313,8 @@ status(_) ->
312313
ignore.
313314

314315
-spec forward(Tag :: tag(), Props :: #{atom() => any()},
315-
Payload :: binary(), state()) -> state().
316+
Payload :: binary(), state()) ->
317+
state() | {stop, any()}.
316318
forward(_Tag, _Props, _Payload,
317319
#{source := #{remaining_unacked := 0}} = State) ->
318320
State;
@@ -331,17 +333,33 @@ forward(Tag, Props, Payload,
331333
Msg = add_timestamp_header(
332334
State, set_message_properties(
333335
Props, add_forward_headers(State, Msg0))),
334-
ok = amqp10_client:send_msg(Link, Msg),
335-
rabbit_shovel_behaviour:decr_remaining_unacked(
336-
case AckMode of
337-
no_ack ->
338-
rabbit_shovel_behaviour:decr_remaining(1, State);
339-
on_confirm ->
340-
State#{dest => Dst#{unacked => Unacked#{OutTag => Tag}}};
341-
on_publish ->
342-
State1 = rabbit_shovel_behaviour:ack(Tag, false, State),
343-
rabbit_shovel_behaviour:decr_remaining(1, State1)
344-
end).
336+
case send_msg(Link, Msg) of
337+
ok ->
338+
rabbit_shovel_behaviour:decr_remaining_unacked(
339+
case AckMode of
340+
no_ack ->
341+
rabbit_shovel_behaviour:decr_remaining(1, State);
342+
on_confirm ->
343+
State#{dest => Dst#{unacked => Unacked#{OutTag => Tag}}};
344+
on_publish ->
345+
State1 = rabbit_shovel_behaviour:ack(Tag, false, State),
346+
rabbit_shovel_behaviour:decr_remaining(1, State1)
347+
end);
348+
Stop ->
349+
Stop
350+
end.
351+
352+
send_msg(Link, Msg) ->
353+
case amqp10_client:send_msg(Link, Msg) of
354+
ok ->
355+
ok;
356+
{error, insufficient_credit} ->
357+
receive {amqp10_event, {link, Link, credited}} ->
358+
ok = amqp10_client:send_msg(Link, Msg)
359+
after ?LINK_CREDIT_TIMEOUT ->
360+
{stop, credited_timeout}
361+
end
362+
end.
345363

346364
new_message(Tag, Payload, #{ack_mode := AckMode,
347365
dest := #{properties := Props,

deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,8 @@ source_endpoint(#{source := #{module := Mod}} = State) ->
140140
dest_endpoint(#{dest := #{module := Mod}} = State) ->
141141
Mod:dest_endpoint(State).
142142

143-
-spec forward(tag(), #{atom() => any()}, binary(), state()) -> state().
143+
-spec forward(tag(), #{atom() => any()}, binary(), state()) ->
144+
state() | {stop, any()}.
144145
forward(Tag, Props, Payload, #{dest := #{module := Mod}} = State) ->
145146
Mod:forward(Tag, Props, Payload, State).
146147

0 commit comments

Comments
 (0)