Skip to content

Commit a364fbd

Browse files
committed
Reduce Khepri test flakes
Test case rabbit_mqtt_qos0_queue_kill_node flaked because after an MQTT client subscribes on node 0, RabbitMQ returns success and replicated the new binding to node 0 and node 1, but not yet to node 2. Another MQTT client then publishes on node 2 without the binding being present yet on node 2, and the message therefore isn't routed. This commit attempts to eliminate this flake. It adds a function to rabbit_ct_broker_helpers which waits until a given node has caught up with the leader node. We can reuse that function in future to eliminate more test flakes. (cherry picked from commit 8ba3649)
1 parent 7811eec commit a364fbd

File tree

3 files changed

+30
-2
lines changed

3 files changed

+30
-2
lines changed

deps/rabbitmq_ct_helpers/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ rabbitmq_app(
4545
"//deps/rabbit_common:erlang_app",
4646
"@meck//:erlang_app",
4747
"@proper//:erlang_app",
48+
"@ra//:erlang_app",
4849
],
4950
)
5051

deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,8 @@
170170
test_writer/1,
171171
user/1,
172172

173-
configured_metadata_store/1
173+
configured_metadata_store/1,
174+
await_metadata_store_consistent/2
174175
]).
175176

176177
%% Internal functions exported to be used by rpc:call/4.
@@ -990,6 +991,30 @@ enable_khepri_metadata_store(Config, FFs0) ->
990991
end
991992
end, Config, FFs).
992993

994+
%% Waits until the metadata store replica on Node is up to date with the leader.
995+
await_metadata_store_consistent(Config, Node) ->
996+
case configured_metadata_store(Config) of
997+
mnesia ->
998+
ok;
999+
{khepri, _} ->
1000+
RaClusterName = rabbit_khepri:get_ra_cluster_name(),
1001+
Leader = rpc(Config, Node, ra_leaderboard, lookup_leader, [RaClusterName]),
1002+
LastAppliedLeader = ra_last_applied(Leader),
1003+
1004+
NodeName = get_node_config(Config, Node, nodename),
1005+
ServerId = {RaClusterName, NodeName},
1006+
rabbit_ct_helpers:eventually(
1007+
?_assert(
1008+
begin
1009+
LastApplied = ra_last_applied(ServerId),
1010+
is_integer(LastApplied) andalso LastApplied >= LastAppliedLeader
1011+
end))
1012+
end.
1013+
1014+
ra_last_applied(ServerId) ->
1015+
#{last_applied := LastApplied} = ra:key_metrics(ServerId),
1016+
LastApplied.
1017+
9931018
rewrite_node_config_file(Config, Node) ->
9941019
NodeConfig = get_node_config(Config, Node),
9951020
I = if

deps/rabbitmq_mqtt/test/shared_SUITE.erl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@
2727
rpc_all/4,
2828
get_node_config/3,
2929
drain_node/2,
30-
revive_node/2
30+
revive_node/2,
31+
await_metadata_store_consistent/2
3132
]).
3233
-import(rabbit_ct_helpers,
3334
[eventually/3,
@@ -1128,6 +1129,7 @@ rabbit_mqtt_qos0_queue_kill_node(Config) ->
11281129
SubscriberId = <<"subscriber">>,
11291130
Sub0 = connect(SubscriberId, Config, 0, []),
11301131
{ok, _, [0]} = emqtt:subscribe(Sub0, Topic1, qos0),
1132+
ok = await_metadata_store_consistent(Config, 2),
11311133
ok = emqtt:publish(Pub, Topic1, <<"m0">>, qos0),
11321134
ok = expect_publishes(Sub0, Topic1, [<<"m0">>]),
11331135

0 commit comments

Comments
 (0)