Skip to content

Exposing attributes used by client libraries #6

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 23, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 88 additions & 89 deletions adafruit_minimqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,33 +115,32 @@ def __init__(self, socket, broker, port=None, username=None,
if port is not None:
self.port = port
# session identifiers
self._user = username
self.user = username
# [MQTT-3.1.3.5]
self._pass = password
if self._pass is not None and len(password.encode('utf-8')) > MQTT_TOPIC_LENGTH_LIMIT:
self.password = password
if self.password is not None and len(password.encode('utf-8')) > MQTT_TOPIC_LENGTH_LIMIT:
raise MMQTTException('Password length is too large.')
if client_id is not None:
# user-defined client_id MAY allow client_id's > 23 bytes or
# non-alpha-numeric characters
self._client_id = client_id
self.client_id = client_id
else:
# assign a unique client_id
self._client_id = 'cpy{0}{1}'.format(microcontroller.cpu.uid[randint(0, 15)],
randint(0, 9))
self.client_id = 'cpy{0}{1}'.format(microcontroller.cpu.uid[randint(0, 15)],
randint(0, 9))
# generated client_id's enforce spec.'s length rules
if len(self._client_id) > 23 or not self._client_id:
if len(self.client_id) > 23 or not self.client_id:
raise ValueError('MQTT Client ID must be between 1 and 23 bytes')
self._logger = None
self.keep_alive = keep_alive
self.user_data = None
self.logger = None
if log is True:
self._logger = logging.getLogger('log')
self._logger.setLevel(logging.INFO)
self.logger = logging.getLogger('log')
self.logger.setLevel(logging.INFO)
self._sock = None
self._is_connected = False
self._msg_size_lim = MQTT_MSG_SZ_LIM
self.packet_id = 0
self._keep_alive = keep_alive
self._pid = 0
self._user_data = None
self._timestamp = 0
# List of subscribed topics, used for tracking
self._subscribed_topics = []
Expand Down Expand Up @@ -177,8 +176,8 @@ def last_will(self, topic=None, message=None, qos=0, retain=False):
raise MMQTTException('Last Will should be defined before connect() is called.')
if qos < 0 or qos > 2:
raise MMQTTException("Invalid QoS level, must be between 0 and 2.")
if self._logger is not None:
self._logger.debug('Setting last will properties')
if self.logger is not None:
self.logger.debug('Setting last will properties')
self._lw_qos = qos
self._lw_topic = topic
self._lw_msg = message
Expand All @@ -190,14 +189,14 @@ def connect(self, clean_session=True):
:param bool clean_session: Establishes a persistent session.
"""
self._set_interface()
if self._logger is not None:
self._logger.debug('Creating new socket')
if self.logger is not None:
self.logger.debug('Creating new socket')
self._sock = self._socket.socket()
self._sock.settimeout(10)
if self.port == 8883:
try:
if self._logger is not None:
self._logger.debug('Attempting to establish secure MQTT connection...')
if self.logger is not None:
self.logger.debug('Attempting to establish secure MQTT connection...')
self._sock.connect((self.broker, self.port), TLS_MODE)
except RuntimeError:
raise MMQTTException("Invalid broker address defined.")
Expand All @@ -207,8 +206,8 @@ def connect(self, clean_session=True):
else:
addr = (self.broker, self.port)
try:
if self._logger is not None:
self._logger.debug('Attempting to establish insecure MQTT connection...')
if self.logger is not None:
self.logger.debug('Attempting to establish insecure MQTT connection...')
#self._sock.connect((self.broker, self.port), TCP_MODE)
self._sock.connect(addr, TCP_MODE)
except RuntimeError as e:
Expand All @@ -223,14 +222,14 @@ def connect(self, clean_session=True):
var_header[6] = clean_session << 1

# Set up variable header and remaining_length
remaining_length = 12 + len(self._client_id)
if self._user is not None:
remaining_length += 2 + len(self._user) + 2 + len(self._pass)
remaining_length = 12 + len(self.client_id)
if self.user is not None:
remaining_length += 2 + len(self.user) + 2 + len(self.password)
var_header[6] |= 0xC0
if self._keep_alive:
assert self._keep_alive < MQTT_TOPIC_LENGTH_LIMIT
var_header[7] |= self._keep_alive >> 8
var_header[8] |= self._keep_alive & 0x00FF
if self.keep_alive:
assert self.keep_alive < MQTT_TOPIC_LENGTH_LIMIT
var_header[7] |= self.keep_alive >> 8
var_header[8] |= self.keep_alive & 0x00FF
if self._lw_topic:
remaining_length += 2 + len(self._lw_topic) + 2 + len(self._lw_msg)
var_header[6] |= 0x4 | (self._lw_qos & 0x1) << 3 | (self._lw_qos & 0x2) << 3
Expand All @@ -254,25 +253,25 @@ def connect(self, clean_session=True):
fixed_header.append(remaining_length)
fixed_header.append(0x00)

if self._logger is not None:
self._logger.debug('Sending CONNECT to broker')
self._logger.debug('Fixed Header: {}\nVariable Header: {}'.format(fixed_header,
var_header))
if self.logger is not None:
self.logger.debug('Sending CONNECT to broker')
self.logger.debug('Fixed Header: {}\nVariable Header: {}'.format(fixed_header,
var_header))
self._sock.write(fixed_header)
self._sock.write(var_header)
# [MQTT-3.1.3-4]
self._send_str(self._client_id)
self._send_str(self.client_id)
if self._lw_topic:
# [MQTT-3.1.3-11]
self._send_str(self._lw_topic)
self._send_str(self._lw_msg)
if self._user is None:
self._user = None
if self.user is None:
self.user = None
else:
self._send_str(self._user)
self._send_str(self._pass)
if self._logger is not None:
self._logger.debug('Receiving CONNACK packet from broker')
self._send_str(self.user)
self._send_str(self.password)
if self.logger is not None:
self.logger.debug('Receiving CONNACK packet from broker')
while True:
op = self._wait_for_msg()
if op == 32:
Expand All @@ -283,34 +282,34 @@ def connect(self, clean_session=True):
self._is_connected = True
result = rc[0] & 1
if self.on_connect is not None:
self.on_connect(self, self._user_data, result, rc[2])
self.on_connect(self, self.user_data, result, rc[2])
return result

def disconnect(self):
"""Disconnects the MiniMQTT client from the MQTT broker.
"""
self.is_connected()
if self._logger is not None:
self._logger.debug('Sending DISCONNECT packet to broker')
if self.logger is not None:
self.logger.debug('Sending DISCONNECT packet to broker')
self._sock.write(MQTT_DISCONNECT)
if self._logger is not None:
self._logger.debug('Closing socket')
if self.logger is not None:
self.logger.debug('Closing socket')
self._sock.close()
self._is_connected = False
self._subscribed_topics = None
if self.on_disconnect is not None:
self.on_disconnect(self, self._user_data, 0)
self.on_disconnect(self, self.user_data, 0)

def ping(self):
"""Pings the MQTT Broker to confirm if the broker is alive or if
there is an active network connection.
"""
self.is_connected()
if self._logger is not None:
self._logger.debug('Sending PINGREQ')
if self.logger is not None:
self.logger.debug('Sending PINGREQ')
self._sock.write(MQTT_PINGREQ)
if self._logger is not None:
self._logger.debug('Checking PINGRESP')
if self.logger is not None:
self.logger.debug('Checking PINGRESP')
while True:
op = self._wait_for_msg(0.5)
if op == 208:
Expand Down Expand Up @@ -373,23 +372,23 @@ def publish(self, topic, msg, retain=False, qos=0):
sz >>= 7
i += 1
pkt[i] = sz
if self._logger is not None:
self._logger.debug('Sending PUBLISH\nTopic: {0}\nMsg: {1}\
if self.logger is not None:
self.logger.debug('Sending PUBLISH\nTopic: {0}\nMsg: {1}\
\nQoS: {2}\nRetain? {3}'.format(topic, msg, qos, retain))
self._sock.write(pkt)
self._send_str(topic)
if qos == 0:
if self.on_publish is not None:
self.on_publish(self, self._user_data, topic, self._pid)
self.on_publish(self, self.user_data, topic, self._pid)
if qos > 0:
self._pid += 1
pid = self._pid
struct.pack_into("!H", pkt, 0, pid)
self._sock.write(pkt)
if self.on_publish is not None:
self.on_publish(self, self._user_data, topic, pid)
if self._logger is not None:
self._logger.debug('Sending PUBACK')
self.on_publish(self, self.user_data, topic, pid)
if self.logger is not None:
self.logger.debug('Sending PUBACK')
self._sock.write(msg)
if qos == 1:
while True:
Expand All @@ -401,12 +400,12 @@ def publish(self, topic, msg, retain=False, qos=0):
rcv_pid = rcv_pid[0] << 0x08 | rcv_pid[1]
if pid == rcv_pid:
if self.on_publish is not None:
self.on_publish(self, self._user_data, topic, rcv_pid)
self.on_publish(self, self.user_data, topic, rcv_pid)
return
elif qos == 2:
assert 0
if self.on_publish is not None:
self.on_publish(self, self._user_data, topic, rcv_pid)
self.on_publish(self, self.user_data, topic, rcv_pid)

def subscribe(self, topic, qos=0):
"""Subscribes to a topic on the MQTT Broker.
Expand Down Expand Up @@ -466,9 +465,9 @@ def subscribe(self, topic, qos=0):
topic_size = len(t).to_bytes(2, 'big')
qos_byte = q.to_bytes(1, 'big')
packet += topic_size + t + qos_byte
if self._logger is not None:
if self.logger is not None:
for t, q in topics:
self._logger.debug('SUBSCRIBING to topic {0} with QoS {1}'.format(t, q))
self.logger.debug('SUBSCRIBING to topic {0} with QoS {1}'.format(t, q))
self._sock.write(packet)
while True:
op = self._wait_for_msg()
Expand All @@ -479,7 +478,7 @@ def subscribe(self, topic, qos=0):
raise MMQTTException('SUBACK Failure!')
for t, q in topics:
if self.on_subscribe is not None:
self.on_subscribe(self, self._user_data, t, q)
self.on_subscribe(self, self.user_data, t, q)
self._subscribed_topics.append(t)
return

Expand Down Expand Up @@ -521,12 +520,12 @@ def unsubscribe(self, topic):
for t in topics:
topic_size = len(t).to_bytes(2, 'big')
packet += topic_size + t
if self._logger is not None:
if self.logger is not None:
for t in topics:
self._logger.debug('UNSUBSCRIBING from topic {0}.'.format(t))
self.logger.debug('UNSUBSCRIBING from topic {0}.'.format(t))
self._sock.write(packet)
if self._logger is not None:
self._logger.debug('Waiting for UNSUBACK...')
if self.logger is not None:
self.logger.debug('Waiting for UNSUBACK...')
while True:
op = self._wait_for_msg()
if op == 176:
Expand All @@ -536,7 +535,7 @@ def unsubscribe(self, topic):
assert return_code[1] == packet_id_bytes[0] and return_code[2] == packet_id_bytes[1]
for t in topics:
if self.on_unsubscribe is not None:
self.on_unsubscribe(self, self._user_data, t, self._pid)
self.on_unsubscribe(self, self.user_data, t, self._pid)
self._subscribed_topics.remove(t)
return

Expand All @@ -558,12 +557,12 @@ def reconnect_socket(self):
"""Re-establishes the socket's connection with the MQTT broker.
"""
try:
if self._logger is not None:
self._logger.debug("Attempting to reconnect with MQTT Broker...")
if self.logger is not None:
self.logger.debug("Attempting to reconnect with MQTT Broker...")
self.reconnect()
except RuntimeError as err:
if self._logger is not None:
self._logger.debug('Failed to reconnect with MQTT Broker, retrying...', err)
if self.logger is not None:
self.logger.debug('Failed to reconnect with MQTT Broker, retrying...', err)
time.sleep(1)
self.reconnect_socket()

Expand All @@ -572,12 +571,12 @@ def reconnect_wifi(self):
"""
while not self.is_wifi_connected:
try:
if self._logger is not None:
self._logger.debug('Connecting to WiFi AP...')
if self.logger is not None:
self.logger.debug('Connecting to WiFi AP...')
self._wifi.connect()
except (RuntimeError, ValueError):
if self._logger is not None:
self._logger.debug('Failed to reset WiFi module, retrying...')
if self.logger is not None:
self.logger.debug('Failed to reset WiFi module, retrying...')
time.sleep(1)
# we just reconnected, is the socket still connected?
if not self.is_sock_connected:
Expand All @@ -587,14 +586,14 @@ def reconnect(self, resub_topics=True):
"""Attempts to reconnect to the MQTT broker.
:param bool resub_topics: Resubscribe to previously subscribed topics.
"""
if self._logger is not None:
self._logger.debug('Attempting to reconnect with MQTT broker')
if self.logger is not None:
self.logger.debug('Attempting to reconnect with MQTT broker')
self.connect()
if self._logger is not None:
self._logger.debug('Reconnected with broker')
if self.logger is not None:
self.logger.debug('Reconnected with broker')
if resub_topics:
if self._logger is not None:
self._logger.debug('Attempting to resubscribe to previously subscribed topics.')
if self.logger is not None:
self.logger.debug('Attempting to resubscribe to previously subscribed topics.')
while self._subscribed_topics:
feed = self._subscribed_topics.pop()
self.subscribe(feed)
Expand Down Expand Up @@ -628,10 +627,10 @@ def loop(self):
if self._timestamp == 0:
self._timestamp = time.monotonic()
current_time = time.monotonic()
if current_time - self._timestamp >= self._keep_alive:
if current_time - self._timestamp >= self.keep_alive:
# Handle KeepAlive by expecting a PINGREQ/PINGRESP from the server
if self._logger is not None:
self._logger.debug('KeepAlive period elapsed - requesting a PINGRESP from the server...')
if self.logger is not None:
self.logger.debug('KeepAlive period elapsed - requesting a PINGRESP from the server...')
self.ping()
self._timestamp = 0
self._sock.settimeout(0.1)
Expand Down Expand Up @@ -753,22 +752,22 @@ def attach_logger(self, logger_name='log'):
"""Initializes and attaches a logger to the MQTTClient.
:param str logger_name: Name of the logger instance
"""
self._logger = logging.getLogger(logger_name)
self._logger.setLevel(logging.INFO)
self.logger = logging.getLogger(logger_name)
self.logger.setLevel(logging.INFO)

def set_logger_level(self, log_level):
"""Sets the level of the logger, if defined during init.
:param string log_level: Level of logging to output to the REPL.
"""
if self._logger is None:
if self.logger is None:
raise MMQTTException('No logger attached - did you create it during initialization?')
if log_level == 'DEBUG':
self._logger.setLevel(logging.DEBUG)
self.logger.setLevel(logging.DEBUG)
elif log_level == 'INFO':
self._logger.setLevel(logging.INFO)
self.logger.setLevel(logging.INFO)
elif log_level == 'WARNING':
self._logger.setLevel(logging.WARNING)
self.logger.setLevel(logging.WARNING)
elif log_level == 'ERROR':
self._logger.setLevel(logging.CRITICIAL)
self.logger.setLevel(logging.CRITICIAL)
else:
raise MMQTTException('Incorrect logging level provided!')