Skip to content

Commit da8b299

Browse files
Merge pull request #10080 from cloudamqp/fix_shovel_3_11
Dynamic Shovels: support old and new supervisor child ID formats on 3.11.x
2 parents 00ce450 + b1d54fd commit da8b299

File tree

1 file changed

+48
-12
lines changed

1 file changed

+48
-12
lines changed

deps/rabbitmq_shovel/src/rabbit_shovel_dyn_worker_sup_sup.erl

Lines changed: 48 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -41,24 +41,31 @@ start_child({VHost, ShovelName} = Name, Def) ->
4141
LockId = rabbit_shovel_locks:lock(Name),
4242
cleanup_specs(),
4343
rabbit_log_shovel:debug("Starting a mirrored supervisor named '~s' in virtual host '~s'", [ShovelName, VHost]),
44-
Result = case mirrored_supervisor:start_child(
44+
case child_exists(Name)
45+
orelse mirrored_supervisor:start_child(
4546
?SUPERVISOR,
4647
{id(Name), {rabbit_shovel_dyn_worker_sup, start_link, [Name, obfuscated_uris_parameters(Def)]},
4748
transient, ?WORKER_WAIT, worker, [rabbit_shovel_dyn_worker_sup]}) of
49+
true -> ok;
4850
{ok, _Pid} -> ok;
4951
{error, {already_started, _Pid}} -> ok
5052
end,
5153
%% release the lock if we managed to acquire one
5254
rabbit_shovel_locks:unlock(LockId),
53-
Result.
55+
ok.
5456

5557
obfuscated_uris_parameters(Def) when is_map(Def) ->
5658
to_map(rabbit_shovel_parameters:obfuscate_uris_in_definition(to_list(Def)));
5759
obfuscated_uris_parameters(Def) when is_list(Def) ->
5860
rabbit_shovel_parameters:obfuscate_uris_in_definition(Def).
5961

6062
child_exists(Name) ->
61-
lists:any(fun ({{_, N}, _, _, _}) -> N =:= Name end,
63+
Id = id(Name),
64+
%% older format, pre 3.13.0, 3.12.8 and 3.11.25. See rabbitmq/rabbitmq-server#9894.
65+
OldId = old_id(Name),
66+
lists:any(fun ({ChildId, _, _, _}) ->
67+
ChildId =:= Id orelse ChildId =:= OldId
68+
end,
6269
mirrored_supervisor:which_children(?SUPERVISOR)).
6370

6471
stop_child({VHost, ShovelName} = Name) ->
@@ -67,8 +74,18 @@ stop_child({VHost, ShovelName} = Name) ->
6774
case get({shovel_worker_autodelete, Name}) of
6875
true -> ok; %% [1]
6976
_ ->
70-
ok = mirrored_supervisor:terminate_child(?SUPERVISOR, id(Name)),
71-
ok = mirrored_supervisor:delete_child(?SUPERVISOR, id(Name)),
77+
case mirrored_supervisor:terminate_child(?SUPERVISOR, id(Name)) of
78+
ok ->
79+
ok = mirrored_supervisor:delete_child(?SUPERVISOR, id(Name));
80+
{error, not_found} ->
81+
%% try older format, pre 3.13.0, 3.12.8 and 3.11.25. See rabbitmq/rabbitmq-server#9894.
82+
case mirrored_supervisor:terminate_child(?SUPERVISOR, old_id(Name)) of
83+
ok ->
84+
ok = mirrored_supervisor:delete_child(?SUPERVISOR, old_id(Name));
85+
{error, not_found} ->
86+
ok
87+
end
88+
end,
7289
rabbit_shovel_status:remove(Name)
7390
end,
7491
rabbit_shovel_locks:unlock(LockId),
@@ -83,19 +100,38 @@ stop_child({VHost, ShovelName} = Name) ->
83100
%% See rabbit_shovel_worker:terminate/2
84101

85102
cleanup_specs() ->
86-
SpecsSet = sets:from_list([element(2, element(1, S)) || S <- mirrored_supervisor:which_children(?SUPERVISOR)]),
87-
ParamsSet = sets:from_list([ {proplists:get_value(vhost, S), proplists:get_value(name, S)}
88-
|| S <- rabbit_runtime_parameters:list_component(<<"shovel">>) ]),
89-
F = fun(Name, ok) ->
90-
_ = mirrored_supervisor:delete_child(?SUPERVISOR, id(Name)),
103+
Children = mirrored_supervisor:which_children(?SUPERVISOR),
104+
105+
ChildIdSet = sets:from_list([element(1, S) || S <- Children]),
106+
ParamsSet = sets:from_list(
107+
lists:flatmap(
108+
fun(S) ->
109+
Name = {proplists:get_value(vhost, S), proplists:get_value(name, S)},
110+
%% Supervisor Id format was different pre 3.13.0, 3.12.8 and 3.11.25.
111+
%% Try both formats to cover the transitionary mixed version cluster period.
112+
[id(Name), old_id(Name)]
113+
end,
114+
rabbit_runtime_parameters:list_component(<<"shovel">>))),
115+
F = fun(ChildId, ok) ->
116+
try
117+
_ = mirrored_supervisor:delete_child(?SUPERVISOR, ChildId)
118+
catch _:_:_Stacktrace ->
119+
ok
120+
end,
91121
ok
92122
end,
93-
ok = sets:fold(F, ok, sets:subtract(SpecsSet, ParamsSet)).
123+
%% Delete any supervisor children that do not have their respective runtime parameters in the database.
124+
SetToCleanUp = sets:subtract(ChildIdSet, ParamsSet),
125+
ok = sets:fold(F, ok, SetToCleanUp).
94126

95127
%%----------------------------------------------------------------------------
96128

97129
init([]) ->
98130
{ok, {{one_for_one, 3, 10}, []}}.
99131

100132
id({V, S} = Name) ->
101-
{[V, S], Name}.
133+
{[V, S], Name}.
134+
135+
%% older format, pre 3.13.0, 3.12.8 and 3.11.25. See rabbitmq/rabbitmq-server#9894
136+
old_id({_V, _S} = Name) ->
137+
Name.

0 commit comments

Comments
 (0)