Skip to content

Commit 3a33163

Browse files
committed
Queues with plugins - remove queue_topology callback
1 parent 1eeaef4 commit 3a33163

File tree

7 files changed

+24
-62
lines changed

7 files changed

+24
-62
lines changed

deps/rabbit/src/amqqueue.erl

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@
2828
set_decorators/2,
2929
% exclusive_owner
3030
get_exclusive_owner/1,
31-
get_leader/1,
31+
get_leader_node/1,
32+
get_nodes/1,
3233
% name (#resource)
3334
get_name/1,
3435
set_name/2,
@@ -387,10 +388,21 @@ set_decorators(#amqqueue{} = Queue, Decorators) ->
387388
get_exclusive_owner(#amqqueue{exclusive_owner = Owner}) ->
388389
Owner.
389390

390-
-spec get_leader(amqqueue_v2()) -> node().
391+
-spec get_leader_node(amqqueue_v2()) -> node() | none.
391392

392-
%% TODO: not only qqs can have leaders, dispatch via queue type
393-
get_leader(#amqqueue{type = rabbit_quorum_queue, pid = {_, Leader}}) -> Leader.
393+
get_leader_node(#amqqueue{pid = {_, Leader}}) -> Leader;
394+
get_leader_node(#amqqueue{pid = none}) -> none;
395+
get_leader_node(#amqqueue{pid = Pid}) -> node(Pid).
396+
397+
-spec get_nodes(amqqueue_v2()) -> [node(),...].
398+
399+
get_nodes(Q) ->
400+
case amqqueue:get_type_state(Q) of
401+
#{nodes := Nodes} ->
402+
Nodes;
403+
_ ->
404+
[get_leader_node(Q)]
405+
end.
394406

395407
% operator_policy
396408

deps/rabbit/src/rabbit_amqp_management.erl

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -463,8 +463,9 @@ encode_queue(Q, NumMsgs, NumConsumers) ->
463463
-spec queue_topology(amqqueue:amqqueue()) ->
464464
{Leader :: node() | none, Replicas :: [node(),...]}.
465465
queue_topology(Q) ->
466-
Type = amqqueue:get_type(Q),
467-
Type:queue_topology(Q).
466+
Leader = amqqueue:get_leader_node(Q),
467+
Replicas = amqqueue:get_nodes(Q),
468+
{Leader, Replicas}.
468469

469470
decode_exchange({map, KVList}) ->
470471
M = lists:foldl(

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1325,14 +1325,14 @@ list_stream_queues_on(Node) when is_atom(Node) ->
13251325
list_local_leaders() ->
13261326
[ Q || Q <- list(),
13271327
amqqueue:is_quorum(Q),
1328-
amqqueue:get_state(Q) =/= crashed, amqqueue:get_leader(Q) =:= node()].
1328+
amqqueue:get_state(Q) =/= crashed, amqqueue:get_leader_node(Q) =:= node()].
13291329

13301330
-spec list_local_followers() -> [amqqueue:amqqueue()].
13311331
list_local_followers() ->
13321332
[Q
13331333
|| Q <- list(),
13341334
amqqueue:is_quorum(Q),
1335-
amqqueue:get_leader(Q) =/= node(),
1335+
amqqueue:get_leader_node(Q) =/= node(),
13361336
lists:member(node(), get_quorum_nodes(Q)),
13371337
rabbit_quorum_queue:is_recoverable(Q)
13381338
].

deps/rabbit/src/rabbit_classic_queue.erl

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,7 @@
6464
send_drained_credit_api_v1/4,
6565
send_credit_reply/7]).
6666

67-
-export([queue_topology/1,
68-
policy_apply_to_name/0,
67+
-export([policy_apply_to_name/0,
6968
stop/1,
7069
list_with_minimum_quorum/0,
7170
drain/1,
@@ -702,13 +701,6 @@ send_credit_reply(Pid, QName, Ctag, DeliveryCount, Credit, Available, Drain) ->
702701
send_queue_event(Pid, QName, Event) ->
703702
gen_server:cast(Pid, {queue_event, QName, Event}).
704703

705-
-spec queue_topology(amqqueue:amqqueue()) ->
706-
{Leader :: node() | none, Replicas :: [node(),...]}.
707-
queue_topology(Q) ->
708-
Pid = amqqueue:get_pid(Q),
709-
Node = node(Pid),
710-
{Node, [Node]}.
711-
712704
policy_apply_to_name() ->
713705
<<"classic_queues">>.
714706

deps/rabbit/src/rabbit_queue_type.erl

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -284,9 +284,6 @@
284284
-callback notify_decorators(amqqueue:amqqueue()) ->
285285
ok.
286286

287-
-callback queue_topology(amqqueue:amqqueue()) ->
288-
{Leader :: undefined | node(), Replicas :: undefined | [node(),...]}.
289-
290287
-callback policy_apply_to_name() -> binary().
291288

292289
%% -callback on_node_up(node()) -> ok.

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,7 @@
7777
force_vhost_queues_shrink_member_to_current_member/1,
7878
force_all_queues_shrink_member_to_current_member/0]).
7979

80-
-export([queue_topology/1,
81-
policy_apply_to_name/0,
80+
-export([policy_apply_to_name/0,
8281
drain/1,
8382
revive/0,
8483
queue_vm_stats_sups/0,
@@ -2253,25 +2252,6 @@ maybe_log_leader_health_check_result(Result) ->
22532252
Qs = lists:map(fun(R) -> catch maps:get(<<"readable_name">>, R) end, Result),
22542253
rabbit_log:warning("Leader health check result (unhealthy leaders detected): ~tp", [Qs]).
22552254

2256-
-spec queue_topology(amqqueue:amqqueue()) ->
2257-
{Leader :: node() | none, Replicas :: [node(),...]}.
2258-
queue_topology(Q) ->
2259-
Leader = case amqqueue:get_pid(Q) of
2260-
{_RaName, Node} ->
2261-
Node;
2262-
none ->
2263-
none;
2264-
Pid ->
2265-
node(Pid)
2266-
end,
2267-
Replicas = case amqqueue:get_type_state(Q) of
2268-
#{nodes := Nodes} ->
2269-
Nodes;
2270-
_ ->
2271-
[Leader]
2272-
end,
2273-
{Leader, Replicas}.
2274-
22752255
policy_apply_to_name() ->
22762256
<<"quorum_queues">>.
22772257

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,7 @@
5959

6060
-export([check_max_segment_size_bytes/1]).
6161

62-
-export([queue_topology/1,
63-
policy_apply_to_name/0,
62+
-export([policy_apply_to_name/0,
6463
stop/1,
6564
drain/1,
6665
revive/0,
@@ -1430,25 +1429,6 @@ delivery_count_add(none, _) ->
14301429
delivery_count_add(Count, N) ->
14311430
serial_number:add(Count, N).
14321431

1433-
-spec queue_topology(amqqueue:amqqueue()) ->
1434-
{Leader :: node() | none, Replicas :: [node(),...]}.
1435-
queue_topology(Q) ->
1436-
Leader = case amqqueue:get_pid(Q) of
1437-
{_RaName, Node} ->
1438-
Node;
1439-
none ->
1440-
none;
1441-
Pid ->
1442-
node(Pid)
1443-
end,
1444-
Replicas = case amqqueue:get_type_state(Q) of
1445-
#{nodes := Nodes} ->
1446-
Nodes;
1447-
_ ->
1448-
[Leader]
1449-
end,
1450-
{Leader, Replicas}.
1451-
14521432
policy_apply_to_name() ->
14531433
<<"streams">>.
14541434

0 commit comments

Comments
 (0)