Skip to content

Commit 7f84083

Browse files
dumbbellgerhard
authored andcommitted
Run garbage collection in GM every 250ms
In high throughput scenarios, e.g. `basic.reject` or `basic.nack`, messages which belong to a mirrored queue and are replicated within a GM group, are quickly promoted to the old heap. This means that garbage collection happens only when the Erlang VM is under memory pressure, which might be too late. When a process is under pressure, garbage collection slows it down even further, to the point of RabbitMQ nodes running out of memory and crashing. To avoid this scenario, We want the GM process to garbage collect binaries regularly, i.e. every 250ms. The variable queue does the same for a similar reason: #289 Initially, we wanted to use the number of messages as the trigger for garbage collection, but we soon discovered that different workloads (e.g. small vs large messages) would result in unpredictable and sub-optimal GC schedules. Before setting `fullsweep_after` to 0, memory usage was 2x higher (400MB vs 200MB) and throughput was 0.1x lower (45k vs 50k). With this `spawn_opt`, there is a single heap generation resulting in a quicker GC: http://erlang.org/doc/man/erlang.html#spawn_opt-3 The RabbitMQ deployment that this was tested against: * AWS, c4.2xlarge, bosh-aws-xen-hvm-ubuntu-trusty-go_agent 3421.11 * 3 RabbitMQ nodes running OTP 20.0.1 * 3 durable & auto-delete queues with 3 replicas each * each queue master was defined on a different RabbitMQ node * 1 consumer per queue with QOS 100 * 100 durable messages @ 1KiB each * `basic.reject` operations ``` | Node | Message throughput | Memory usage | | ------ | -------------------- | -------------- | | rmq0 | 12K - 20K msg/s | 400 - 900 MB | | rmq1 | 12K - 20K msg/s | 500 - 1000 MB | | rmq2 | 12K - 20K msg/s | 500 - 800 MB | ``` [#148892851] Signed-off-by: Gerhard Lazu <[email protected]>
1 parent 5f03dcc commit 7f84083

File tree

1 file changed

+22
-7
lines changed

1 file changed

+22
-7
lines changed

src/gm.erl

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -395,9 +395,8 @@
395395

396396
-define(GROUP_TABLE, gm_group).
397397
-define(MAX_BUFFER_SIZE, 100000000). %% 100MB
398-
-define(HIBERNATE_AFTER_MIN, 1000).
399-
-define(DESIRED_HIBERNATE, 10000).
400398
-define(BROADCAST_TIMER, 25).
399+
-define(FORCE_GC_TIMER, 250).
401400
-define(VERSION_START, 0).
402401
-define(SETS, ordsets).
403402
-define(DICT, orddict).
@@ -416,6 +415,7 @@
416415
broadcast_buffer,
417416
broadcast_buffer_sz,
418417
broadcast_timer,
418+
force_gc_timer,
419419
txn_executor,
420420
shutting_down
421421
}).
@@ -508,7 +508,8 @@ table_definitions() ->
508508
[{Name, [?TABLE_MATCH | Attributes]}].
509509

510510
start_link(GroupName, Module, Args, TxnFun) ->
511-
gen_server2:start_link(?MODULE, [GroupName, Module, Args, TxnFun], []).
511+
gen_server2:start_link(?MODULE, [GroupName, Module, Args, TxnFun],
512+
[{spawn_opt, [{fullsweep_after, 0}]}]).
512513

513514
leave(Server) ->
514515
gen_server2:cast(Server, leave).
@@ -551,6 +552,7 @@ init([GroupName, Module, Args, TxnFun]) ->
551552
broadcast_buffer = [],
552553
broadcast_buffer_sz = 0,
553554
broadcast_timer = undefined,
555+
force_gc_timer = undefined,
554556
txn_executor = TxnFun,
555557
shutting_down = false }}.
556558

@@ -707,6 +709,10 @@ handle_cast(leave, State) ->
707709
{stop, normal, State}.
708710

709711

712+
handle_info(force_gc, State) ->
713+
garbage_collect(),
714+
noreply(State #state { force_gc_timer = undefined });
715+
710716
handle_info(flush, State) ->
711717
noreply(
712718
flush_broadcast_buffer(State #state { broadcast_timer = undefined }));
@@ -882,14 +888,24 @@ handle_msg({activity, _NotLeft, _Activity}, State) ->
882888

883889

884890
noreply(State) ->
885-
{noreply, ensure_broadcast_timer(State), flush_timeout(State)}.
891+
{noreply, ensure_timers(State), flush_timeout(State)}.
886892

887893
reply(Reply, State) ->
888-
{reply, Reply, ensure_broadcast_timer(State), flush_timeout(State)}.
894+
{reply, Reply, ensure_timers(State), flush_timeout(State)}.
895+
896+
ensure_timers(State) ->
897+
ensure_force_gc_timer(ensure_broadcast_timer(State)).
889898

890899
flush_timeout(#state{broadcast_buffer = []}) -> infinity;
891900
flush_timeout(_) -> 0.
892901

902+
ensure_force_gc_timer(State = #state { force_gc_timer = TRef })
903+
when is_reference(TRef) ->
904+
State;
905+
ensure_force_gc_timer(State = #state { force_gc_timer = undefined }) ->
906+
TRef = erlang:send_after(?FORCE_GC_TIMER, self(), force_gc),
907+
State #state { force_gc_timer = TRef }.
908+
893909
ensure_broadcast_timer(State = #state { broadcast_buffer = [],
894910
broadcast_timer = undefined }) ->
895911
State;
@@ -957,8 +973,7 @@ flush_broadcast_buffer(State = #state { self = Self,
957973
end, Self, MembersState),
958974
State #state { members_state = MembersState1,
959975
broadcast_buffer = [],
960-
broadcast_buffer_sz = 0}.
961-
976+
broadcast_buffer_sz = 0 }.
962977

963978
%% ---------------------------------------------------------------------------
964979
%% View construction and inspection

0 commit comments

Comments
 (0)