Skip to content

Commit 077c027

Browse files
kjnilssonansd
andauthored
MQTT: speed up shared_SUITE:many_qos1_messages (#11477)
* MQTT: speed up shared_SUITE:many_qos1_messages * speed up block_only_publisher * MQTT: reorganise tests groups To avoid starting a new broker for each protocol group (v3,v4,v5). Instead we run all protocol groups under a single cluster configuration group. * MQTT: speed up publish_to_all_queue_types_qos* tests. * Remove separate mnesia_store group to speed up the test suite * Fix wrong_shard_count * Remove unused field * Run subset of v3 tests The code being tested under v3 and v4 is almost identical. To save time in CI, we therefore run only a very small subset of tests in v3. This cuts the total time reported by CT for the shared_SUITE from 898 seconds to 614 seconds. Also, the java_SUITE tests in v3. * Fix wrong_shard_count * Fix mixed version failure --------- Co-authored-by: David Ansari <[email protected]>
1 parent 806efd8 commit 077c027

File tree

3 files changed

+76
-72
lines changed

3 files changed

+76
-72
lines changed

deps/rabbitmq_mqtt/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ rabbitmq_integration_suite(
250250
":test_util_beam",
251251
":test_event_recorder_beam",
252252
],
253-
shard_count = 18,
253+
shard_count = 10,
254254
runtime_deps = [
255255
"//deps/rabbitmq_management_agent:erlang_app",
256256
"@emqtt//:erlang_app",

deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@
6666
published = false :: boolean(),
6767
ssl_login_name :: none | binary(),
6868
retainer_pid :: pid(),
69-
delivery_flow, %% Deprecated since removal of CMQ in 4.0
7069
trace_state :: rabbit_trace:state(),
7170
prefetch :: non_neg_integer(),
7271
vhost :: rabbit_types:vhost(),

deps/rabbitmq_mqtt/test/shared_SUITE.erl

Lines changed: 75 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -56,38 +56,35 @@ all() ->
5656
[{group, mqtt},
5757
{group, web_mqtt}].
5858

59+
%% The code being tested under v3 and v4 is almost identical.
60+
%% To save time in CI, we therefore run only a very small subset of tests in v3.
5961
groups() ->
6062
[
6163
{mqtt, [],
62-
[{v3, [],
63-
[{cluster_size_1, [], cluster_size_1_tests()},
64-
{cluster_size_3, [], cluster_size_3_tests()},
65-
{mnesia_store, [], mnesia_store_tests()}]},
66-
{v4, [],
67-
[{cluster_size_1, [], cluster_size_1_tests()},
68-
{cluster_size_3, [], cluster_size_3_tests()},
69-
{mnesia_store, [], mnesia_store_tests()}]},
70-
{v5, [],
71-
[{cluster_size_1, [], cluster_size_1_tests()},
72-
{cluster_size_3, [], cluster_size_3_tests()},
73-
{mnesia_store, [], mnesia_store_tests()}]}
64+
[{cluster_size_1, [],
65+
[{v3, [], cluster_size_1_tests_v3()},
66+
{v4, [], cluster_size_1_tests()},
67+
{v5, [], cluster_size_1_tests()}]},
68+
{cluster_size_3, [],
69+
[{v4, [], cluster_size_3_tests()},
70+
{v5, [], cluster_size_3_tests()}]}
7471
]},
7572
{web_mqtt, [],
76-
[{v3, [],
77-
[{cluster_size_1, [], cluster_size_1_tests()},
78-
{cluster_size_3, [], cluster_size_3_tests()},
79-
{mnesia_store, [], mnesia_store_tests()}]},
80-
{v4, [],
81-
[{cluster_size_1, [], cluster_size_1_tests()},
82-
{cluster_size_3, [], cluster_size_3_tests()},
83-
{mnesia_store, [], mnesia_store_tests()}]},
84-
{v5, [],
85-
[{cluster_size_1, [], cluster_size_1_tests()},
86-
{cluster_size_3, [], cluster_size_3_tests()},
87-
{mnesia_store, [], mnesia_store_tests()}]}
73+
[{cluster_size_1, [],
74+
[{v3, [], cluster_size_1_tests_v3()},
75+
{v4, [], cluster_size_1_tests()},
76+
{v5, [], cluster_size_1_tests()}]},
77+
{cluster_size_3, [],
78+
[{v4, [], cluster_size_3_tests()},
79+
{v5, [], cluster_size_3_tests()}]}
8880
]}
8981
].
9082

83+
cluster_size_1_tests_v3() ->
84+
[global_counters,
85+
events
86+
].
87+
9188
cluster_size_1_tests() ->
9289
[
9390
global_counters %% must be the 1st test case
@@ -148,13 +145,9 @@ cluster_size_3_tests() ->
148145
session_reconnect,
149146
session_takeover,
150147
duplicate_client_id,
151-
maintenance
152-
].
153-
154-
mnesia_store_tests() ->
155-
[
156148
publish_to_all_queue_types_qos0,
157-
publish_to_all_queue_types_qos1
149+
publish_to_all_queue_types_qos1,
150+
maintenance
158151
].
159152

160153
suite() ->
@@ -166,7 +159,12 @@ suite() ->
166159

167160
init_per_suite(Config) ->
168161
rabbit_ct_helpers:log_environment(),
169-
rabbit_ct_helpers:run_setup_steps(Config).
162+
Config1 = rabbit_ct_helpers:merge_app_env(
163+
Config, {rabbit, [
164+
{quorum_tick_interval, 1000},
165+
{stream_tick_interval, 1000}
166+
]}),
167+
rabbit_ct_helpers:run_setup_steps(Config1).
170168

171169
end_per_suite(Config) ->
172170
rabbit_ct_helpers:run_teardown_steps(Config).
@@ -176,39 +174,31 @@ init_per_group(mqtt, Config) ->
176174
init_per_group(web_mqtt, Config) ->
177175
rabbit_ct_helpers:set_config(Config, {websocket, true});
178176

179-
init_per_group(Group, Config)
177+
init_per_group(Group, Config0)
180178
when Group =:= v3;
181179
Group =:= v4;
182180
Group =:= v5 ->
183-
rabbit_ct_helpers:set_config(Config, {mqtt_version, Group});
181+
Config = rabbit_ct_helpers:set_config(Config0, {mqtt_version, Group}),
182+
util:maybe_skip_v5(Config);
184183

185184
init_per_group(Group, Config0) ->
186185
Nodes = case Group of
187186
cluster_size_1 -> 1;
188-
cluster_size_3 -> 3;
189-
mnesia_store -> 3
187+
cluster_size_3 -> 3
190188
end,
191189
Suffix = rabbit_ct_helpers:testcase_absname(Config0, "", "-"),
192-
Config1 = case Group of
193-
mnesia_store ->
194-
rabbit_ct_helpers:set_config(Config0, {metadata_store, mnesia});
195-
_ ->
196-
Config0
197-
end,
198-
Config2 = rabbit_ct_helpers:set_config(
199-
Config1,
200-
[{rmq_nodes_count, Nodes},
201-
{rmq_nodename_suffix, Suffix}]),
202-
Config = rabbit_ct_helpers:run_steps(
203-
Config2,
204-
rabbit_ct_broker_helpers:setup_steps() ++
205-
rabbit_ct_client_helpers:setup_steps()),
206-
util:maybe_skip_v5(Config).
190+
Config = rabbit_ct_helpers:set_config(
191+
Config0,
192+
[{rmq_nodes_count, Nodes},
193+
{rmq_nodename_suffix, Suffix}]),
194+
rabbit_ct_helpers:run_steps(
195+
Config,
196+
rabbit_ct_broker_helpers:setup_steps() ++
197+
rabbit_ct_client_helpers:setup_steps()).
207198

208199
end_per_group(G, Config)
209200
when G =:= cluster_size_1;
210-
G =:= cluster_size_3;
211-
G =:= mnesia_store ->
201+
G =:= cluster_size_3 ->
212202
rabbit_ct_helpers:run_steps(
213203
Config,
214204
rabbit_ct_client_helpers:teardown_steps() ++
@@ -410,19 +400,21 @@ publish_to_all_queue_types(Config, QoS) ->
410400
declare_queue(Ch, SQ, [{<<"x-queue-type">>, longstr, <<"stream">>}]),
411401
bind(Ch, SQ, Topic),
412402

413-
NumMsgs = 2000,
414-
C = connect(?FUNCTION_NAME, Config, [{retry_interval, 2}]),
415-
lists:foreach(fun(N) ->
416-
case emqtt:publish(C, Topic, integer_to_binary(N), QoS) of
417-
ok ->
418-
ok;
419-
{ok, _} ->
420-
ok;
421-
Other ->
422-
ct:fail("Failed to publish: ~p", [Other])
423-
end
424-
end, lists:seq(1, NumMsgs)),
425-
403+
NumMsgs = 1000,
404+
C = connect(?FUNCTION_NAME, Config, [{max_inflight, 200},
405+
{retry_interval, 2}]),
406+
Self = self(),
407+
lists:foreach(
408+
fun(N) ->
409+
%% Publish async all messages at once to trigger flow control
410+
ok = emqtt:publish_async(C, Topic, integer_to_binary(N), QoS,
411+
{fun(N0, {ok, #{reason_code_name := success}}) ->
412+
Self ! {self(), N0};
413+
(N0, ok) ->
414+
Self ! {self(), N0}
415+
end, [N]})
416+
end, lists:seq(1, NumMsgs)),
417+
ok = await_confirms_ordered(C, 1, NumMsgs),
426418
eventually(?_assert(
427419
begin
428420
L = rabbitmqctl_list(Config, 0, ["list_queues", "messages", "--no-table-headers"]),
@@ -439,7 +431,7 @@ publish_to_all_queue_types(Config, QoS) ->
439431
N < NumMsgs * 2
440432
end
441433
end, L)
442-
end), 2000, 10),
434+
end), 1000, 20),
443435

444436
delete_queue(Ch, [CQ, QQ, SQ]),
445437
ok = emqtt:disconnect(C),
@@ -1126,13 +1118,26 @@ amqp_to_mqtt_qos0(Config) ->
11261118
%% Test that the server wraps around the packet identifier.
11271119
many_qos1_messages(Config) ->
11281120
Topic = ClientId = atom_to_binary(?FUNCTION_NAME),
1129-
C = connect(ClientId, Config, 0, [{retry_interval, 600}]),
1130-
{ok, _, [1]} = emqtt:subscribe(C, {Topic, qos1}),
11311121
NumMsgs = 16#ffff + 100,
1122+
C = connect(ClientId, Config, 0, [{retry_interval, 600},
1123+
{max_inflight, NumMsgs div 8}]),
1124+
{ok, _, [1]} = emqtt:subscribe(C, {Topic, qos1}),
11321125
Payloads = lists:map(fun integer_to_binary/1, lists:seq(1, NumMsgs)),
1126+
Self = self(),
1127+
Target = lists:last(Payloads),
11331128
lists:foreach(fun(P) ->
1134-
{ok, _} = emqtt:publish(C, Topic, P, qos1)
1129+
Cb = {fun(T, _) when T == Target ->
1130+
Self ! proceed;
1131+
(_, _) ->
1132+
ok
1133+
end, [P]},
1134+
ok = emqtt:publish_async(C, Topic, P, qos1, Cb)
11351135
end, Payloads),
1136+
receive
1137+
proceed -> ok
1138+
after 30000 ->
1139+
ct:fail("message to proceed never received")
1140+
end,
11361141
ok = expect_publishes(C, Topic, Payloads),
11371142
ok = emqtt:disconnect(C).
11381143

@@ -1453,7 +1458,7 @@ block(Config) ->
14531458
block_only_publisher(Config) ->
14541459
Topic = atom_to_binary(?FUNCTION_NAME),
14551460

1456-
Opts = [{ack_timeout, 2}],
1461+
Opts = [{ack_timeout, 1}],
14571462
Con = connect(<<"background-connection">>, Config, Opts),
14581463
Sub = connect(<<"subscriber-connection">>, Config, Opts),
14591464
Pub = connect(<<"publisher-connection">>, Config, Opts),

0 commit comments

Comments
 (0)