Skip to content

Commit d0d901f

Browse files
authored
Merge pull request #2504 from rabbitmq/qq-credit-mode
Use correct credit mode x-credit
2 parents 4ca56d6 + 8ff5273 commit d0d901f

File tree

3 files changed

+45
-47
lines changed

3 files changed

+45
-47
lines changed

deps/rabbit/src/rabbit_fifo_client.erl

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
init/2,
1616
init/3,
1717
init/5,
18-
checkout/4,
1918
checkout/5,
2019
cancel_checkout/2,
2120
enqueue/2,
@@ -349,27 +348,6 @@ discard(ConsumerTag, [_|_] = MsgIds,
349348
end, {[], [], MsgIds}, Unsent0),
350349
{State0#state{unsent_commands = Unsent}, []}.
351350

352-
353-
%% @doc Register with the rabbit_fifo queue to "checkout" messages as they
354-
%% become available.
355-
%%
356-
%% This is a synchronous call. I.e. the call will block until the command
357-
%% has been accepted by the ra process or it times out.
358-
%%
359-
%% @param ConsumerTag a unique tag to identify this particular consumer.
360-
%% @param NumUnsettled the maximum number of in-flight messages. Once this
361-
%% number of messages has been received but not settled no further messages
362-
%% will be delivered to the consumer.
363-
%% @param State The {@module} state.
364-
%%
365-
%% @returns `{ok, State}' or `{error | timeout, term()}'
366-
-spec checkout(rabbit_fifo:consumer_tag(), NumUnsettled :: non_neg_integer(),
367-
rabbit_fifo:consumer_meta(),
368-
state()) -> {ok, state()} | {error | timeout, term()}.
369-
checkout(ConsumerTag, NumUnsettled, ConsumerInfo, State0)
370-
when is_map(ConsumerInfo) ->
371-
checkout(ConsumerTag, NumUnsettled, get_credit_mode(ConsumerInfo), ConsumerInfo, State0).
372-
373351
%% @doc Register with the rabbit_fifo queue to "checkout" messages as they
374352
%% become available.
375353
%%
@@ -908,13 +886,3 @@ find_leader([Server | Servers]) ->
908886

909887
qref({Ref, _}) -> Ref;
910888
qref(Ref) -> Ref.
911-
912-
get_credit_mode(#{args := Args}) ->
913-
case rabbit_misc:table_lookup(Args, <<"x-credit">>) of
914-
{_Key, Value} ->
915-
Value;
916-
_ ->
917-
simple_prefetch
918-
end;
919-
get_credit_mode(_) ->
920-
simple_prefetch.

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -667,20 +667,33 @@ consume(Q, Spec, QState0) when ?amqqueue_is_quorum(Q) ->
667667
ConsumerTag = quorum_ctag(ConsumerTag0),
668668
%% A prefetch count of 0 means no limitation,
669669
%% let's make it into something large for ra
670-
Prefetch = case ConsumerPrefetchCount of
671-
0 -> 2000;
672-
Other -> Other
673-
end,
670+
Prefetch0 = case ConsumerPrefetchCount of
671+
0 -> 2000;
672+
Other -> Other
673+
end,
674674
%% consumer info is used to describe the consumer properties
675675
AckRequired = not NoAck,
676676
ConsumerMeta = #{ack => AckRequired,
677677
prefetch => ConsumerPrefetchCount,
678678
args => Args,
679679
username => ActingUser},
680-
{ok, QState} = rabbit_fifo_client:checkout(ConsumerTag,
681-
Prefetch,
682-
ConsumerMeta,
683-
QState0),
680+
681+
{CreditMode, Credit, Drain} = parse_credit_args(Prefetch0, Args),
682+
%% if the mode is credited we should send a separate credit command
683+
%% after checkout and give 0 credits initally
684+
Prefetch = case CreditMode of
685+
credited -> 0;
686+
simple_prefetch -> Prefetch0
687+
end,
688+
{ok, QState1} = rabbit_fifo_client:checkout(ConsumerTag, Prefetch,
689+
CreditMode, ConsumerMeta,
690+
QState0),
691+
QState = case CreditMode of
692+
credited when Credit > 0 ->
693+
rabbit_fifo_client:credit(ConsumerTag, Credit, Drain,
694+
QState1);
695+
_ -> QState1
696+
end,
684697
case ra:local_query(QPid,
685698
fun rabbit_fifo:query_single_active_consumer/1) of
686699
{ok, {_, SacResult}, _} ->
@@ -1494,3 +1507,17 @@ overflow(<<"reject-publish-dlx">> = V, Def, QName) ->
14941507
rabbit_log:warning("Invalid overflow strategy ~p for quorum queue: ~p",
14951508
[V, rabbit_misc:rs(QName)]),
14961509
Def.
1510+
1511+
parse_credit_args(Default, Args) ->
1512+
case rabbit_misc:table_lookup(Args, <<"x-credit">>) of
1513+
{table, T} ->
1514+
case {rabbit_misc:table_lookup(T, <<"credit">>),
1515+
rabbit_misc:table_lookup(T, <<"drain">>)} of
1516+
{{long, C}, {bool, D}} ->
1517+
{credited, C, D};
1518+
_ ->
1519+
{simple_prefetch, Default, false}
1520+
end;
1521+
undefined ->
1522+
{simple_prefetch, Default, false}
1523+
end.

deps/rabbit/test/rabbit_fifo_int_SUITE.erl

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,8 @@ basics(Config) ->
8686
CustomerTag = UId,
8787
ok = start_cluster(ClusterName, [ServerId]),
8888
FState0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
89-
{ok, FState1} = rabbit_fifo_client:checkout(CustomerTag, 1, #{}, FState0),
89+
{ok, FState1} = rabbit_fifo_client:checkout(CustomerTag, 1, simple_prefetch,
90+
#{}, FState0),
9091

9192
ra_log_wal:force_roll_over(ra_log_wal),
9293
% create segment the segment will trigger a snapshot
@@ -179,7 +180,7 @@ duplicate_delivery(Config) ->
179180
ServerId = ?config(node_id, Config),
180181
ok = start_cluster(ClusterName, [ServerId]),
181182
F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
182-
{ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, #{}, F0),
183+
{ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, simple_prefetch, #{}, F0),
183184
{ok, F2} = rabbit_fifo_client:enqueue(corr1, msg1, F1),
184185
Fun = fun Loop(S0) ->
185186
receive
@@ -214,7 +215,7 @@ usage(Config) ->
214215
ServerId = ?config(node_id, Config),
215216
ok = start_cluster(ClusterName, [ServerId]),
216217
F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
217-
{ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, #{}, F0),
218+
{ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, simple_prefetch, #{}, F0),
218219
{ok, F2} = rabbit_fifo_client:enqueue(corr1, msg1, F1),
219220
{ok, F3} = rabbit_fifo_client:enqueue(corr2, msg2, F2),
220221
{_, _, _} = process_ra_events(receive_ra_events(2, 2), F3),
@@ -267,7 +268,7 @@ detects_lost_delivery(Config) ->
267268
F000 = rabbit_fifo_client:init(ClusterName, [ServerId]),
268269
{ok, F00} = rabbit_fifo_client:enqueue(msg1, F000),
269270
{_, _, F0} = process_ra_events(receive_ra_events(1, 0), F00),
270-
{ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, #{}, F0),
271+
{ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, simple_prefetch, #{}, F0),
271272
{ok, F2} = rabbit_fifo_client:enqueue(msg2, F1),
272273
{ok, F3} = rabbit_fifo_client:enqueue(msg3, F2),
273274
% lose first delivery
@@ -297,6 +298,7 @@ returns_after_down(Config) ->
297298
_Pid = spawn(fun () ->
298299
F = rabbit_fifo_client:init(ClusterName, [ServerId]),
299300
{ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 10,
301+
simple_prefetch,
300302
#{}, F),
301303
Self ! checkout_done
302304
end),
@@ -376,7 +378,8 @@ discard(Config) ->
376378
_ = ra:members(ServerId),
377379

378380
F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
379-
{ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, #{}, F0),
381+
{ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10,
382+
simple_prefetch, #{}, F0),
380383
{ok, F2} = rabbit_fifo_client:enqueue(msg1, F1),
381384
F3 = discard_next_delivery(F2, 5000),
382385
{empty, _F4} = rabbit_fifo_client:dequeue(<<"tag1">>, settled, F3),
@@ -397,7 +400,7 @@ cancel_checkout(Config) ->
397400
ok = start_cluster(ClusterName, [ServerId]),
398401
F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4),
399402
{ok, F1} = rabbit_fifo_client:enqueue(m1, F0),
400-
{ok, F2} = rabbit_fifo_client:checkout(<<"tag">>, 10, #{}, F1),
403+
{ok, F2} = rabbit_fifo_client:checkout(<<"tag">>, 10, simple_prefetch, #{}, F1),
401404
{_, _, F3} = process_ra_events(receive_ra_events(1, 1), F2, [], [], fun (_, S) -> S end),
402405
{ok, F4} = rabbit_fifo_client:cancel_checkout(<<"tag">>, F3),
403406
{F5, _} = rabbit_fifo_client:return(<<"tag">>, [0], F4),
@@ -490,7 +493,7 @@ test_queries(Config) ->
490493
exit(ready_timeout)
491494
end,
492495
F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4),
493-
{ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 1, #{}, F0),
496+
{ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 1, simple_prefetch, #{}, F0),
494497
{ok, {_, Ready}, _} = ra:local_query(ServerId,
495498
fun rabbit_fifo:query_messages_ready/1),
496499
?assertEqual(1, Ready),

0 commit comments

Comments
 (0)