Skip to content

Commit c0e5eff

Browse files
committed
More fixes to transient queue handling
1 parent 816246e commit c0e5eff

File tree

2 files changed

+10
-65
lines changed

2 files changed

+10
-65
lines changed

deps/rabbit/src/rabbit_db_queue.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -989,7 +989,7 @@ delete_transient_in_khepri(FilterFun) ->
989989
case rabbit_khepri:get_many(PathPattern) of
990990
{ok, Qs} ->
991991
Items = maps:fold(
992-
fun(Path, #{data := Queue}, Acc) ->
992+
fun(Path, Queue, Acc) when ?is_amqqueue(Queue) ->
993993
case FilterFun(Queue) of
994994
true ->
995995
QueueName = khepri_queue_path_to_name(

deps/rabbit/test/bindings_SUITE.erl

Lines changed: 9 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,10 @@ groups() ->
3535
from_mnesia_to_khepri
3636
]},
3737
{mnesia_cluster, [], [
38-
transient_queue_on_node_down_mnesia
38+
transient_queue_on_node_down
3939
]},
4040
{khepri_cluster, [], [
41-
transient_queue_on_node_down_khepri
41+
transient_queue_on_node_down
4242
]}
4343
].
4444

@@ -815,7 +815,7 @@ bind_and_delete_exchange_source(Config) ->
815815
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_binding, list, [<<"/">>])),
816816
ok.
817817

818-
transient_queue_on_node_down_mnesia(Config) ->
818+
transient_queue_on_node_down(Config) ->
819819
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
820820

821821
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
@@ -846,88 +846,33 @@ transient_queue_on_node_down_mnesia(Config) ->
846846
QResource, Q, []),
847847
DirectAltBinding = binding_record(rabbit_misc:r(<<"/">>, exchange, <<"amq.direct">>),
848848
QAltResource, QAlt, []),
849-
Bindings = lists:sort([DefaultBinding, DirectBinding, DefaultAltBinding, DirectAltBinding]),
850-
851-
?assertEqual(Bindings,
852-
lists:sort(
853-
rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_binding, list, [<<"/">>]))),
854-
855-
rabbit_ct_broker_helpers:stop_node(Config, Server),
856-
857-
Bindings1 = lists:sort([DefaultBinding, DirectBinding]),
858-
?assertEqual([DirectBinding],
859-
lists:sort(rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_binding, list, [<<"/">>]))),
860-
?assertMatch([],
861-
rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_amqqueue, list, [<<"/">>])),
862-
863-
rabbit_ct_broker_helpers:start_node(Config, Server),
864849

850+
Bindings1 = lists:sort([DefaultBinding, DirectBinding, DefaultAltBinding, DirectAltBinding]),
865851
?awaitMatch(Bindings1,
866852
lists:sort(
867853
rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_binding, list, [<<"/">>])),
868854
30000),
869-
?awaitMatch([_], rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_amqqueue, list, [<<"/">>]),
870-
30000),
871-
ok.
872-
873-
transient_queue_on_node_down_khepri(Config) ->
874-
%% All entities are durable in khepri
875-
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
876-
877-
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
878-
Q = ?config(queue_name, Config),
879-
QAlt = ?config(alt_queue_name, Config),
880-
?assertEqual({'queue.declare_ok', Q, 0, 0}, declare(Ch, Q, [])),
881-
?assertEqual({'queue.declare_ok', QAlt, 0, 0}, declare(Ch, QAlt, [], false)),
882-
883-
DefaultExchange = rabbit_misc:r(<<"/">>, exchange, <<>>),
884-
QResource = rabbit_misc:r(<<"/">>, queue, Q),
885-
QAltResource = rabbit_misc:r(<<"/">>, queue, QAlt),
886-
DefaultBinding = binding_record(DefaultExchange, QResource, Q, []),
887-
DefaultAltBinding = binding_record(DefaultExchange, QAltResource, QAlt, []),
888-
889-
%% Binding to the default exchange, it's always present
890-
?assertEqual(lists:sort([DefaultBinding, DefaultAltBinding]),
891-
lists:sort(rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_binding, list, [<<"/">>]))),
892-
893-
%% Let's bind to other exchange
894-
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = <<"amq.direct">>,
895-
queue = Q,
896-
routing_key = Q}),
897-
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = <<"amq.direct">>,
898-
queue = QAlt,
899-
routing_key = QAlt}),
900-
901-
DirectBinding = binding_record(rabbit_misc:r(<<"/">>, exchange, <<"amq.direct">>),
902-
QResource, Q, []),
903-
DirectAltBinding = binding_record(rabbit_misc:r(<<"/">>, exchange, <<"amq.direct">>),
904-
QAltResource, QAlt, []),
905-
Bindings = lists:sort([DefaultBinding, DirectBinding, DefaultAltBinding, DirectAltBinding]),
906-
907-
?assertEqual(Bindings,
908-
lists:sort(
909-
rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_binding, list, [<<"/">>]))),
910855

911856
rabbit_ct_broker_helpers:stop_node(Config, Server),
912857

913-
Bindings1 = lists:sort([DirectBinding]),
914-
?awaitMatch(Bindings1,
858+
?awaitMatch([DirectBinding],
915859
lists:sort(
916860
rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_binding, list, [<<"/">>])),
917861
30000),
918862
?awaitMatch([],
919-
rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_amqqueue, list, [<<"/">>]),
863+
rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_amqqueue, list, [<<"/">>]),
920864
30000),
921865

922866
rabbit_ct_broker_helpers:start_node(Config, Server),
923867

924868
Bindings2 = lists:sort([DefaultBinding, DirectBinding]),
925-
?awaitMatch([_], rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_amqqueue, list, [<<"/">>]),
926-
30000),
927869
?awaitMatch(Bindings2,
928870
lists:sort(
929871
rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_binding, list, [<<"/">>])),
930872
30000),
873+
?awaitMatch([_],
874+
rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_amqqueue, list, [<<"/">>]),
875+
30000),
931876
ok.
932877

933878
%% Internal

0 commit comments

Comments
 (0)