Skip to content

Commit 4a1f61f

Browse files
committed
Bugfixes
1 parent 418af7a commit 4a1f61f

File tree

6 files changed

+90
-11
lines changed

6 files changed

+90
-11
lines changed

deps/rabbit/src/rabbit_db_rtparams.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ get_all_in_mnesia() ->
231231
rabbit_mnesia:dirty_read_all(?MNESIA_TABLE).
232232

233233
get_all_in_khepri() ->
234-
Path = khepri_vhost_rp_path(?KHEPRI_WILDCARD_STAR, ?KHEPRI_WILDCARD_STAR, ?KHEPRI_WILDCARD_STAR),
234+
Path = khepri_rp_path() ++ [rabbit_khepri:if_has_data_wildcard()],
235235
{ok, Map} = rabbit_khepri:match(Path),
236236
maps:values(Map).
237237

deps/rabbit/src/rabbit_db_user.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -782,7 +782,7 @@ set_topic_permissions_in_khepri(Username, VHostName, TopicPermission) ->
782782
VHostName,
783783
fun() ->
784784
set_topic_permissions_in_khepri_tx(Username, VHostName, TopicPermission)
785-
end))).
785+
end)), rw).
786786

787787
set_topic_permissions_in_khepri_tx(Username, VHostName, TopicPermission) ->
788788
#topic_permission{topic_permission_key =

deps/rabbit/test/per_user_connection_channel_tracking_SUITE.erl

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -628,7 +628,6 @@ cluster_node_removed(Config) ->
628628

629629
rabbit_ct_broker_helpers:forget_cluster_node(Config, 0, 1),
630630
timer:sleep(200),
631-
NodeName = rabbit_ct_broker_helpers:get_node_config(Config, 1, nodename),
632631

633632
?assertEqual(false, is_process_alive(Conn2)),
634633
[?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2],
@@ -756,7 +755,6 @@ exists_in_tracked_channel_per_user_table(Config, NodeIndex, Username) ->
756755
exists_in_tracking_table(Config, NodeIndex, TableNameFun, Table, Key) ->
757756
Node = rabbit_ct_broker_helpers:get_node_config(
758757
Config, NodeIndex, nodename),
759-
Tab = TableNameFun(Node),
760758
All = rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
761759
ets, lookup, [Table, Key]),
762760
lists:keymember(Key, 1, All).

deps/rabbit/test/rabbit_db_topic_exchange_SUITE.erl

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@
1717

1818
all() ->
1919
[
20-
{group, all_tests}
20+
{group, all_tests},
21+
{group, mnesia_store}
2122
].
2223

2324
groups() ->
2425
[
26+
{mnesia_store, [], mnesia_tests()},
2527
{all_tests, [], all_tests()},
2628
{benchmarks, [], benchmarks()}
2729
].
@@ -31,7 +33,11 @@ all_tests() ->
3133
set,
3234
delete,
3335
delete_all_for_exchange,
34-
match,
36+
match
37+
].
38+
39+
mnesia_tests() ->
40+
[
3541
build_key_from_topic_trie_binding_record,
3642
build_key_from_deletion_events,
3743
build_key_from_binding_deletion_event,
@@ -50,7 +56,13 @@ init_per_suite(Config) ->
5056
end_per_suite(Config) ->
5157
rabbit_ct_helpers:run_teardown_steps(Config).
5258

59+
init_per_group(mnesia_store = Group, Config0) ->
60+
Config = rabbit_ct_helpers:set_config(Config0, [{metadata_store, mnesia}]),
61+
init_per_group_common(Group, Config);
5362
init_per_group(Group, Config) ->
63+
init_per_group_common(Group, Config).
64+
65+
init_per_group_common(Group, Config) ->
5466
Config1 = rabbit_ct_helpers:set_config(Config, [
5567
{rmq_nodename_suffix, Group},
5668
{rmq_nodes_count, 1}

deps/rabbitmq_mqtt/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ rabbitmq_integration_suite(
230230

231231
rabbitmq_integration_suite(
232232
name = "shared_SUITE",
233-
shard_count = 6,
233+
shard_count = 8,
234234
size = "large",
235235
runtime_deps = [
236236
"@emqtt//:erlang_app",

deps/rabbitmq_mqtt/test/shared_SUITE.erl

Lines changed: 73 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,15 +100,20 @@ subgroups() ->
100100
{cluster_size_3, [],
101101
[
102102
queue_down_qos1,
103-
consuming_classic_mirrored_queue_down,
104103
consuming_classic_queue_down,
105-
flow_classic_mirrored_queue,
106104
flow_quorum_queue,
107105
flow_stream,
108106
rabbit_mqtt_qos0_queue,
109107
cli_list_queues,
110108
maintenance,
111109
delete_create_queue,
110+
publish_to_all_non_deprecated_queue_types_qos0,
111+
publish_to_all_non_deprecated_queue_types_qos1
112+
]},
113+
{mnesia_store, [],
114+
[
115+
consuming_classic_mirrored_queue_down,
116+
flow_classic_mirrored_queue,
112117
publish_to_all_queue_types_qos0,
113118
publish_to_all_queue_types_qos1
114119
]}
@@ -138,6 +143,11 @@ init_per_group(cluster_size_1, Config) ->
138143
init_per_group(cluster_size_3 = Group, Config) ->
139144
init_per_group0(Group,
140145
rabbit_ct_helpers:set_config(Config, [{rmq_nodes_count, 3}]));
146+
init_per_group(mnesia_store = Group, Config) ->
147+
init_per_group0(
148+
Group,
149+
rabbit_ct_helpers:set_config(Config, [{metadata_store, mnesia},
150+
{rmq_nodes_count, 3}]));
141151
init_per_group(Group, Config)
142152
when Group =:= global_counters orelse
143153
Group =:= tests ->
@@ -369,6 +379,65 @@ publish_to_all_queue_types(Config, QoS) ->
369379
?awaitMatch([],
370380
all_connection_pids(Config), 10_000, 1000).
371381

382+
publish_to_all_non_deprecated_queue_types_qos0(Config) ->
383+
publish_to_all_non_deprecated_queue_types(Config, qos0).
384+
385+
publish_to_all_non_deprecated_queue_types_qos1(Config) ->
386+
publish_to_all_non_deprecated_queue_types(Config, qos1).
387+
388+
publish_to_all_non_deprecated_queue_types(Config, QoS) ->
389+
Ch = rabbit_ct_client_helpers:open_channel(Config),
390+
391+
CQ = <<"classic-queue">>,
392+
QQ = <<"quorum-queue">>,
393+
SQ = <<"stream-queue">>,
394+
Topic = <<"mytopic">>,
395+
396+
declare_queue(Ch, CQ, []),
397+
bind(Ch, CQ, Topic),
398+
399+
declare_queue(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}]),
400+
bind(Ch, QQ, Topic),
401+
402+
declare_queue(Ch, SQ, [{<<"x-queue-type">>, longstr, <<"stream">>}]),
403+
bind(Ch, SQ, Topic),
404+
405+
NumMsgs = 2000,
406+
C = connect(?FUNCTION_NAME, Config, [{retry_interval, 2}]),
407+
lists:foreach(fun(N) ->
408+
case emqtt:publish(C, Topic, integer_to_binary(N), QoS) of
409+
ok ->
410+
ok;
411+
{ok, _} ->
412+
ok;
413+
Other ->
414+
ct:fail("Failed to publish: ~p", [Other])
415+
end
416+
end, lists:seq(1, NumMsgs)),
417+
418+
eventually(?_assert(
419+
begin
420+
L = rabbitmqctl_list(Config, 0, ["list_queues", "messages", "--no-table-headers"]),
421+
length(L) =:= 3 andalso
422+
lists:all(fun([Bin]) ->
423+
N = binary_to_integer(Bin),
424+
case QoS of
425+
qos0 ->
426+
N =:= NumMsgs;
427+
qos1 ->
428+
%% Allow for some duplicates when client resends
429+
%% a message that gets acked at roughly the same time.
430+
N >= NumMsgs andalso
431+
N < NumMsgs * 2
432+
end
433+
end, L)
434+
end), 2000, 10),
435+
436+
delete_queue(Ch, [CQ, QQ, SQ]),
437+
ok = emqtt:disconnect(C),
438+
?awaitMatch([],
439+
all_connection_pids(Config), 10_000, 1000).
440+
372441
flow_classic_mirrored_queue(Config) ->
373442
QueueName = <<"flow">>,
374443
ok = rabbit_ct_broker_helpers:set_ha_policy(Config, 0, QueueName, <<"all">>),
@@ -1248,15 +1317,15 @@ clean_session_kill_node(Config) ->
12481317
?assertEqual(0, length(QsQos0)),
12491318
?assertEqual(2, length(QsClassic))
12501319
end,
1251-
?assertEqual(2, rpc(Config, ets, info, [rabbit_durable_queue, size])),
1320+
?assertEqual(2, rpc(Config, rabbit_amqqueue, count, [])),
12521321

12531322
unlink(C),
12541323
ok = rabbit_ct_broker_helpers:kill_node(Config, 0),
12551324
ok = rabbit_ct_broker_helpers:start_node(Config, 0),
12561325

12571326
%% After terminating a clean session by a node crash, we expect any session
12581327
%% state to be cleaned up on the server once the server comes back up.
1259-
?assertEqual(0, rpc(Config, ets, info, [rabbit_durable_queue, size])).
1328+
?assertEqual(0, rpc(Config, rabbit_amqqueue, count, [])).
12601329

12611330
rabbit_status_connection_count(Config) ->
12621331
_ = rabbit_ct_client_helpers:open_connection(Config, 0),

0 commit comments

Comments
 (0)