Skip to content

Commit d721d33

Browse files
committed
Remove consumer bias & allow queues under max load to drain quickly
Given a queue process under max load, with both publishers & consumers, if consumers are not **always** prioritised over publishers, a queue can take up to 1 day to fully drain. Even without consumer bias, queues can drain fast (i.e. 10 minutes in our case), or slow (i.e. 1 hour or more). To put it differently, this is what a slow drain looks like: ``` ___ <- 2,000,000 messages / \__ / \___ _ _ / \___/ \_____/ \___ / \ |-------------- 1h --------------| ``` And this is what a fast drain looks like: ``` _ <- 1,500,000 messages / \_ / \___ / \ |- 10 min -| ``` We are still trying to understand the reason behind this, but without removing consumer bias, this would **always** happen: ``` ______________ <- 2,000,000 messages / \_______________ / \______________ ________ / \__/ \______ / \ |----------------------------- 1 day ---------------------------------| ``` Other observations worth capturing: ``` | PUBLISHERS | CONSUMERS | READY MESSAGES | PUBLISH MSG/S | CONSUME ACK MSG/S | | ---------- | --------- | -------------- | --------------- | ----------------- | | 3 | 3 | 0 | 22,000 - 23,000 | 22,000 - 23,000 | | 3 | 3 | 1 - 2,000,000 | 5,000 - 8,000 | 7,000 - 11,000 | | 3 | 0 | 1 - 2,000,000 | 21,000 - 25,000 | 0 | | 3 | 0 | 2,000,000 | 5,000 - 15,000 | 0 | ``` * Empty queues are the fastest since messages are delivered straight to consuming channels * With 3 publishing channels, a single queue process gets saturated at 22,000 msg/s. The client that we used for this benchmark would max at 10,000 msg/s, meaning that we needed 3 clients, each with 1 connection & 1 channel to max the queue process. It is possible that a single fast client using 1 connection & 1 channel would achieve a slightly higher throughput, but we didn't measure on this occasion. It's highly unrealistic for a production, high-throughput RabbitMQ deployment to use 1 publishers running 1 connection & 1 channel. If anything, there would be many publishers with many connections & channels. * When a queue process gets saturated, publishing channels & their connections will enter flow state, meaning that the publishing rates will be throttled. This allows the consuming channels to keep up with the publishing ones. * Adding more publishers or consumers slow down publishinig & consuming. The queue process, and ultimately the Erlang VMs (typically 1 per CPU), have more work to do, so it's expected for throughput to suffer. Most relevant properties that we used for this benchmark: ``` | erlang | 19.3.6.2 | | rabbitmq | 3.6.12 | | gcp instance type | n1-standard-4 | | -------------------- | ------------ | | queue | non-durable | | max-length | 2,000,000 | | -------------------- | ------------ | | publishers | 3 | | publisher rate msg/s | 10,000 | | msg size | 1KB | | -------------------- | ------------ | | consumers | 3 | | prefetch | 100 | | multi-ack | every 10 msg | ``` Worth mentioning vm_memory_high_watermark_paging_ratio was set to a really high value so that messages would not be paged to disc. When messages are paged out, all other queue operations are blocked, including all publishes and consumes. More screenshots, RabbitMQ definitions, BOSH & CF manifests can be found on the PR itself. [#151499632]
1 parent 1c81095 commit d721d33

File tree

1 file changed

+5
-13
lines changed

1 file changed

+5
-13
lines changed

src/rabbit_amqqueue_process.erl

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222

2323
-define(SYNC_INTERVAL, 200). %% milliseconds
2424
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
25-
-define(CONSUMER_BIAS_RATIO, 1.1). %% i.e. consume 10% faster
2625

2726
-export([info_keys/0]).
2827

@@ -969,26 +968,26 @@ emit_consumer_deleted(ChPid, ConsumerTag, QName) ->
969968

970969
%%----------------------------------------------------------------------------
971970

972-
prioritise_call(Msg, _From, _Len, State) ->
971+
prioritise_call(Msg, _From, _Len, _State) ->
973972
case Msg of
974973
info -> 9;
975974
{info, _Items} -> 9;
976975
consumers -> 9;
977976
stat -> 7;
978-
{basic_consume, _, _, _, _, _, _, _, _, _} -> consumer_bias(State);
979-
{basic_cancel, _, _, _} -> consumer_bias(State);
977+
{basic_consume, _, _, _, _, _, _, _, _, _} -> 1;
978+
{basic_cancel, _, _, _} -> 1;
980979
_ -> 0
981980
end.
982981

983-
prioritise_cast(Msg, _Len, State) ->
982+
prioritise_cast(Msg, _Len, _State) ->
984983
case Msg of
985984
delete_immediately -> 8;
986985
{set_ram_duration_target, _Duration} -> 8;
987986
{set_maximum_since_use, _Age} -> 8;
988987
{run_backing_queue, _Mod, _Fun} -> 6;
989988
{ack, _AckTags, _ChPid} -> 3; %% [1]
990989
{resume, _ChPid} -> 2;
991-
{notify_sent, _ChPid, _Credit} -> consumer_bias(State);
990+
{notify_sent, _ChPid, _Credit} -> 1;
992991
_ -> 0
993992
end.
994993

@@ -1001,13 +1000,6 @@ prioritise_cast(Msg, _Len, State) ->
10011000
%% about. Finally, we prioritise ack over resume since it should
10021001
%% always reduce memory use.
10031002

1004-
consumer_bias(#q{backing_queue = BQ, backing_queue_state = BQS}) ->
1005-
case BQ:msg_rates(BQS) of
1006-
{0.0, _} -> 0;
1007-
{Ingress, Egress} when Egress / Ingress < ?CONSUMER_BIAS_RATIO -> 1;
1008-
{_, _} -> 0
1009-
end.
1010-
10111003
prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
10121004
case Msg of
10131005
{'DOWN', _, process, DownPid, _} -> 8;

0 commit comments

Comments
 (0)