Skip to content

Commit 82fb30b

Browse files
Merge pull request #1302 from rabbitmq/stable-gm-mem-usage-during-constant-redelivery
Stable GM memory usage during constant redelivery
2 parents 46eb2e5 + 7d0e49c commit 82fb30b

File tree

1 file changed

+24
-10
lines changed

1 file changed

+24
-10
lines changed

src/gm.erl

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -395,9 +395,8 @@
395395

396396
-define(GROUP_TABLE, gm_group).
397397
-define(MAX_BUFFER_SIZE, 100000000). %% 100MB
398-
-define(HIBERNATE_AFTER_MIN, 1000).
399-
-define(DESIRED_HIBERNATE, 10000).
400398
-define(BROADCAST_TIMER, 25).
399+
-define(FORCE_GC_TIMER, 250).
401400
-define(VERSION_START, 0).
402401
-define(SETS, ordsets).
403402
-define(DICT, orddict).
@@ -416,6 +415,7 @@
416415
broadcast_buffer,
417416
broadcast_buffer_sz,
418417
broadcast_timer,
418+
force_gc_timer,
419419
txn_executor,
420420
shutting_down
421421
}).
@@ -508,7 +508,8 @@ table_definitions() ->
508508
[{Name, [?TABLE_MATCH | Attributes]}].
509509

510510
start_link(GroupName, Module, Args, TxnFun) ->
511-
gen_server2:start_link(?MODULE, [GroupName, Module, Args, TxnFun], []).
511+
gen_server2:start_link(?MODULE, [GroupName, Module, Args, TxnFun],
512+
[{spawn_opt, [{fullsweep_after, 0}]}]).
512513

513514
leave(Server) ->
514515
gen_server2:cast(Server, leave).
@@ -551,9 +552,9 @@ init([GroupName, Module, Args, TxnFun]) ->
551552
broadcast_buffer = [],
552553
broadcast_buffer_sz = 0,
553554
broadcast_timer = undefined,
555+
force_gc_timer = undefined,
554556
txn_executor = TxnFun,
555-
shutting_down = false }, hibernate,
556-
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
557+
shutting_down = false }}.
557558

558559

559560
handle_call({confirmed_broadcast, _Msg}, _From,
@@ -708,6 +709,10 @@ handle_cast(leave, State) ->
708709
{stop, normal, State}.
709710

710711

712+
handle_info(force_gc, State) ->
713+
garbage_collect(),
714+
noreply(State #state { force_gc_timer = undefined });
715+
711716
handle_info(flush, State) ->
712717
noreply(
713718
flush_broadcast_buffer(State #state { broadcast_timer = undefined }));
@@ -883,14 +888,24 @@ handle_msg({activity, _NotLeft, _Activity}, State) ->
883888

884889

885890
noreply(State) ->
886-
{noreply, ensure_broadcast_timer(State), flush_timeout(State)}.
891+
{noreply, ensure_timers(State), flush_timeout(State)}.
887892

888893
reply(Reply, State) ->
889-
{reply, Reply, ensure_broadcast_timer(State), flush_timeout(State)}.
894+
{reply, Reply, ensure_timers(State), flush_timeout(State)}.
895+
896+
ensure_timers(State) ->
897+
ensure_force_gc_timer(ensure_broadcast_timer(State)).
890898

891-
flush_timeout(#state{broadcast_buffer = []}) -> hibernate;
899+
flush_timeout(#state{broadcast_buffer = []}) -> infinity;
892900
flush_timeout(_) -> 0.
893901

902+
ensure_force_gc_timer(State = #state { force_gc_timer = TRef })
903+
when is_reference(TRef) ->
904+
State;
905+
ensure_force_gc_timer(State = #state { force_gc_timer = undefined }) ->
906+
TRef = erlang:send_after(?FORCE_GC_TIMER, self(), force_gc),
907+
State #state { force_gc_timer = TRef }.
908+
894909
ensure_broadcast_timer(State = #state { broadcast_buffer = [],
895910
broadcast_timer = undefined }) ->
896911
State;
@@ -958,8 +973,7 @@ flush_broadcast_buffer(State = #state { self = Self,
958973
end, Self, MembersState),
959974
State #state { members_state = MembersState1,
960975
broadcast_buffer = [],
961-
broadcast_buffer_sz = 0}.
962-
976+
broadcast_buffer_sz = 0 }.
963977

964978
%% ---------------------------------------------------------------------------
965979
%% View construction and inspection

0 commit comments

Comments
 (0)