Skip to content

Commit 7b64374

Browse files
Merge pull request #1753 from IntelPython/fix-for-deadlocks
Release GIL around blocking operations in libtensor
2 parents ab5b9e1 + 69aa1ed commit 7b64374

File tree

4 files changed

+105
-24
lines changed

4 files changed

+105
-24
lines changed

dpctl/tensor/libtensor/source/accumulators.cpp

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -160,10 +160,14 @@ size_t py_mask_positions(const dpctl::tensor::usm_ndarray &mask,
160160
? mask_positions_contig_i32_dispatch_vector[mask_typeid]
161161
: mask_positions_contig_i64_dispatch_vector[mask_typeid];
162162

163-
size_t total_set = fn(exec_q, mask_size, mask_data, cumsum_data,
164-
host_task_events, depends);
163+
size_t total_set;
164+
165165
{
166166
py::gil_scoped_release release;
167+
168+
total_set = fn(exec_q, mask_size, mask_data, cumsum_data,
169+
host_task_events, depends);
170+
167171
sycl::event::wait(host_task_events);
168172
}
169173
return total_set;
@@ -198,12 +202,13 @@ size_t py_mask_positions(const dpctl::tensor::usm_ndarray &mask,
198202
sycl::event copy_shape_ev = std::get<2>(ptr_size_event_tuple);
199203

200204
if (2 * static_cast<size_t>(nd) != std::get<1>(ptr_size_event_tuple)) {
201-
copy_shape_ev.wait();
202205
{
203206
py::gil_scoped_release release;
207+
208+
copy_shape_ev.wait();
204209
sycl::event::wait(host_task_events);
210+
sycl::free(shape_strides, exec_q);
205211
}
206-
sycl::free(shape_strides, exec_q);
207212
throw std::runtime_error("Unexpected error");
208213
}
209214

@@ -213,15 +218,17 @@ size_t py_mask_positions(const dpctl::tensor::usm_ndarray &mask,
213218
dependent_events.insert(dependent_events.end(), depends.begin(),
214219
depends.end());
215220

216-
size_t total_set =
217-
strided_fn(exec_q, mask_size, mask_data, nd, shape_strides, cumsum_data,
218-
host_task_events, dependent_events);
221+
size_t total_set;
219222

220223
{
221224
py::gil_scoped_release release;
225+
226+
total_set = strided_fn(exec_q, mask_size, mask_data, nd, shape_strides,
227+
cumsum_data, host_task_events, dependent_events);
228+
222229
sycl::event::wait(host_task_events);
230+
sycl::free(shape_strides, exec_q);
223231
}
224-
sycl::free(shape_strides, exec_q);
225232

226233
return total_set;
227234
}
@@ -352,8 +359,12 @@ size_t py_cumsum_1d(const dpctl::tensor::usm_ndarray &src,
352359
sycl::event copy_shape_ev = std::get<2>(ptr_size_event_tuple);
353360

354361
if (2 * static_cast<size_t>(nd) != std::get<1>(ptr_size_event_tuple)) {
355-
copy_shape_ev.wait();
356-
sycl::event::wait(host_task_events);
362+
{
363+
py::gil_scoped_release release;
364+
365+
copy_shape_ev.wait();
366+
sycl::event::wait(host_task_events);
367+
}
357368
sycl::free(shape_strides, exec_q);
358369
throw std::runtime_error("Unexpected error");
359370
}

dpctl/tensor/libtensor/source/copy_numpy_ndarray_into_usm_ndarray.cpp

Lines changed: 51 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -116,21 +116,29 @@ void copy_numpy_ndarray_into_usm_ndarray(
116116

117117
// check for applicability of special cases:
118118
// (same type && (both C-contiguous || both F-contiguous)
119-
bool both_c_contig =
119+
const bool both_c_contig =
120120
((src_flags & py::array::c_style) && dst.is_c_contiguous());
121-
bool both_f_contig =
121+
const bool both_f_contig =
122122
((src_flags & py::array::f_style) && dst.is_f_contiguous());
123+
124+
const bool same_data_types = (src_type_id == dst_type_id);
125+
123126
if (both_c_contig || both_f_contig) {
124-
if (src_type_id == dst_type_id) {
127+
if (same_data_types) {
125128
int src_elem_size = npy_src.itemsize();
126129

127130
sycl::event copy_ev =
128131
exec_q.memcpy(static_cast<void *>(dst_data),
129132
static_cast<const void *>(src_data),
130133
src_nelems * src_elem_size, depends);
131134

132-
// wait for copy_ev to complete
133-
copy_ev.wait();
135+
{
136+
// wait for copy_ev to complete
137+
// release GIL to allow other threads (host_tasks)
138+
// a chance to acquire GIL
139+
py::gil_scoped_release lock{};
140+
copy_ev.wait();
141+
}
134142

135143
return;
136144
}
@@ -202,6 +210,30 @@ void copy_numpy_ndarray_into_usm_ndarray(
202210
simplified_dst_strides.push_back(1);
203211
}
204212

213+
const bool can_use_memcpy =
214+
(same_data_types && (nd == 1) && (src_offset == 0) &&
215+
(dst_offset == 0) && (simplified_src_strides[0] == 1) &&
216+
(simplified_dst_strides[0] == 1));
217+
218+
if (can_use_memcpy) {
219+
int src_elem_size = npy_src.itemsize();
220+
221+
sycl::event copy_ev = exec_q.memcpy(
222+
static_cast<void *>(dst_data), static_cast<const void *>(src_data),
223+
src_nelems * src_elem_size, depends);
224+
225+
{
226+
// wait for copy_ev to complete
227+
// release GIL to allow other threads (host_tasks)
228+
// a chance to acquire GIL
229+
py::gil_scoped_release lock{};
230+
231+
copy_ev.wait();
232+
}
233+
234+
return;
235+
}
236+
205237
// Minimum and maximum element offsets for source np.ndarray
206238
py::ssize_t npy_src_min_nelem_offset(src_offset);
207239
py::ssize_t npy_src_max_nelem_offset(src_offset);
@@ -230,17 +262,22 @@ void copy_numpy_ndarray_into_usm_ndarray(
230262
}
231263
const sycl::event &copy_shape_ev = std::get<2>(ptr_size_event_tuple);
232264

233-
// Get implementation function pointer
234-
auto copy_and_cast_from_host_blocking_fn =
235-
copy_and_cast_from_host_blocking_dispatch_table[dst_type_id]
236-
[src_type_id];
265+
{
266+
// release GIL for the blocking call
267+
py::gil_scoped_release lock{};
237268

238-
copy_and_cast_from_host_blocking_fn(
239-
exec_q, src_nelems, nd, shape_strides, src_data, src_offset,
240-
npy_src_min_nelem_offset, npy_src_max_nelem_offset, dst_data,
241-
dst_offset, depends, {copy_shape_ev});
269+
// Get implementation function pointer
270+
auto copy_and_cast_from_host_blocking_fn =
271+
copy_and_cast_from_host_blocking_dispatch_table[dst_type_id]
272+
[src_type_id];
242273

243-
sycl::free(shape_strides, exec_q);
274+
copy_and_cast_from_host_blocking_fn(
275+
exec_q, src_nelems, nd, shape_strides, src_data, src_offset,
276+
npy_src_min_nelem_offset, npy_src_max_nelem_offset, dst_data,
277+
dst_offset, depends, {copy_shape_ev});
278+
279+
sycl::free(shape_strides, exec_q);
280+
}
244281

245282
return;
246283
}

dpctl/tests/test_usm_ndarray_ctor.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1063,11 +1063,34 @@ def test_tofrom_numpy(shape, dtype, usm_type):
10631063
skip_if_dtype_not_supported(dtype, q)
10641064
Xusm = dpt.zeros(shape, dtype=dtype, usm_type=usm_type, sycl_queue=q)
10651065
Ynp = np.ones(shape, dtype=dtype)
1066+
Ynp[(0,) * len(shape)] = 0
10661067
ind = (slice(None, None, None),) * Ynp.ndim
10671068
Xusm[ind] = Ynp
10681069
assert np.array_equal(dpt.to_numpy(Xusm), Ynp)
10691070

10701071

1072+
@pytest.mark.parametrize(
1073+
"dtype",
1074+
_all_dtypes,
1075+
)
1076+
@pytest.mark.parametrize("usm_type", ["device", "shared", "host"])
1077+
def test_tofrom_numpy_permuted(dtype, usm_type):
1078+
shape = (3, 5, 7)
1079+
perm = (1, 2, 0)
1080+
q = get_queue_or_skip()
1081+
skip_if_dtype_not_supported(dtype, q)
1082+
Xusm = dpt.permute_dims(
1083+
dpt.zeros(shape, dtype=dtype, usm_type=usm_type, sycl_queue=q), perm
1084+
)
1085+
Ynp = np.transpose(np.ones(shape, dtype=dtype), perm)
1086+
Ynp[:, ::2, ::2] = 0
1087+
ind = (slice(None, None, None),) * Ynp.ndim
1088+
# even though Xusm and Ynp are strided, simple memcpy could be done.
1089+
# This test validates that it is being done correctly
1090+
Xusm[ind] = Ynp
1091+
assert np.array_equal(dpt.to_numpy(Xusm), Ynp)
1092+
1093+
10711094
@pytest.mark.parametrize(
10721095
"dtype",
10731096
_all_dtypes,

dpctl/utils/_order_manager.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import weakref
12
from collections import defaultdict
23
from contextvars import ContextVar
34

@@ -88,7 +89,16 @@ def __getitem__(self, q: SyclQueue) -> _SequentialOrderManager:
8889
def clear(self):
8990
"""Clear content of internal dictionary"""
9091
_local = self._map.get()
92+
for v in _local.values():
93+
v.wait()
9194
_local.clear()
9295

9396

9497
SequentialOrderManager = SyclQueueToOrderManagerMap()
98+
99+
100+
def _callback(som):
101+
som.clear()
102+
103+
104+
f = weakref.finalize(SequentialOrderManager, _callback, SequentialOrderManager)

0 commit comments

Comments
 (0)