Skip to content

Commit 4da1431

Browse files
Merge pull request #1647 from rabbitmq/cancel-sync-slaves
Cancel synchronisation when memory alarm is triggered
2 parents ddbc864 + 4f762b9 commit 4da1431

File tree

2 files changed

+18
-6
lines changed

2 files changed

+18
-6
lines changed

src/rabbit_mirror_queue_master.erl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,8 @@ sync_mirrors(HandleInfo, EmitStats,
165165
S = fun(BQSN) -> State#state{backing_queue_state = BQSN} end,
166166
case rabbit_mirror_queue_sync:master_go(
167167
Syncer, Ref, Log, HandleInfo, EmitStats, SyncBatchSize, BQ, BQS) of
168+
{cancelled, BQS1} -> Log(" synchronisation cancelled ", []),
169+
{ok, S(BQS1)};
168170
{shutdown, R, BQS1} -> {stop, R, S(BQS1)};
169171
{sync_died, R, BQS1} -> Log("~p", [R]),
170172
{ok, S(BQS1)};

src/rabbit_mirror_queue_sync.erl

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -150,10 +150,20 @@ master_send_receive(SyncMsg, NewAcc, Syncer, Ref, Parent) ->
150150

151151
master_done({Syncer, Ref, _Log, _HandleInfo, _EmitStats, Parent}, BQS) ->
152152
receive
153-
{next, Ref} -> stop_syncer(Syncer, {done, Ref}),
154-
{ok, BQS};
155-
{'EXIT', Parent, Reason} -> {shutdown, Reason, BQS};
156-
{'EXIT', Syncer, Reason} -> {sync_died, Reason, BQS}
153+
{'$gen_call', From,
154+
cancel_sync_mirrors} ->
155+
stop_syncer(Syncer, {cancel, Ref}),
156+
gen_server2:reply(From, ok),
157+
{cancelled, BQS};
158+
{cancelled, Ref} ->
159+
{cancelled, BQS};
160+
{next, Ref} ->
161+
stop_syncer(Syncer, {done, Ref}),
162+
{ok, BQS};
163+
{'EXIT', Parent, Reason} ->
164+
{shutdown, Reason, BQS};
165+
{'EXIT', Syncer, Reason} ->
166+
{sync_died, Reason, BQS}
157167
end.
158168

159169
stop_syncer(Syncer, Msg) ->
@@ -230,7 +240,7 @@ syncer_check_resources(Ref, MPid, SPids) ->
230240
syncer_loop(Ref, MPid, SPids);
231241
true ->
232242
case wait_for_resources(Ref, SPids) of
233-
cancel -> ok;
243+
cancel -> MPid ! {cancelled, Ref};
234244
SPids1 -> MPid ! {next, Ref},
235245
syncer_loop(Ref, MPid, SPids1)
236246
end
@@ -240,7 +250,7 @@ syncer_loop(Ref, MPid, SPids) ->
240250
receive
241251
{conserve_resources, memory, true} ->
242252
case wait_for_resources(Ref, SPids) of
243-
cancel -> ok;
253+
cancel -> MPid ! {cancelled, Ref};
244254
SPids1 -> syncer_loop(Ref, MPid, SPids1)
245255
end;
246256
{conserve_resources, _, _} ->

0 commit comments

Comments
 (0)