@@ -92,7 +92,7 @@ end_per_testcase(Testcase, Config) ->
92
92
93
93
block_connack_timeout (Config ) ->
94
94
P = rabbit_ct_broker_helpers :get_node_config (Config , 0 , tcp_port_mqtt ),
95
- Ports0 = rpc (Config , erlang , ports , []),
95
+ Ports = rpc (Config , erlang , ports , []),
96
96
97
97
ok = rpc (Config , vm_memory_monitor , set_vm_memory_high_watermark , [0 ]),
98
98
% % Let connection block.
@@ -107,34 +107,35 @@ block_connack_timeout(Config) ->
107
107
unlink (Client ),
108
108
ClientMRef = monitor (process , Client ),
109
109
{error , connack_timeout } = emqtt :connect (Client ),
110
- receive
111
- {'DOWN' , ClientMRef , process , Client , connack_timeout } ->
112
- ok
113
- after 200 ->
114
- ct :fail (" missing connack_timeout in client" )
110
+ receive {'DOWN' , ClientMRef , process , Client , connack_timeout } -> ok
111
+ after 200 -> ct :fail (" missing connack_timeout in client" )
115
112
end ,
116
113
117
- Ports = rpc (Config , erlang , ports , []),
118
- % % Server creates 1 new port to handle our MQTT connection.
119
- [NewPort ] = Ports -- Ports0 ,
120
- {connected , MqttReader } = rpc (Config , erlang , port_info , [NewPort , connected ]),
114
+ MqttReader = rpc (Config , ? MODULE , mqtt_connection_pid , [Ports ]),
121
115
MqttReaderMRef = monitor (process , MqttReader ),
122
116
123
117
% % Unblock connection. CONNECT packet will be processed on the server.
124
118
rpc (Config , vm_memory_monitor , set_vm_memory_high_watermark , [0.4 ]),
125
119
126
- receive
127
- {'DOWN' , MqttReaderMRef , process , MqttReader , {shutdown , {socket_ends , einval }}} ->
128
- % % We expect that MQTT reader process exits (without crashing)
129
- % % because our client already disconnected.
130
- ok
131
- after 2000 ->
132
- ct :fail (" missing peername_not_known from server" )
120
+ receive {'DOWN' , MqttReaderMRef , process , MqttReader , {shutdown , {socket_ends , einval }}} ->
121
+ % % We expect that MQTT reader process exits (without crashing)
122
+ % % because our client already disconnected.
123
+ ok
124
+ after 2000 -> ct :fail (" missing peername_not_known from server" )
133
125
end ,
134
126
% % Ensure that our client is not registered.
135
127
? assertEqual ([], all_connection_pids (Config )),
136
128
ok .
137
129
130
+ mqtt_connection_pid (ExistingPorts ) ->
131
+ NewPorts = erlang :ports () -- ExistingPorts ,
132
+ % % Server creates 1 new TCP port to handle our MQTT connection.
133
+ [MqttConnectionPort ] = lists :filter (fun (P ) ->
134
+ erlang :port_info (P , name ) =:= {name , " tcp_inet" }
135
+ end , NewPorts ),
136
+ {connected , MqttConnectionPid } = erlang :port_info (MqttConnectionPort , connected ),
137
+ MqttConnectionPid .
138
+
138
139
handle_invalid_packets (Config ) ->
139
140
N = rpc (Config , ets , info , [connection_metrics , size ]),
140
141
P = rabbit_ct_broker_helpers :get_node_config (Config , 0 , tcp_port_mqtt ),
0 commit comments