Skip to content

Commit 2760707

Browse files
Merge pull request #1903 from rabbitmq/single-active-noconnection
rabbit_fifo: change single active consumer on noconnection
2 parents 564bca6 + 933c176 commit 2760707

8 files changed

+438
-199
lines changed

src/rabbit_fifo.erl

Lines changed: 153 additions & 95 deletions
Large diffs are not rendered by default.

src/rabbit_fifo.hrl

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@
8888

8989
-type consumer() :: #consumer{}.
9090

91+
-type consumer_strategy() :: competing | single_active.
92+
9193
-record(enqueuer,
9294
{next_seqno = 1 :: msg_seqno(),
9395
% out of order enqueues - sorted list
@@ -104,7 +106,8 @@
104106
max_length :: maybe(non_neg_integer()),
105107
max_bytes :: maybe(non_neg_integer()),
106108
%% whether single active consumer is on or not for this queue
107-
consumer_strategy = default :: default | single_active,
109+
consumer_strategy = competing :: consumer_strategy(),
110+
%% the maximum number of unsuccessful delivery attempts permitted
108111
delivery_limit :: maybe(non_neg_integer())
109112
}).
110113

@@ -114,7 +117,7 @@
114117
messages = #{} :: #{msg_in_id() => indexed_msg()},
115118
% defines the lowest message in id available in the messages map
116119
% that isn't a return
117-
low_msg_num :: msg_in_id() | undefined,
120+
low_msg_num :: maybe(msg_in_id()),
118121
% defines the next message in id to be added to the messages map
119122
next_msg_num = 1 :: msg_in_id(),
120123
% list of returned msg_in_ids - when checking out it picks from

src/rabbit_quorum_queue.erl

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ declare(Q) when ?amqqueue_is_quorum(Q) ->
120120
RaMachine = ra_machine(NewQ),
121121
ServerIds = [{RaName, Node} || Node <- Nodes],
122122
ClusterName = RaName,
123+
TickTimeout = application:get_env(rabbit, quorum_tick_interval, ?TICK_TIMEOUT),
123124
RaConfs = [begin
124125
UId = ra:new_uid(ra_lib:to_binary(ClusterName)),
125126
FName = rabbit_misc:rs(QName),
@@ -129,7 +130,7 @@ declare(Q) when ?amqqueue_is_quorum(Q) ->
129130
friendly_name => FName,
130131
initial_members => ServerIds,
131132
log_init_args => #{uid => UId},
132-
tick_timeout => ?TICK_TIMEOUT,
133+
tick_timeout => TickTimeout,
133134
machine => RaMachine}
134135
end || ServerId <- ServerIds],
135136

@@ -190,7 +191,8 @@ update_consumer(QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, Act
190191
QName, Prefetch, Active, ActivityStatus, Args).
191192

192193
cancel_consumer_handler(QName, {ConsumerTag, ChPid}) ->
193-
local_or_remote_handler(ChPid, rabbit_quorum_queue, cancel_consumer, [QName, ChPid, ConsumerTag]).
194+
local_or_remote_handler(ChPid, rabbit_quorum_queue, cancel_consumer,
195+
[QName, ChPid, ConsumerTag]).
194196

195197
cancel_consumer(QName, ChPid, ConsumerTag) ->
196198
catch rabbit_core_metrics:consumer_deleted(ChPid, ConsumerTag, QName),

test/quorum_queue_SUITE.erl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,10 @@ memory_tests() ->
126126
%% Testsuite setup/teardown.
127127
%% -------------------------------------------------------------------
128128

129-
init_per_suite(Config) ->
129+
init_per_suite(Config0) ->
130130
rabbit_ct_helpers:log_environment(),
131+
Config = rabbit_ct_helpers:merge_app_env(
132+
Config0, {rabbit, [{quorum_tick_interval, 1000}]}),
131133
rabbit_ct_helpers:run_setup_steps(
132134
Config,
133135
[fun rabbit_ct_broker_helpers:enable_dist_proxy_manager/1]).

test/rabbit_fifo_SUITE.erl

Lines changed: 250 additions & 80 deletions
Large diffs are not rendered by default.

test/rabbit_fifo_prop_SUITE.erl

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -271,21 +271,6 @@ scenario16(_Config) ->
271271
delivery_limit => 1}, Commands),
272272
ok.
273273

274-
fake_pid(_Config) ->
275-
Pid = fake_external_pid(<<"mynode@banana">>),
276-
?assertNotEqual(node(Pid), node()),
277-
?assert(is_pid(Pid)),
278-
ok.
279-
280-
fake_external_pid(Node) when is_binary(Node) ->
281-
ThisNodeSize = size(term_to_binary(node())) + 1,
282-
Pid = spawn(fun () -> ok end),
283-
%% drop the local node data from a local pid
284-
<<_:ThisNodeSize/binary, LocalPidData/binary>> = term_to_binary(Pid),
285-
S = size(Node),
286-
%% replace it with the incoming node binary
287-
Final = <<131,103, 100, 0, S, Node/binary, LocalPidData/binary>>,
288-
binary_to_term(Final).
289274

290275
snapshots(_Config) ->
291276
run_proper(
@@ -352,7 +337,7 @@ log_gen(Size) ->
352337

353338
pid_gen(Nodes) ->
354339
?LET(Node, oneof(Nodes),
355-
fake_external_pid(atom_to_binary(Node, utf8))).
340+
test_util:fake_pid(atom_to_binary(Node, utf8))).
356341

357342
down_gen(Pid) ->
358343
?LET(E, {down, Pid, oneof([noconnection, noproc])}, E).

test/single_active_consumer_SUITE.erl

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,14 @@ groups() ->
4545
]}
4646
].
4747

48-
init_per_suite(Config) ->
48+
init_per_suite(Config0) ->
4949
rabbit_ct_helpers:log_environment(),
50-
Config1 = rabbit_ct_helpers:set_config(Config, [
50+
Config1 = rabbit_ct_helpers:set_config(Config0, [
5151
{rmq_nodename_suffix, ?MODULE}
5252
]),
53-
rabbit_ct_helpers:run_setup_steps(Config1,
53+
Config = rabbit_ct_helpers:merge_app_env(
54+
Config1, {rabbit, [{quorum_tick_interval, 1000}]}),
55+
rabbit_ct_helpers:run_setup_steps(Config,
5456
rabbit_ct_broker_helpers:setup_steps() ++
5557
rabbit_ct_client_helpers:setup_steps()).
5658

test/test_util.erl

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
-module(test_util).
2+
3+
-export([
4+
fake_pid/1
5+
]).
6+
7+
8+
fake_pid(Node) ->
9+
NodeBin = rabbit_data_coercion:to_binary(Node),
10+
ThisNodeSize = size(term_to_binary(node())) + 1,
11+
Pid = spawn(fun () -> ok end),
12+
%% drop the local node data from a local pid
13+
<<_:ThisNodeSize/binary, LocalPidData/binary>> = term_to_binary(Pid),
14+
S = size(NodeBin),
15+
%% replace it with the incoming node binary
16+
Final = <<131,103, 100, S:16/unsigned, NodeBin/binary, LocalPidData/binary>>,
17+
binary_to_term(Final).

0 commit comments

Comments
 (0)