Skip to content

Commit 501027a

Browse files
committed
Backport implicit bindings for forward-compatibility with 3.8.
Backport of c7d107d 10013f5 e01dc98 7ba04a6 In 3.8 queues will not create default bindings. Because default bindings are only used in list function, it should be safe to run in a mixed mode with 3.8. List functions should show implicit bindings for queues created in 3.8. Min master locator logic is simplified and with or without default bindings will have the same behaviour as before. Follow-up to #1721
1 parent 66705eb commit 501027a

File tree

3 files changed

+93
-55
lines changed

3 files changed

+93
-55
lines changed

src/rabbit_amqqueue.erl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2,
3030
emit_info_all/5, list_local/1, info_local/1,
3131
emit_info_local/4, emit_info_down/4]).
32-
-export([list_down/1, count/1, list_names/0, list_local_names/0]).
32+
-export([list_down/1, count/1, list_names/0, list_names/1, list_local_names/0]).
3333
-export([force_event_refresh/1, notify_policy_changed/1]).
3434
-export([consumers/1, consumers_all/1, emit_consumers_all/4, consumer_info_keys/0]).
3535
-export([basic_get/4, basic_consume/11, basic_cancel/5, notify_decorators/1]).
@@ -116,6 +116,7 @@
116116
-spec list() -> [rabbit_types:amqqueue()].
117117
-spec list(rabbit_types:vhost()) -> [rabbit_types:amqqueue()].
118118
-spec list_names() -> [rabbit_amqqueue:name()].
119+
-spec list_names(rabbit_types:vhost()) -> [rabbit_amqqueue:name()].
119120
-spec list_down(rabbit_types:vhost()) -> [rabbit_types:amqqueue()].
120121
-spec info_keys() -> rabbit_types:info_keys().
121122
-spec info(rabbit_types:amqqueue()) -> rabbit_types:infos().
@@ -659,6 +660,8 @@ list() -> mnesia:dirty_match_object(rabbit_queue, #amqqueue{_ = '_'}).
659660

660661
list_names() -> mnesia:dirty_all_keys(rabbit_queue).
661662

663+
list_names(VHost) -> [Q#amqqueue.name || Q <- list(VHost)].
664+
662665
list_local_names() ->
663666
[ Q#amqqueue.name || #amqqueue{state = State, pid = QPid} = Q <- list(),
664667
State =/= crashed,

src/rabbit_binding.erl

Lines changed: 60 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@
2727
-export([has_for_source/1, remove_for_source/1,
2828
remove_for_destination/2, remove_transient_for_destination/1]).
2929

30+
-define(DEFAULT_EXCHANGE(VHostPath), #resource{virtual_host = VHostPath,
31+
kind = exchange,
32+
name = <<>>}).
33+
3034
%%----------------------------------------------------------------------------
3135

3236
-export_type([key/0, deletions/0]).
@@ -156,6 +160,14 @@ recover_semi_durable_route_txn(R = #route{binding = B}, X) ->
156160
(Serial, false) -> x_callback(Serial, X, add_binding, B)
157161
end).
158162

163+
exists(#binding{source = ?DEFAULT_EXCHANGE(_),
164+
destination = #resource{kind = queue, name = QName} = Queue,
165+
key = QName,
166+
args = []}) ->
167+
case rabbit_amqqueue:lookup(Queue) of
168+
{ok, _} -> true;
169+
{error, not_found} -> false
170+
end;
159171
exists(Binding) ->
160172
binding_action(
161173
Binding, fun (_Src, _Dst, B) ->
@@ -243,9 +255,17 @@ list(VHostPath) ->
243255
destination = VHostResource,
244256
_ = '_'},
245257
_ = '_'},
246-
[B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route,
247-
Route)].
248-
258+
%% if there are any default exchange bindings left after an upgrade
259+
%% of a pre-3.8 database, filter them out
260+
AllBindings = [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route,
261+
Route)],
262+
Filtered = lists:filter(fun(#binding{source = S}) ->
263+
S =/= ?DEFAULT_EXCHANGE(VHostPath)
264+
end, AllBindings),
265+
implicit_bindings(VHostPath) ++ Filtered.
266+
267+
list_for_source(?DEFAULT_EXCHANGE(VHostPath)) ->
268+
implicit_bindings(VHostPath);
249269
list_for_source(SrcName) ->
250270
mnesia:async_dirty(
251271
fun() ->
@@ -255,16 +275,43 @@ list_for_source(SrcName) ->
255275
end).
256276

257277
list_for_destination(DstName) ->
258-
mnesia:async_dirty(
259-
fun() ->
260-
Route = #route{binding = #binding{destination = DstName,
261-
_ = '_'}},
262-
[reverse_binding(B) ||
263-
#reverse_route{reverse_binding = B} <-
264-
mnesia:match_object(rabbit_reverse_route,
265-
reverse_route(Route), read)]
266-
end).
267-
278+
implicit_for_destination(DstName) ++
279+
mnesia:async_dirty(
280+
fun() ->
281+
Route = #route{binding = #binding{destination = DstName,
282+
_ = '_'}},
283+
[reverse_binding(B) ||
284+
#reverse_route{reverse_binding = B} <-
285+
mnesia:match_object(rabbit_reverse_route,
286+
reverse_route(Route), read)]
287+
end).
288+
289+
implicit_bindings(VHostPath) ->
290+
DstQueues = rabbit_amqqueue:list_names(VHostPath),
291+
[ #binding{source = ?DEFAULT_EXCHANGE(VHostPath),
292+
destination = DstQueue,
293+
key = QName,
294+
args = []}
295+
|| DstQueue = #resource{name = QName} <- DstQueues ].
296+
297+
implicit_for_destination(DstQueue = #resource{kind = queue,
298+
virtual_host = VHostPath,
299+
name = QName}) ->
300+
[#binding{source = ?DEFAULT_EXCHANGE(VHostPath),
301+
destination = DstQueue,
302+
key = QName,
303+
args = []}];
304+
implicit_for_destination(_) ->
305+
[].
306+
307+
list_for_source_and_destination(?DEFAULT_EXCHANGE(VHostPath),
308+
#resource{kind = queue,
309+
virtual_host = VHostPath,
310+
name = QName} = DstQueue) ->
311+
[#binding{source = ?DEFAULT_EXCHANGE(VHostPath),
312+
destination = DstQueue,
313+
key = QName,
314+
args = []}];
268315
list_for_source_and_destination(SrcName, DstName) ->
269316
mnesia:async_dirty(
270317
fun() ->

src/rabbit_queue_location_min_masters.erl

Lines changed: 29 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -38,45 +38,33 @@ description() ->
3838
<<"Locate queue master node from cluster node with least bound queues">>}].
3939

4040
queue_master_location(#amqqueue{} = Q) ->
41-
Cluster = rabbit_queue_master_location_misc:all_nodes(Q),
42-
VHosts = rabbit_vhost:list(),
43-
BoundQueueMasters = get_bound_queue_masters_per_vhost(VHosts, []),
44-
{_Count, MinMaster}= get_min_master(Cluster, BoundQueueMasters),
45-
{ok, MinMaster}.
41+
Cluster = rabbit_queue_master_location_misc:all_nodes(Q),
42+
QueueNames = rabbit_amqqueue:list_names(),
43+
MastersPerNode = lists:foldl(
44+
fun(#resource{virtual_host = VHost, name = QueueName}, NodeMasters) ->
45+
case rabbit_queue_master_location_misc:lookup_master(QueueName, VHost) of
46+
{ok, Master} when is_atom(Master) ->
47+
case maps:is_key(Master, NodeMasters) of
48+
true -> maps:update_with(Master,
49+
fun(N) -> N + 1 end,
50+
NodeMasters);
51+
false -> NodeMasters
52+
end;
53+
_ -> NodeMasters
54+
end
55+
end,
56+
maps:from_list([{N, 0} || N <- Cluster]),
57+
QueueNames),
4658

47-
%%---------------------------------------------------------------------------
48-
%% Private helper functions
49-
%%---------------------------------------------------------------------------
50-
get_min_master(Cluster, BoundQueueMasters) ->
51-
lists:min([ {count_masters(Node, BoundQueueMasters), Node} ||
52-
Node <- Cluster ]).
53-
54-
count_masters(Node, Masters) ->
55-
length([ X || X <- Masters, X == Node ]).
56-
57-
get_bound_queue_masters_per_vhost([], Acc) ->
58-
lists:flatten(Acc);
59-
get_bound_queue_masters_per_vhost([VHost|RemVHosts], Acc) ->
60-
BoundQueueNames =
61-
lists:filtermap(
62-
fun(#binding{destination =#resource{kind = queue,
63-
name = QueueName}}) ->
64-
{true, QueueName};
65-
(_) ->
66-
false
67-
end,
68-
rabbit_binding:list(VHost)),
69-
UniqQueueNames = lists:usort(BoundQueueNames),
70-
BoundQueueMasters = get_queue_masters(VHost, UniqQueueNames, []),
71-
get_bound_queue_masters_per_vhost(RemVHosts, [BoundQueueMasters|Acc]).
72-
73-
74-
get_queue_masters(_VHost, [], BoundQueueNodes) -> BoundQueueNodes;
75-
get_queue_masters(VHost, [QueueName | RemQueueNames], QueueMastersAcc) ->
76-
QueueMastersAcc0 = case rabbit_queue_master_location_misc:lookup_master(
77-
QueueName, VHost) of
78-
{ok, Master} when is_atom(Master) ->
79-
[Master|QueueMastersAcc];
80-
_ -> QueueMastersAcc
81-
end,
82-
get_queue_masters(VHost, RemQueueNames, QueueMastersAcc0).
59+
{MinNode, _NMasters} = maps:fold(
60+
fun(Node, NMasters, init) ->
61+
{Node, NMasters};
62+
(Node, NMasters, {MinNode, MinMasters}) ->
63+
case NMasters < MinMasters of
64+
true -> {Node, NMasters};
65+
false -> {MinNode, MinMasters}
66+
end
67+
end,
68+
init,
69+
MastersPerNode),
70+
{ok, MinNode}.

0 commit comments

Comments
 (0)