Skip to content

Commit 7e1022d

Browse files
Merge pull request #533 from rabbitmq/rabbitmq-federation-7
Set deleting exchange status
2 parents 87e68fe + 630beb3 commit 7e1022d

File tree

3 files changed

+117
-31
lines changed

3 files changed

+117
-31
lines changed

src/rabbit_exchange.erl

Lines changed: 56 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -166,24 +166,37 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
166166
XT = type_to_module(Type),
167167
%% We want to upset things if it isn't ok
168168
ok = XT:validate(X),
169-
rabbit_misc:execute_mnesia_transaction(
170-
fun () ->
171-
case mnesia:wread({rabbit_exchange, XName}) of
172-
[] ->
173-
{new, store(X)};
174-
[ExistingX] ->
175-
{existing, ExistingX}
176-
end
177-
end,
178-
fun ({new, Exchange}, Tx) ->
179-
ok = callback(X, create, map_create_tx(Tx), [Exchange]),
180-
rabbit_event:notify_if(not Tx, exchange_created, info(Exchange)),
181-
Exchange;
182-
({existing, Exchange}, _Tx) ->
183-
Exchange;
184-
(Err, _Tx) ->
185-
Err
186-
end).
169+
%% Avoid a channel exception if there's a race condition
170+
%% with an exchange.delete operation.
171+
%%
172+
%% See rabbitmq/rabbitmq-federation#7.
173+
case rabbit_runtime_parameters:lookup(XName#resource.virtual_host,
174+
?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT,
175+
XName#resource.name) of
176+
not_found ->
177+
rabbit_misc:execute_mnesia_transaction(
178+
fun () ->
179+
case mnesia:wread({rabbit_exchange, XName}) of
180+
[] ->
181+
{new, store(X)};
182+
[ExistingX] ->
183+
{existing, ExistingX}
184+
end
185+
end,
186+
fun ({new, Exchange}, Tx) ->
187+
ok = callback(X, create, map_create_tx(Tx), [Exchange]),
188+
rabbit_event:notify_if(not Tx, exchange_created, info(Exchange)),
189+
Exchange;
190+
({existing, Exchange}, _Tx) ->
191+
Exchange;
192+
(Err, _Tx) ->
193+
Err
194+
end);
195+
_ ->
196+
rabbit_log:warning("ignoring exchange.declare for exchange ~p,
197+
exchange.delete in progress~n.", [XName]),
198+
X
199+
end.
187200

188201
map_create_tx(true) -> transaction;
189202
map_create_tx(false) -> none.
@@ -427,18 +440,31 @@ delete(XName, IfUnused) ->
427440
true -> fun conditional_delete/2;
428441
false -> fun unconditional_delete/2
429442
end,
430-
call_with_exchange(
431-
XName,
432-
fun (X) ->
433-
case Fun(X, false) of
434-
{deleted, X, Bs, Deletions} ->
435-
rabbit_binding:process_deletions(
436-
rabbit_binding:add_deletion(
437-
XName, {X, deleted, Bs}, Deletions));
438-
{error, _InUseOrNotFound} = E ->
439-
rabbit_misc:const(E)
440-
end
441-
end).
443+
try
444+
%% guard exchange.declare operations from failing when there's
445+
%% a race condition between it and an exchange.delete.
446+
%%
447+
%% see rabbitmq/rabbitmq-federation#7
448+
rabbit_runtime_parameters:set(XName#resource.virtual_host,
449+
?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT,
450+
XName#resource.name, true, none),
451+
call_with_exchange(
452+
XName,
453+
fun (X) ->
454+
case Fun(X, false) of
455+
{deleted, X, Bs, Deletions} ->
456+
rabbit_binding:process_deletions(
457+
rabbit_binding:add_deletion(
458+
XName, {X, deleted, Bs}, Deletions));
459+
{error, _InUseOrNotFound} = E ->
460+
rabbit_misc:const(E)
461+
end
462+
end)
463+
after
464+
rabbit_runtime_parameters:clear(XName#resource.virtual_host,
465+
?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT,
466+
XName#resource.name)
467+
end.
442468

443469
validate_binding(X = #exchange{type = XType}, Binding) ->
444470
Module = type_to_module(XType),

src/rabbit_exchange_parameters.erl

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
%% The contents of this file are subject to the Mozilla Public License
2+
%% Version 1.1 (the "License"); you may not use this file except in
3+
%% compliance with the License. You may obtain a copy of the License
4+
%% at http://www.mozilla.org/MPL/
5+
%%
6+
%% Software distributed under the License is distributed on an "AS IS"
7+
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8+
%% the License for the specific language governing rights and
9+
%% limitations under the License.
10+
%%
11+
%% The Original Code is RabbitMQ.
12+
%%
13+
%% The Initial Developer of the Original Code is GoPivotal, Inc.
14+
%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
15+
%%
16+
17+
-module(rabbit_exchange_parameters).
18+
19+
-behaviour(rabbit_runtime_parameter).
20+
21+
-include("rabbit.hrl").
22+
23+
-export([register/0]).
24+
-export([validate/5, notify/4, notify_clear/3]).
25+
26+
-import(rabbit_misc, [pget/2]).
27+
28+
-rabbit_boot_step({?MODULE,
29+
[{description, "exchange parameters"},
30+
{mfa, {rabbit_exchange_parameters, register, []}},
31+
{requires, rabbit_registry},
32+
{enables, recovery}]}).
33+
34+
register() ->
35+
rabbit_registry:register(runtime_parameter,
36+
?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT, ?MODULE),
37+
%% ensure there are no leftovers from before node restart/crash
38+
rabbit_runtime_parameters:clear_component(
39+
?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT),
40+
ok.
41+
42+
validate(_VHost, ?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT, _Name, _Term, _User) ->
43+
ok.
44+
45+
notify(_VHost, ?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT, _Name, _Term) ->
46+
ok.
47+
48+
notify_clear(_VHost, ?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT, _Name) ->
49+
ok.

src/rabbit_runtime_parameters.erl

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151

5252
-export([parse_set/5, set/5, set_any/5, clear/3, clear_any/3, list/0, list/1,
5353
list_component/1, list/2, list_formatted/1, list_formatted/3,
54-
lookup/3, value/3, value/4, info_keys/0]).
54+
lookup/3, value/3, value/4, info_keys/0, clear_component/1]).
5555

5656
-export([set_global/2, value_global/1, value_global/2]).
5757

@@ -171,6 +171,17 @@ clear(_, <<"policy">> , _) ->
171171
clear(VHost, Component, Name) ->
172172
clear_any(VHost, Component, Name).
173173

174+
clear_component(Component) ->
175+
case rabbit_runtime_parameters:list_component(Component) of
176+
[] ->
177+
ok;
178+
Xs ->
179+
[rabbit_runtime_parameters:clear(pget(vhost, X),
180+
pget(component, X),
181+
pget(name, X))|| X <- Xs],
182+
ok
183+
end.
184+
174185
clear_any(VHost, Component, Name) ->
175186
Notify = fun () ->
176187
case lookup_component(Component) of

0 commit comments

Comments
 (0)