@@ -94,7 +94,7 @@ end_per_testcase(Testcase, Config) ->
94
94
95
95
block_connack_timeout (Config ) ->
96
96
P = rabbit_ct_broker_helpers :get_node_config (Config , 0 , tcp_port_mqtt ),
97
- Ports0 = rpc (Config , erlang , ports , []),
97
+ Ports = rpc (Config , erlang , ports , []),
98
98
99
99
ok = rpc (Config , vm_memory_monitor , set_vm_memory_high_watermark , [0 ]),
100
100
% % Let connection block.
@@ -109,34 +109,35 @@ block_connack_timeout(Config) ->
109
109
unlink (Client ),
110
110
ClientMRef = monitor (process , Client ),
111
111
{error , connack_timeout } = emqtt :connect (Client ),
112
- receive
113
- {'DOWN' , ClientMRef , process , Client , connack_timeout } ->
114
- ok
115
- after 200 ->
116
- ct :fail (" missing connack_timeout in client" )
112
+ receive {'DOWN' , ClientMRef , process , Client , connack_timeout } -> ok
113
+ after 200 -> ct :fail (" missing connack_timeout in client" )
117
114
end ,
118
115
119
- Ports = rpc (Config , erlang , ports , []),
120
- % % Server creates 1 new port to handle our MQTT connection.
121
- [NewPort ] = Ports -- Ports0 ,
122
- {connected , MqttReader } = rpc (Config , erlang , port_info , [NewPort , connected ]),
116
+ MqttReader = rpc (Config , ? MODULE , mqtt_connection_pid , [Ports ]),
123
117
MqttReaderMRef = monitor (process , MqttReader ),
124
118
125
119
% % Unblock connection. CONNECT packet will be processed on the server.
126
120
rpc (Config , vm_memory_monitor , set_vm_memory_high_watermark , [0.4 ]),
127
121
128
- receive
129
- {'DOWN' , MqttReaderMRef , process , MqttReader , {shutdown , {socket_ends , einval }}} ->
130
- % % We expect that MQTT reader process exits (without crashing)
131
- % % because our client already disconnected.
132
- ok
133
- after 2000 ->
134
- ct :fail (" missing peername_not_known from server" )
122
+ receive {'DOWN' , MqttReaderMRef , process , MqttReader , {shutdown , {socket_ends , einval }}} ->
123
+ % % We expect that MQTT reader process exits (without crashing)
124
+ % % because our client already disconnected.
125
+ ok
126
+ after 2000 -> ct :fail (" missing peername_not_known from server" )
135
127
end ,
136
128
% % Ensure that our client is not registered.
137
129
? assertEqual ([], all_connection_pids (Config )),
138
130
ok .
139
131
132
+ mqtt_connection_pid (ExistingPorts ) ->
133
+ NewPorts = erlang :ports () -- ExistingPorts ,
134
+ % % Server creates 1 new TCP port to handle our MQTT connection.
135
+ [MqttConnectionPort ] = lists :filter (fun (P ) ->
136
+ erlang :port_info (P , name ) =:= {name , " tcp_inet" }
137
+ end , NewPorts ),
138
+ {connected , MqttConnectionPid } = erlang :port_info (MqttConnectionPort , connected ),
139
+ MqttConnectionPid .
140
+
140
141
handle_invalid_packets (Config ) ->
141
142
N = rpc (Config , ets , info , [connection_metrics , size ]),
142
143
P = rabbit_ct_broker_helpers :get_node_config (Config , 0 , tcp_port_mqtt ),
0 commit comments