Skip to content

Commit 685f8ea

Browse files
authored
Merge pull request #1801 from rabbitmq/qq-list-consumers
Implement consumer listing for quorum queues
2 parents b914cdb + 2376e9b commit 685f8ea

8 files changed

+480
-296
lines changed

src/lqueue.erl

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -25,27 +25,32 @@
2525

2626
-define(QUEUE, queue).
2727

28-
-export_type([?MODULE/0]).
28+
-export_type([
29+
?MODULE/0,
30+
?MODULE/1
31+
]).
2932

30-
-opaque ?MODULE() :: {non_neg_integer(), queue:queue()}.
33+
-opaque ?MODULE() :: {non_neg_integer(), queue:queue(term())}.
34+
-opaque ?MODULE(T) :: {non_neg_integer(), queue:queue(T)}.
3135
-type value() :: any().
32-
-type result() :: 'empty' | {'value', value()}.
36+
-type result(T) :: 'empty' | {'value', T}.
3337

34-
-spec new() -> ?MODULE().
35-
-spec drop(?MODULE()) -> ?MODULE().
36-
-spec is_empty(?MODULE()) -> boolean().
37-
-spec len(?MODULE()) -> non_neg_integer().
38-
-spec in(value(), ?MODULE()) -> ?MODULE().
38+
-spec new() -> ?MODULE(_).
39+
-spec drop(?MODULE(T)) -> ?MODULE(T).
40+
-spec is_empty(?MODULE(_)) -> boolean().
41+
-spec len(?MODULE(_)) -> non_neg_integer().
42+
-spec in(T, ?MODULE(T)) -> ?MODULE(T).
3943
-spec in_r(value(), ?MODULE()) -> ?MODULE().
40-
-spec out(?MODULE()) -> {result(), ?MODULE()}.
41-
-spec out_r(?MODULE()) -> {result(), ?MODULE()}.
42-
-spec join(?MODULE(), ?MODULE()) -> ?MODULE().
43-
-spec foldl(fun ((value(), B) -> B), B, ?MODULE()) -> B.
44-
-spec foldr(fun ((value(), B) -> B), B, ?MODULE()) -> B.
45-
-spec from_list([value()]) -> ?MODULE().
46-
-spec to_list(?MODULE()) -> [value()].
47-
-spec peek(?MODULE()) -> result().
48-
-spec peek_r(?MODULE()) -> result().
44+
-spec out(?MODULE(T)) -> {result(T), ?MODULE()}.
45+
-spec out_r(?MODULE(T)) -> {result(T), ?MODULE()}.
46+
-spec join(?MODULE(A), ?MODULE(B)) -> ?MODULE(A | B).
47+
-spec foldl(fun ((T, B) -> B), B, ?MODULE(T)) -> B.
48+
-spec foldr(fun ((T, B) -> B), B, ?MODULE(T)) -> B.
49+
-spec from_list([T]) -> ?MODULE(T).
50+
-spec to_list(?MODULE(T)) -> [T].
51+
% -spec peek(?MODULE()) -> result().
52+
-spec peek(?MODULE(T)) -> result(T).
53+
-spec peek_r(?MODULE(T)) -> result(T).
4954

5055
new() -> {0, ?QUEUE:new()}.
5156

src/rabbit_amqqueue.erl

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -919,8 +919,12 @@ notify_policy_changed(#amqqueue{pid = QPid,
919919
name = QName}) when ?IS_QUORUM(QPid) ->
920920
rabbit_quorum_queue:policy_changed(QName, QPid).
921921

922-
consumers(#amqqueue{ pid = QPid }) ->
923-
delegate:invoke(QPid, {gen_server2, call, [consumers, infinity]}).
922+
consumers(#amqqueue{pid = QPid}) when ?IS_CLASSIC(QPid) ->
923+
delegate:invoke(QPid, {gen_server2, call, [consumers, infinity]});
924+
consumers(#amqqueue{pid = QPid}) when ?IS_QUORUM(QPid) ->
925+
{ok, {_, Result}, _} = ra:local_query(QPid,
926+
fun rabbit_fifo:query_consumers/1),
927+
maps:values(Result).
924928

925929
consumer_info_keys() -> ?CONSUMER_INFO_KEYS.
926930

@@ -1147,14 +1151,15 @@ basic_get(#amqqueue{pid = {Name, _} = Id, type = quorum, name = QName} = Q, _ChP
11471151
[rabbit_misc:rs(QName), Reason])
11481152
end.
11491153

1150-
basic_consume(#amqqueue{pid = QPid, name = QName, type = classic}, NoAck, ChPid, LimiterPid,
1151-
LimiterActive, ConsumerPrefetchCount, ConsumerTag,
1154+
basic_consume(#amqqueue{pid = QPid, name = QName, type = classic}, NoAck, ChPid,
1155+
LimiterPid, LimiterActive, ConsumerPrefetchCount, ConsumerTag,
11521156
ExclusiveConsume, Args, OkMsg, ActingUser, QState) ->
11531157
ok = check_consume_arguments(QName, Args),
1154-
case delegate:invoke(QPid, {gen_server2, call,
1155-
[{basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
1156-
ConsumerPrefetchCount, ConsumerTag, ExclusiveConsume,
1157-
Args, OkMsg, ActingUser}, infinity]}) of
1158+
case delegate:invoke(QPid,
1159+
{gen_server2, call,
1160+
[{basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
1161+
ConsumerPrefetchCount, ConsumerTag, ExclusiveConsume,
1162+
Args, OkMsg, ActingUser}, infinity]}) of
11581163
ok ->
11591164
{ok, QState};
11601165
Err ->
@@ -1164,15 +1169,17 @@ basic_consume(#amqqueue{type = quorum}, _NoAck, _ChPid,
11641169
_LimiterPid, true, _ConsumerPrefetchCount, _ConsumerTag,
11651170
_ExclusiveConsume, _Args, _OkMsg, _ActingUser, _QStates) ->
11661171
{error, global_qos_not_supported_for_queue_type};
1167-
basic_consume(#amqqueue{pid = {Name, _} = Id, name = QName, type = quorum} = Q, NoAck, ChPid,
1168-
_LimiterPid, _LimiterActive, ConsumerPrefetchCount, ConsumerTag,
1169-
ExclusiveConsume, Args, OkMsg, _ActingUser, QStates) ->
1172+
basic_consume(#amqqueue{pid = {Name, _} = Id, name = QName, type = quorum} = Q,
1173+
NoAck, ChPid, _LimiterPid, _LimiterActive, ConsumerPrefetchCount,
1174+
ConsumerTag, ExclusiveConsume, Args, OkMsg,
1175+
ActingUser, QStates) ->
11701176
ok = check_consume_arguments(QName, Args),
11711177
QState0 = get_quorum_state(Id, QName, QStates),
11721178
{ok, QState} = rabbit_quorum_queue:basic_consume(Q, NoAck, ChPid,
11731179
ConsumerPrefetchCount,
11741180
ConsumerTag,
11751181
ExclusiveConsume, Args,
1182+
ActingUser,
11761183
OkMsg, QState0),
11771184
{ok, maps:put(Name, QState, QStates)}.
11781185

0 commit comments

Comments
 (0)