@@ -66,42 +66,98 @@ start_link() ->
66
66
% % Note that the next supervisor down, rabbit_federation_link_sup, is common
67
67
% % between exchanges and queues.
68
68
start_child (X ) ->
69
- case mirrored_supervisor :start_child (
70
- ? SUPERVISOR ,
71
- {id (X ), {rabbit_federation_link_sup , start_link , [X ]},
72
- transient , ? SUPERVISOR_WAIT , supervisor ,
73
- [rabbit_federation_link_sup ]}) of
74
- {ok , _Pid } -> ok ;
75
- {error , {already_started , _Pid }} ->
76
- # exchange {name = ExchangeName } = X ,
77
- rabbit_log_federation :debug (" Federation link for exchange ~tp was already started" ,
78
- [rabbit_misc :rs (ExchangeName )]),
79
- ok ;
80
- % % A link returned {stop, gone}, the link_sup shut down, that's OK.
81
- {error , {shutdown , _ }} -> ok
69
+ Result =
70
+ case child_exists (X ) orelse
71
+ mirrored_supervisor :start_child (
72
+ ? SUPERVISOR ,
73
+ {id (X ), {rabbit_federation_link_sup , start_link , [X ]},
74
+ transient , ? SUPERVISOR_WAIT , supervisor ,
75
+ [rabbit_federation_link_sup ]}) of
76
+ true ->
77
+ already_started ;
78
+ {ok , _Pid } ->
79
+ ok ;
80
+ {error , {already_started , _Pid }} ->
81
+ already_started ;
82
+ % % A link returned {stop, gone}, the link_sup shut down, that's OK.
83
+ {error , {shutdown , _ }} ->
84
+ ok
85
+ end ,
86
+ case Result of
87
+ ok ->
88
+ ok ;
89
+ already_started ->
90
+ # exchange {name = ExchangeName } = X ,
91
+ rabbit_log_federation :debug (" Federation link for exchange ~tp was already started" ,
92
+ [rabbit_misc :rs (ExchangeName )]),
93
+ ok
82
94
end .
83
95
96
+
97
+ child_exists (Name ) ->
98
+ Id = id (Name ),
99
+ % % older format, pre 3.13.0
100
+ OldId = old_id (Name ),
101
+ lists :any (fun ({ChildId , _ , _ , _ }) ->
102
+ ChildId =:= Id orelse ChildId =:= OldId
103
+ end ,
104
+ mirrored_supervisor :which_children (? SUPERVISOR )).
105
+
84
106
adjust ({clear_upstream , VHost , UpstreamName }) ->
85
107
_ = [rabbit_federation_link_sup :adjust (Pid , X , {clear_upstream , UpstreamName }) ||
86
- {{_ , # exchange {name = Name } = X }, Pid , _ , _ } <- mirrored_supervisor :which_children (? SUPERVISOR ),
87
- Name # resource .virtual_host == VHost ],
108
+ {Id , Pid , _ , _ } <- mirrored_supervisor :which_children (? SUPERVISOR ),
109
+ case Id of
110
+ {_ , # exchange {name = Name } = X } ->
111
+ Name # resource .virtual_host == VHost ;
112
+ # exchange {name = Name } = X ->
113
+ % % Old child id format, pre 3.13.0
114
+ Name # resource .virtual_host == VHost
115
+ end
116
+ ],
88
117
ok ;
89
118
adjust (Reason ) ->
90
- _ = [rabbit_federation_link_sup :adjust (Pid , X , Reason ) ||
91
- {{_ , X }, Pid , _ , _ } <- mirrored_supervisor :which_children (? SUPERVISOR )],
119
+ _ = [case Id of
120
+ {_ , # exchange {} = X } ->
121
+ rabbit_federation_link_sup :adjust (Pid , X , Reason );
122
+ # exchange {} = X ->
123
+ % % Old child id format, pre 3.13.0
124
+ rabbit_federation_link_sup :adjust (Pid , X , Reason )
125
+ end
126
+ || {Id , Pid , _ , _ } <- mirrored_supervisor :which_children (? SUPERVISOR )],
92
127
ok .
93
128
94
129
stop_child (X ) ->
95
- case mirrored_supervisor :terminate_child (? SUPERVISOR , id (X )) of
96
- ok -> ok ;
97
- {error , Err } ->
98
- # exchange {name = ExchangeName } = X ,
99
- rabbit_log_federation :warning (
100
- " Attempt to stop a federation link for exchange ~tp failed: ~tp " ,
101
- [rabbit_misc :rs (ExchangeName ), Err ]),
102
- ok
103
- end ,
104
- ok = mirrored_supervisor :delete_child (? SUPERVISOR , id (X )).
130
+ Result =
131
+ case stop_and_delete_child (id (X )) of
132
+ ok -> ok ;
133
+ {error , not_found } = Error ->
134
+ case rabbit_khepri :is_enabled () of
135
+ true ->
136
+ % % Old id format is not supported by and cannot exist in Khepri
137
+ Error ;
138
+ false ->
139
+ % % try old format, pre 3.13.0
140
+ stop_and_delete_child (old_id (X ))
141
+ end
142
+ end ,
143
+ case Result of
144
+ ok ->
145
+ ok ;
146
+ {error , Err } ->
147
+ # exchange {name = ExchangeName } = X ,
148
+ rabbit_log_federation :warning (
149
+ " Attempt to stop a federation link for exchange ~tp failed: ~tp " ,
150
+ [rabbit_misc :rs (ExchangeName ), Err ]),
151
+ ok
152
+ end .
153
+
154
+ stop_and_delete_child (Id ) ->
155
+ case mirrored_supervisor :terminate_child (? SUPERVISOR , Id ) of
156
+ ok ->
157
+ ok = mirrored_supervisor :delete_child (? SUPERVISOR , Id );
158
+ {error , not_found } = Error ->
159
+ Error
160
+ end .
105
161
106
162
% %----------------------------------------------------------------------------
107
163
@@ -115,3 +171,8 @@ id(X = #exchange{policy = Policy}) ->
115
171
116
172
simple_id (# exchange {name = # resource {virtual_host = VHost , name = Name }}) ->
117
173
[exchange , VHost , Name ].
174
+
175
+ % % Old child id format, pre 3.13.0
176
+ old_id (X = # exchange {policy = Policy }) ->
177
+ X1 = rabbit_exchange :immutable (X ),
178
+ X1 # exchange {policy = Policy }.
0 commit comments