Skip to content

Tests: USB: Fix endpoint halt test #9373

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 18 additions & 20 deletions TESTS/host_tests/pyusb_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def get_interface(dev, interface, alternate=0):
VENDOR_TEST_CTRL_OUT_STATUS_DELAY = 8
VENDOR_TEST_CTRL_IN_SIZES = 9
VENDOR_TEST_CTRL_OUT_SIZES = 10
VENDOR_TEST_READ_START = 11
VENDOR_TEST_RW_RESTART = 11
VENDOR_TEST_ABORT_BUFF_CHECK = 12
VENDOR_TEST_UNSUPPORTED_REQUEST = 32

Expand Down Expand Up @@ -984,7 +984,7 @@ def random_size_loopback_ep_test(ep_out, ep_in, failure, error, seconds, log, mi
time.sleep(0.01)


def halt_ep_test(dev, ep_out, ep_in, ep_to_halt, log):
def halt_ep_test(dev, ep_out, ep_in, log):
"""OUT/IN endpoint halt test.

Verify that halting an endpoint at a random point of OUT or IN transfer
Expand All @@ -1003,6 +1003,8 @@ def halt_ep_test(dev, ep_out, ep_in, ep_to_halt, log):
except usb.core.USBError as err:
raise_unconditionally(lineno(), 'Unable to get endpoint status ({!r}).'.format(err))

ep_to_halt = random.choice([ep_out, ep_in])

def timer_handler():
"""Halt an endpoint using a USB control request."""
try:
Expand Down Expand Up @@ -1044,17 +1046,19 @@ def timer_handler():
finally:
# Always wait for the Timer thread created above.
delayed_halt.join()
ep_out.clear_halt()
ep_in.clear_halt()
raise_unconditionally(lineno(), 'Halting endpoint {0.bEndpointAddress:#04x}'
' during transmission did not raise USBError.'
.format(ep_to_halt))


def request_endpoint_read_start(dev, ep):
def request_endpoint_loops_restart(dev):
ctrl_kwargs = {
'bmRequestType': build_request_type(CTRL_OUT, CTRL_TYPE_VENDOR, CTRL_RECIPIENT_ENDPOINT),
'bRequest': VENDOR_TEST_READ_START,
'bmRequestType': build_request_type(CTRL_OUT, CTRL_TYPE_VENDOR, CTRL_RECIPIENT_DEVICE),
'bRequest': VENDOR_TEST_RW_RESTART,
'wValue': 0,
'wIndex': ep.bEndpointAddress}
'wIndex': 0}
dev.ctrl_transfer(**ctrl_kwargs)


Expand Down Expand Up @@ -1149,35 +1153,26 @@ def ep_test_halt(dev, log, verbose=False):

bulk_out, bulk_in = find_ep_pair(intf, usb.ENDPOINT_TYPE_BULK)
interrupt_out, interrupt_in = find_ep_pair(intf, usb.ENDPOINT_TYPE_INTERRUPT)
iso_out, iso_in = find_ep_pair(intf, usb.ENDPOINT_TYPE_ISOCHRONOUS)

if verbose:
log('\tbulk_out {0.bEndpointAddress:#04x}, {0.wMaxPacketSize:02} B'.format(bulk_out))
log('\tbulk_in {0.bEndpointAddress:#04x}, {0.wMaxPacketSize:02} B'.format(bulk_in))
log('\tinterrupt_out {0.bEndpointAddress:#04x}, {0.wMaxPacketSize:02} B'.format(interrupt_out))
log('\tinterrupt_in {0.bEndpointAddress:#04x}, {0.wMaxPacketSize:02} B'.format(interrupt_in))
log('\tiso_out {0.bEndpointAddress:#04x}, {0.wMaxPacketSize:02} B'.format(iso_out))
log('\tiso_in {0.bEndpointAddress:#04x}, {0.wMaxPacketSize:02} B'.format(iso_in))

if verbose:
log('Testing endpoint halt at a random point of bulk transmission.')
end_ts = time.time() + 1.0
while time.time() < end_ts:
halt_ep_test(dev, bulk_out, bulk_in, bulk_out, log)
bulk_out.clear_halt()
request_endpoint_read_start(dev, bulk_out)
halt_ep_test(dev, bulk_out, bulk_in, bulk_in, log)
bulk_in.clear_halt()
halt_ep_test(dev, bulk_out, bulk_in, log)
request_endpoint_loops_restart(dev)

if verbose:
log('Testing endpoint halt at a random point of interrupt transmission.')
end_ts = time.time() + 1.0
while time.time() < end_ts:
halt_ep_test(dev, interrupt_out, interrupt_in, interrupt_out, log)
interrupt_out.clear_halt()
request_endpoint_read_start(dev, interrupt_out)
halt_ep_test(dev, interrupt_out, interrupt_in, interrupt_in, log)
interrupt_in.clear_halt()
halt_ep_test(dev, interrupt_out, interrupt_in, log)
request_endpoint_loops_restart(dev)


def ep_test_parallel_transfers(dev, log, verbose=False):
Expand Down Expand Up @@ -1466,7 +1461,10 @@ def ep_test_data_toggle(dev, log, verbose=False):
# ClearFeature(ENDPOINT_HALT) request always results in the data toggle being reinitialized to DATA0.
# "
bulk_out.clear_halt()
# request_endpoint_read_start(dev, bulk_out)
# The ClearFeature(ENDPOINT_HALT) terminates a pending read operation on the device end.
# Use a custom vendor request to restart reading on the OUT endpoint.
# This does not impact the state of the data toggle bit.
request_endpoint_loops_restart(dev)

# 2.4 verify that host and USB device are still in sync with respect to data toggle
try:
Expand Down
Loading