33
33
import warnings
34
34
from collections import namedtuple
35
35
36
+ try :
37
+ from typing import Tuple , Dict , Any , Optional , List , Iterator , Union
38
+ from busio import UART
39
+ from digitalio import DigitalInOut
40
+ except ImportError :
41
+ pass
42
+
36
43
# pylint:disable=invalid-name,undefined-variable,global-variable-not-assigned
37
44
# pylint:disable=too-many-arguments,raise-missing-from,too-many-instance-attributes
38
45
@@ -83,7 +90,7 @@ class RPLidarException(Exception):
83
90
"""Basic exception class for RPLidar"""
84
91
85
92
86
- def _process_scan (raw ) :
93
+ def _process_scan (raw : bytes ) -> Tuple [ bool , int , float , float ] :
87
94
"""Processes input raw data and returns measurement data"""
88
95
new_scan = bool (raw [0 ] & 0b1 )
89
96
inversed_new_scan = bool ((raw [0 ] >> 1 ) & 0b1 )
@@ -98,7 +105,9 @@ def _process_scan(raw):
98
105
return new_scan , quality , angle , distance
99
106
100
107
101
- def _process_express_scan (data , new_angle , frame ):
108
+ def _process_express_scan (
109
+ data : "ExpressPacket" , new_angle : float , frame : int
110
+ ) -> Tuple [bool , None , float , float ]:
102
111
new_scan = (new_angle < data .start_angle ) & (frame == 1 )
103
112
angle = (
104
113
data .start_angle
@@ -125,18 +134,28 @@ class RPLidar:
125
134
express_data = False
126
135
express_old_data = None
127
136
128
- def __init__ (self , motor_pin , port , baudrate = 115200 , timeout = 1 , logging = False ):
137
+ def __init__ (
138
+ self ,
139
+ motor_pin : DigitalInOut ,
140
+ port : UART ,
141
+ baudrate : int = 115200 ,
142
+ timeout : float = 1 ,
143
+ logging : bool = False ,
144
+ ) -> None :
129
145
"""Initialize RPLidar object for communicating with the sensor.
130
146
131
147
Parameters
132
148
149
+ motor_pin : digitalio.DigitalInOut
150
+ Pin controlling the motor
133
151
port : busio.UART or str
134
152
Serial port instance or name of the port to which the sensor is connected
135
153
baudrate : int, optional
136
154
Baudrate for serial connection (the default is 115200)
137
155
timeout : float, optional
138
156
Serial port connection timeout in seconds (the default is 1)
139
- logging : whether to output logging information
157
+ logging : bool, optional
158
+ Whether to output logging information
140
159
"""
141
160
self .motor_pin = motor_pin
142
161
self .port = port
@@ -156,17 +175,17 @@ def __init__(self, motor_pin, port, baudrate=115200, timeout=1, logging=False):
156
175
self .connect ()
157
176
self .start_motor ()
158
177
159
- def log (self , level , msg ) :
178
+ def log (self , level : str , msg : str ) -> None :
160
179
"""Output the level and a message if logging is enabled."""
161
180
if self .logging :
162
181
sys .stdout .write ("{0}: {1}\n " .format (level , msg ))
163
182
164
- def log_bytes (self , level , msg , ba ) :
183
+ def log_bytes (self , level : str , msg : str , ba : bytes ) -> None :
165
184
"""Log and output a byte array in a readable way."""
166
185
bs = ["%02x" % b for b in ba ]
167
186
self .log (level , msg + " " .join (bs ))
168
187
169
- def connect (self ):
188
+ def connect (self ) -> None :
170
189
"""Connects to the serial port named by the port instance var. If it was
171
190
connected to another serial port disconnects from it first."""
172
191
if not self .is_CP :
@@ -185,26 +204,26 @@ def connect(self):
185
204
"Failed to connect to the sensor " "due to: %s" % err
186
205
)
187
206
188
- def disconnect (self ):
207
+ def disconnect (self ) -> None :
189
208
"""Disconnects from the serial port"""
190
209
if self ._serial_port is None :
191
210
return
192
211
self ._serial_port .close ()
193
212
194
- def set_pwm (self , pwm ) :
213
+ def set_pwm (self , pwm : int ) -> None :
195
214
"""Set the motor PWM"""
196
215
assert 0 <= pwm <= MAX_MOTOR_PWM
197
216
payload = struct .pack ("<H" , pwm )
198
217
self ._send_payload_cmd (SET_PWM_BYTE , payload )
199
218
200
- def _control_motor (self , val ) :
219
+ def _control_motor (self , val : bool ) -> None :
201
220
"""Manipulate the motor"""
202
221
if self .is_CP :
203
222
self .motor_pin .value = val
204
223
else :
205
224
self ._serial_port .dtr = not val
206
225
207
- def start_motor (self ):
226
+ def start_motor (self ) -> None :
208
227
"""Starts sensor motor"""
209
228
self .log ("info" , "Starting motor" )
210
229
# For A1
@@ -214,7 +233,7 @@ def start_motor(self):
214
233
self .set_pwm (DEFAULT_MOTOR_PWM )
215
234
self .motor_running = True
216
235
217
- def stop_motor (self ):
236
+ def stop_motor (self ) -> None :
218
237
"""Stops sensor motor"""
219
238
self .log ("info" , "Stopping motor" )
220
239
# For A2
@@ -224,7 +243,7 @@ def stop_motor(self):
224
243
self ._control_motor (False )
225
244
self .motor_running = False
226
245
227
- def _send_payload_cmd (self , cmd , payload ) :
246
+ def _send_payload_cmd (self , cmd : bytes , payload : bytes ) -> None :
228
247
"""Sends `cmd` command with `payload` to the sensor"""
229
248
size = struct .pack ("B" , len (payload ))
230
249
req = SYNC_BYTE + cmd + size + payload
@@ -235,13 +254,13 @@ def _send_payload_cmd(self, cmd, payload):
235
254
self ._serial_port .write (req )
236
255
self .log_bytes ("debug" , "Command sent: " , req )
237
256
238
- def _send_cmd (self , cmd ) :
257
+ def _send_cmd (self , cmd : bytes ) -> None :
239
258
"""Sends `cmd` command to the sensor"""
240
259
req = SYNC_BYTE + cmd
241
260
self ._serial_port .write (req )
242
261
self .log_bytes ("debug" , "Command sent: " , req )
243
262
244
- def _read_descriptor (self ):
263
+ def _read_descriptor (self ) -> Tuple [ int , bool , int ] :
245
264
"""Reads descriptor packet"""
246
265
descriptor = self ._serial_port .read (DESCRIPTOR_LEN )
247
266
self .log_bytes ("debug" , "Received descriptor:" , descriptor )
@@ -252,7 +271,7 @@ def _read_descriptor(self):
252
271
is_single = descriptor [- 2 ] == 0
253
272
return descriptor [2 ], is_single , descriptor [- 1 ]
254
273
255
- def _read_response (self , dsize ) :
274
+ def _read_response (self , dsize : int ) -> bytes :
256
275
"""Reads response packet with length of `dsize` bytes"""
257
276
self .log ("debug" , "Trying to read response: %d bytes" % dsize )
258
277
data = self ._serial_port .read (dsize )
@@ -262,7 +281,7 @@ def _read_response(self, dsize):
262
281
return data
263
282
264
283
@property
265
- def info (self ):
284
+ def info (self ) -> Dict [ str , Any ] :
266
285
"""Get device information
267
286
268
287
Returns
@@ -290,7 +309,7 @@ def info(self):
290
309
return data
291
310
292
311
@property
293
- def health (self ):
312
+ def health (self ) -> Tuple [ str , int ] :
294
313
"""Get device health state. When the core system detects some
295
314
potential risk that may cause hardware failure in the future,
296
315
the returned status value will be 'Warning'. But sensor can still work
@@ -318,19 +337,21 @@ def health(self):
318
337
error_code = (raw [1 ] << 8 ) + raw [2 ]
319
338
return (status , error_code )
320
339
321
- def clear_input (self ):
340
+ def clear_input (self ) -> None :
322
341
"""Clears input buffer by reading all available data"""
323
342
if self .scanning :
324
343
raise RPLidarException ("Clearing not allowed during active scanning!" )
325
344
self ._serial_port .flushInput ()
326
345
self .express_frame = 32
327
346
self .express_data = False
328
347
329
- def start (self , scan_type = SCAN_TYPE_NORMAL ):
348
+ def start (self , scan_type : int = SCAN_TYPE_NORMAL ) -> None :
330
349
"""Start the scanning process
350
+
331
351
Parameters
332
- ----------
333
- scan : normal, force or express.
352
+
353
+ scan_type : int, optional
354
+ Normal, force or express; default is normal
334
355
"""
335
356
if self .scanning :
336
357
raise RPLidarException ("Scanning already running!" )
@@ -374,7 +395,7 @@ def start(self, scan_type=SCAN_TYPE_NORMAL):
374
395
self .scan_type = scan_type
375
396
self .scanning = True
376
397
377
- def stop (self ):
398
+ def stop (self ) -> None :
378
399
"""Stops scanning process, disables laser diode and the measurement
379
400
system, moves sensor to the idle state."""
380
401
self .log ("info" , "Stopping scanning" )
@@ -383,30 +404,35 @@ def stop(self):
383
404
self .scanning = False
384
405
self .clear_input ()
385
406
386
- def reset (self ):
407
+ def reset (self ) -> None :
387
408
"""Resets sensor core, reverting it to a similar state as it has
388
409
just been powered up."""
389
410
self .log ("info" , "Resetting the sensor" )
390
411
self ._send_cmd (RESET_BYTE )
391
412
time .sleep (0.002 )
392
413
self .clear_input ()
393
414
394
- def iter_measurements (self , max_buf_meas = 500 , scan_type = SCAN_TYPE_NORMAL ):
415
+ def iter_measurements (
416
+ self , max_buf_meas : int = 500 , scan_type : int = SCAN_TYPE_NORMAL
417
+ ) -> Iterator [Tuple [bool , Optional [int ], float , float ]]:
395
418
"""Iterate over measurements. Note that consumer must be fast enough,
396
419
otherwise data will be accumulated inside buffer and consumer will get
397
420
data with increasing lag.
398
421
399
422
Parameters
400
423
401
- max_buf_meas : int
424
+ max_buf_meas : int, optional
402
425
Maximum number of measurements to be stored inside the buffer. Once
403
- number exceeds this limit buffer will be emptied out.
426
+ number exceeds this limit buffer will be emptied out. Default is
427
+ 500.
428
+ scan_type : int, optional
429
+ Normal, force or express; default is normal
404
430
405
431
Yields
406
432
407
433
new_scan : bool
408
434
True if measurement belongs to a new scan
409
- quality : int
435
+ quality : int | None
410
436
Reflected laser pulse strength
411
437
angle : float
412
438
The measurement heading angle in degree unit [0, 360)
@@ -474,7 +500,9 @@ def iter_measurements(self, max_buf_meas=500, scan_type=SCAN_TYPE_NORMAL):
474
500
self .express_frame ,
475
501
)
476
502
477
- def iter_measurments (self , max_buf_meas = 500 ):
503
+ def iter_measurments (
504
+ self , max_buf_meas : int = 500
505
+ ) -> Iterator [Tuple [bool , int , float , float ]]:
478
506
"""For compatibility, this method wraps `iter_measurements`"""
479
507
warnings .warn (
480
508
"The method `iter_measurments` has been renamed "
@@ -483,18 +511,22 @@ def iter_measurments(self, max_buf_meas=500):
483
511
)
484
512
self .iter_measurements (max_buf_meas = max_buf_meas )
485
513
486
- def iter_scans (self , max_buf_meas = 500 , min_len = 5 ):
514
+ def iter_scans (
515
+ self , max_buf_meas : int = 500 , min_len : int = 5
516
+ ) -> List [Union [int , float ]]:
487
517
"""Iterate over scans. Note that consumer must be fast enough,
488
518
otherwise data will be accumulated inside buffer and consumer will get
489
519
data with increasing lag.
490
520
491
521
Parameters
492
522
493
- max_buf_meas : int
523
+ max_buf_meas : int, optional
494
524
Maximum number of measurements to be stored inside the buffer. Once
495
- number exceeds this limit buffer will be emptied out.
496
- min_len : int
525
+ number exceeds this limit buffer will be emptied out. Default is
526
+ 500.
527
+ min_len : int, optional
497
528
Minimum number of measurements in the scan for it to be yielded.
529
+ Default is 5.
498
530
499
531
Yields
500
532
@@ -522,7 +554,7 @@ class ExpressPacket(express_packet):
522
554
sign = {0 : 1 , 1 : - 1 }
523
555
524
556
@classmethod
525
- def from_string (cls , data ) :
557
+ def from_string (cls , data : bytes ) -> "ExpressPacket" :
526
558
"""Decode and Instantiate the class from a string packet"""
527
559
packet = bytearray (data )
528
560
0 commit comments