Skip to content

Commit 0b2d8d7

Browse files
committed
Cancel synchronisation when memory alarm is triggered
The cancellation used to block if the memory alarm was set on any of the slaves during sync. This patch solves it. [#158828073]
1 parent 7c941af commit 0b2d8d7

File tree

2 files changed

+19
-7
lines changed

2 files changed

+19
-7
lines changed

src/rabbit_mirror_queue_master.erl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,8 @@ sync_mirrors(HandleInfo, EmitStats,
165165
S = fun(BQSN) -> State#state{backing_queue_state = BQSN} end,
166166
case rabbit_mirror_queue_sync:master_go(
167167
Syncer, Ref, Log, HandleInfo, EmitStats, SyncBatchSize, BQ, BQS) of
168+
{cancelled, BQS1} -> Log(" synchronisation cancelled ", []),
169+
{ok, S(BQS1)};
168170
{shutdown, R, BQS1} -> {stop, R, S(BQS1)};
169171
{sync_died, R, BQS1} -> Log("~p", [R]),
170172
{ok, S(BQS1)};

src/rabbit_mirror_queue_sync.erl

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -148,12 +148,22 @@ master_send_receive(SyncMsg, NewAcc, Syncer, Ref, Parent) ->
148148
{'EXIT', Syncer, Reason} -> {stop, {sync_died, Reason}}
149149
end.
150150

151-
master_done({Syncer, Ref, _Log, _HandleInfo, _EmitStats, Parent}, BQS) ->
151+
master_done({Syncer, Ref, _Log, _HandleInfo, _EmitStats, Parent} = I, BQS) ->
152152
receive
153-
{next, Ref} -> stop_syncer(Syncer, {done, Ref}),
154-
{ok, BQS};
155-
{'EXIT', Parent, Reason} -> {shutdown, Reason, BQS};
156-
{'EXIT', Syncer, Reason} -> {sync_died, Reason, BQS}
153+
{'$gen_call', From,
154+
cancel_sync_mirrors} ->
155+
stop_syncer(Syncer, {cancel, Ref}),
156+
gen_server2:reply(From, ok),
157+
{cancelled, BQS};
158+
{cancelled, Ref} ->
159+
{cancelled, BQS};
160+
{next, Ref} ->
161+
stop_syncer(Syncer, {done, Ref}),
162+
{ok, BQS};
163+
{'EXIT', Parent, Reason} ->
164+
{shutdown, Reason, BQS};
165+
{'EXIT', Syncer, Reason} ->
166+
{sync_died, Reason, BQS}
157167
end.
158168

159169
stop_syncer(Syncer, Msg) ->
@@ -230,7 +240,7 @@ syncer_check_resources(Ref, MPid, SPids) ->
230240
syncer_loop(Ref, MPid, SPids);
231241
true ->
232242
case wait_for_resources(Ref, SPids) of
233-
cancel -> ok;
243+
cancel -> MPid ! {cancelled, Ref};
234244
SPids1 -> MPid ! {next, Ref},
235245
syncer_loop(Ref, MPid, SPids1)
236246
end
@@ -240,7 +250,7 @@ syncer_loop(Ref, MPid, SPids) ->
240250
receive
241251
{conserve_resources, memory, true} ->
242252
case wait_for_resources(Ref, SPids) of
243-
cancel -> ok;
253+
cancel -> MPid ! {cancelled, Ref};
244254
SPids1 -> syncer_loop(Ref, MPid, SPids1)
245255
end;
246256
{conserve_resources, _, _} ->

0 commit comments

Comments
 (0)