Skip to content

Commit 410925b

Browse files
Merge pull request #3263 from rabbitmq/rabbitmq-server-3260
Partially reintroduce locking to mirrored_supervisor (cherry picked from commit cf2c609) Adapted for Erlang 24 compatibility of rabbit_log call sites. Conflicts: deps/rabbit_common/src/mirrored_supervisor.erl
1 parent 6e5553c commit 410925b

File tree

2 files changed

+98
-18
lines changed

2 files changed

+98
-18
lines changed

deps/rabbit_common/src/mirrored_supervisor.erl

Lines changed: 65 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -277,10 +277,15 @@ handle_call({init, Overall}, _From,
277277
tx_fun = TxFun,
278278
initial_childspecs = ChildSpecs}) ->
279279
process_flag(trap_exit, true),
280+
LockId = mirrored_supervisor_locks:lock(Group),
281+
maybe_log_lock_acquisition_failure(LockId, Group),
280282
ok = pg:join(Group, Overall),
283+
_ = rabbit_log:debug("Mirrored supervisor: initializing, overall supervisor ~p joined group ~p", [Overall, Group]),
281284
Rest = pg:get_members(Group) -- [Overall],
282285
case Rest of
283-
[] -> TxFun(fun() -> delete_all(Group) end);
286+
[] ->
287+
_ = rabbit_log:debug("Mirrored supervisor: no known peer members in group ~p, will delete all child records for it", [Group]),
288+
TxFun(fun() -> delete_all(Group) end);
284289
_ -> ok
285290
end,
286291
[begin
@@ -290,8 +295,9 @@ handle_call({init, Overall}, _From,
290295
Delegate = delegate(Overall),
291296
erlang:monitor(process, Delegate),
292297
State1 = State#state{overall = Overall, delegate = Delegate},
293-
case errors([maybe_start(Group, TxFun, Overall, Delegate, S)
294-
|| S <- ChildSpecs]) of
298+
Results = [maybe_start(Group, TxFun, Overall, Delegate, S) || S <- ChildSpecs],
299+
mirrored_supervisor_locks:unlock(LockId),
300+
case errors(Results) of
295301
[] -> {reply, ok, State1};
296302
Errors -> {stop, {shutdown, Errors}, State1}
297303
end;
@@ -301,11 +307,25 @@ handle_call({start_child, ChildSpec}, _From,
301307
delegate = Delegate,
302308
group = Group,
303309
tx_fun = TxFun}) ->
304-
{reply, case maybe_start(Group, TxFun, Overall, Delegate, ChildSpec) of
305-
already_in_mnesia -> {error, already_present};
306-
{already_in_mnesia, Pid} -> {error, {already_started, Pid}};
307-
Else -> Else
308-
end, State};
310+
LockId = mirrored_supervisor_locks:lock(Group),
311+
maybe_log_lock_acquisition_failure(LockId, Group),
312+
_ = rabbit_log:debug("Mirrored supervisor: asked to consider starting a child, group: ~p", [Group]),
313+
Result = case maybe_start(Group, TxFun, Overall, Delegate, ChildSpec) of
314+
already_in_mnesia ->
315+
_ = rabbit_log:debug("Mirrored supervisor: maybe_start for group ~p,"
316+
" overall ~p returned 'record already present'", [Group, Overall]),
317+
{error, already_present};
318+
{already_in_mnesia, Pid} ->
319+
_ = rabbit_log:debug("Mirrored supervisor: maybe_start for group ~p,"
320+
" overall ~p returned 'already running: ~p'", [Group, Overall, Pid]),
321+
{error, {already_started, Pid}};
322+
Else ->
323+
_ = rabbit_log:debug("Mirrored supervisor: maybe_start for group ~p,"
324+
" overall ~p returned ~p", [Group, Overall, Else]),
325+
Else
326+
end,
327+
mirrored_supervisor_locks:unlock(LockId),
328+
{reply, Result, State};
309329

310330
handle_call({delete_child, Id}, _From, State = #state{delegate = Delegate,
311331
group = Group,
@@ -381,28 +401,50 @@ tell_all_peers_to_die(Group, Reason) ->
381401
[cast(P, {die, Reason}) || P <- pg:get_members(Group) -- [self()]].
382402

383403
maybe_start(Group, TxFun, Overall, Delegate, ChildSpec) ->
404+
_ = rabbit_log:debug("Mirrored supervisor: asked to consider starting, group: ~p", [Group]),
384405
try TxFun(fun() -> check_start(Group, Overall, Delegate, ChildSpec) end) of
385-
start -> start(Delegate, ChildSpec);
386-
undefined -> already_in_mnesia;
387-
Pid -> {already_in_mnesia, Pid}
406+
start ->
407+
_ = rabbit_log:debug("Mirrored supervisor: check_start for group ~p,"
408+
" overall ~p returned 'do start'", [Group, Overall]),
409+
start(Delegate, ChildSpec);
410+
undefined ->
411+
_ = rabbit_log:debug("Mirrored supervisor: check_start for group ~p,"
412+
" overall ~p returned 'undefined'", [Group, Overall]),
413+
already_in_mnesia;
414+
Pid ->
415+
_ = rabbit_log:debug("Mirrored supervisor: check_start for group ~p,"
416+
" overall ~p returned 'already running (~p)'", [Group, Overall, Pid]),
417+
{already_in_mnesia, Pid}
388418
catch
389419
%% If we are torn down while in the transaction...
390420
{error, E} -> {error, E}
391421
end.
392422

393423
check_start(Group, Overall, Delegate, ChildSpec) ->
394-
case mnesia:wread({?TABLE, {Group, id(ChildSpec)}}) of
424+
_ = rabbit_log:debug("Mirrored supervisor: check_start for group ~p, id: ~p, overall: ~p",
425+
[Group, id(ChildSpec), Overall]),
426+
ReadResult = mnesia:wread({?TABLE, {Group, id(ChildSpec)}}),
427+
_ = rabbit_log:debug("Mirrored supervisor: check_start table ~s read for key ~p returned ~p",
428+
[?TABLE, {Group, id(ChildSpec)}, ReadResult]),
429+
case ReadResult of
395430
[] -> _ = write(Group, Overall, ChildSpec),
396431
start;
397432
[S] -> #mirrored_sup_childspec{key = {Group, Id},
398433
mirroring_pid = Pid} = S,
399434
case Overall of
400-
Pid -> child(Delegate, Id);
401-
_ -> case supervisor(Pid) of
402-
dead -> _ = write(Group, Overall, ChildSpec),
403-
start;
404-
Delegate0 -> child(Delegate0, Id)
405-
end
435+
Pid ->
436+
_ = rabbit_log:debug("Mirrored supervisor: overall matched mirrored pid ~p", [Pid]),
437+
child(Delegate, Id);
438+
_ ->
439+
_ = rabbit_log:debug("Mirrored supervisor: overall ~p did not match mirrored pid ~p", [Overall, Pid]),
440+
_ = rabbit_log:debug("Mirrored supervisor: supervisor(~p) returned ~p", [Pid, supervisor(Pid)]),
441+
case supervisor(Pid) of
442+
dead ->
443+
_ = write(Group, Overall, ChildSpec),
444+
start;
445+
Delegate0 ->
446+
child(Delegate0, Id)
447+
end
406448
end
407449
end.
408450

@@ -507,3 +549,8 @@ restore_child_order(ChildSpecs, ChildOrder) ->
507549
proplists:get_value(id(A), ChildOrder)
508550
< proplists:get_value(id(B), ChildOrder)
509551
end, ChildSpecs).
552+
553+
maybe_log_lock_acquisition_failure(undefined = _LockId, Group) ->
554+
rabbit_log:warning("Mirrored supervisor: could not acquire lock for group ~s", [Group]);
555+
maybe_log_lock_acquisition_failure(_, _) ->
556+
ok.
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved.
6+
%%
7+
8+
-module(mirrored_supervisor_locks).
9+
10+
-export([lock/1, unlock/1]).
11+
12+
-define(KEY_PREFIX, mirrored_supervisor).
13+
14+
%%
15+
%% API
16+
%%
17+
18+
lock(Group) ->
19+
Nodes = nodes(),
20+
%% about 300s, same as rabbit_nodes:lock_retries/0 default
21+
LockId = case global:set_lock({?KEY_PREFIX, Group}, Nodes, 80) of
22+
true -> Group;
23+
false -> undefined
24+
end,
25+
LockId.
26+
27+
unlock(LockId) ->
28+
Nodes = nodes(),
29+
case LockId of
30+
undefined -> ok;
31+
Value -> global:del_lock({?KEY_PREFIX, Value}, Nodes)
32+
end,
33+
ok.

0 commit comments

Comments
 (0)