@@ -90,10 +90,13 @@ class MQTT:
90
90
:param str client_id: Optional client identifier, defaults to a unique, generated string.
91
91
:param bool is_ssl: Sets a secure or insecure connection with the broker.
92
92
:param bool log: Attaches a logger to the MQTT client, defaults to logging level INFO.
93
+ :param int keep_alive: KeepAlive interval between the broker and the
94
+ MiniMQTT client, in seconds.
93
95
"""
94
96
# pylint: disable=too-many-arguments,too-many-instance-attributes, not-callable, invalid-name, no-member
95
97
def __init__ (self , socket , broker , port = None , username = None ,
96
- password = None , network_manager = None , client_id = None , is_ssl = True , log = False ):
98
+ password = None , network_manager = None , client_id = None ,
99
+ is_ssl = True , log = False , keep_alive = 60 ):
97
100
# network management
98
101
self ._socket = socket
99
102
network_manager_type = str (type (network_manager ))
@@ -137,9 +140,11 @@ def __init__(self, socket, broker, port=None, username=None,
137
140
self ._is_connected = False
138
141
self ._msg_size_lim = MQTT_MSG_SZ_LIM
139
142
self .packet_id = 0
140
- self ._keep_alive = 60
143
+ self ._keep_alive = keep_alive
141
144
self ._pid = 0
142
145
self ._user_data = None
146
+ self ._timestamp = 0
147
+ # List of subscribed topics, used for tracking
143
148
self ._subscribed_topics = []
144
149
# Server callbacks
145
150
self .on_message = None
@@ -180,34 +185,6 @@ def last_will(self, topic=None, message=None, qos=0, retain=False):
180
185
self ._lw_msg = message
181
186
self ._lw_retain = retain
182
187
183
- def reconnect (self , retries = 30 , resub_topics = True ):
184
- """Attempts to reconnect to the MQTT broker.
185
- :param int retries: Amount of retries before resetting the network interface.
186
- :param bool resub_topics: Resubscribe to previously subscribed topics.
187
- """
188
- retries = 0
189
- while not self ._is_connected :
190
- if self ._logger is not None :
191
- self ._logger .debug ('Attempting to reconnect to broker' )
192
- try :
193
- self .connect ()
194
- if self ._logger is not None :
195
- self ._logger .debug ('Reconnected to broker' )
196
- if resub_topics :
197
- if self ._logger is not None :
198
- self ._logger .debug ('Attempting to resubscribe to prv. subscribed topics.' )
199
- while self ._subscribed_topics :
200
- feed = self ._subscribed_topics .pop ()
201
- self .subscribe (feed )
202
- except OSError as e :
203
- if self ._logger is not None :
204
- self ._logger .debug ('Lost connection, reconnecting and resubscribing...' , e )
205
- retries += 1
206
- if retries >= 30 :
207
- retries = 0
208
- time .sleep (1 )
209
- continue
210
-
211
188
# pylint: disable=too-many-branches, too-many-statements
212
189
def connect (self , clean_session = True ):
213
190
"""Initiates connection with the MQTT Broker.
@@ -564,29 +541,100 @@ def unsubscribe(self, topic):
564
541
self ._subscribed_topics .remove (t )
565
542
return
566
543
544
+ @property
545
+ def is_wifi_connected (self ):
546
+ """Returns if the ESP module is connected to
547
+ an access point, resets module if False"""
548
+ if self ._wifi :
549
+ return self ._wifi .esp .is_connected
550
+ raise MMQTTException ("MiniMQTT Client does not use a WiFi NetworkManager." )
551
+
552
+ # pylint: disable=line-too-long, protected-access
553
+ @property
554
+ def is_sock_connected (self ):
555
+ """Returns if the socket is connected."""
556
+ return self .is_wifi_connected and self ._sock and self ._wifi .esp .socket_connected (self ._sock ._socknum )
557
+
558
+ def reconnect_socket (self ):
559
+ """Re-establishes the socket's connection with the MQTT broker.
560
+ """
561
+ try :
562
+ if self ._logger is not None :
563
+ self ._logger .debug ("Attempting to reconnect with MQTT Broker..." )
564
+ self .reconnect ()
565
+ except RuntimeError as err :
566
+ if self ._logger is not None :
567
+ self ._logger .debug ('Failed to reconnect with MQTT Broker, retrying...' , err )
568
+ time .sleep (1 )
569
+ self .reconnect_socket ()
570
+
571
+ def reconnect_wifi (self ):
572
+ """Reconnects to WiFi Access Point and socket, if disconnected.
573
+ """
574
+ while not self .is_wifi_connected :
575
+ try :
576
+ if self ._logger is not None :
577
+ self ._logger .debug ('Connecting to WiFi AP...' )
578
+ self ._wifi .connect ()
579
+ except (RuntimeError , ValueError ):
580
+ if self ._logger is not None :
581
+ self ._logger .debug ('Failed to reset WiFi module, retrying...' )
582
+ time .sleep (1 )
583
+ # we just reconnected, is the socket still connected?
584
+ if not self .is_sock_connected :
585
+ self .reconnect_socket ()
586
+
587
+ def reconnect (self , resub_topics = True ):
588
+ """Attempts to reconnect to the MQTT broker.
589
+ :param bool resub_topics: Resubscribe to previously subscribed topics.
590
+ """
591
+ if self ._logger is not None :
592
+ self ._logger .debug ('Attempting to reconnect with MQTT broker' )
593
+ self .connect ()
594
+ if self ._logger is not None :
595
+ self ._logger .debug ('Reconnected with broker' )
596
+ if resub_topics :
597
+ if self ._logger is not None :
598
+ self ._logger .debug ('Attempting to resubscribe to previously subscribed topics.' )
599
+ while self ._subscribed_topics :
600
+ feed = self ._subscribed_topics .pop ()
601
+ self .subscribe (feed )
602
+
567
603
def loop_forever (self ):
568
604
"""Starts a blocking message loop. Use this
569
605
method if you want to run a program forever.
606
+ Code below a call to this method will NOT execute.
570
607
Network reconnection is handled within this call.
571
- Your code will not execute anything below this call.
608
+
572
609
"""
573
- run = True
574
- while run :
575
- if self ._is_connected :
576
- self . _wait_for_msg ( 0.0 )
577
- else :
578
- if self . _logger is not None :
579
- self ._logger . debug ( 'Lost connection, reconnecting and resubscribing...' )
580
- self . reconnect ( resub_topics = True )
581
- if self . _logger is not None :
582
- self . _logger . debug ( 'Connection restored, continuing to loop forever...' )
610
+ while True :
611
+ # Check WiFi and socket status
612
+ if self .is_sock_connected :
613
+ try :
614
+ self . loop ()
615
+ except ( RuntimeError , ValueError ) :
616
+ if self ._wifi :
617
+ # Reconnect the WiFi module and the socket
618
+ self . reconnect_wifi ()
619
+ continue
583
620
584
621
def loop (self ):
585
622
"""Non-blocking message loop. Use this method to
586
- check incoming subscription messages. Does not handle
587
- network reconnection like loop_forever - reconnection must
588
- be handled within your code.
623
+ check incoming subscription messages.
624
+
625
+ This method does NOT handle networking or
626
+ network hardware management, use loop_forever
627
+ or handle in code instead.
589
628
"""
629
+ if self ._timestamp == 0 :
630
+ self ._timestamp = time .monotonic ()
631
+ current_time = time .monotonic ()
632
+ if current_time - self ._timestamp >= self ._keep_alive :
633
+ # Handle KeepAlive by expecting a PINGREQ/PINGRESP from the server
634
+ if self ._logger is not None :
635
+ self ._logger .debug ('KeepAlive period elapsed - requesting a PINGRESP from the server...' )
636
+ self .ping ()
637
+ self ._timestamp = 0
590
638
self ._sock .settimeout (0.1 )
591
639
return self ._wait_for_msg ()
592
640
0 commit comments