Skip to content

Add stream argument validation #1969

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 25 additions & 37 deletions dpctl/tensor/_usmarray.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,19 @@ cdef bint _is_host_cpu(object dl_device):
return (dl_type == DLDeviceType.kDLCPU) and (dl_id == 0)


cdef void _validate_and_use_stream(object stream, c_dpctl.SyclQueue self_queue) except *:
if (stream is None or stream == self_queue):
pass
else:
if not isinstance(stream, dpctl.SyclQueue):
raise TypeError(
"stream argument type was expected to be dpctl.SyclQueue,"
f" got {type(stream)} instead"
)
ev = self_queue.submit_barrier()
stream.submit_barrier(dependent_events=[ev])


cdef class usm_ndarray:
""" usm_ndarray(shape, dtype=None, strides=None, buffer="device", \
offset=0, order="C", buffer_ctor_kwargs=dict(), \
Expand Down Expand Up @@ -1025,12 +1038,7 @@ cdef class usm_ndarray:
cdef c_dpmem._Memory arr_buf
d = Device.create_device(target_device)

if (stream is None or not isinstance(stream, dpctl.SyclQueue) or
stream == self.sycl_queue):
pass
else:
ev = self.sycl_queue.submit_barrier()
stream.submit_barrier(dependent_events=[ev])
_validate_and_use_stream(stream, self.sycl_queue)

if (d.sycl_context == self.sycl_context):
arr_buf = <c_dpmem._Memory> self.usm_data
Expand Down Expand Up @@ -1203,12 +1211,7 @@ cdef class usm_ndarray:
# legacy path for DLManagedTensor
# copy kwarg ignored because copy flag can't be set
_caps = c_dlpack.to_dlpack_capsule(self)
if (stream is None or type(stream) is not dpctl.SyclQueue or
stream == self.sycl_queue):
pass
else:
ev = self.sycl_queue.submit_barrier()
stream.submit_barrier(dependent_events=[ev])
_validate_and_use_stream(stream, self.sycl_queue)
return _caps
else:
if not isinstance(max_version, tuple) or len(max_version) != 2:
Expand Down Expand Up @@ -1250,12 +1253,7 @@ cdef class usm_ndarray:
copy = False
# TODO: strategy for handling stream on different device from dl_device
if copy:
if (stream is None or type(stream) is not dpctl.SyclQueue or
stream == self.sycl_queue):
pass
else:
ev = self.sycl_queue.submit_barrier()
stream.submit_barrier(dependent_events=[ev])
_validate_and_use_stream(stream, self.sycl_queue)
nbytes = self.usm_data.nbytes
copy_buffer = type(self.usm_data)(
nbytes, queue=self.sycl_queue
Expand All @@ -1272,22 +1270,12 @@ cdef class usm_ndarray:
_caps = c_dlpack.to_dlpack_versioned_capsule(_copied_arr, copy)
else:
_caps = c_dlpack.to_dlpack_versioned_capsule(self, copy)
if (stream is None or type(stream) is not dpctl.SyclQueue or
stream == self.sycl_queue):
pass
else:
ev = self.sycl_queue.submit_barrier()
stream.submit_barrier(dependent_events=[ev])
_validate_and_use_stream(stream, self.sycl_queue)
return _caps
else:
# legacy path for DLManagedTensor
_caps = c_dlpack.to_dlpack_capsule(self)
if (stream is None or type(stream) is not dpctl.SyclQueue or
stream == self.sycl_queue):
pass
else:
ev = self.sycl_queue.submit_barrier()
stream.submit_barrier(dependent_events=[ev])
_validate_and_use_stream(stream, self.sycl_queue)
return _caps

def __dlpack_device__(self):
Expand Down Expand Up @@ -1555,17 +1543,17 @@ cdef class usm_ndarray:
def __array__(self, dtype=None, /, *, copy=None):
"""NumPy's array protocol method to disallow implicit conversion.

Without this definition, `numpy.asarray(usm_ar)` converts
usm_ndarray instance into NumPy array with data type `object`
and every element being 0d usm_ndarray.
Without this definition, `numpy.asarray(usm_ar)` converts
usm_ndarray instance into NumPy array with data type `object`
and every element being 0d usm_ndarray.

https://github.com/IntelPython/dpctl/pull/1384#issuecomment-1707212972
"""
"""
raise TypeError(
"Implicit conversion to a NumPy array is not allowed. "
"Use `dpctl.tensor.asnumpy` to copy data from this "
"`dpctl.tensor.usm_ndarray` instance to NumPy array"
)
"Use `dpctl.tensor.asnumpy` to copy data from this "
"`dpctl.tensor.usm_ndarray` instance to NumPy array"
)


cdef usm_ndarray _real_view(usm_ndarray ary):
Expand Down
24 changes: 24 additions & 0 deletions dpctl/tests/test_usm_ndarray_ctor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1380,6 +1380,30 @@ def test_to_device():
assert Y.sycl_device == dev


def test_to_device_stream_validation():
try:
X = dpt.usm_ndarray(1, "f4")
except dpctl.SyclDeviceCreationError:
pytest.skip("No SYCL devices available")
# invalid type of stream keyword
with pytest.raises(TypeError):
X.to_device(X.sycl_queue, stream=dict())
# stream is keyword-only arg
with pytest.raises(TypeError):
X.to_device(X.sycl_queue, X.sycl_queue)


def test_to_device_stream_use():
try:
X = dpt.usm_ndarray(1, "f4")
except dpctl.SyclDeviceCreationError:
pytest.skip("No SYCL devices available")
q1 = dpctl.SyclQueue(
X.sycl_context, X.sycl_device, property="enable_profiling"
)
X.to_device(q1, stream=q1)


def test_to_device_migration():
q1 = get_queue_or_skip() # two distinct copies of default-constructed queue
q2 = get_queue_or_skip()
Expand Down
Loading