Skip to content

Commit eb07565

Browse files
authored
Merge pull request #42 from brentru/variable-headers-large-payload
Fix publishing large payloads (>127 bytes)
2 parents 6ed8b95 + 2ce4f09 commit eb07565

File tree

1 file changed

+46
-36
lines changed

1 file changed

+46
-36
lines changed

adafruit_minimqtt/adafruit_minimqtt.py

Lines changed: 46 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,13 @@
6262
MQTT_PINGRESP = const(0xD0)
6363
MQTT_SUB = b"\x82"
6464
MQTT_UNSUB = b"\xA2"
65-
MQTT_PUB = bytearray(b"\x30\0")
66-
# Variable CONNECT header [MQTT 3.1.2]
67-
MQTT_VAR_HEADER = bytearray(b"\x04MQTT\x04\x02\0\0")
65+
MQTT_PUB = bytearray(b"\x30")
6866
MQTT_DISCONNECT = b"\xe0\0"
6967

68+
# Variable CONNECT header [MQTT 3.1.2]
69+
MQTT_HDR_CONNECT = bytearray(b"\x04MQTT\x04\x02\0\0")
70+
71+
7072
CONNACK_ERRORS = {
7173
const(0x01): "Connection Refused - Incorrect Protocol Version",
7274
const(0x02): "Connection Refused - ID Rejected",
@@ -301,8 +303,11 @@ def connect(self, clean_session=True):
301303
fixed_header = bytearray()
302304
fixed_header.append(0x10)
303305

306+
# NOTE: Variable header is
307+
# MQTT_HDR_CONNECT = bytearray(b"\x04MQTT\x04\x02\0\0")
308+
# because final 4 bytes are 4, 2, 0, 0
304309
# Variable Header
305-
var_header = MQTT_VAR_HEADER
310+
var_header = MQTT_HDR_CONNECT
306311
var_header[6] = clean_session << 1
307312

308313
# Set up variable header and remaining_length
@@ -427,7 +432,6 @@ def publish(self, topic, msg, retain=False, qos=0):
427432
.. code-block:: python
428433
429434
mqtt_client.publish('topics/piVal', 'threepointonefour')
430-
431435
"""
432436
self.is_connected()
433437
self._check_topic(topic)
@@ -443,42 +447,52 @@ def publish(self, topic, msg, retain=False, qos=0):
443447
else:
444448
raise MMQTTException("Invalid message data type.")
445449
if len(msg) > MQTT_MSG_MAX_SZ:
446-
raise MMQTTException("Message size larger than %db." % MQTT_MSG_MAX_SZ)
447-
self._check_qos(qos)
448-
pkt = MQTT_PUB
449-
pkt[0] |= qos << 1 | retain
450-
sz = 2 + len(topic) + len(msg)
450+
raise MMQTTException("Message size larger than %d bytes." % MQTT_MSG_MAX_SZ)
451+
assert (
452+
0 <= qos <= 1
453+
), "Quality of Service Level 2 is unsupported by this library."
454+
455+
pub_hdr_fixed = bytearray() # fixed header
456+
pub_hdr_fixed.extend(MQTT_PUB)
457+
pub_hdr_fixed[0] |= retain | qos << 1 # [3.3.1.2], [3.3.1.3]
458+
459+
pub_hdr_var = bytearray() # variable header
460+
pub_hdr_var.append(len(topic) >> 8) # Topic length, MSB
461+
pub_hdr_var.append(len(topic) & 0xFF) # Topic length, LSB
462+
pub_hdr_var.extend(topic.encode("utf-8")) # Topic name
463+
464+
remaining_length = 2 + len(msg) + len(topic)
451465
if qos > 0:
452-
sz += 2
453-
assert sz < 2097152
454-
i = 1
455-
while sz > 0x7F:
456-
pkt[i] = (sz & 0x7F) | 0x80
457-
sz >>= 7
458-
i += 1
459-
pkt[i] = sz
466+
# packet identifier where QoS level is 1 or 2. [3.3.2.2]
467+
pid = self._pid
468+
remaining_length += 2
469+
pub_hdr_var.append(0x00)
470+
pub_hdr_var.append(self._pid)
471+
self._pid += 1
472+
473+
# Calculate remaining length [2.2.3]
474+
if remaining_length > 0x7F:
475+
while remaining_length > 0:
476+
encoded_byte = remaining_length % 0x80
477+
remaining_length = remaining_length // 0x80
478+
if remaining_length > 0:
479+
encoded_byte |= 0x80
480+
pub_hdr_fixed.append(encoded_byte)
481+
else:
482+
pub_hdr_fixed.append(remaining_length)
483+
460484
if self.logger is not None:
461485
self.logger.debug(
462486
"Sending PUBLISH\nTopic: {0}\nMsg: {1}\
463487
\nQoS: {2}\nRetain? {3}".format(
464488
topic, msg, qos, retain
465489
)
466490
)
467-
self._sock.send(pkt)
468-
self._send_str(topic)
469-
if qos == 0:
470-
if self.on_publish is not None:
471-
self.on_publish(self, self.user_data, topic, self._pid)
472-
if qos > 0:
473-
self._pid += 1
474-
pid = self._pid
475-
struct.pack_into("!H", pkt, 0, pid)
476-
self._sock.send(pkt)
477-
if self.on_publish is not None:
478-
self.on_publish(self, self.user_data, topic, pid)
479-
if self.logger is not None:
480-
self.logger.debug("Sending PUBACK")
491+
self._sock.send(pub_hdr_fixed)
492+
self._sock.send(pub_hdr_var)
481493
self._sock.send(msg)
494+
if qos == 0 and self.on_publish is not None:
495+
self.on_publish(self, self.user_data, topic, self._pid)
482496
if qos == 1:
483497
while True:
484498
op = self._wait_for_msg()
@@ -491,10 +505,6 @@ def publish(self, topic, msg, retain=False, qos=0):
491505
if self.on_publish is not None:
492506
self.on_publish(self, self.user_data, topic, rcv_pid)
493507
return
494-
elif qos == 2:
495-
assert 0
496-
if self.on_publish is not None:
497-
self.on_publish(self, self.user_data, topic, rcv_pid)
498508

499509
def subscribe(self, topic, qos=0):
500510
"""Subscribes to a topic on the MQTT Broker.

0 commit comments

Comments
 (0)