Skip to content

Commit eb7634c

Browse files
Merge pull request #13905 from rabbitmq/ik-queues-with-plugins
Queues with plugins - Core
2 parents 551a300 + bcdb0b7 commit eb7634c

24 files changed

+739
-301
lines changed

deps/rabbit/src/amqqueue.erl

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@
2828
set_decorators/2,
2929
% exclusive_owner
3030
get_exclusive_owner/1,
31-
get_leader/1,
31+
get_leader_node/1,
32+
get_nodes/1,
3233
% name (#resource)
3334
get_name/1,
3435
set_name/2,
@@ -387,9 +388,21 @@ set_decorators(#amqqueue{} = Queue, Decorators) ->
387388
get_exclusive_owner(#amqqueue{exclusive_owner = Owner}) ->
388389
Owner.
389390

390-
-spec get_leader(amqqueue_v2()) -> node().
391+
-spec get_leader_node(amqqueue_v2()) -> node() | none.
391392

392-
get_leader(#amqqueue{type = rabbit_quorum_queue, pid = {_, Leader}}) -> Leader.
393+
get_leader_node(#amqqueue{pid = {_, Leader}}) -> Leader;
394+
get_leader_node(#amqqueue{pid = none}) -> none;
395+
get_leader_node(#amqqueue{pid = Pid}) -> node(Pid).
396+
397+
-spec get_nodes(amqqueue_v2()) -> [node(),...].
398+
399+
get_nodes(Q) ->
400+
case amqqueue:get_type_state(Q) of
401+
#{nodes := Nodes} ->
402+
Nodes;
403+
_ ->
404+
[get_leader_node(Q)]
405+
end.
393406

394407
% operator_policy
395408

deps/rabbit/src/rabbit_amqp_management.erl

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -463,20 +463,8 @@ encode_queue(Q, NumMsgs, NumConsumers) ->
463463
-spec queue_topology(amqqueue:amqqueue()) ->
464464
{Leader :: node() | none, Replicas :: [node(),...]}.
465465
queue_topology(Q) ->
466-
Leader = case amqqueue:get_pid(Q) of
467-
{_RaName, Node} ->
468-
Node;
469-
none ->
470-
none;
471-
Pid ->
472-
node(Pid)
473-
end,
474-
Replicas = case amqqueue:get_type_state(Q) of
475-
#{nodes := Nodes} ->
476-
Nodes;
477-
_ ->
478-
[Leader]
479-
end,
466+
Leader = amqqueue:get_leader_node(Q),
467+
Replicas = amqqueue:get_nodes(Q),
480468
{Leader, Replicas}.
481469

482470
decode_exchange({map, KVList}) ->

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 38 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
-export([update/2, store_queue/1, update_decorators/2, policy_changed/2]).
3838
-export([emit_unresponsive/6, emit_unresponsive_local/5, is_unresponsive/2]).
3939
-export([is_match/2, is_in_virtual_host/2]).
40-
-export([is_replicated/1, is_exclusive/1, is_not_exclusive/1, is_dead_exclusive/1]).
40+
-export([is_replicable/1, is_exclusive/1, is_not_exclusive/1, is_dead_exclusive/1]).
4141
-export([list_local_quorum_queues/0, list_local_quorum_queue_names/0,
4242
list_local_stream_queues/0, list_stream_queues_on/1,
4343
list_local_leaders/0, list_local_followers/0, get_quorum_nodes/1,
@@ -150,11 +150,7 @@ filter_pid_per_type(QPids) ->
150150

151151
-spec stop(rabbit_types:vhost()) -> 'ok'.
152152
stop(VHost) ->
153-
%% Classic queues
154-
ok = rabbit_amqqueue_sup_sup:stop_for_vhost(VHost),
155-
{ok, BQ} = application:get_env(rabbit, backing_queue_module),
156-
ok = BQ:stop(VHost),
157-
rabbit_quorum_queue:stop(VHost).
153+
rabbit_queue_type:stop(VHost).
158154

159155
-spec start([amqqueue:amqqueue()]) -> 'ok'.
160156

@@ -424,14 +420,16 @@ rebalance(Type, VhostSpec, QueueSpec) ->
424420
%% We have not yet acquired the rebalance_queues global lock.
425421
maybe_rebalance(get_rebalance_lock(self()), Type, VhostSpec, QueueSpec).
426422

423+
%% TODO: classic queues do not support rebalancing, it looks like they are simply
424+
%% filtered out with is_replicable(Q). Maybe error instead?
427425
maybe_rebalance({true, Id}, Type, VhostSpec, QueueSpec) ->
428426
rabbit_log:info("Starting queue rebalance operation: '~ts' for vhosts matching '~ts' and queues matching '~ts'",
429427
[Type, VhostSpec, QueueSpec]),
430428
Running = rabbit_maintenance:filter_out_drained_nodes_consistent_read(rabbit_nodes:list_running()),
431429
NumRunning = length(Running),
432430
ToRebalance = [Q || Q <- list(),
433431
filter_per_type(Type, Q),
434-
is_replicated(Q),
432+
is_replicable(Q),
435433
is_match(amqqueue:get_vhost(Q), VhostSpec) andalso
436434
is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec)],
437435
NumToRebalance = length(ToRebalance),
@@ -459,10 +457,20 @@ filter_per_type(stream, Q) ->
459457
filter_per_type(classic, Q) ->
460458
?amqqueue_is_classic(Q).
461459

462-
rebalance_module(Q) when ?amqqueue_is_quorum(Q) ->
463-
rabbit_quorum_queue;
464-
rebalance_module(Q) when ?amqqueue_is_stream(Q) ->
465-
rabbit_stream_queue.
460+
%% TODO: note that it can return {error, not_supported}.
461+
%% this will result in a badmatch. However that's fine
462+
%% for now because the original function will fail with
463+
%% bad clause if called with classical queue.
464+
%% The assumption is all non-replicated queues
465+
%% are filtered before calling this with is_replicable/0
466+
rebalance_module(Q) ->
467+
case rabbit_queue_type:rebalance_module(Q) of
468+
undefined ->
469+
rabbit_log:error("Undefined rebalance module for queue type: ~s", [amqqueue:get_type(Q)]),
470+
{error, not_supported};
471+
RBModule ->
472+
RBModule
473+
end.
466474

467475
get_resource_name(#resource{name = Name}) ->
468476
Name.
@@ -487,13 +495,19 @@ iterative_rebalance(ByNode, MaxQueuesDesired) ->
487495
maybe_migrate(ByNode, MaxQueuesDesired) ->
488496
maybe_migrate(ByNode, MaxQueuesDesired, maps:keys(ByNode)).
489497

498+
%% TODO: unfortunate part - UI bits mixed deep inside logic.
499+
%% I will not be moving this inside queue type. Instead
500+
%% an attempt to generate something more readable than
501+
%% Other made.
490502
column_name(rabbit_classic_queue) -> <<"Number of replicated classic queues">>;
491503
column_name(rabbit_quorum_queue) -> <<"Number of quorum queues">>;
492504
column_name(rabbit_stream_queue) -> <<"Number of streams">>;
493-
column_name(Other) -> Other.
505+
column_name(TypeModule) ->
506+
Alias = rabbit_queue_type:short_alias_of(TypeModule),
507+
<<"Number of \"", Alias/binary, "\" queues">>.
494508

495509
maybe_migrate(ByNode, _, []) ->
496-
ByNodeAndType = maps:map(fun(_Node, Queues) -> maps:groups_from_list(fun({_, Q, _}) -> column_name(?amqqueue_v2_field_type(Q)) end, Queues) end, ByNode),
510+
ByNodeAndType = maps:map(fun(_Node, Queues) -> maps:groups_from_list(fun({_, Q, _}) -> column_name(amqqueue:get_type(Q)) end, Queues) end, ByNode),
497511
CountByNodeAndType = maps:map(fun(_Node, Type) -> maps:map(fun (_, Qs)-> length(Qs) end, Type) end, ByNodeAndType),
498512
{ok, maps:values(maps:map(fun(Node,Counts) -> [{<<"Node name">>, Node} | maps:to_list(Counts)] end, CountByNodeAndType))};
499513
maybe_migrate(ByNode, MaxQueuesDesired, [N | Nodes]) ->
@@ -1281,14 +1295,12 @@ list_durable() ->
12811295

12821296
-spec list_by_type(atom()) -> [amqqueue:amqqueue()].
12831297

1284-
list_by_type(classic) -> list_by_type(rabbit_classic_queue);
1285-
list_by_type(quorum) -> list_by_type(rabbit_quorum_queue);
1286-
list_by_type(stream) -> list_by_type(rabbit_stream_queue);
1287-
list_by_type(Type) ->
1288-
rabbit_db_queue:get_all_durable_by_type(Type).
1298+
list_by_type(TypeDescriptor) ->
1299+
TypeModule = rabbit_queue_type:discover(TypeDescriptor),
1300+
rabbit_db_queue:get_all_durable_by_type(TypeModule).
12891301

1302+
%% TODO: looks unused
12901303
-spec list_local_quorum_queue_names() -> [name()].
1291-
12921304
list_local_quorum_queue_names() ->
12931305
[ amqqueue:get_name(Q) || Q <- list_by_type(quorum),
12941306
amqqueue:get_state(Q) =/= crashed,
@@ -1313,18 +1325,19 @@ list_stream_queues_on(Node) when is_atom(Node) ->
13131325
list_local_leaders() ->
13141326
[ Q || Q <- list(),
13151327
amqqueue:is_quorum(Q),
1316-
amqqueue:get_state(Q) =/= crashed, amqqueue:get_leader(Q) =:= node()].
1328+
amqqueue:get_state(Q) =/= crashed, amqqueue:get_leader_node(Q) =:= node()].
13171329

13181330
-spec list_local_followers() -> [amqqueue:amqqueue()].
13191331
list_local_followers() ->
13201332
[Q
13211333
|| Q <- list(),
13221334
amqqueue:is_quorum(Q),
1323-
amqqueue:get_leader(Q) =/= node(),
1335+
amqqueue:get_leader_node(Q) =/= node(),
13241336
lists:member(node(), get_quorum_nodes(Q)),
13251337
rabbit_quorum_queue:is_recoverable(Q)
13261338
].
13271339

1340+
%% TODO: looks unused
13281341
-spec list_local_quorum_queues_with_name_matching(binary()) -> [amqqueue:amqqueue()].
13291342
list_local_quorum_queues_with_name_matching(Pattern) ->
13301343
[ Q || Q <- list_by_type(quorum),
@@ -1909,13 +1922,10 @@ forget_node_for_queue(Q) ->
19091922
run_backing_queue(QPid, Mod, Fun) ->
19101923
gen_server2:cast(QPid, {run_backing_queue, Mod, Fun}).
19111924

1912-
-spec is_replicated(amqqueue:amqqueue()) -> boolean().
1925+
-spec is_replicable(amqqueue:amqqueue()) -> boolean().
19131926

1914-
is_replicated(Q) when ?amqqueue_is_classic(Q) ->
1915-
false;
1916-
is_replicated(_Q) ->
1917-
%% streams and quorum queues are all replicated
1918-
true.
1927+
is_replicable(Q) ->
1928+
rabbit_queue_type:is_replicable(Q).
19191929

19201930
is_exclusive(Q) when ?amqqueue_exclusive_owner_is(Q, none) ->
19211931
false;
@@ -1985,7 +1995,7 @@ filter_transient_queues_to_delete(Node) ->
19851995
amqqueue:qnode(Q) == Node andalso
19861996
not rabbit_process:is_process_alive(amqqueue:get_pid(Q))
19871997
andalso (not amqqueue:is_classic(Q) orelse not amqqueue:is_durable(Q))
1988-
andalso (not is_replicated(Q)
1998+
andalso (not is_replicable(Q)
19891999
orelse is_dead_exclusive(Q))
19902000
andalso amqqueue:get_type(Q) =/= rabbit_mqtt_qos0_queue
19912001
end.

deps/rabbit/src/rabbit_boot_steps.erl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
%%
55
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
66
%%
7+
%% README: https://github.com/rabbitmq/internals/blob/master/rabbit_boot_process.md
8+
%%
79

810
-module(rabbit_boot_steps).
911

deps/rabbit/src/rabbit_channel.erl

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1279,11 +1279,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck},
12791279
?INCR_STATS(queue_stats, QueueName, 1, get_empty, State),
12801280
{reply, #'basic.get_empty'{}, State#ch{queue_states = QueueStates}};
12811281
{error, {unsupported, single_active_consumer}} ->
1282-
rabbit_misc:protocol_error(
1283-
resource_locked,
1284-
"cannot obtain access to locked ~ts. basic.get operations "
1285-
"are not supported by quorum queues with single active consumer",
1286-
[rabbit_misc:rs(QueueName)]);
1282+
rabbit_amqqueue:with_or_die(QueueName, fun unsupported_single_active_consumer_error/1);
12871283
{error, Reason} ->
12881284
%% TODO add queue type to error message
12891285
rabbit_misc:protocol_error(internal_error,
@@ -1996,6 +1992,7 @@ foreach_per_queue(_F, [], Acc) ->
19961992
foreach_per_queue(F, [#pending_ack{tag = CTag,
19971993
queue = QName,
19981994
msg_id = MsgId}], Acc) ->
1995+
%% TODO: fix this abstraction leak
19991996
%% quorum queue, needs the consumer tag
20001997
F({QName, CTag}, [MsgId], Acc);
20011998
foreach_per_queue(F, UAL, Acc) ->
@@ -2023,6 +2020,7 @@ notify_limiter(Limiter, Acked) ->
20232020
case rabbit_limiter:is_active(Limiter) of
20242021
false -> ok;
20252022
true -> case lists:foldl(fun (#pending_ack{tag = CTag}, Acc) when is_integer(CTag) ->
2023+
%% TODO: fix absctraction leak
20262024
%% Quorum queues use integer CTags
20272025
%% classic queues use binaries
20282026
%% Quorum queues do not interact
@@ -2787,3 +2785,12 @@ maybe_decrease_global_publishers(#ch{publishing_mode = true}) ->
27872785

27882786
is_global_qos_permitted() ->
27892787
rabbit_deprecated_features:is_permitted(global_qos).
2788+
2789+
-spec unsupported_single_active_consumer_error(amqqueue:amqqueue()) -> no_return().
2790+
unsupported_single_active_consumer_error(Q) ->
2791+
rabbit_misc:protocol_error(
2792+
resource_locked,
2793+
"cannot obtain access to locked ~ts. basic.get operations "
2794+
"are not supported by ~p queues with single active consumer",
2795+
[rabbit_misc:rs(amqqueue:get_name(Q)),
2796+
rabbit_queue_type:short_alias_of(amqqueue:get_type(Q))]).

deps/rabbit/src/rabbit_classic_queue.erl

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,26 @@
6464
send_drained_credit_api_v1/4,
6565
send_credit_reply/7]).
6666

67+
-export([policy_apply_to_name/0,
68+
stop/1,
69+
list_with_minimum_quorum/0,
70+
drain/1,
71+
revive/0,
72+
queue_vm_stats_sups/0,
73+
queue_vm_ets/0]).
74+
6775
-export([validate_policy/1]).
6876

77+
-rabbit_boot_step(
78+
{rabbit_classic_queue_type,
79+
[{description, "Classic queue: queue type"},
80+
{mfa, {rabbit_registry, register,
81+
[queue, <<"classic">>, ?MODULE]}},
82+
{cleanup, {rabbit_registry, unregister,
83+
[queue, <<"classic">>]}},
84+
{requires, rabbit_registry},
85+
{enables, ?MODULE}]}).
86+
6987
-rabbit_boot_step(
7088
{?MODULE,
7189
[{description, "Deprecated queue-master-locator support."
@@ -74,7 +92,7 @@
7492
[policy_validator, <<"queue-master-locator">>, ?MODULE]}},
7593
{mfa, {rabbit_registry, register,
7694
[operator_policy_validator, <<"queue-master-locator">>, ?MODULE]}},
77-
{requires, rabbit_registry},
95+
{requires, [rabbit_classic_queue_type]},
7896
{enables, recovery}]}).
7997

8098
validate_policy(Args) ->
@@ -590,7 +608,11 @@ capabilities() ->
590608
false -> []
591609
end,
592610
consumer_arguments => [<<"x-priority">>],
593-
server_named => true}.
611+
server_named => true,
612+
rebalance_module => undefined,
613+
can_redeliver => false,
614+
is_replicable => false
615+
}.
594616

595617
notify_decorators(Q) when ?is_amqqueue(Q) ->
596618
QPid = amqqueue:get_pid(Q),
@@ -678,3 +700,33 @@ send_credit_reply(Pid, QName, Ctag, DeliveryCount, Credit, Available, Drain) ->
678700

679701
send_queue_event(Pid, QName, Event) ->
680702
gen_server:cast(Pid, {queue_event, QName, Event}).
703+
704+
policy_apply_to_name() ->
705+
<<"classic_queues">>.
706+
707+
stop(VHost) ->
708+
ok = rabbit_amqqueue_sup_sup:stop_for_vhost(VHost),
709+
{ok, BQ} = application:get_env(rabbit, backing_queue_module),
710+
ok = BQ:stop(VHost).
711+
712+
list_with_minimum_quorum() ->
713+
[].
714+
715+
drain(_TransferCandidates) ->
716+
ok.
717+
718+
revive() ->
719+
ok.
720+
721+
queue_vm_stats_sups() ->
722+
{[queue_procs], [rabbit_vm:all_vhosts_children(rabbit_amqqueue_sup_sup)]}.
723+
724+
%% return nothing because of this line in rabbit_vm:
725+
%% {msg_index, MsgIndexETS + MsgIndexProc},
726+
%% it mixes procs and ets,
727+
%% TODO: maybe instead of separating sups and ets
728+
%% I need vm_memory callback that just
729+
%% returns proplist? And rabbit_vm calculates
730+
%% Other as usual by substraction.
731+
queue_vm_ets() ->
732+
{[], []}.

deps/rabbit/src/rabbit_definitions.erl

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1045,16 +1045,11 @@ list_queues() ->
10451045

10461046
queue_definition(Q) ->
10471047
#resource{virtual_host = VHost, name = Name} = amqqueue:get_name(Q),
1048-
Type = case amqqueue:get_type(Q) of
1049-
rabbit_classic_queue -> classic;
1050-
rabbit_quorum_queue -> quorum;
1051-
rabbit_stream_queue -> stream;
1052-
T -> T
1053-
end,
1048+
TypeModule = amqqueue:get_type(Q),
10541049
#{
10551050
<<"vhost">> => VHost,
10561051
<<"name">> => Name,
1057-
<<"type">> => Type,
1052+
<<"type">> => rabbit_registry:lookup_type_name(queue, TypeModule),
10581053
<<"durable">> => amqqueue:is_durable(Q),
10591054
<<"auto_delete">> => amqqueue:is_auto_delete(Q),
10601055
<<"arguments">> => rabbit_misc:amqp_table(amqqueue:get_arguments(Q))

deps/rabbit/src/rabbit_fifo_dlx_worker.erl

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -537,16 +537,7 @@ redeliver0(#pending{delivery = Msg0,
537537
[rabbit_amqqueue:name()].
538538
clients_redeliver(Qs, QTypeState) ->
539539
lists:filter(fun(Q) ->
540-
case rabbit_queue_type:module(Q, QTypeState) of
541-
{ok, rabbit_quorum_queue} ->
542-
% If #enqueue{} Raft command does not get applied
543-
% rabbit_fifo_client will resend.
544-
true;
545-
{ok, rabbit_stream_queue} ->
546-
true;
547-
_ ->
548-
false
549-
end
540+
rabbit_queue_type:can_redeliver(Q, QTypeState)
550541
end, Qs).
551542

552543
maybe_set_timer(#state{timer = TRef} = State)

deps/rabbit/src/rabbit_global_counters.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -266,8 +266,8 @@ messages_dead_lettered(Reason, QueueType, DeadLetterStrategy, Num) ->
266266
end,
267267
counters:add(fetch(QueueType, DeadLetterStrategy), Index, Num).
268268

269-
messages_dead_lettered_confirmed(rabbit_quorum_queue, at_least_once, Num) ->
270-
counters:add(fetch(rabbit_quorum_queue, at_least_once), ?MESSAGES_DEAD_LETTERED_CONFIRMED, Num).
269+
messages_dead_lettered_confirmed(QTypeModule, at_least_once, Num) ->
270+
counters:add(fetch(QTypeModule, at_least_once), ?MESSAGES_DEAD_LETTERED_CONFIRMED, Num).
271271

272272
fetch(Protocol) ->
273273
persistent_term:get({?MODULE, Protocol}).

0 commit comments

Comments
 (0)