Skip to content

Commit cf2c609

Browse files
Merge pull request #3263 from rabbitmq/rabbitmq-server-3260
Partially reintroduce locking to mirrored_supervisor
2 parents 04371b0 + c84115f commit cf2c609

File tree

2 files changed

+98
-20
lines changed

2 files changed

+98
-20
lines changed

deps/rabbit_common/src/mirrored_supervisor.erl

Lines changed: 65 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -277,13 +277,17 @@ handle_call({init, Overall}, _From,
277277
tx_fun = TxFun,
278278
initial_childspecs = ChildSpecs}) ->
279279
process_flag(trap_exit, true),
280+
LockId = mirrored_supervisor_locks:lock(Group),
281+
maybe_log_lock_acquisition_failure(LockId, Group),
280282
ok = pg:join(Group, Overall),
281-
rabbit_log:debug("Mirrored supervisor: initializing, joined group ~p", [Group]),
283+
rabbit_log:debug("Mirrored supervisor: initializing, overall supervisor ~p joined group ~p", [Overall, Group]),
282284
Rest = pg:get_members(Group) -- [Overall],
283285
Nodes = [node(M) || M <- Rest],
284286
rabbit_log:debug("Mirrored supervisor: known group ~p members: ~p on nodes ~p", [Group, Rest, Nodes]),
285287
case Rest of
286-
[] -> TxFun(fun() -> delete_all(Group) end);
288+
[] ->
289+
rabbit_log:debug("Mirrored supervisor: no known peer members in group ~p, will delete all child records for it", [Group]),
290+
TxFun(fun() -> delete_all(Group) end);
287291
_ -> ok
288292
end,
289293
[begin
@@ -293,8 +297,9 @@ handle_call({init, Overall}, _From,
293297
Delegate = delegate(Overall),
294298
erlang:monitor(process, Delegate),
295299
State1 = State#state{overall = Overall, delegate = Delegate},
296-
case errors([maybe_start(Group, TxFun, Overall, Delegate, S)
297-
|| S <- ChildSpecs]) of
300+
Results = [maybe_start(Group, TxFun, Overall, Delegate, S) || S <- ChildSpecs],
301+
mirrored_supervisor_locks:unlock(LockId),
302+
case errors(Results) of
298303
[] -> {reply, ok, State1};
299304
Errors -> {stop, {shutdown, Errors}, State1}
300305
end;
@@ -304,11 +309,25 @@ handle_call({start_child, ChildSpec}, _From,
304309
delegate = Delegate,
305310
group = Group,
306311
tx_fun = TxFun}) ->
307-
{reply, case maybe_start(Group, TxFun, Overall, Delegate, ChildSpec) of
308-
already_in_mnesia -> {error, already_present};
309-
{already_in_mnesia, Pid} -> {error, {already_started, Pid}};
310-
Else -> Else
311-
end, State};
312+
LockId = mirrored_supervisor_locks:lock(Group),
313+
maybe_log_lock_acquisition_failure(LockId, Group),
314+
rabbit_log:debug("Mirrored supervisor: asked to consider starting a child, group: ~p", [Group]),
315+
Result = case maybe_start(Group, TxFun, Overall, Delegate, ChildSpec) of
316+
already_in_mnesia ->
317+
rabbit_log:debug("Mirrored supervisor: maybe_start for group ~p,"
318+
" overall ~p returned 'record already present'", [Group, Overall]),
319+
{error, already_present};
320+
{already_in_mnesia, Pid} ->
321+
rabbit_log:debug("Mirrored supervisor: maybe_start for group ~p,"
322+
" overall ~p returned 'already running: ~p'", [Group, Overall, Pid]),
323+
{error, {already_started, Pid}};
324+
Else ->
325+
rabbit_log:debug("Mirrored supervisor: maybe_start for group ~p,"
326+
" overall ~p returned ~p", [Group, Overall, Else]),
327+
Else
328+
end,
329+
mirrored_supervisor_locks:unlock(LockId),
330+
{reply, Result, State};
312331

313332
handle_call({delete_child, Id}, _From, State = #state{delegate = Delegate,
314333
group = Group,
@@ -384,28 +403,50 @@ tell_all_peers_to_die(Group, Reason) ->
384403
[cast(P, {die, Reason}) || P <- pg:get_members(Group) -- [self()]].
385404

386405
maybe_start(Group, TxFun, Overall, Delegate, ChildSpec) ->
406+
rabbit_log:debug("Mirrored supervisor: asked to consider starting, group: ~p", [Group]),
387407
try TxFun(fun() -> check_start(Group, Overall, Delegate, ChildSpec) end) of
388-
start -> start(Delegate, ChildSpec);
389-
undefined -> already_in_mnesia;
390-
Pid -> {already_in_mnesia, Pid}
408+
start ->
409+
rabbit_log:debug("Mirrored supervisor: check_start for group ~p,"
410+
" overall ~p returned 'do start'", [Group, Overall]),
411+
start(Delegate, ChildSpec);
412+
undefined ->
413+
rabbit_log:debug("Mirrored supervisor: check_start for group ~p,"
414+
" overall ~p returned 'undefined'", [Group, Overall]),
415+
already_in_mnesia;
416+
Pid ->
417+
rabbit_log:debug("Mirrored supervisor: check_start for group ~p,"
418+
" overall ~p returned 'already running (~p)'", [Group, Overall, Pid]),
419+
{already_in_mnesia, Pid}
391420
catch
392421
%% If we are torn down while in the transaction...
393422
{error, E} -> {error, E}
394423
end.
395424

396425
check_start(Group, Overall, Delegate, ChildSpec) ->
397-
case mnesia:wread({?TABLE, {Group, id(ChildSpec)}}) of
426+
rabbit_log:debug("Mirrored supervisor: check_start for group ~p, id: ~p, overall: ~p",
427+
[Group, id(ChildSpec), Overall]),
428+
ReadResult = mnesia:wread({?TABLE, {Group, id(ChildSpec)}}),
429+
rabbit_log:debug("Mirrored supervisor: check_start table ~s read for key ~p returned ~p",
430+
[?TABLE, {Group, id(ChildSpec)}, ReadResult]),
431+
case ReadResult of
398432
[] -> _ = write(Group, Overall, ChildSpec),
399433
start;
400434
[S] -> #mirrored_sup_childspec{key = {Group, Id},
401435
mirroring_pid = Pid} = S,
402436
case Overall of
403-
Pid -> child(Delegate, Id);
404-
_ -> case supervisor(Pid) of
405-
dead -> _ = write(Group, Overall, ChildSpec),
406-
start;
407-
Delegate0 -> child(Delegate0, Id)
408-
end
437+
Pid ->
438+
rabbit_log:debug("Mirrored supervisor: overall matched mirrored pid ~p", [Pid]),
439+
child(Delegate, Id);
440+
_ ->
441+
rabbit_log:debug("Mirrored supervisor: overall ~p did not match mirrored pid ~p", [Overall, Pid]),
442+
rabbit_log:debug("Mirrored supervisor: supervisor(~p) returned ~p", [Pid, supervisor(Pid)]),
443+
case supervisor(Pid) of
444+
dead ->
445+
_ = write(Group, Overall, ChildSpec),
446+
start;
447+
Delegate0 ->
448+
child(Delegate0, Id)
449+
end
409450
end
410451
end.
411452

@@ -423,7 +464,6 @@ delete(Group, Id) ->
423464
ok = mnesia:delete({?TABLE, {Group, Id}}).
424465

425466
start(Delegate, ChildSpec) ->
426-
rabbit_log:debug("Mirrored supervisor: asked to start with delegate: ~p, child spec: ~p", [Delegate, ChildSpec]),
427467
apply(?SUPERVISOR, start_child, [Delegate, ChildSpec]).
428468

429469
stop(Group, TxFun, Delegate, Id) ->
@@ -511,3 +551,8 @@ restore_child_order(ChildSpecs, ChildOrder) ->
511551
proplists:get_value(id(A), ChildOrder)
512552
< proplists:get_value(id(B), ChildOrder)
513553
end, ChildSpecs).
554+
555+
maybe_log_lock_acquisition_failure(undefined = _LockId, Group) ->
556+
rabbit_log:warning("Mirrored supervisor: could not acquire lock for group ~s", [Group]);
557+
maybe_log_lock_acquisition_failure(_, _) ->
558+
ok.
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved.
6+
%%
7+
8+
-module(mirrored_supervisor_locks).
9+
10+
-export([lock/1, unlock/1]).
11+
12+
-define(KEY_PREFIX, mirrored_supervisor).
13+
14+
%%
15+
%% API
16+
%%
17+
18+
lock(Group) ->
19+
Nodes = nodes(),
20+
%% about 300s, same as rabbit_nodes:lock_retries/0 default
21+
LockId = case global:set_lock({?KEY_PREFIX, Group}, Nodes, 80) of
22+
true -> Group;
23+
false -> undefined
24+
end,
25+
LockId.
26+
27+
unlock(LockId) ->
28+
Nodes = nodes(),
29+
case LockId of
30+
undefined -> ok;
31+
Value -> global:del_lock({?KEY_PREFIX, Value}, Nodes)
32+
end,
33+
ok.

0 commit comments

Comments
 (0)