|
64 | 64 | MQTT_SUB = b'\x82'
|
65 | 65 | MQTT_UNSUB = b'\xA2'
|
66 | 66 | MQTT_PUB = bytearray(b'\x30\0')
|
67 |
| -MQTT_CON = bytearray(b'\x10\0\0') |
68 | 67 | # Variable CONNECT header [MQTT 3.1.2]
|
69 |
| -MQTT_CON_HEADER = bytearray(b"\x04MQTT\x04\x02\0\0") |
| 68 | +MQTT_VAR_HEADER = bytearray(b"\x04MQTT\x04\x02\0\0") |
70 | 69 | MQTT_DISCONNECT = b'\xe0\0'
|
71 | 70 |
|
72 | 71 | CONNACK_ERRORS = {const(0x01) : 'Connection Refused - Incorrect Protocol Version',
|
@@ -138,7 +137,7 @@ def __init__(self, socket, broker, port=None, username=None,
|
138 | 137 | self._is_connected = False
|
139 | 138 | self._msg_size_lim = MQTT_MSG_SZ_LIM
|
140 | 139 | self.packet_id = 0
|
141 |
| - self._keep_alive = 0 |
| 140 | + self._keep_alive = 60 |
142 | 141 | self._pid = 0
|
143 | 142 | self._user_data = None
|
144 | 143 | self._subscribed_topics = []
|
@@ -238,31 +237,53 @@ def connect(self, clean_session=True):
|
238 | 237 | self._sock.connect(addr, TCP_MODE)
|
239 | 238 | except RuntimeError as e:
|
240 | 239 | raise MMQTTException("Invalid broker address defined.", e)
|
241 |
| - premsg = MQTT_CON |
242 |
| - msg = MQTT_CON_HEADER |
243 |
| - msg[6] = clean_session << 1 |
244 |
| - sz = 12 + len(self._client_id) |
| 240 | + |
| 241 | + # Fixed Header |
| 242 | + fixed_header = bytearray() |
| 243 | + fixed_header.append(0x10) |
| 244 | + |
| 245 | + # Variable Header |
| 246 | + var_header = MQTT_VAR_HEADER |
| 247 | + var_header[6] = clean_session << 1 |
| 248 | + |
| 249 | + # Set up variable header and remaining_length |
| 250 | + remaining_length = 12 + len(self._client_id) |
245 | 251 | if self._user is not None:
|
246 |
| - sz += 2 + len(self._user) + 2 + len(self._pass) |
247 |
| - msg[6] |= 0xC0 |
| 252 | + remaining_length += 2 + len(self._user) + 2 + len(self._pass) |
| 253 | + var_header[6] |= 0xC0 |
248 | 254 | if self._keep_alive:
|
249 | 255 | assert self._keep_alive < MQTT_TOPIC_LENGTH_LIMIT
|
250 |
| - msg[7] |= self._keep_alive >> 8 |
251 |
| - msg[8] |= self._keep_alive & 0x00FF |
| 256 | + var_header[7] |= self._keep_alive >> 8 |
| 257 | + var_header[8] |= self._keep_alive & 0x00FF |
252 | 258 | if self._lw_topic:
|
253 |
| - sz += 2 + len(self._lw_topic) + 2 + len(self._lw_msg) |
254 |
| - msg[6] |= 0x4 | (self._lw_qos & 0x1) << 3 | (self._lw_qos & 0x2) << 3 |
255 |
| - msg[6] |= self._lw_retain << 5 |
256 |
| - i = 1 |
257 |
| - while sz > 0x7f: |
258 |
| - premsg[i] = (sz & 0x7f) | 0x80 |
259 |
| - sz >>= 7 |
260 |
| - i += 1 |
261 |
| - premsg[i] = sz |
| 259 | + remaining_length += 2 + len(self._lw_topic) + 2 + len(self._lw_msg) |
| 260 | + var_header[6] |= 0x4 | (self._lw_qos & 0x1) << 3 | (self._lw_qos & 0x2) << 3 |
| 261 | + var_header[6] |= self._lw_retain << 5 |
| 262 | + |
| 263 | + # Remaining length calculation |
| 264 | + large_rel_length = False |
| 265 | + if remaining_length > 0x7f: |
| 266 | + large_rel_length = True |
| 267 | + # Calculate Remaining Length [2.2.3] |
| 268 | + while remaining_length > 0: |
| 269 | + encoded_byte = remaining_length % 0x80 |
| 270 | + remaining_length = remaining_length // 0x80 |
| 271 | + # if there is more data to encode, set the top bit of the byte |
| 272 | + if remaining_length > 0: |
| 273 | + encoded_byte |= 0x80 |
| 274 | + fixed_header.append(encoded_byte) |
| 275 | + if large_rel_length: |
| 276 | + fixed_header.append(0x00) |
| 277 | + else: |
| 278 | + fixed_header.append(remaining_length) |
| 279 | + fixed_header.append(0x00) |
| 280 | + |
262 | 281 | if self._logger is not None:
|
263 |
| - self._logger.debug('Sending CONNECT packet to broker') |
264 |
| - self._sock.write(premsg) |
265 |
| - self._sock.write(msg) |
| 282 | + self._logger.debug('Sending CONNECT to broker') |
| 283 | + self._logger.debug('Fixed Header: {}\nVariable Header: {}'.format(fixed_header, |
| 284 | + var_header)) |
| 285 | + self._sock.write(fixed_header) |
| 286 | + self._sock.write(var_header) |
266 | 287 | # [MQTT-3.1.3-4]
|
267 | 288 | self._send_str(self._client_id)
|
268 | 289 | if self._lw_topic:
|
|
0 commit comments