Skip to content

Commit 1ac9ea3

Browse files
dumbbellgerhard
authored andcommitted
Force GM garbage collection every 250ms
In high throughput scenarios, e.g. basic.reject or basic.nack, messages which belong to a mirrored queue and replicated in a GM group are promoted to the old heap quickly. This means that garbage collection happens only when the VM is under memory pressure, which might be too late. When the system is under pressure, garbage collection slows it down even further, to the point that RabbitMQ nodes can run out of memory and crash. We want the GM process to garbage collect binaries a lot more often, i.e. every 250ms. There is precedence for explicit GC in the variable queue, which we do for the same reason. Initially, we wanted to use the number of messages as the trigger for garbage collection, but we soon discovered that different workloads (e.g. small vs large messages) would result in unpredictable and sub-optimal garbage collections. Before setting fullsweep_after to 0, memory usage was 2x higher (400MB vs 200MB) and throughput was 0.1x lower (45k vs 50k). With this setting, there is a single heap generation resulting in a quicker GC. The RabbitMQ deployment that this fix was tested against is: * AWS, c4.2xlarge * 3 nodes of RabbitMQ 3.6.11 with OTP 20.0.1 * 3 durable & auto-delete queues with 3 replicas each * each queue master was defined on a different RabbitMQ node * 1 consumer per queue with QOS 100 * 100 durable messages @ 1KiB each * basic.reject operations | Node | Message throughput | Memory usage | |------|--------------------|--------------| | rmq0 | 20K msg/s | 700 MB | | rmq1 | 14K msg/s | 900 MB | | rmq2 | 15K msg/s | 800 MB | [#148892851] Signed-off-by: Gerhard Lazu <[email protected]>
1 parent 5f03dcc commit 1ac9ea3

File tree

1 file changed

+22
-7
lines changed

1 file changed

+22
-7
lines changed

src/gm.erl

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -395,9 +395,8 @@
395395

396396
-define(GROUP_TABLE, gm_group).
397397
-define(MAX_BUFFER_SIZE, 100000000). %% 100MB
398-
-define(HIBERNATE_AFTER_MIN, 1000).
399-
-define(DESIRED_HIBERNATE, 10000).
400398
-define(BROADCAST_TIMER, 25).
399+
-define(FORCE_GC_TIMER, 250).
401400
-define(VERSION_START, 0).
402401
-define(SETS, ordsets).
403402
-define(DICT, orddict).
@@ -416,6 +415,7 @@
416415
broadcast_buffer,
417416
broadcast_buffer_sz,
418417
broadcast_timer,
418+
force_gc_timer,
419419
txn_executor,
420420
shutting_down
421421
}).
@@ -508,7 +508,8 @@ table_definitions() ->
508508
[{Name, [?TABLE_MATCH | Attributes]}].
509509

510510
start_link(GroupName, Module, Args, TxnFun) ->
511-
gen_server2:start_link(?MODULE, [GroupName, Module, Args, TxnFun], []).
511+
gen_server2:start_link(?MODULE, [GroupName, Module, Args, TxnFun],
512+
[{spawn_opt, [{fullsweep_after, 0}]}]).
512513

513514
leave(Server) ->
514515
gen_server2:cast(Server, leave).
@@ -551,6 +552,7 @@ init([GroupName, Module, Args, TxnFun]) ->
551552
broadcast_buffer = [],
552553
broadcast_buffer_sz = 0,
553554
broadcast_timer = undefined,
555+
force_gc_timer = undefined,
554556
txn_executor = TxnFun,
555557
shutting_down = false }}.
556558

@@ -707,6 +709,10 @@ handle_cast(leave, State) ->
707709
{stop, normal, State}.
708710

709711

712+
handle_info(force_gc, State) ->
713+
garbage_collect(),
714+
noreply(State #state { force_gc_timer = undefined });
715+
710716
handle_info(flush, State) ->
711717
noreply(
712718
flush_broadcast_buffer(State #state { broadcast_timer = undefined }));
@@ -882,14 +888,24 @@ handle_msg({activity, _NotLeft, _Activity}, State) ->
882888

883889

884890
noreply(State) ->
885-
{noreply, ensure_broadcast_timer(State), flush_timeout(State)}.
891+
{noreply, ensure_timers(State), flush_timeout(State)}.
886892

887893
reply(Reply, State) ->
888-
{reply, Reply, ensure_broadcast_timer(State), flush_timeout(State)}.
894+
{reply, Reply, ensure_timers(State), flush_timeout(State)}.
895+
896+
ensure_timers(State) ->
897+
ensure_force_gc_timer(ensure_broadcast_timer(State)).
889898

890899
flush_timeout(#state{broadcast_buffer = []}) -> infinity;
891900
flush_timeout(_) -> 0.
892901

902+
ensure_force_gc_timer(State = #state { force_gc_timer = TRef })
903+
when TRef =/= undefined ->
904+
State;
905+
ensure_force_gc_timer(State = #state { force_gc_timer = undefined }) ->
906+
TRef = erlang:send_after(?FORCE_GC_TIMER, self(), force_gc),
907+
State #state { force_gc_timer = TRef }.
908+
893909
ensure_broadcast_timer(State = #state { broadcast_buffer = [],
894910
broadcast_timer = undefined }) ->
895911
State;
@@ -957,8 +973,7 @@ flush_broadcast_buffer(State = #state { self = Self,
957973
end, Self, MembersState),
958974
State #state { members_state = MembersState1,
959975
broadcast_buffer = [],
960-
broadcast_buffer_sz = 0}.
961-
976+
broadcast_buffer_sz = 0 }.
962977

963978
%% ---------------------------------------------------------------------------
964979
%% View construction and inspection

0 commit comments

Comments
 (0)