Skip to content

Commit 1972f89

Browse files
Merge pull request #1408 from rabbitmq/rabbitmq-server-1367
Configurable default consumer prefetch for new channels
2 parents 32c48cd + 2879048 commit 1972f89

File tree

2 files changed

+21
-6
lines changed

2 files changed

+21
-6
lines changed

Makefile

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,9 @@ define PROJECT_ENV
124124
%% either "stop_node" or "continue".
125125
%% by default we choose to not terminate the entire node if one
126126
%% vhost had to shut down, see server#1158 and server#1280
127-
{vhost_restart_strategy, continue}
127+
{vhost_restart_strategy, continue},
128+
%% {global, prefetch count}
129+
{default_consumer_prefetch, {false, 0}}
128130
]
129131
endef
130132

src/rabbit_channel.erl

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -386,15 +386,24 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
386386
true -> flow;
387387
false -> noflow
388388
end,
389-
389+
{ok, {Global, Prefetch}} = application:get_env(rabbit, default_consumer_prefetch),
390+
Limiter0 = rabbit_limiter:new(LimiterPid),
391+
Limiter = case {Global, Prefetch} of
392+
{true, 0} ->
393+
rabbit_limiter:unlimit_prefetch(Limiter0);
394+
{true, _} ->
395+
rabbit_limiter:limit_prefetch(Limiter0, Prefetch, 0);
396+
_ ->
397+
Limiter0
398+
end,
390399
State = #ch{state = starting,
391400
protocol = Protocol,
392401
channel = Channel,
393402
reader_pid = ReaderPid,
394403
writer_pid = WriterPid,
395404
conn_pid = ConnPid,
396405
conn_name = ConnName,
397-
limiter = rabbit_limiter:new(LimiterPid),
406+
limiter = Limiter,
398407
tx = none,
399408
next_tag = 1,
400409
unacked_message_q = queue:new(),
@@ -414,7 +423,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
414423
mandatory = dtree:empty(),
415424
capabilities = Capabilities,
416425
trace_state = rabbit_trace:init(VHost),
417-
consumer_prefetch = 0,
426+
consumer_prefetch = Prefetch,
418427
reply_consumer = none,
419428
delivery_flow = Flow,
420429
interceptor_state = undefined},
@@ -1248,8 +1257,12 @@ handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 ->
12481257
"prefetch_size!=0 (~w)", [Size]);
12491258

12501259
handle_method(#'basic.qos'{global = false,
1251-
prefetch_count = PrefetchCount}, _, State) ->
1252-
{reply, #'basic.qos_ok'{}, State#ch{consumer_prefetch = PrefetchCount}};
1260+
prefetch_count = PrefetchCount},
1261+
_, State = #ch{limiter = Limiter}) ->
1262+
%% Ensures that if default was set, it's overriden
1263+
Limiter1 = rabbit_limiter:unlimit_prefetch(Limiter),
1264+
{reply, #'basic.qos_ok'{}, State#ch{consumer_prefetch = PrefetchCount,
1265+
limiter = Limiter1}};
12531266

12541267
handle_method(#'basic.qos'{global = true,
12551268
prefetch_count = 0},

0 commit comments

Comments
 (0)