Skip to content

Add type annotations #23

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Mar 14, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 66 additions & 34 deletions adafruit_rplidar.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@
import warnings
from collections import namedtuple

try:
from typing import Tuple, Dict, Any, Optional, List, Iterator, Union
from busio import UART
from digitalio import DigitalInOut
except ImportError:
pass

# pylint:disable=invalid-name,undefined-variable,global-variable-not-assigned
# pylint:disable=too-many-arguments,raise-missing-from,too-many-instance-attributes

Expand Down Expand Up @@ -83,7 +90,7 @@ class RPLidarException(Exception):
"""Basic exception class for RPLidar"""


def _process_scan(raw):
def _process_scan(raw: bytes) -> Tuple[bool, int, float, float]:
"""Processes input raw data and returns measurement data"""
new_scan = bool(raw[0] & 0b1)
inversed_new_scan = bool((raw[0] >> 1) & 0b1)
Expand All @@ -98,7 +105,9 @@ def _process_scan(raw):
return new_scan, quality, angle, distance


def _process_express_scan(data, new_angle, frame):
def _process_express_scan(
data: "ExpressPacket", new_angle: float, frame: int
) -> Tuple[bool, None, float, float]:
new_scan = (new_angle < data.start_angle) & (frame == 1)
angle = (
data.start_angle
Expand All @@ -125,18 +134,28 @@ class RPLidar:
express_data = False
express_old_data = None

def __init__(self, motor_pin, port, baudrate=115200, timeout=1, logging=False):
def __init__(
self,
motor_pin: DigitalInOut,
port: UART,
baudrate: int = 115200,
timeout: float = 1,
logging: bool = False,
) -> None:
"""Initialize RPLidar object for communicating with the sensor.

Parameters

motor_pin : digitalio.DigitalInOut
Pin controlling the motor
port : busio.UART or str
Serial port instance or name of the port to which the sensor is connected
baudrate : int, optional
Baudrate for serial connection (the default is 115200)
timeout : float, optional
Serial port connection timeout in seconds (the default is 1)
logging : whether to output logging information
logging : bool, optional
Whether to output logging information
"""
self.motor_pin = motor_pin
self.port = port
Expand All @@ -156,17 +175,17 @@ def __init__(self, motor_pin, port, baudrate=115200, timeout=1, logging=False):
self.connect()
self.start_motor()

def log(self, level, msg):
def log(self, level: str, msg: str) -> None:
"""Output the level and a message if logging is enabled."""
if self.logging:
sys.stdout.write("{0}: {1}\n".format(level, msg))

def log_bytes(self, level, msg, ba):
def log_bytes(self, level: str, msg: str, ba: bytes) -> None:
"""Log and output a byte array in a readable way."""
bs = ["%02x" % b for b in ba]
self.log(level, msg + " ".join(bs))

def connect(self):
def connect(self) -> None:
"""Connects to the serial port named by the port instance var. If it was
connected to another serial port disconnects from it first."""
if not self.is_CP:
Expand All @@ -185,26 +204,26 @@ def connect(self):
"Failed to connect to the sensor " "due to: %s" % err
)

def disconnect(self):
def disconnect(self) -> None:
"""Disconnects from the serial port"""
if self._serial_port is None:
return
self._serial_port.close()

def set_pwm(self, pwm):
def set_pwm(self, pwm: int) -> None:
"""Set the motor PWM"""
assert 0 <= pwm <= MAX_MOTOR_PWM
payload = struct.pack("<H", pwm)
self._send_payload_cmd(SET_PWM_BYTE, payload)

def _control_motor(self, val):
def _control_motor(self, val: bool) -> None:
"""Manipulate the motor"""
if self.is_CP:
self.motor_pin.value = val
else:
self._serial_port.dtr = not val

def start_motor(self):
def start_motor(self) -> None:
"""Starts sensor motor"""
self.log("info", "Starting motor")
# For A1
Expand All @@ -214,7 +233,7 @@ def start_motor(self):
self.set_pwm(DEFAULT_MOTOR_PWM)
self.motor_running = True

def stop_motor(self):
def stop_motor(self) -> None:
"""Stops sensor motor"""
self.log("info", "Stopping motor")
# For A2
Expand All @@ -224,7 +243,7 @@ def stop_motor(self):
self._control_motor(False)
self.motor_running = False

def _send_payload_cmd(self, cmd, payload):
def _send_payload_cmd(self, cmd: bytes, payload: bytes) -> None:
"""Sends `cmd` command with `payload` to the sensor"""
size = struct.pack("B", len(payload))
req = SYNC_BYTE + cmd + size + payload
Expand All @@ -235,13 +254,13 @@ def _send_payload_cmd(self, cmd, payload):
self._serial_port.write(req)
self.log_bytes("debug", "Command sent: ", req)

def _send_cmd(self, cmd):
def _send_cmd(self, cmd: bytes) -> None:
"""Sends `cmd` command to the sensor"""
req = SYNC_BYTE + cmd
self._serial_port.write(req)
self.log_bytes("debug", "Command sent: ", req)

def _read_descriptor(self):
def _read_descriptor(self) -> Tuple[int, bool, int]:
"""Reads descriptor packet"""
descriptor = self._serial_port.read(DESCRIPTOR_LEN)
self.log_bytes("debug", "Received descriptor:", descriptor)
Expand All @@ -252,7 +271,7 @@ def _read_descriptor(self):
is_single = descriptor[-2] == 0
return descriptor[2], is_single, descriptor[-1]

def _read_response(self, dsize):
def _read_response(self, dsize: int) -> bytes:
"""Reads response packet with length of `dsize` bytes"""
self.log("debug", "Trying to read response: %d bytes" % dsize)
data = self._serial_port.read(dsize)
Expand All @@ -262,7 +281,7 @@ def _read_response(self, dsize):
return data

@property
def info(self):
def info(self) -> Dict[str, Any]:
"""Get device information

Returns
Expand Down Expand Up @@ -290,7 +309,7 @@ def info(self):
return data

@property
def health(self):
def health(self) -> Tuple[str, int]:
"""Get device health state. When the core system detects some
potential risk that may cause hardware failure in the future,
the returned status value will be 'Warning'. But sensor can still work
Expand Down Expand Up @@ -318,19 +337,21 @@ def health(self):
error_code = (raw[1] << 8) + raw[2]
return (status, error_code)

def clear_input(self):
def clear_input(self) -> None:
"""Clears input buffer by reading all available data"""
if self.scanning:
raise RPLidarException("Clearing not allowed during active scanning!")
self._serial_port.flushInput()
self.express_frame = 32
self.express_data = False

def start(self, scan_type=SCAN_TYPE_NORMAL):
def start(self, scan_type: int = SCAN_TYPE_NORMAL) -> None:
"""Start the scanning process

Parameters
----------
scan : normal, force or express.

scan_type : int, optional
Normal, force or express; default is normal
"""
if self.scanning:
raise RPLidarException("Scanning already running!")
Expand Down Expand Up @@ -374,7 +395,7 @@ def start(self, scan_type=SCAN_TYPE_NORMAL):
self.scan_type = scan_type
self.scanning = True

def stop(self):
def stop(self) -> None:
"""Stops scanning process, disables laser diode and the measurement
system, moves sensor to the idle state."""
self.log("info", "Stopping scanning")
Expand All @@ -383,30 +404,35 @@ def stop(self):
self.scanning = False
self.clear_input()

def reset(self):
def reset(self) -> None:
"""Resets sensor core, reverting it to a similar state as it has
just been powered up."""
self.log("info", "Resetting the sensor")
self._send_cmd(RESET_BYTE)
time.sleep(0.002)
self.clear_input()

def iter_measurements(self, max_buf_meas=500, scan_type=SCAN_TYPE_NORMAL):
def iter_measurements(
self, max_buf_meas: int = 500, scan_type: int = SCAN_TYPE_NORMAL
) -> Iterator[Tuple[bool, Optional[int], float, float]]:
"""Iterate over measurements. Note that consumer must be fast enough,
otherwise data will be accumulated inside buffer and consumer will get
data with increasing lag.

Parameters

max_buf_meas : int
max_buf_meas : int, optional
Maximum number of measurements to be stored inside the buffer. Once
number exceeds this limit buffer will be emptied out.
number exceeds this limit buffer will be emptied out. Default is
500.
scan_type : int, optional
Normal, force or express; default is normal

Yields

new_scan : bool
True if measurement belongs to a new scan
quality : int
quality : int | None
Reflected laser pulse strength
angle : float
The measurement heading angle in degree unit [0, 360)
Expand Down Expand Up @@ -474,7 +500,9 @@ def iter_measurements(self, max_buf_meas=500, scan_type=SCAN_TYPE_NORMAL):
self.express_frame,
)

def iter_measurments(self, max_buf_meas=500):
def iter_measurments(
self, max_buf_meas: int = 500
) -> Iterator[Tuple[bool, int, float, float]]:
"""For compatibility, this method wraps `iter_measurements`"""
warnings.warn(
"The method `iter_measurments` has been renamed "
Expand All @@ -483,18 +511,22 @@ def iter_measurments(self, max_buf_meas=500):
)
self.iter_measurements(max_buf_meas=max_buf_meas)

def iter_scans(self, max_buf_meas=500, min_len=5):
def iter_scans(
self, max_buf_meas: int = 500, min_len: int = 5
) -> List[Union[int, float]]:
"""Iterate over scans. Note that consumer must be fast enough,
otherwise data will be accumulated inside buffer and consumer will get
data with increasing lag.

Parameters

max_buf_meas : int
max_buf_meas : int, optional
Maximum number of measurements to be stored inside the buffer. Once
number exceeds this limit buffer will be emptied out.
min_len : int
number exceeds this limit buffer will be emptied out. Default is
500.
min_len : int, optional
Minimum number of measurements in the scan for it to be yielded.
Default is 5.

Yields

Expand Down Expand Up @@ -522,7 +554,7 @@ class ExpressPacket(express_packet):
sign = {0: 1, 1: -1}

@classmethod
def from_string(cls, data):
def from_string(cls, data: bytes) -> "ExpressPacket":
"""Decode and Instantiate the class from a string packet"""
packet = bytearray(data)

Expand Down