Skip to content

Commit d3dcd48

Browse files
committed
Always place exclusive queues on the local node
Prior to this change, exclusive queues have been subject to the queue location process, just like other queues. Therefore, if queue_master_locator was not client-local and x-queue-master-locator was not set to client-local, an exclusive queue was likely to be located on a different node than the connection it is exclusive to. This is suboptimal and may lead to inconsistencies when the queue's node goes down while the connection's node is still up.
1 parent a809e55 commit d3dcd48

File tree

2 files changed

+6
-42
lines changed

2 files changed

+6
-42
lines changed

deps/rabbit/src/rabbit_classic_queue.erl

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,13 @@ is_enabled() -> true.
5555
declare(Q, Node) when ?amqqueue_is_classic(Q) ->
5656
QName = amqqueue:get_name(Q),
5757
VHost = amqqueue:get_vhost(Q),
58-
Node1 = case Node of
59-
{ignore_location, Node0} ->
58+
Node1 = case {Node, rabbit_amqqueue:is_exclusive(Q)} of
59+
{{ignore_location, Node0}, _} ->
6060
Node0;
61+
{_, true} ->
62+
Node;
6163
_ ->
62-
case rabbit_queue_master_location_misc:get_location(Q) of
64+
case rabbit_queue_master_location_misc:get_location(Q) of
6365
{ok, Node0} -> Node0;
6466
_ -> Node
6567
end

deps/rabbit/test/simple_ha_SUITE.erl

Lines changed: 1 addition & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,7 @@ groups() ->
3030
{cluster_size_2, [], [
3131
rapid_redeclare,
3232
declare_synchrony,
33-
clean_up_exclusive_queues,
34-
clean_up_and_redeclare_exclusive_queues_on_other_nodes
33+
clean_up_exclusive_queues
3534
]},
3635
{cluster_size_3, [], [
3736
consume_survives_stop,
@@ -150,43 +149,6 @@ clean_up_exclusive_queues(Config) ->
150149
[[],[]] = rabbit_ct_broker_helpers:rpc_all(Config, rabbit_amqqueue, list, []),
151150
ok.
152151

153-
clean_up_and_redeclare_exclusive_queues_on_other_nodes(Config) ->
154-
QueueCount = 10,
155-
QueueNames = lists:map(fun(N) ->
156-
NBin = erlang:integer_to_binary(N),
157-
<<"exclusive-q-", NBin/binary>>
158-
end, lists:seq(1, QueueCount)),
159-
[A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
160-
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, A),
161-
{ok, Ch} = amqp_connection:open_channel(Conn),
162-
163-
LocationMinMasters = [
164-
{<<"x-queue-master-locator">>, longstr, <<"min-masters">>}
165-
],
166-
lists:foreach(fun(QueueName) ->
167-
declare_exclusive(Ch, QueueName, LocationMinMasters),
168-
subscribe(Ch, QueueName)
169-
end, QueueNames),
170-
171-
ok = rabbit_ct_broker_helpers:kill_node(Config, B),
172-
173-
Cancels = receive_cancels([]),
174-
?assert(length(Cancels) > 0),
175-
176-
RemaniningQueues = rabbit_ct_broker_helpers:rpc(Config, A, rabbit_amqqueue, list, []),
177-
178-
?assertEqual(length(RemaniningQueues), QueueCount - length(Cancels)),
179-
180-
lists:foreach(fun(QueueName) ->
181-
declare_exclusive(Ch, QueueName, LocationMinMasters),
182-
true = rabbit_ct_client_helpers:publish(Ch, QueueName, 1),
183-
subscribe(Ch, QueueName)
184-
end, QueueNames),
185-
Messages = receive_messages([]),
186-
?assertEqual(10, length(Messages)),
187-
ok = rabbit_ct_client_helpers:close_connection(Conn).
188-
189-
190152
consume_survives_stop(Cf) -> consume_survives(Cf, fun stop/2, true).
191153
consume_survives_sigkill(Cf) -> consume_survives(Cf, fun sigkill/2, true).
192154
consume_survives_policy(Cf) -> consume_survives(Cf, fun policy/2, true).

0 commit comments

Comments
 (0)