@@ -894,34 +894,30 @@ session_expiry(Config) ->
894
894
ok = rpc (Config , application , set_env , [App , Par , DefaultVal ]).
895
895
896
896
non_clean_sess_reconnect_qos1 (Config ) ->
897
- non_clean_sess_reconnect (Config , qos1 ).
897
+ non_clean_sess_reconnect (Config , 1 ).
898
898
899
899
non_clean_sess_reconnect_qos0 (Config ) ->
900
- non_clean_sess_reconnect (Config , qos0 ).
900
+ non_clean_sess_reconnect (Config , 0 ).
901
901
902
902
non_clean_sess_reconnect (Config , SubscriptionQoS ) ->
903
903
Pub = connect (<<" publisher" >>, Config ),
904
904
Topic = ClientId = atom_to_binary (? FUNCTION_NAME ),
905
905
906
906
C1 = connect (ClientId , Config , non_clean_sess_opts ()),
907
- {ok , _ , _ } = emqtt :subscribe (C1 , Topic , SubscriptionQoS ),
908
- ? assertMatch (#{consumers := 1 },
909
- get_global_counters (Config )),
907
+ {ok , _ , [SubscriptionQoS ]} = emqtt :subscribe (C1 , Topic , SubscriptionQoS ),
908
+ ok = await_consumer_count (1 , ClientId , SubscriptionQoS , Config ),
910
909
911
910
ok = emqtt :disconnect (C1 ),
912
- eventually (? _assertMatch (#{consumers := 0 },
913
- get_global_counters (Config ))),
911
+ ok = await_consumer_count (0 , ClientId , SubscriptionQoS , Config ),
914
912
915
- timer :sleep (20 ),
916
913
ok = emqtt :publish (Pub , Topic , <<" msg-3-qos0" >>, qos0 ),
917
914
{ok , _ } = emqtt :publish (Pub , Topic , <<" msg-4-qos1" >>, qos1 ),
918
915
919
916
C2 = connect (ClientId , Config , non_clean_sess_opts ()),
920
917
% % Server should reply in CONNACK that it has session state.
921
918
? assertEqual ({session_present , 1 },
922
919
proplists :lookup (session_present , emqtt :info (C2 ))),
923
- ? assertMatch (#{consumers := 1 },
924
- get_global_counters (Config )),
920
+ ok = await_consumer_count (1 , ClientId , SubscriptionQoS , Config ),
925
921
926
922
ok = emqtt :publish (Pub , Topic , <<" msg-5-qos0" >>, qos0 ),
927
923
{ok , _ } = emqtt :publish (Pub , Topic , <<" msg-6-qos1" >>, qos1 ),
@@ -954,21 +950,20 @@ non_clean_sess_reconnect_qos0_and_qos1(Config) ->
954
950
ClientId = ? FUNCTION_NAME ,
955
951
956
952
C1 = connect (ClientId , Config , non_clean_sess_opts ()),
957
- {ok , _ , [1 , 0 ]} = emqtt :subscribe (C1 , [{Topic1 , qos1 }, {Topic0 , qos0 }]),
958
- ? assertMatch (#{consumers := 1 },
959
- get_global_counters (Config )),
953
+ {ok , _ , [1 , 0 ]} = emqtt :subscribe (C1 , [{Topic1 , qos1 },
954
+ {Topic0 , qos0 }]),
955
+ ok = await_consumer_count (1 , ClientId , 0 , Config ),
956
+ ok = await_consumer_count (1 , ClientId , 1 , Config ),
960
957
961
958
ok = emqtt :disconnect (C1 ),
962
- eventually (? _assertMatch (#{consumers := 0 },
963
- get_global_counters (Config ))),
964
-
959
+ ok = await_consumer_count (0 , ClientId , 0 , Config ),
960
+ ok = await_consumer_count (0 , ClientId , 1 , Config ),
965
961
{ok , _ } = emqtt :publish (Pub , Topic0 , <<" msg-0" >>, qos1 ),
966
962
{ok , _ } = emqtt :publish (Pub , Topic1 , <<" msg-1" >>, qos1 ),
967
963
968
964
C2 = connect (ClientId , Config , non_clean_sess_opts ()),
969
- ? assertMatch (#{consumers := 1 },
970
- get_global_counters (Config )),
971
-
965
+ ok = await_consumer_count (1 , ClientId , 0 , Config ),
966
+ ok = await_consumer_count (1 , ClientId , 1 , Config ),
972
967
ok = expect_publishes (C2 , Topic0 , [<<" msg-0" >>]),
973
968
ok = expect_publishes (C2 , Topic1 , [<<" msg-1" >>]),
974
969
@@ -1884,6 +1879,17 @@ await_confirms_unordered(From, Left) ->
1884
1879
ct :fail (" ~b confirms are missing" , [Left ])
1885
1880
end .
1886
1881
1882
+ await_consumer_count (ConsumerCount , ClientId , QoS , Config ) ->
1883
+ Ch = rabbit_ct_client_helpers :open_channel (Config ),
1884
+ QueueName = rabbit_mqtt_util :queue_name_bin (
1885
+ rabbit_data_coercion :to_binary (ClientId ), QoS ),
1886
+ eventually (
1887
+ ? _assertMatch (
1888
+ # 'queue.declare_ok' {consumer_count = ConsumerCount },
1889
+ amqp_channel :call (Ch , # 'queue.declare' {queue = QueueName ,
1890
+ passive = true })), 500 , 10 ),
1891
+ ok = rabbit_ct_client_helpers :close_channel (Ch ).
1892
+
1887
1893
declare_queue (Ch , QueueName , Args )
1888
1894
when is_pid (Ch ), is_binary (QueueName ), is_list (Args ) ->
1889
1895
# 'queue.declare_ok' {} = amqp_channel :call (
0 commit comments