Skip to content

Commit f44a33c

Browse files
author
Paul Sokolovsky
committed
umqtt.simple: Initial implementation of a simple MQTT client.
1 parent 0b52419 commit f44a33c

File tree

1 file changed

+147
-0
lines changed

1 file changed

+147
-0
lines changed

umqtt.simple/umqtt/simple.py

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
import usocket as socket
2+
import ustruct as struct
3+
from ubinascii import hexlify
4+
import time
5+
6+
class MQTTException(Exception):
7+
pass
8+
9+
class MQTTClient:
10+
11+
def __init__(self, client_id, server, port=1883):
12+
self.client_id = client_id
13+
self.sock = None
14+
self.addr = socket.getaddrinfo(server, port)[0][-1]
15+
self.pid = 0
16+
self.cb = None
17+
18+
def _send_str(self, s):
19+
self.sock.write(struct.pack("!H", len(s)))
20+
self.sock.write(s)
21+
22+
def _recv_len(self):
23+
n = 0
24+
sh = 0
25+
while 1:
26+
b = self.sock.read(1)[0]
27+
n |= (b & 0x7f) << sh
28+
if not b & 0x80:
29+
return n
30+
sh += 7
31+
32+
def set_callback(self, f):
33+
self.cb = f
34+
35+
def connect(self, clean_session=True):
36+
self.sock = socket.socket()
37+
self.sock.connect(self.addr)
38+
msg = bytearray(b"\x10\0\0\x04MQTT\x04\x02\0\0")
39+
msg[1] = 10 + 2 + len(self.client_id)
40+
msg[9] = clean_session << 1
41+
self.sock.write(msg)
42+
#print(hex(len(msg)), hexlify(msg, ":"))
43+
self._send_str(self.client_id)
44+
resp = self.sock.read(4)
45+
assert resp[0] == 0x20 and resp[1] == 0x02
46+
if resp[3] != 0:
47+
raise MQTTException(resp[3])
48+
return resp[2] & 1
49+
50+
def disconnect(self):
51+
self.sock.write(b"\xe0\0")
52+
self.sock.close()
53+
54+
def ping(self):
55+
self.sock.write(b"\xc0\0")
56+
self.sock.close()
57+
58+
def publish(self, topic, msg, retain=False, qos=0):
59+
pkt = bytearray(b"\x30\0\0")
60+
pkt[0] |= qos << 1 | retain
61+
sz = 2 + len(topic) + len(msg)
62+
if qos > 0:
63+
sz += 2
64+
assert sz <= 16383
65+
pkt[1] = (sz & 0x7f) | 0x80
66+
pkt[2] = sz >> 7
67+
#print(hex(len(pkt)), hexlify(pkt, ":"))
68+
self.sock.write(pkt)
69+
self._send_str(topic)
70+
if qos > 0:
71+
self.pid += 1
72+
pid = self.pid
73+
buf = bytearray(b"\0\0")
74+
struct.pack_into("!H", buf, 0, pid)
75+
self.sock.write(buf)
76+
self.sock.write(msg)
77+
if qos == 1:
78+
while 1:
79+
op = self.wait_msg()
80+
if op == 0x40:
81+
sz = self.sock.read(1)
82+
assert sz == b"\x02"
83+
rcv_pid = self.sock.read(2)
84+
rcv_pid = rcv_pid[0] << 8 | rcv_pid[1]
85+
if pid == rcv_pid:
86+
return
87+
elif qos == 2:
88+
assert 0
89+
90+
def subscribe(self, topic, qos=0):
91+
assert self.cb is not None, "Subscribe callback is not set"
92+
pkt = bytearray(b"\x82\0\0\0")
93+
self.pid += 1
94+
struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic) + 1, self.pid)
95+
#print(hex(len(pkt)), hexlify(pkt, ":"))
96+
self.sock.write(pkt)
97+
self._send_str(topic)
98+
self.sock.write(qos.to_bytes(1))
99+
resp = self.sock.read(5)
100+
#print(resp)
101+
assert resp[0] == 0x90
102+
assert resp[2] == pkt[2] and resp[3] == pkt[3]
103+
if resp[4] == 0x80:
104+
raise MQTTException(resp[4])
105+
106+
# Wait for a single incoming MQTT message and process it.
107+
# Subscribed messages are delivered to a callback previously
108+
# set by .set_callback() method. Other (internal) MQTT
109+
# messages processed internally.
110+
def wait_msg(self):
111+
res = self.sock.read(1)
112+
if res is None:
113+
return None
114+
self.sock.setblocking(True)
115+
if res == b"":
116+
raise OSError(-1)
117+
if res == b"\xd0": # PINGRESP
118+
sz = self.sock.read(1)[0]
119+
assert sz == 0
120+
return None
121+
op = res[0]
122+
if op & 0xf0 != 0x30:
123+
return op
124+
sz = self._recv_len()
125+
topic_len = self.sock.read(2)
126+
topic_len = (topic_len[0] << 8) | topic_len[1]
127+
topic = self.sock.read(topic_len)
128+
sz -= topic_len + 2
129+
if op & 6:
130+
pid = self.sock.read(2)
131+
pid = pid[0] << 8 | pid[1]
132+
sz -= 2
133+
msg = self.sock.read(sz)
134+
self.cb(topic, msg)
135+
if op & 6 == 2:
136+
pkt = bytearray(b"\x40\x02\0\0")
137+
struct.pack_into("!H", pkt, 2, pid)
138+
self.sock.write(pkt)
139+
elif op & 6 == 4:
140+
assert 0
141+
142+
# Checks whether a pending message from server is available.
143+
# If not, returns immediately with None. Otherwise, does
144+
# the same processing as wait_msg.
145+
def check_msg(self):
146+
self.sock.setblocking(False)
147+
return self.wait_msg()

0 commit comments

Comments
 (0)