Skip to content

Added timeouts (delays) on serial port and mounting point #986

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 24, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 28 additions & 6 deletions workspace_tools/host_tests/host_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,21 +126,43 @@ def init_serial(self, serial_baud=None, serial_timeout=None):
serial_baud = serial_baud if serial_baud is not None else self.serial_baud
serial_timeout = serial_timeout if serial_timeout is not None else self.serial_timeout

# Clear serial port
if self.serial:
self.serial.close()
self.serial = None

result = True
try:
self.serial = Serial(self.port, baudrate=serial_baud, timeout=serial_timeout)
except Exception as e:
print "MBED: %s"% str(e)
result = False
# We will pool for serial to be re-mounted if it was unmounted after device reset
result = self.pool_for_serial_init(serial_baud, serial_timeout) # Blocking

# Port can be opened
if result:
self.flush()
return result

def pool_for_serial_init(self, serial_baud, serial_timeout, pooling_loops=40, init_delay=0.5, loop_delay=0.25):
""" Functions pools for serial port readiness
"""
result = True
last_error = None
# This loop is used to check for serial port availability due to
# some delays and remounting when devices are being flashed with new software.
for i in range(pooling_loops):
sleep(loop_delay if i else init_delay)
try:
self.serial = Serial(self.port, baudrate=serial_baud, timeout=serial_timeout)
except Exception as e:
result = False
last_error = "MBED: %s"% str(e)
stdout.write('.')
stdout.flush()
else:
print "...port ready!"
result = True
break
if not result and last_error:
print last_error
return result

def set_serial_timeout(self, timeout):
""" Wraps self.mbed.serial object timeout property
"""
Expand Down
31 changes: 31 additions & 0 deletions workspace_tools/host_tests/host_tests_plugins/host_test_plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
limitations under the License.
"""

from os import access, F_OK
from sys import stdout
from time import sleep
from subprocess import call


Expand Down Expand Up @@ -58,6 +61,34 @@ def print_plugin_error(self, text):
print "Plugin error: %s::%s: %s"% (self.name, self.type, text)
return False

def print_plugin_info(self, text, NL=True):
""" Function prints notification in console and exits always with True
"""
if NL:
print "Plugin info: %s::%s: %s"% (self.name, self.type, text)
else:
print "Plugin info: %s::%s: %s"% (self.name, self.type, text),
return True

def print_plugin_char(self, char):
""" Function prints char on stdout
"""
stdout.write(char)
stdout.flush()
return True

def check_mount_point_ready(self, destination_disk, init_delay=0.2, loop_delay=0.25):
""" Checks if destination_disk is ready and can be accessed by e.g. copy commands
@init_delay - Initial delay time before first access check
@loop_delay - pooling delay for access check
"""
if not access(destination_disk, F_OK):
self.print_plugin_info("Waiting for mount point '%s' to be ready..."% destination_disk, NL=False)
sleep(init_delay)
while not access(destination_disk, F_OK):
sleep(loop_delay)
self.print_plugin_char('.')

def check_parameters(self, capabilitity, *args, **kwargs):
""" This function should be ran each time we call execute()
to check if none of the required parameters is missing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ def execute(self, capabilitity, *args, **kwargs):
if capabilitity == 'default':
image_path = kwargs['image_path']
destination_disk = kwargs['destination_disk']
# Wait for mount point to be ready
self.check_mount_point_ready(destination_disk) # Blocking
result = self.generic_mbed_copy(image_path, destination_disk)
return result

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ def execute(self, capabilitity, *args, **kwargs):
if self.check_parameters(capabilitity, *args, **kwargs) is True:
image_path = kwargs['image_path']
destination_disk = kwargs['destination_disk']
# Wait for mount point to be ready
self.check_mount_point_ready(destination_disk) # Blocking
# Prepare correct command line parameter values
image_base_name = basename(image_path)
destination_path = join(destination_disk, image_base_name)
Expand Down