Skip to content

Usb msd tests #9444

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
240 changes: 240 additions & 0 deletions TESTS/host_tests/pyusb_msd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
"""
Copyright (c) 2019, Arm Limited and affiliates.
SPDX-License-Identifier: Apache-2.0

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

from mbed_host_tests import BaseHostTest
import time
import psutil
import tempfile
import uuid
import os
import platform
import subprocess
import sys
system_name = platform.system()
if system_name == "Windows":
import wmi


class PyusbMSDTest(BaseHostTest):
"""Host side test for USB MSD class."""

__result = None
MOUNT_WAIT_TIME = 25 # in [s]
initial_disk_list = None
msd_disk = None
serial_number = None

def _callback_device_ready(self, key, value, timestamp):
"""Send a unique USB SN to the device.
DUT uses this SN every time it connects to host as a USB device.
"""
self.serial_number = uuid.uuid4().hex # 32 hex digit string
self.send_kv("serial_number", self.serial_number)

def _callback_check_file_exist(self, key, value, timestamp):
"""Check if file exist.

"""
folder_name, file_name, file_content = value.split(' ')
msd_disk = MSDUtils.disk_path(self.serial_number)
file_path = os.path.join(msd_disk, folder_name, file_name)
try:
file = open(file_path, 'r')
line = file.readline()
file.close()
time.sleep(2) # wait for msd communication done
if line == file_content:
self.send_kv("exist", "0")
return
self.report_error("file content invalid")
except IOError as err:
self.log('{} !!!'.format(err))
self.send_kv("non-exist", "0")

def _callback_delete_files(self, key, value, timestamp):
"""Delete test file.

"""
dir_name, file_name = value.split(' ')
msd_disk = MSDUtils.disk_path(self.serial_number)
try:
os.remove(os.path.join(msd_disk, dir_name, file_name))
except:
self.report_error("delete files")
return
time.sleep(2) # wait for msd communication done
self.report_success()

def _callback_check_if_mounted(self, key, value, timestamp):
"""Check if disk was mounted.

"""
wait_time = self.MOUNT_WAIT_TIME
while wait_time != 0:
msd_disk = MSDUtils.disk_path(self.serial_number)
if msd_disk is not None:
# MSD disk found
time.sleep(2) # wait for msd communication done
self.report_success()
return
wait_time -= 1
time.sleep(1) # wait 1s and try again
self.report_error("mount check")

def _callback_check_if_not_mounted(self, key, value, timestamp):
"""Check if disk was unmouted.

"""
wait_time = self.MOUNT_WAIT_TIME
while wait_time != 0:
msd_disk = MSDUtils.disk_path(self.serial_number)
if msd_disk is None:
#self.msd_disk = None
time.sleep(2) # wait for msd communication done
self.report_success()
return
wait_time -= 1
time.sleep(1) # wait 1s and try again
self.report_error("unmount check")

def _callback_get_mounted_fs_size(self, key, value, timestamp):
"""Record visible filesystem size.

"""
stats = psutil.disk_usage(MSDUtils.disk_path(self.serial_number))
self.send_kv("{}".format(stats.total), "0")

def _callback_unmount(self, key, value, timestamp):
"""Disk unmount.

"""
if MSDUtils.unmount(serial=self.serial_number):
self.report_success()
else:
self.report_error("unmount")

def setup(self):
self.register_callback("get_serial_number", self._callback_device_ready)
self.register_callback('check_if_mounted', self._callback_check_if_mounted)
self.register_callback('check_if_not_mounted', self._callback_check_if_not_mounted)
self.register_callback('get_mounted_fs_size', self._callback_get_mounted_fs_size)
self.register_callback('check_file_exist', self._callback_check_file_exist)
self.register_callback('delete_files', self._callback_delete_files)
self.register_callback('unmount', self._callback_unmount)

def report_success(self):
self.send_kv("passed", "0")

def report_error(self, msg):
self.log('{} failed !!!'.format(msg))
self.send_kv("failed", "0")

def result(self):
return self.__result

def teardown(self):
pass


class MSDUtils(object):

@staticmethod
def disk_path(serial):
system_name = platform.system()
if system_name == "Windows":
return MSDUtils._disk_path_windows(serial)
elif system_name == "Linux":
return MSDUtils._disk_path_linux(serial)
elif system_name == "Darwin":
return MSDUtils._disk_path_mac(serial)
return None

@staticmethod
def unmount(serial):
system_name = platform.system()
if system_name == "Windows":
return MSDUtils._unmount_windows(serial)
elif system_name == "Linux":
return MSDUtils._unmount_linux(serial)
elif system_name == "Darwin":
return MSDUtils._unmount_mac(serial)
return False

@staticmethod
def _disk_path_windows(serial):
serial_decoded = serial.encode("ascii")
c = wmi.WMI()
for physical_disk in c.Win32_DiskDrive():
if serial_decoded == physical_disk.SerialNumber:
for partition in physical_disk.associators("Win32_DiskDriveToDiskPartition"):
for logical_disk in partition.associators("Win32_LogicalDiskToPartition"):
return logical_disk.Caption
return None

@staticmethod
def _disk_path_linux(serial):
output = subprocess.check_output(['lsblk', '-dnoserial,mountpoint']).split('\n')
for line in output:
serial_and_mount_point = line.split()
if len(serial_and_mount_point) == 2:
if serial_and_mount_point[0] == str(serial):
return serial_and_mount_point[1]
return None

@staticmethod
def _disk_path_mac(serial):
# TODO:
# add implementation
return None

@staticmethod
def _unmount_windows(serial):
disk_path = MSDUtils._disk_path_windows(serial)
tmp_file = tempfile.NamedTemporaryFile(suffix='.ps1', delete=False)
try:
# create unmount script
tmp_file.write('$disk_leter=$args[0]\n')
tmp_file.write('$driveEject = New-Object -comObject Shell.Application\n')
tmp_file.write('$driveEject.Namespace(17).ParseName($disk_leter).InvokeVerb("Eject")\n')
# close to allow open by other process
tmp_file.close()

try_count = 10
while try_count:
p = subprocess.Popen(["powershell.exe", tmp_file.name + " " + disk_path], stdout=sys.stdout)
p.communicate()
try_count -= 1
if MSDUtils._disk_path_windows(serial) is None:
return True
time.sleep(1)
finally:
os.remove(tmp_file.name)

return False

@staticmethod
def _unmount_linux(serial):
disk_path = MSDUtils._disk_path_linux(serial)
os.system("umount " + disk_path)
return MSDUtils._disk_path_linux(serial) is None

@staticmethod
def _unmount_mac(serial):
disk_path = MSDUtils._disk_path_mac(serial)
os.system("diskutil unmount " + disk_path)
disks = set(MSDUtils._disks_mac())
return MSDUtils._disk_path_mac(serial) is None
12 changes: 12 additions & 0 deletions TESTS/usb_device/msd/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# USB mass storage test user guide

To run the tests-usb_device-msd test device with at least *70kB* of RAM is required.
Test creates 64kB `HeapBlockDevice` as block device and mounts FAT32 filesystem on it.
64kB block device is the smallest one that can mount FAT32 filesystem.

Test can be easily extended to use any block device available in Mbed

Test run command:
```bash
mbed test -t COMPILER -m TARGET -n tests-usb_device-msd
```
133 changes: 133 additions & 0 deletions TESTS/usb_device/msd/TestUSBMSD.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Copyright (c) 2019, Arm Limited and affiliates.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef Test_USBMSD_H
#define Test_USBMSD_H

#include "USBMSD.h"


#define USB_DEV_SN_LEN (32) // 32 hex digit UUID
#define USB_DEV_SN_DESC_SIZE (USB_DEV_SN_LEN * 2 + 2)

/**
* Convert a C style ASCII to a USB string descriptor
*
* @param usb_desc output buffer for the USB string descriptor
* @param str ASCII string
* @param n size of usb_desc buffer, even number
* @returns number of bytes returned in usb_desc or -1 on failure
*/
int ascii2usb_string_desc(uint8_t *usb_desc, const char *str, size_t n)
{
if (str == NULL || usb_desc == NULL || n < 4) {
return -1;
}
if (n % 2 != 0) {
return -1;
}
size_t s, d;
// set bString (@ offset 2 onwards) as a UNICODE UTF-16LE string
memset(usb_desc, 0, n);
for (s = 0, d = 2; str[s] != '\0' && d < n; s++, d += 2) {
usb_desc[d] = str[s];
}
// set bLength @ offset 0
usb_desc[0] = d;
// set bDescriptorType @ offset 1
usb_desc[1] = STRING_DESCRIPTOR;
return d;
}

class TestUSBMSD: public USBMSD {
public:
TestUSBMSD(BlockDevice *bd, bool connect_blocking = true, uint16_t vendor_id = 0x0703, uint16_t product_id = 0x0104,
uint16_t product_release = 0x0001)
: USBMSD(bd, connect_blocking, vendor_id, product_id, product_release)
{

}

virtual ~TestUSBMSD()
{

}

uint32_t get_read_counter()
{
return read_counter;
}

uint32_t get_program_counter()
{
return program_counter;
}

void reset_counters()
{
read_counter = program_counter = erase_counter = 0;
}

static void setup_serial_number()
{
char _key[128] = { 0 };
char _value[128] = { 0 };

greentea_send_kv("get_serial_number", 0);
greentea_parse_kv(_key, _value, sizeof(_key), sizeof(_value));
TEST_ASSERT_EQUAL_STRING("serial_number", _key);
usb_dev_sn[USB_DEV_SN_LEN] = '\0';
memcpy(usb_dev_sn, _value, USB_DEV_SN_LEN);
ascii2usb_string_desc(_serial_num_descriptor, usb_dev_sn, USB_DEV_SN_DESC_SIZE);
}

virtual const uint8_t *string_iserial_desc()
{
return (const uint8_t *)_serial_num_descriptor;
}

static volatile uint32_t read_counter;
static volatile uint32_t program_counter;
static volatile uint32_t erase_counter;

protected:
virtual int disk_read(uint8_t *data, uint64_t block, uint8_t count)
{
read_counter++;
return USBMSD::disk_read(data, block, count);
}

virtual int disk_write(const uint8_t *data, uint64_t block, uint8_t count)
{
erase_counter++;
program_counter++;

return USBMSD::disk_write(data, block, count);
}
private:
static uint8_t _serial_num_descriptor[USB_DEV_SN_DESC_SIZE];
static char usb_dev_sn[USB_DEV_SN_LEN + 1];
};

uint8_t TestUSBMSD::_serial_num_descriptor[USB_DEV_SN_DESC_SIZE] = { 0 };
char TestUSBMSD::usb_dev_sn[USB_DEV_SN_LEN + 1] = { 0 };


volatile uint32_t TestUSBMSD::read_counter = 0;
volatile uint32_t TestUSBMSD::program_counter = 0;
volatile uint32_t TestUSBMSD::erase_counter = 0;

#endif // Test_USBMSD_H
Loading