Skip to content

Commit f4b5fb4

Browse files
Merge pull request #1833 from rabbitmq/forward-compatible-default-bindings
v3.7.x: backport implicit bindings for forward-compatibility with 3.8.
2 parents e7a4778 + 2f881b1 commit f4b5fb4

File tree

3 files changed

+93
-55
lines changed

3 files changed

+93
-55
lines changed

src/rabbit_amqqueue.erl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2,
3030
emit_info_all/5, list_local/1, info_local/1,
3131
emit_info_local/4, emit_info_down/4]).
32-
-export([list_down/1, count/1, list_names/0, list_local_names/0]).
32+
-export([list_down/1, count/1, list_names/0, list_names/1, list_local_names/0]).
3333
-export([force_event_refresh/1, notify_policy_changed/1]).
3434
-export([consumers/1, consumers_all/1, emit_consumers_all/4, consumer_info_keys/0]).
3535
-export([basic_get/4, basic_consume/11, basic_cancel/5, notify_decorators/1]).
@@ -116,6 +116,7 @@
116116
-spec list() -> [rabbit_types:amqqueue()].
117117
-spec list(rabbit_types:vhost()) -> [rabbit_types:amqqueue()].
118118
-spec list_names() -> [rabbit_amqqueue:name()].
119+
-spec list_names(rabbit_types:vhost()) -> [rabbit_amqqueue:name()].
119120
-spec list_down(rabbit_types:vhost()) -> [rabbit_types:amqqueue()].
120121
-spec info_keys() -> rabbit_types:info_keys().
121122
-spec info(rabbit_types:amqqueue()) -> rabbit_types:infos().
@@ -659,6 +660,8 @@ list() -> mnesia:dirty_match_object(rabbit_queue, #amqqueue{_ = '_'}).
659660

660661
list_names() -> mnesia:dirty_all_keys(rabbit_queue).
661662

663+
list_names(VHost) -> [Q#amqqueue.name || Q <- list(VHost)].
664+
662665
list_local_names() ->
663666
[ Q#amqqueue.name || #amqqueue{state = State, pid = QPid} = Q <- list(),
664667
State =/= crashed,

src/rabbit_binding.erl

Lines changed: 60 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@
2727
-export([has_for_source/1, remove_for_source/1,
2828
remove_for_destination/2, remove_transient_for_destination/1]).
2929

30+
-define(DEFAULT_EXCHANGE(VHostPath), #resource{virtual_host = VHostPath,
31+
kind = exchange,
32+
name = <<>>}).
33+
3034
%%----------------------------------------------------------------------------
3135

3236
-export_type([key/0, deletions/0]).
@@ -156,6 +160,14 @@ recover_semi_durable_route_txn(R = #route{binding = B}, X) ->
156160
(Serial, false) -> x_callback(Serial, X, add_binding, B)
157161
end).
158162

163+
exists(#binding{source = ?DEFAULT_EXCHANGE(_),
164+
destination = #resource{kind = queue, name = QName} = Queue,
165+
key = QName,
166+
args = []}) ->
167+
case rabbit_amqqueue:lookup(Queue) of
168+
{ok, _} -> true;
169+
{error, not_found} -> false
170+
end;
159171
exists(Binding) ->
160172
binding_action(
161173
Binding, fun (_Src, _Dst, B) ->
@@ -243,9 +255,17 @@ list(VHostPath) ->
243255
destination = VHostResource,
244256
_ = '_'},
245257
_ = '_'},
246-
[B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route,
247-
Route)].
248-
258+
%% if there are any default exchange bindings left after an upgrade
259+
%% of a pre-3.8 database, filter them out
260+
AllBindings = [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route,
261+
Route)],
262+
Filtered = lists:filter(fun(#binding{source = S}) ->
263+
S =/= ?DEFAULT_EXCHANGE(VHostPath)
264+
end, AllBindings),
265+
implicit_bindings(VHostPath) ++ Filtered.
266+
267+
list_for_source(?DEFAULT_EXCHANGE(VHostPath)) ->
268+
implicit_bindings(VHostPath);
249269
list_for_source(SrcName) ->
250270
mnesia:async_dirty(
251271
fun() ->
@@ -255,16 +275,43 @@ list_for_source(SrcName) ->
255275
end).
256276

257277
list_for_destination(DstName) ->
258-
mnesia:async_dirty(
259-
fun() ->
260-
Route = #route{binding = #binding{destination = DstName,
261-
_ = '_'}},
262-
[reverse_binding(B) ||
263-
#reverse_route{reverse_binding = B} <-
264-
mnesia:match_object(rabbit_reverse_route,
265-
reverse_route(Route), read)]
266-
end).
267-
278+
implicit_for_destination(DstName) ++
279+
mnesia:async_dirty(
280+
fun() ->
281+
Route = #route{binding = #binding{destination = DstName,
282+
_ = '_'}},
283+
[reverse_binding(B) ||
284+
#reverse_route{reverse_binding = B} <-
285+
mnesia:match_object(rabbit_reverse_route,
286+
reverse_route(Route), read)]
287+
end).
288+
289+
implicit_bindings(VHostPath) ->
290+
DstQueues = rabbit_amqqueue:list_names(VHostPath),
291+
[ #binding{source = ?DEFAULT_EXCHANGE(VHostPath),
292+
destination = DstQueue,
293+
key = QName,
294+
args = []}
295+
|| DstQueue = #resource{name = QName} <- DstQueues ].
296+
297+
implicit_for_destination(DstQueue = #resource{kind = queue,
298+
virtual_host = VHostPath,
299+
name = QName}) ->
300+
[#binding{source = ?DEFAULT_EXCHANGE(VHostPath),
301+
destination = DstQueue,
302+
key = QName,
303+
args = []}];
304+
implicit_for_destination(_) ->
305+
[].
306+
307+
list_for_source_and_destination(?DEFAULT_EXCHANGE(VHostPath),
308+
#resource{kind = queue,
309+
virtual_host = VHostPath,
310+
name = QName} = DstQueue) ->
311+
[#binding{source = ?DEFAULT_EXCHANGE(VHostPath),
312+
destination = DstQueue,
313+
key = QName,
314+
args = []}];
268315
list_for_source_and_destination(SrcName, DstName) ->
269316
mnesia:async_dirty(
270317
fun() ->

src/rabbit_queue_location_min_masters.erl

Lines changed: 29 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -38,45 +38,33 @@ description() ->
3838
<<"Locate queue master node from cluster node with least bound queues">>}].
3939

4040
queue_master_location(#amqqueue{} = Q) ->
41-
Cluster = rabbit_queue_master_location_misc:all_nodes(Q),
42-
VHosts = rabbit_vhost:list(),
43-
BoundQueueMasters = get_bound_queue_masters_per_vhost(VHosts, []),
44-
{_Count, MinMaster}= get_min_master(Cluster, BoundQueueMasters),
45-
{ok, MinMaster}.
41+
Cluster = rabbit_queue_master_location_misc:all_nodes(Q),
42+
QueueNames = rabbit_amqqueue:list_names(),
43+
MastersPerNode = lists:foldl(
44+
fun(#resource{virtual_host = VHost, name = QueueName}, NodeMasters) ->
45+
case rabbit_queue_master_location_misc:lookup_master(QueueName, VHost) of
46+
{ok, Master} when is_atom(Master) ->
47+
case maps:is_key(Master, NodeMasters) of
48+
true -> maps:update_with(Master,
49+
fun(N) -> N + 1 end,
50+
NodeMasters);
51+
false -> NodeMasters
52+
end;
53+
_ -> NodeMasters
54+
end
55+
end,
56+
maps:from_list([{N, 0} || N <- Cluster]),
57+
QueueNames),
4658

47-
%%---------------------------------------------------------------------------
48-
%% Private helper functions
49-
%%---------------------------------------------------------------------------
50-
get_min_master(Cluster, BoundQueueMasters) ->
51-
lists:min([ {count_masters(Node, BoundQueueMasters), Node} ||
52-
Node <- Cluster ]).
53-
54-
count_masters(Node, Masters) ->
55-
length([ X || X <- Masters, X == Node ]).
56-
57-
get_bound_queue_masters_per_vhost([], Acc) ->
58-
lists:flatten(Acc);
59-
get_bound_queue_masters_per_vhost([VHost|RemVHosts], Acc) ->
60-
BoundQueueNames =
61-
lists:filtermap(
62-
fun(#binding{destination =#resource{kind = queue,
63-
name = QueueName}}) ->
64-
{true, QueueName};
65-
(_) ->
66-
false
67-
end,
68-
rabbit_binding:list(VHost)),
69-
UniqQueueNames = lists:usort(BoundQueueNames),
70-
BoundQueueMasters = get_queue_masters(VHost, UniqQueueNames, []),
71-
get_bound_queue_masters_per_vhost(RemVHosts, [BoundQueueMasters|Acc]).
72-
73-
74-
get_queue_masters(_VHost, [], BoundQueueNodes) -> BoundQueueNodes;
75-
get_queue_masters(VHost, [QueueName | RemQueueNames], QueueMastersAcc) ->
76-
QueueMastersAcc0 = case rabbit_queue_master_location_misc:lookup_master(
77-
QueueName, VHost) of
78-
{ok, Master} when is_atom(Master) ->
79-
[Master|QueueMastersAcc];
80-
_ -> QueueMastersAcc
81-
end,
82-
get_queue_masters(VHost, RemQueueNames, QueueMastersAcc0).
59+
{MinNode, _NMasters} = maps:fold(
60+
fun(Node, NMasters, init) ->
61+
{Node, NMasters};
62+
(Node, NMasters, {MinNode, MinMasters}) ->
63+
case NMasters < MinMasters of
64+
true -> {Node, NMasters};
65+
false -> {MinNode, MinMasters}
66+
end
67+
end,
68+
init,
69+
MastersPerNode),
70+
{ok, MinNode}.

0 commit comments

Comments
 (0)