Skip to content

Add basic satellite coms #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 135 additions & 12 deletions adafruit_rockblock.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@

"""

import struct

__version__ = "0.0.0-auto.0"
__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_RockBlock.git"

Expand All @@ -52,25 +54,146 @@ class RockBlock:
def __init__(self, uart, baudrate=19200):
self._uart = uart
self._uart.baudrate = baudrate
self._buf_out = None
self.reset()

def _uart_xfer(self, cmd):
"""Send AT command and return response."""
# response = ATCMD\r\nRESP\r\n\r\nOK\r\n
# or = ATCMD\r\nERROR\r\n

cmd = str.encode("AT" + cmd)
"""Send AT command and return response as tuple of lines read."""

self._uart.reset_input_buffer()
self._uart.write(cmd + "\r")
self._uart.write(str.encode("AT" + cmd + "\r"))

resp = None
if self._uart.readline().strip() == cmd:
resp = self._uart.readline().strip().decode()
resp = []
line = self._uart.readline()
resp.append(line)
while not any(EOM in line for EOM in (b"OK\r\n", b"ERROR\r\n")):
line = self._uart.readline()
resp.append(line)

self._uart.reset_input_buffer()
return resp

return tuple(resp)

def reset(self):
"""Perform a software reset."""
self._uart_xfer("&F0") # factory defaults
self._uart_xfer("&K0") # flow control off

@property
def data_out(self):
"The binary data in the outbound buffer."
return self._buf_out

@data_out.setter
def data_out(self, buf):
if buf is None:
# clear the buffer
resp = self._uart_xfer("+SBDD0")
resp = int(resp[1].strip().decode())
if resp == 1:
raise RuntimeError("Error clearing buffer.")
else:
# set the buffer
if len(buf) > 340:
raise RuntimeError("Maximum length of 340 bytes.")
self._uart.write(str.encode("AT+SBDWB={}\r".format(len(buf))))
line = self._uart.readline()
while line != b"READY\r\n":
line = self._uart.readline()
# binary data plus checksum
self._uart.write(buf + struct.pack(">H", sum(buf)))
line = self._uart.readline() # blank line
line = self._uart.readline() # status response
resp = int(line)
if resp != 0:
raise RuntimeError("Write error", resp)
self._buf_out = buf

@property
def text_out(self):
"""The text in the outbound buffer."""
text = None
try:
text = self._buf_out.decode()
except UnicodeDecodeError:
pass
return text

@text_out.setter
def text_out(self, text):
if not isinstance(text, str):
raise ValueError("Only strings allowed.")
if len(text) > 120:
raise ValueError("Text size limited to 120 bytes.")
self.data_out = str.encode(text)

@property
def data_in(self):
"""The binary data in the inbound buffer."""
data = None
if self.status[2] == 1:
resp = self._uart_xfer("+SBDRB")
data = resp[0].splitlines()[1]
data = data[2:-2]
return data

@data_in.setter
def data_in(self, buf):
if buf is not None:
raise ValueError("Can only set in buffer to None to clear.")
resp = self._uart_xfer("+SBDD1")
resp = int(resp[1].strip().decode())
if resp == 1:
raise RuntimeError("Error clearing buffer.")

@property
def text_in(self):
"""The text in the inbound buffer."""
text = None
if self.status[2] == 1:
resp = self._uart_xfer("+SBDRT")
try:
text = resp[2].strip().decode()
except UnicodeDecodeError:
pass
return text

@text_in.setter
def text_in(self, text):
self.data_in = text

def satellite_transfer(self, location=None):
"""Initiate a Short Burst Data transfer with satellites."""
status = (None,) * 6
if location:
resp = self._uart_xfer("+SBDIX=" + location)
else:
resp = self._uart_xfer("+SBDIX")
if resp[-1].strip().decode() == "OK":
status = resp[1].strip().decode().split(":")[1]
status = [int(s) for s in status.split(",")]
if status[0] <= 8:
# outgoing message sent successfully
self.data_out = None
return tuple(status)

@property
def status(self):
"""Return tuple of Short Burst Data status."""
resp = self._uart_xfer("+SBDSX")
if resp[-1].strip().decode() == "OK":
status = resp[1].strip().decode().split(":")[1]
return tuple([int(a) for a in status.split(",")])
return (None,) * 6

@property
def model(self):
"""Return phone model."""
return self._uart_xfer("+GMM")
"""Return modem model."""
resp = self._uart_xfer("+GMM")
if resp[-1].strip().decode() == "OK":
return resp[1].strip().decode()
return None

def _transfer_buffer(self):
"""Copy out buffer to in buffer to simulate receiving a message."""
self._uart_xfer("+SBDTC")
47 changes: 47 additions & 0 deletions examples/rockblock_recv_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# pylint: disable=wrong-import-position
import time
import struct

# CircuitPython / Blinka
import board

uart = board.UART()
uart.baudrate = 19200

# via USB cable
# import serial
# uart = serial.Serial("/dev/ttyUSB0", 19200)

from adafruit_rockblock import RockBlock

rb = RockBlock(uart)

# try a satellite Short Burst Data transfer
print("Talking to satellite...")
status = rb.satellite_transfer()
# loop as needed
retry = 0
while status[0] > 8:
time.sleep(10)
status = rb.satellite_transfer()
print(retry, status)
retry += 1
print("\nDONE.")

# get the raw data
data = rb.data_in
print("Raw data = ", data)

# unpack data (see send example)
some_int = struct.unpack("i", data[0:4])[0]
some_float = struct.unpack("f", data[4:8])[0]
text_len = struct.unpack("i", data[8:12])[0]
some_text = struct.unpack("{}s".format(text_len), data[12:])[0]

# turn text into string
some_text = some_text.decode()

# print results
print("some_int =", some_int)
print("some_float =", some_float)
print("some_text =", some_text)
31 changes: 31 additions & 0 deletions examples/rockblock_recv_text.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# pylint: disable=wrong-import-position
import time

# CircuitPython / Blinka
import board

uart = board.UART()
uart.baudrate = 19200

# via USB cable
# import serial
# uart = serial.Serial("/dev/ttyUSB0", 19200)

from adafruit_rockblock import RockBlock

rb = RockBlock(uart)

# try a satellite Short Burst Data transfer
print("Talking to satellite...")
status = rb.satellite_transfer()
# loop as needed
retry = 0
while status[0] > 8:
time.sleep(10)
status = rb.satellite_transfer()
print(retry, status)
retry += 1
print("\nDONE.")

# get the text
print(rb.text_in)
45 changes: 45 additions & 0 deletions examples/rockblock_send_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# pylint: disable=wrong-import-position
import time
import struct

# CircuitPython / Blinka
import board

uart = board.UART()
uart.baudrate = 19200

# via USB cable
# import serial
# uart = serial.Serial("/dev/ttyUSB0", 19200)

from adafruit_rockblock import RockBlock

rb = RockBlock(uart)

# create some data
some_int = 2112
some_float = 42.123456789
some_text = "hello world"
text_len = len(some_text)

# create binary data
data = struct.pack("i", some_int)
data += struct.pack("f", some_float)
data += struct.pack("i", len(some_text))
data += struct.pack("{}s".format(text_len), some_text.encode())

# put data in outbound buffer
rb.data_out = data

# try a satellite Short Burst Data transfer
print("Talking to satellite...")
status = rb.satellite_transfer()
# loop as needed
retry = 0
while status[0] > 8:
time.sleep(10)
status = rb.satellite_transfer()
print(retry, status)
retry += 1

print("\nDONE.")
32 changes: 32 additions & 0 deletions examples/rockblock_send_text.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# pylint: disable=wrong-import-position
import time

# CircuitPython / Blinka
import board

uart = board.UART()
uart.baudrate = 19200

# via USB cable
# import serial
# uart = serial.Serial("/dev/ttyUSB0", 19200)

from adafruit_rockblock import RockBlock

rb = RockBlock(uart)

# set the text
rb.out_text = "hello world"

# try a satellite Short Burst Data transfer
print("Talking to satellite...")
status = rb.satellite_transfer()
# loop as needed
retry = 0
while status[0] > 8:
time.sleep(10)
status = rb.satellite_transfer()
print(retry, status)
retry += 1

print("\nDONE.")
16 changes: 16 additions & 0 deletions examples/rockblock_simpletest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# pylint: disable=wrong-import-position
# CircuitPython / Blinka
import board

uart = board.UART()
uart.baudrate = 19200

# via USB cable
# import serial
# uart = serial.Serial("/dev/ttyUSB0", 19200)

from adafruit_rockblock import RockBlock

rb = RockBlock(uart)

print(rb.model)