@@ -66,42 +66,87 @@ start_link() ->
66
66
% % Note that the next supervisor down, rabbit_federation_link_sup, is common
67
67
% % between exchanges and queues.
68
68
start_child (X ) ->
69
- case mirrored_supervisor :start_child (
70
- ? SUPERVISOR ,
71
- {id (X ), {rabbit_federation_link_sup , start_link , [X ]},
72
- transient , ? SUPERVISOR_WAIT , supervisor ,
73
- [rabbit_federation_link_sup ]}) of
74
- {ok , _Pid } -> ok ;
75
- {error , {already_started , _Pid }} ->
76
- # exchange {name = ExchangeName } = X ,
77
- rabbit_log_federation :debug (" Federation link for exchange ~tp was already started" ,
78
- [rabbit_misc :rs (ExchangeName )]),
79
- ok ;
80
- % % A link returned {stop, gone}, the link_sup shut down, that's OK.
81
- {error , {shutdown , _ }} -> ok
69
+ Result =
70
+ case child_exists (X ) orelse
71
+ mirrored_supervisor :start_child (
72
+ ? SUPERVISOR ,
73
+ {id (X ), {rabbit_federation_link_sup , start_link , [X ]},
74
+ transient , ? SUPERVISOR_WAIT , supervisor ,
75
+ [rabbit_federation_link_sup ]}) of
76
+ true ->
77
+ already_started ;
78
+ {ok , _Pid } ->
79
+ ok ;
80
+ {error , {already_started , _Pid }} ->
81
+ already_started ;
82
+ % % A link returned {stop, gone}, the link_sup shut down, that's OK.
83
+ {error , {shutdown , _ }} ->
84
+ ok
85
+ end ,
86
+ case Result of
87
+ ok ->
88
+ ok ;
89
+ already_started ->
90
+ # exchange {name = ExchangeName } = X ,
91
+ rabbit_log_federation :debug (" Federation link for exchange ~tp was already started" ,
92
+ [rabbit_misc :rs (ExchangeName )]),
93
+ ok
82
94
end .
83
95
96
+
97
+ child_exists (Name ) ->
98
+ Id = id (Name ),
99
+ % % older format, pre-3.13.0
100
+ OldId = old_id (Name ),
101
+ lists :any (fun ({ChildId , _ , _ , _ }) ->
102
+ ChildId =:= Id orelse ChildId =:= OldId
103
+ end ,
104
+ mirrored_supervisor :which_children (? SUPERVISOR )).
105
+
84
106
adjust ({clear_upstream , VHost , UpstreamName }) ->
85
- _ = [rabbit_federation_link_sup :adjust (Pid , X , {clear_upstream , UpstreamName }) ||
86
- {{_ , # exchange {name = Name } = X }, Pid , _ , _ } <- mirrored_supervisor :which_children (? SUPERVISOR ),
87
- Name # resource .virtual_host == VHost ],
107
+ _ = [rabbit_federation_link_sup :adjust (Pid , exchange_record_from_child_id (Id ), {clear_upstream , UpstreamName }) ||
108
+ {Id , Pid , _ , _ } <- mirrored_supervisor :which_children (? SUPERVISOR ),
109
+ virtual_host_name_from_child_id (Id ) =:= VHost
110
+ ],
88
111
ok ;
89
112
adjust (Reason ) ->
90
- _ = [rabbit_federation_link_sup :adjust (Pid , X , Reason ) ||
91
- {{_ , X }, Pid , _ , _ } <- mirrored_supervisor :which_children (? SUPERVISOR )],
113
+ _ = [rabbit_federation_link_sup :adjust (Pid , exchange_record_from_child_id (Id ), Reason ) ||
114
+ {Id , Pid , _ , _ } <- mirrored_supervisor :which_children (? SUPERVISOR )
115
+ ],
92
116
ok .
93
117
94
118
stop_child (X ) ->
95
- case mirrored_supervisor :terminate_child (? SUPERVISOR , id (X )) of
96
- ok -> ok ;
97
- {error , Err } ->
98
- # exchange {name = ExchangeName } = X ,
99
- rabbit_log_federation :warning (
100
- " Attempt to stop a federation link for exchange ~tp failed: ~tp " ,
101
- [rabbit_misc :rs (ExchangeName ), Err ]),
102
- ok
103
- end ,
104
- ok = mirrored_supervisor :delete_child (? SUPERVISOR , id (X )).
119
+ Result =
120
+ case stop_and_delete_child (id (X )) of
121
+ ok -> ok ;
122
+ {error , not_found } = Error ->
123
+ case rabbit_khepri :is_enabled () of
124
+ true ->
125
+ % % Old id format is not supported by Khepri and cannot exist there
126
+ Error ;
127
+ false ->
128
+ % % try old format, pre-3.13.0
129
+ stop_and_delete_child (old_id (X ))
130
+ end
131
+ end ,
132
+ case Result of
133
+ ok ->
134
+ ok ;
135
+ {error , Err } ->
136
+ # exchange {name = ExchangeName } = X ,
137
+ rabbit_log_federation :warning (
138
+ " Attempt to stop a federation link for exchange ~tp failed: ~tp " ,
139
+ [rabbit_misc :rs (ExchangeName ), Err ]),
140
+ ok
141
+ end .
142
+
143
+ stop_and_delete_child (Id ) ->
144
+ case mirrored_supervisor :terminate_child (? SUPERVISOR , Id ) of
145
+ ok ->
146
+ ok = mirrored_supervisor :delete_child (? SUPERVISOR , Id );
147
+ {error , not_found } = Error ->
148
+ Error
149
+ end .
105
150
106
151
% %----------------------------------------------------------------------------
107
152
@@ -115,3 +160,20 @@ id(X = #exchange{policy = Policy}) ->
115
160
116
161
simple_id (# exchange {name = # resource {virtual_host = VHost , name = Name }}) ->
117
162
[exchange , VHost , Name ].
163
+
164
+ % % Old child id format, pre 3.13.0
165
+ old_id (X = # exchange {policy = Policy }) ->
166
+ X1 = rabbit_exchange :immutable (X ),
167
+ X1 # exchange {policy = Policy }.
168
+
169
+ % % New child id format, introduced in 3.13.0 for Khepri
170
+ exchange_record_from_child_id ({_ , # exchange {} = XR }) ->
171
+ XR ;
172
+ % % Old child id format, pre-3.13.0
173
+ exchange_record_from_child_id (# exchange {} = XR ) ->
174
+ XR .
175
+
176
+ virtual_host_name_from_child_id ({_ , # exchange {name = Res }}) ->
177
+ Res # resource .virtual_host ;
178
+ virtual_host_name_from_child_id (# exchange {name = Res }) ->
179
+ Res # resource .virtual_host .
0 commit comments