61
61
const (0x05 ): "Connection Refused - Unauthorized" ,
62
62
}
63
63
64
- _the_interface = None # pylint: disable=invalid-name
65
- _the_sock = None # pylint: disable=invalid-name
64
+ _default_sock = None # pylint: disable=invalid-name
65
+ _fake_context = None # pylint: disable=invalid-name
66
66
67
67
68
68
class MMQTTException (Exception ):
@@ -74,17 +74,17 @@ class MMQTTException(Exception):
74
74
75
75
# Legacy ESP32SPI Socket API
76
76
def set_socket (sock , iface = None ):
77
- """Legacy API for setting the socket and network interface, use a Session instead.
78
-
77
+ """Legacy API for setting the socket and network interface.
79
78
:param sock: socket object.
80
79
:param iface: internet interface object
80
+
81
81
"""
82
- global _the_sock # pylint: disable=invalid-name, global-statement
83
- _the_sock = sock
82
+ global _default_sock # pylint: disable=invalid-name, global-statement
83
+ global _fake_context # pylint: disable=invalid-name, global-statement
84
+ _default_sock = sock
84
85
if iface :
85
- global _the_interface # pylint: disable=invalid-name, global-statement
86
- _the_interface = iface
87
- _the_sock .set_interface (iface )
86
+ _default_sock .set_interface (iface )
87
+ _fake_context = _FakeSSLContext (iface )
88
88
89
89
90
90
class _FakeSSLSocket :
@@ -144,18 +144,7 @@ def __init__(
144
144
):
145
145
146
146
self ._socket_pool = socket_pool
147
- # Legacy API - if we do not have a socket pool, use default socket
148
- if self ._socket_pool is None :
149
- self ._socket_pool = _the_sock
150
-
151
147
self ._ssl_context = ssl_context
152
- # Legacy API - if we do not have SSL context, fake it
153
- if self ._ssl_context is None :
154
- self ._ssl_context = _FakeSSLContext (_the_interface )
155
-
156
- # Hang onto open sockets so that we can reuse them
157
- self ._socket_free = {}
158
- self ._open_sockets = {}
159
148
self ._sock = None
160
149
self ._backwards_compatible_sock = False
161
150
@@ -214,62 +203,37 @@ def __init__(
214
203
self .on_subscribe = None
215
204
self .on_unsubscribe = None
216
205
217
- # Socket helpers
218
- def _free_socket (self , socket ):
219
- """Frees a socket for re-use."""
220
- if socket not in self ._open_sockets .values ():
221
- raise RuntimeError ("Socket not from MQTT client." )
222
- self ._socket_free [socket ] = True
223
-
224
- def _close_socket (self , socket ):
225
- """Closes a slocket."""
226
- socket .close ()
227
- del self ._socket_free [socket ]
228
- key = None
229
- for k in self ._open_sockets :
230
- if self ._open_sockets [k ] == socket :
231
- key = k
232
- break
233
- if key :
234
- del self ._open_sockets [key ]
235
-
236
- def _free_sockets (self ):
237
- """Closes all free sockets."""
238
- free_sockets = []
239
- for sock in self ._socket_free :
240
- if self ._socket_free [sock ]:
241
- free_sockets .append (sock )
242
- for sock in free_sockets :
243
- self ._close_socket (sock )
244
-
245
- # pylint: disable=too-many-branches
246
- def _get_socket (self , host , port , * , timeout = 1 ):
247
- key = (host , port )
248
- if key in self ._open_sockets :
249
- sock = self ._open_sockets [key ]
250
- if self ._socket_free [sock ]:
251
- self ._socket_free [sock ] = False
252
- return sock
206
+ def _get_connect_socket (self , host , port , * , timeout = 1 ):
207
+ """Obtains a new socket and connects to a broker.
208
+ :param str host: Desired broker hostname
209
+ :param int port: Desired broker port
210
+ :param int timeout: Desired socket timeout
211
+ """
212
+ # For reconnections - check if we're using a socket already and close it
213
+ if self ._sock :
214
+ self ._sock .close ()
215
+ self ._sock = None
216
+
217
+ # Legacy API - use the interface's socket instead of a passed socket pool
218
+ if self ._socket_pool is None :
219
+ self ._socket_pool = _default_sock
220
+
221
+ # Legacy API - fake the ssl context
222
+ if self ._ssl_context is None :
223
+ self ._ssl_context = _fake_context
224
+
253
225
if port == 8883 and not self ._ssl_context :
254
226
raise RuntimeError (
255
227
"ssl_context must be set before using adafruit_mqtt for secure MQTT."
256
228
)
257
229
258
- # Legacy API - use a default socket instead of socket pool
259
- if self ._socket_pool is None :
260
- self ._socket_pool = _the_sock
261
-
262
230
addr_info = self ._socket_pool .getaddrinfo (
263
231
host , port , 0 , self ._socket_pool .SOCK_STREAM
264
232
)[0 ]
265
- retry_count = 0
233
+
266
234
sock = None
235
+ retry_count = 0
267
236
while retry_count < 5 and sock is None :
268
- if retry_count > 0 :
269
- if any (self ._socket_free .items ()):
270
- self ._free_sockets ()
271
- else :
272
- raise RuntimeError ("Sending request failed" )
273
237
retry_count += 1
274
238
275
239
try :
@@ -298,9 +262,6 @@ def _get_socket(self, host, port, *, timeout=1):
298
262
raise RuntimeError ("Repeated socket failures" )
299
263
300
264
self ._backwards_compatible_sock = not hasattr (sock , "recv_into" )
301
-
302
- self ._open_sockets [key ] = sock
303
- self ._socket_free [sock ] = False
304
265
return sock
305
266
306
267
def __enter__ (self ):
@@ -463,8 +424,8 @@ def connect(self, clean_session=True, host=None, port=None, keep_alive=None):
463
424
if self .logger :
464
425
self .logger .debug ("Attempting to establish MQTT connection..." )
465
426
466
- # Attempt to get a new socket
467
- self ._sock = self ._get_socket (self .broker , self .port )
427
+ # Get a new socket
428
+ self ._sock = self ._get_connect_socket (self .broker , self .port )
468
429
469
430
# Fixed Header
470
431
fixed_header = bytearray ([0x10 ])
0 commit comments