Skip to content

Commit b806634

Browse files
authored
Merge pull request #3 from caternuson/basic_sat
Add basic satellite coms
2 parents c95a2da + 1a9aa95 commit b806634

File tree

6 files changed

+306
-12
lines changed

6 files changed

+306
-12
lines changed

adafruit_rockblock.py

Lines changed: 135 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@
4242
4343
"""
4444

45+
import struct
46+
4547
__version__ = "0.0.0-auto.0"
4648
__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_RockBlock.git"
4749

@@ -52,25 +54,146 @@ class RockBlock:
5254
def __init__(self, uart, baudrate=19200):
5355
self._uart = uart
5456
self._uart.baudrate = baudrate
57+
self._buf_out = None
58+
self.reset()
5559

5660
def _uart_xfer(self, cmd):
57-
"""Send AT command and return response."""
58-
# response = ATCMD\r\nRESP\r\n\r\nOK\r\n
59-
# or = ATCMD\r\nERROR\r\n
60-
61-
cmd = str.encode("AT" + cmd)
61+
"""Send AT command and return response as tuple of lines read."""
6262

6363
self._uart.reset_input_buffer()
64-
self._uart.write(cmd + "\r")
64+
self._uart.write(str.encode("AT" + cmd + "\r"))
6565

66-
resp = None
67-
if self._uart.readline().strip() == cmd:
68-
resp = self._uart.readline().strip().decode()
66+
resp = []
67+
line = self._uart.readline()
68+
resp.append(line)
69+
while not any(EOM in line for EOM in (b"OK\r\n", b"ERROR\r\n")):
70+
line = self._uart.readline()
71+
resp.append(line)
6972

7073
self._uart.reset_input_buffer()
71-
return resp
74+
75+
return tuple(resp)
76+
77+
def reset(self):
78+
"""Perform a software reset."""
79+
self._uart_xfer("&F0") # factory defaults
80+
self._uart_xfer("&K0") # flow control off
81+
82+
@property
83+
def data_out(self):
84+
"The binary data in the outbound buffer."
85+
return self._buf_out
86+
87+
@data_out.setter
88+
def data_out(self, buf):
89+
if buf is None:
90+
# clear the buffer
91+
resp = self._uart_xfer("+SBDD0")
92+
resp = int(resp[1].strip().decode())
93+
if resp == 1:
94+
raise RuntimeError("Error clearing buffer.")
95+
else:
96+
# set the buffer
97+
if len(buf) > 340:
98+
raise RuntimeError("Maximum length of 340 bytes.")
99+
self._uart.write(str.encode("AT+SBDWB={}\r".format(len(buf))))
100+
line = self._uart.readline()
101+
while line != b"READY\r\n":
102+
line = self._uart.readline()
103+
# binary data plus checksum
104+
self._uart.write(buf + struct.pack(">H", sum(buf)))
105+
line = self._uart.readline() # blank line
106+
line = self._uart.readline() # status response
107+
resp = int(line)
108+
if resp != 0:
109+
raise RuntimeError("Write error", resp)
110+
self._buf_out = buf
111+
112+
@property
113+
def text_out(self):
114+
"""The text in the outbound buffer."""
115+
text = None
116+
try:
117+
text = self._buf_out.decode()
118+
except UnicodeDecodeError:
119+
pass
120+
return text
121+
122+
@text_out.setter
123+
def text_out(self, text):
124+
if not isinstance(text, str):
125+
raise ValueError("Only strings allowed.")
126+
if len(text) > 120:
127+
raise ValueError("Text size limited to 120 bytes.")
128+
self.data_out = str.encode(text)
129+
130+
@property
131+
def data_in(self):
132+
"""The binary data in the inbound buffer."""
133+
data = None
134+
if self.status[2] == 1:
135+
resp = self._uart_xfer("+SBDRB")
136+
data = resp[0].splitlines()[1]
137+
data = data[2:-2]
138+
return data
139+
140+
@data_in.setter
141+
def data_in(self, buf):
142+
if buf is not None:
143+
raise ValueError("Can only set in buffer to None to clear.")
144+
resp = self._uart_xfer("+SBDD1")
145+
resp = int(resp[1].strip().decode())
146+
if resp == 1:
147+
raise RuntimeError("Error clearing buffer.")
148+
149+
@property
150+
def text_in(self):
151+
"""The text in the inbound buffer."""
152+
text = None
153+
if self.status[2] == 1:
154+
resp = self._uart_xfer("+SBDRT")
155+
try:
156+
text = resp[2].strip().decode()
157+
except UnicodeDecodeError:
158+
pass
159+
return text
160+
161+
@text_in.setter
162+
def text_in(self, text):
163+
self.data_in = text
164+
165+
def satellite_transfer(self, location=None):
166+
"""Initiate a Short Burst Data transfer with satellites."""
167+
status = (None,) * 6
168+
if location:
169+
resp = self._uart_xfer("+SBDIX=" + location)
170+
else:
171+
resp = self._uart_xfer("+SBDIX")
172+
if resp[-1].strip().decode() == "OK":
173+
status = resp[1].strip().decode().split(":")[1]
174+
status = [int(s) for s in status.split(",")]
175+
if status[0] <= 8:
176+
# outgoing message sent successfully
177+
self.data_out = None
178+
return tuple(status)
179+
180+
@property
181+
def status(self):
182+
"""Return tuple of Short Burst Data status."""
183+
resp = self._uart_xfer("+SBDSX")
184+
if resp[-1].strip().decode() == "OK":
185+
status = resp[1].strip().decode().split(":")[1]
186+
return tuple([int(a) for a in status.split(",")])
187+
return (None,) * 6
72188

73189
@property
74190
def model(self):
75-
"""Return phone model."""
76-
return self._uart_xfer("+GMM")
191+
"""Return modem model."""
192+
resp = self._uart_xfer("+GMM")
193+
if resp[-1].strip().decode() == "OK":
194+
return resp[1].strip().decode()
195+
return None
196+
197+
def _transfer_buffer(self):
198+
"""Copy out buffer to in buffer to simulate receiving a message."""
199+
self._uart_xfer("+SBDTC")

examples/rockblock_recv_data.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
# pylint: disable=wrong-import-position
2+
import time
3+
import struct
4+
5+
# CircuitPython / Blinka
6+
import board
7+
8+
uart = board.UART()
9+
uart.baudrate = 19200
10+
11+
# via USB cable
12+
# import serial
13+
# uart = serial.Serial("/dev/ttyUSB0", 19200)
14+
15+
from adafruit_rockblock import RockBlock
16+
17+
rb = RockBlock(uart)
18+
19+
# try a satellite Short Burst Data transfer
20+
print("Talking to satellite...")
21+
status = rb.satellite_transfer()
22+
# loop as needed
23+
retry = 0
24+
while status[0] > 8:
25+
time.sleep(10)
26+
status = rb.satellite_transfer()
27+
print(retry, status)
28+
retry += 1
29+
print("\nDONE.")
30+
31+
# get the raw data
32+
data = rb.data_in
33+
print("Raw data = ", data)
34+
35+
# unpack data (see send example)
36+
some_int = struct.unpack("i", data[0:4])[0]
37+
some_float = struct.unpack("f", data[4:8])[0]
38+
text_len = struct.unpack("i", data[8:12])[0]
39+
some_text = struct.unpack("{}s".format(text_len), data[12:])[0]
40+
41+
# turn text into string
42+
some_text = some_text.decode()
43+
44+
# print results
45+
print("some_int =", some_int)
46+
print("some_float =", some_float)
47+
print("some_text =", some_text)

examples/rockblock_recv_text.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# pylint: disable=wrong-import-position
2+
import time
3+
4+
# CircuitPython / Blinka
5+
import board
6+
7+
uart = board.UART()
8+
uart.baudrate = 19200
9+
10+
# via USB cable
11+
# import serial
12+
# uart = serial.Serial("/dev/ttyUSB0", 19200)
13+
14+
from adafruit_rockblock import RockBlock
15+
16+
rb = RockBlock(uart)
17+
18+
# try a satellite Short Burst Data transfer
19+
print("Talking to satellite...")
20+
status = rb.satellite_transfer()
21+
# loop as needed
22+
retry = 0
23+
while status[0] > 8:
24+
time.sleep(10)
25+
status = rb.satellite_transfer()
26+
print(retry, status)
27+
retry += 1
28+
print("\nDONE.")
29+
30+
# get the text
31+
print(rb.text_in)

examples/rockblock_send_data.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
# pylint: disable=wrong-import-position
2+
import time
3+
import struct
4+
5+
# CircuitPython / Blinka
6+
import board
7+
8+
uart = board.UART()
9+
uart.baudrate = 19200
10+
11+
# via USB cable
12+
# import serial
13+
# uart = serial.Serial("/dev/ttyUSB0", 19200)
14+
15+
from adafruit_rockblock import RockBlock
16+
17+
rb = RockBlock(uart)
18+
19+
# create some data
20+
some_int = 2112
21+
some_float = 42.123456789
22+
some_text = "hello world"
23+
text_len = len(some_text)
24+
25+
# create binary data
26+
data = struct.pack("i", some_int)
27+
data += struct.pack("f", some_float)
28+
data += struct.pack("i", len(some_text))
29+
data += struct.pack("{}s".format(text_len), some_text.encode())
30+
31+
# put data in outbound buffer
32+
rb.data_out = data
33+
34+
# try a satellite Short Burst Data transfer
35+
print("Talking to satellite...")
36+
status = rb.satellite_transfer()
37+
# loop as needed
38+
retry = 0
39+
while status[0] > 8:
40+
time.sleep(10)
41+
status = rb.satellite_transfer()
42+
print(retry, status)
43+
retry += 1
44+
45+
print("\nDONE.")

examples/rockblock_send_text.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# pylint: disable=wrong-import-position
2+
import time
3+
4+
# CircuitPython / Blinka
5+
import board
6+
7+
uart = board.UART()
8+
uart.baudrate = 19200
9+
10+
# via USB cable
11+
# import serial
12+
# uart = serial.Serial("/dev/ttyUSB0", 19200)
13+
14+
from adafruit_rockblock import RockBlock
15+
16+
rb = RockBlock(uart)
17+
18+
# set the text
19+
rb.out_text = "hello world"
20+
21+
# try a satellite Short Burst Data transfer
22+
print("Talking to satellite...")
23+
status = rb.satellite_transfer()
24+
# loop as needed
25+
retry = 0
26+
while status[0] > 8:
27+
time.sleep(10)
28+
status = rb.satellite_transfer()
29+
print(retry, status)
30+
retry += 1
31+
32+
print("\nDONE.")

examples/rockblock_simpletest.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# pylint: disable=wrong-import-position
2+
# CircuitPython / Blinka
3+
import board
4+
5+
uart = board.UART()
6+
uart.baudrate = 19200
7+
8+
# via USB cable
9+
# import serial
10+
# uart = serial.Serial("/dev/ttyUSB0", 19200)
11+
12+
from adafruit_rockblock import RockBlock
13+
14+
rb = RockBlock(uart)
15+
16+
print(rb.model)

0 commit comments

Comments
 (0)