@@ -41,26 +41,30 @@ start_child({VHost, ShovelName} = Name, Def) ->
41
41
LockId = rabbit_shovel_locks :lock (Name ),
42
42
cleanup_specs (),
43
43
rabbit_log_shovel :debug (" Starting a mirrored supervisor named '~ts ' in virtual host '~ts '" , [ShovelName , VHost ]),
44
- Result = case mirrored_supervisor :start_child (
44
+ case child_exists (Name )
45
+ orelse mirrored_supervisor :start_child (
45
46
? SUPERVISOR ,
46
47
{id (Name ), {rabbit_shovel_dyn_worker_sup , start_link , [Name , obfuscated_uris_parameters (Def )]},
47
48
transient , ? WORKER_WAIT , worker , [rabbit_shovel_dyn_worker_sup ]}) of
49
+ true -> ok ;
48
50
{ok , _Pid } -> ok ;
49
51
{error , {already_started , _Pid }} -> ok
50
52
end ,
51
53
% % release the lock if we managed to acquire one
52
54
rabbit_shovel_locks :unlock (LockId ),
53
- Result .
55
+ ok .
54
56
55
57
obfuscated_uris_parameters (Def ) when is_map (Def ) ->
56
58
to_map (rabbit_shovel_parameters :obfuscate_uris_in_definition (to_list (Def )));
57
59
obfuscated_uris_parameters (Def ) when is_list (Def ) ->
58
60
rabbit_shovel_parameters :obfuscate_uris_in_definition (Def ).
59
61
60
62
child_exists (Name ) ->
61
- lists :any (fun ({{_ , N }, _ , _ , _ }) -> N =:= Name ;
62
- % % older format, pre 3.13.0 and 3.12.8. See rabbitmq/rabbitmq-server#9894.
63
- ({N , _ , _ , _ }) -> N =:= Name
63
+ Id = id (Name ),
64
+ % % older format, pre 3.13.0 and 3.12.8. See rabbitmq/rabbitmq-server#9894.
65
+ OldId = old_id (Name ),
66
+ lists :any (fun ({ChildId , _ , _ , _ }) ->
67
+ ChildId =:= Id orelse ChildId =:= OldId
64
68
end ,
65
69
mirrored_supervisor :which_children (? SUPERVISOR )).
66
70
@@ -70,13 +74,34 @@ stop_child({VHost, ShovelName} = Name) ->
70
74
case get ({shovel_worker_autodelete , Name }) of
71
75
true -> ok ; % % [1]
72
76
_ ->
73
- ok = mirrored_supervisor :terminate_child (? SUPERVISOR , id (Name )),
74
- ok = mirrored_supervisor :delete_child (? SUPERVISOR , id (Name )),
77
+ case stop_and_delete_child (id (Name )) of
78
+ ok ->
79
+ ok ;
80
+ {error , not_found } ->
81
+ case rabbit_khepri :is_enabled () of
82
+ true ->
83
+ % % Old id format is not supported by and cannot exist in Khepri
84
+ ok ;
85
+ false ->
86
+ % % try older format, pre 3.13.0 and 3.12.8.
87
+ % % See rabbitmq/rabbitmq-server#9894.
88
+ _ = stop_and_delete_child (old_id (Name )),
89
+ ok
90
+ end
91
+ end ,
75
92
rabbit_shovel_status :remove (Name )
76
93
end ,
77
94
rabbit_shovel_locks :unlock (LockId ),
78
95
ok .
79
96
97
+ stop_and_delete_child (Id ) ->
98
+ case mirrored_supervisor :terminate_child (? SUPERVISOR , Id ) of
99
+ ok ->
100
+ ok = mirrored_supervisor :delete_child (? SUPERVISOR , Id );
101
+ {error , not_found } = Error ->
102
+ Error
103
+ end .
104
+
80
105
% % [1] An autodeleting worker removes its own parameter, and thus ends
81
106
% % up here via the parameter callback. It is a transient worker that
82
107
% % is just about to terminate normally - so we don't need to tell the
@@ -88,41 +113,47 @@ stop_child({VHost, ShovelName} = Name) ->
88
113
cleanup_specs () ->
89
114
Children = mirrored_supervisor :which_children (? SUPERVISOR ),
90
115
91
- % % older format, pre 3.13.0 and 3.12.8. See rabbitmq/rabbitmq-server#9894
92
- OldStyleSpecsSet = sets :from_list ([element (1 , S ) || S <- Children ]),
93
- NewStyleSpecsSet = sets :from_list ([element (2 , element (1 , S )) || S <- Children ]),
94
- ParamsSet = sets :from_list ([ {proplists :get_value (vhost , S ), proplists :get_value (name , S )}
95
- || S <- rabbit_runtime_parameters :list_component (<<" shovel" >>) ]),
96
- F = fun (Name , ok ) ->
116
+ ChildIdSet = sets :from_list ([element (1 , S ) || S <- Children ]),
117
+ ParamsSet = params_to_child_ids (rabbit_khepri :is_enabled ()),
118
+ F = fun (ChildId , ok ) ->
97
119
try
98
120
% % The supervisor operation is very unlikely to fail, it's the schema
99
121
% % data stores that can make a fuss about a non-existent or non-standard value passed in.
100
122
% % For example, an old style Shovel name is an invalid Khepri query path element. MK.
101
- _ = mirrored_supervisor :delete_child (? SUPERVISOR , id ( Name ) )
123
+ _ = mirrored_supervisor :delete_child (? SUPERVISOR , ChildId )
102
124
catch _ :_ :_Stacktrace ->
103
125
ok
104
126
end ,
105
127
ok
106
128
end ,
107
- % % Khepri won't handle values in OldStyleSpecsSet in its path well. At the same time,
108
- % % those older elements simply cannot exist in Khepri because having Khepri enabled
109
- % % means a cluster-wide move to 3.13+, so we can conditionally compute what specs we care about. MK.
110
- AllSpecs =
111
- case rabbit_khepri :is_enabled () of
112
- true -> NewStyleSpecsSet ;
113
- false -> sets :union (NewStyleSpecsSet , OldStyleSpecsSet )
114
- end ,
115
129
% % Delete any supervisor children that do not have their respective runtime parameters in the database.
116
- SetToCleanUp = sets :subtract (AllSpecs , ParamsSet ),
130
+ SetToCleanUp = sets :subtract (ChildIdSet , ParamsSet ),
117
131
ok = sets :fold (F , ok , SetToCleanUp ).
118
132
133
+ params_to_child_ids (_KhepriEnabled = true ) ->
134
+ % % Old id format simply cannot exist in Khepri because having Khepri enabled
135
+ % % means a cluster-wide move to 3.13+, so we can conditionally compute what specs we care about. MK.
136
+ sets :from_list ([id ({proplists :get_value (vhost , S ), proplists :get_value (name , S )})
137
+ || S <- rabbit_runtime_parameters :list_component (<<" shovel" >>)]);
138
+ params_to_child_ids (_KhepriEnabled = false ) ->
139
+ sets :from_list (
140
+ lists :flatmap (
141
+ fun (S ) ->
142
+ Name = {proplists :get_value (vhost , S ), proplists :get_value (name , S )},
143
+ % % Supervisor Id format was different pre 3.13.0 and 3.12.8.
144
+ % % Try both formats to cover the transitionary mixed version cluster period.
145
+ [id (Name ), old_id (Name )]
146
+ end ,
147
+ rabbit_runtime_parameters :list_component (<<" shovel" >>))).
148
+
119
149
% %----------------------------------------------------------------------------
120
150
121
151
init ([]) ->
122
152
{ok , {{one_for_one , 3 , 10 }, []}}.
123
153
124
154
id ({V , S } = Name ) ->
125
- {[V , S ], Name };
155
+ {[V , S ], Name }.
156
+
126
157
% % older format, pre 3.13.0 and 3.12.8. See rabbitmq/rabbitmq-server#9894
127
- id ( Other ) ->
128
- Other .
158
+ old_id ({ _V , _S } = Name ) ->
159
+ Name .
0 commit comments