Skip to content

Commit 22add3c

Browse files
Acquire locks when starting mirrored supervisor children
Unlike pg2, pg in Erlang 24 is eventually consistent. So this reintroduces some of the same kind of locking mirrored_supervisor used to rely on implicitly via pg2. Per discussion with @lhoguin. Closes #3260. References #3132, #3154.
1 parent 9a0f4b1 commit 22add3c

File tree

2 files changed

+54
-20
lines changed

2 files changed

+54
-20
lines changed

deps/rabbit_common/src/mirrored_supervisor.erl

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,7 @@ handle_call({init, Overall}, _From,
277277
tx_fun = TxFun,
278278
initial_childspecs = ChildSpecs}) ->
279279
process_flag(trap_exit, true),
280+
LockId = mirrored_supervisor_locks:lock(Group),
280281
ok = pg:join(Group, Overall),
281282
rabbit_log:debug("Mirrored supervisor: initializing, overall supervisor ~p joined group ~p", [Overall, Group]),
282283
Rest = pg:get_members(Group) -- [Overall],
@@ -295,8 +296,9 @@ handle_call({init, Overall}, _From,
295296
Delegate = delegate(Overall),
296297
erlang:monitor(process, Delegate),
297298
State1 = State#state{overall = Overall, delegate = Delegate},
298-
case errors([maybe_start(Group, TxFun, Overall, Delegate, S)
299-
|| S <- ChildSpecs]) of
299+
Results = [maybe_start(Group, TxFun, Overall, Delegate, S) || S <- ChildSpecs],
300+
mirrored_supervisor_locks:unlock(LockId),
301+
case errors(Results) of
300302
[] -> {reply, ok, State1};
301303
Errors -> {stop, {shutdown, Errors}, State1}
302304
end;
@@ -306,21 +308,24 @@ handle_call({start_child, ChildSpec}, _From,
306308
delegate = Delegate,
307309
group = Group,
308310
tx_fun = TxFun}) ->
311+
LockId = mirrored_supervisor_locks:lock(Group),
309312
rabbit_log:debug("Mirrored supervisor: asked to consider starting a child, group: ~p", [Group]),
310-
{reply, case maybe_start(Group, TxFun, Overall, Delegate, ChildSpec) of
311-
already_in_mnesia ->
312-
rabbit_log:debug("Mirrored supervisor: maybe_start for group ~p,"
313-
" overall ~p returned 'record already present'", [Group, Overall]),
314-
{error, already_present};
315-
{already_in_mnesia, Pid} ->
316-
rabbit_log:debug("Mirrored supervisor: maybe_start for group ~p,"
317-
" overall ~p returned 'already running: ~p'", [Group, Overall, Pid]),
318-
{error, {already_started, Pid}};
319-
Else ->
320-
rabbit_log:debug("Mirrored supervisor: maybe_start for group ~p,"
321-
" overall ~p returned ~p", [Group, Overall, Else]),
322-
Else
323-
end, State};
313+
Result = case maybe_start(Group, TxFun, Overall, Delegate, ChildSpec) of
314+
already_in_mnesia ->
315+
rabbit_log:debug("Mirrored supervisor: maybe_start for group ~p,"
316+
" overall ~p returned 'record already present'", [Group, Overall]),
317+
{error, already_present};
318+
{already_in_mnesia, Pid} ->
319+
rabbit_log:debug("Mirrored supervisor: maybe_start for group ~p,"
320+
" overall ~p returned 'already running: ~p'", [Group, Overall, Pid]),
321+
{error, {already_started, Pid}};
322+
Else ->
323+
rabbit_log:debug("Mirrored supervisor: maybe_start for group ~p,"
324+
" overall ~p returned ~p", [Group, Overall, Else]),
325+
Else
326+
end,
327+
mirrored_supervisor_locks:unlock(LockId),
328+
{reply, Result, State};
324329

325330
handle_call({delete_child, Id}, _From, State = #state{delegate = Delegate,
326331
group = Group,
@@ -457,11 +462,9 @@ delete(Group, Id) ->
457462
ok = mnesia:delete({?TABLE, {Group, Id}}).
458463

459464
start(Delegate, ChildSpec) ->
460-
rabbit_log:debug("Mirrored supervisor: asked to start with delegate: ~p, child spec: ~p", [Delegate, ChildSpec]),
461465
apply(?SUPERVISOR, start_child, [Delegate, ChildSpec]).
462466

463467
stop(Group, TxFun, Delegate, Id) ->
464-
rabbit_log:debug("Mirrored supervisor: asked to stop, group: ~p, child ID: ~p", [Group, Id]),
465468
try TxFun(fun() -> check_stop(Group, Delegate, Id) end) of
466469
deleted -> apply(?SUPERVISOR, delete_child, [Delegate, Id]);
467470
running -> {error, running}
@@ -470,8 +473,6 @@ stop(Group, TxFun, Delegate, Id) ->
470473
end.
471474

472475
check_stop(Group, Delegate, Id) ->
473-
rabbit_log:debug("Mirrored supervisor: checking if child ~p in group ~p should be stopped: ~p",
474-
[Id, Group, child(Delegate, Id)]),
475476
case child(Delegate, Id) of
476477
undefined -> delete(Group, Id),
477478
deleted;
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved.
6+
%%
7+
8+
-module(mirrored_supervisor_locks).
9+
10+
-export([lock/1, unlock/1]).
11+
12+
-define(KEY_PREFIX, mirrored_supervisor).
13+
14+
%%
15+
%% API
16+
%%
17+
18+
lock(Group) ->
19+
Nodes = nodes(),
20+
%% about 300s, same as rabbit_nodes:lock_retries/0 default
21+
LockId = case global:set_lock({?KEY_PREFIX, Group}, Nodes, 80) of
22+
true -> Group;
23+
false -> undefined
24+
end,
25+
LockId.
26+
27+
unlock(LockId) ->
28+
Nodes = nodes(),
29+
case LockId of
30+
undefined -> ok;
31+
Value -> global:del_lock({?KEY_PREFIX, Value}, Nodes)
32+
end,
33+
ok.

0 commit comments

Comments
 (0)