Skip to content

Commit 2adec32

Browse files
Merge pull request #9909 from rabbitmq/rabbitmq-server-9894
Dynamic Shovels: support old (pre-3.13.0, 3.12.8) and new supervisor child ID formats
2 parents 61dac76 + 28741cd commit 2adec32

File tree

1 file changed

+32
-5
lines changed

1 file changed

+32
-5
lines changed

deps/rabbitmq_shovel/src/rabbit_shovel_dyn_worker_sup_sup.erl

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,10 @@ obfuscated_uris_parameters(Def) when is_list(Def) ->
5858
rabbit_shovel_parameters:obfuscate_uris_in_definition(Def).
5959

6060
child_exists(Name) ->
61-
lists:any(fun ({{_, N}, _, _, _}) -> N =:= Name end,
61+
lists:any(fun ({{_, N}, _, _, _}) -> N =:= Name;
62+
%% older format, pre 3.13.0 and 3.12.8. See rabbitmq/rabbitmq-server#9894.
63+
({N, _, _, _}) -> N =:= Name
64+
end,
6265
mirrored_supervisor:which_children(?SUPERVISOR)).
6366

6467
stop_child({VHost, ShovelName} = Name) ->
@@ -83,19 +86,43 @@ stop_child({VHost, ShovelName} = Name) ->
8386
%% See rabbit_shovel_worker:terminate/2
8487

8588
cleanup_specs() ->
86-
SpecsSet = sets:from_list([element(2, element(1, S)) || S <- mirrored_supervisor:which_children(?SUPERVISOR)]),
89+
Children = mirrored_supervisor:which_children(?SUPERVISOR),
90+
91+
%% older format, pre 3.13.0 and 3.12.8. See rabbitmq/rabbitmq-server#9894
92+
OldStyleSpecsSet = sets:from_list([element(1, S) || S <- Children]),
93+
NewStyleSpecsSet = sets:from_list([element(2, element(1, S)) || S <- Children]),
8794
ParamsSet = sets:from_list([ {proplists:get_value(vhost, S), proplists:get_value(name, S)}
8895
|| S <- rabbit_runtime_parameters:list_component(<<"shovel">>) ]),
8996
F = fun(Name, ok) ->
90-
_ = mirrored_supervisor:delete_child(?SUPERVISOR, id(Name)),
97+
try
98+
%% The supervisor operation is very unlikely to fail, it's the schema
99+
%% data stores that can make a fuss about a non-existent or non-standard value passed in.
100+
%% For example, an old style Shovel name is an invalid Khepri query path element. MK.
101+
_ = mirrored_supervisor:delete_child(?SUPERVISOR, id(Name))
102+
catch _:_:_Stacktrace ->
103+
ok
104+
end,
91105
ok
92106
end,
93-
ok = sets:fold(F, ok, sets:subtract(SpecsSet, ParamsSet)).
107+
%% Khepri won't handle values in OldStyleSpecsSet in its path well. At the same time,
108+
%% those older elements simply cannot exist in Khepri because having Khepri enabled
109+
%% means a cluster-wide move to 3.13+, so we can conditionally compute what specs we care about. MK.
110+
AllSpecs =
111+
case rabbit_khepri:is_enabled() of
112+
true -> NewStyleSpecsSet;
113+
false -> sets:union(NewStyleSpecsSet, OldStyleSpecsSet)
114+
end,
115+
%% Delete any supervisor children that do not have their respective runtime parameters in the database.
116+
SetToCleanUp = sets:subtract(AllSpecs, ParamsSet),
117+
ok = sets:fold(F, ok, SetToCleanUp).
94118

95119
%%----------------------------------------------------------------------------
96120

97121
init([]) ->
98122
{ok, {{one_for_one, 3, 10}, []}}.
99123

100124
id({V, S} = Name) ->
101-
{[V, S], Name}.
125+
{[V, S], Name};
126+
%% older format, pre 3.13.0 and 3.12.8. See rabbitmq/rabbitmq-server#9894
127+
id(Other) ->
128+
Other.

0 commit comments

Comments
 (0)