Skip to content

Commit 8ed1c37

Browse files
committed
Add concurrent tests for IPC Get/Put functions
1 parent d658fae commit 8ed1c37

File tree

1 file changed

+111
-44
lines changed

1 file changed

+111
-44
lines changed

test/ipcFixtures.hpp

Lines changed: 111 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,10 @@
1515
#include <umf/memory_provider.h>
1616
#include <umf/pools/pool_proxy.h>
1717

18+
#include <algorithm>
1819
#include <cstring>
1920
#include <numeric>
21+
#include <random>
2022
#include <tuple>
2123

2224
class MemoryAccessor {
@@ -158,6 +160,106 @@ struct umfIpcTest : umf_test::test,
158160
umf_memory_provider_ops_t *providerOps = nullptr;
159161
pfnProviderParamsCreate providerParamsCreate = nullptr;
160162
pfnProviderParamsDestroy providerParamsDestroy = nullptr;
163+
164+
void concurrentGetConcurrentPutHandles(bool shuffle) {
165+
std::vector<void *> ptrs;
166+
constexpr size_t ALLOC_SIZE = 100;
167+
constexpr size_t NUM_POINTERS = 100;
168+
umf::pool_unique_handle_t pool = makePool();
169+
ASSERT_NE(pool.get(), nullptr);
170+
171+
for (size_t i = 0; i < NUM_POINTERS; ++i) {
172+
void *ptr = umfPoolMalloc(pool.get(), ALLOC_SIZE);
173+
EXPECT_NE(ptr, nullptr);
174+
ptrs.push_back(ptr);
175+
}
176+
177+
std::array<std::vector<umf_ipc_handle_t>, NTHREADS> ipcHandles;
178+
179+
umf_test::syncthreads_barrier syncthreads(NTHREADS);
180+
181+
auto getHandlesFn = [shuffle, &ipcHandles, ptrs,
182+
&syncthreads](size_t tid) mutable {
183+
if (shuffle) {
184+
std::random_device rd;
185+
std::mt19937 g(rd());
186+
std::shuffle(ptrs.begin(), ptrs.end(), g);
187+
}
188+
syncthreads();
189+
for (void *ptr : ptrs) {
190+
umf_ipc_handle_t ipcHandle;
191+
size_t handleSize;
192+
umf_result_t ret =
193+
umfGetIPCHandle(ptr, &ipcHandle, &handleSize);
194+
ASSERT_EQ(ret, UMF_RESULT_SUCCESS);
195+
ipcHandles[tid].push_back(ipcHandle);
196+
}
197+
};
198+
199+
umf_test::parallel_exec(NTHREADS, getHandlesFn);
200+
201+
auto putHandlesFn = [&ipcHandles, &syncthreads](size_t tid) {
202+
syncthreads();
203+
for (umf_ipc_handle_t ipcHandle : ipcHandles[tid]) {
204+
umf_result_t ret = umfPutIPCHandle(ipcHandle);
205+
EXPECT_EQ(ret, UMF_RESULT_SUCCESS);
206+
}
207+
};
208+
209+
umf_test::parallel_exec(NTHREADS, putHandlesFn);
210+
211+
for (void *ptr : ptrs) {
212+
umf_result_t ret = umfPoolFree(pool.get(), ptr);
213+
EXPECT_EQ(ret, UMF_RESULT_SUCCESS);
214+
}
215+
216+
pool.reset(nullptr);
217+
EXPECT_EQ(stat.putCount, stat.getCount);
218+
}
219+
220+
void concurrentGetPutHandles(bool shuffle) {
221+
std::vector<void *> ptrs;
222+
constexpr size_t ALLOC_SIZE = 100;
223+
constexpr size_t NUM_POINTERS = 100;
224+
umf::pool_unique_handle_t pool = makePool();
225+
ASSERT_NE(pool.get(), nullptr);
226+
227+
for (size_t i = 0; i < NUM_POINTERS; ++i) {
228+
void *ptr = umfPoolMalloc(pool.get(), ALLOC_SIZE);
229+
EXPECT_NE(ptr, nullptr);
230+
ptrs.push_back(ptr);
231+
}
232+
233+
umf_test::syncthreads_barrier syncthreads(NTHREADS);
234+
235+
auto getPutHandlesFn = [shuffle, ptrs, &syncthreads](size_t) mutable {
236+
if (shuffle) {
237+
std::random_device rd;
238+
std::mt19937 g(rd());
239+
std::shuffle(ptrs.begin(), ptrs.end(), g);
240+
}
241+
syncthreads();
242+
for (void *ptr : ptrs) {
243+
umf_ipc_handle_t ipcHandle;
244+
size_t handleSize;
245+
umf_result_t ret =
246+
umfGetIPCHandle(ptr, &ipcHandle, &handleSize);
247+
ASSERT_EQ(ret, UMF_RESULT_SUCCESS);
248+
ret = umfPutIPCHandle(ipcHandle);
249+
EXPECT_EQ(ret, UMF_RESULT_SUCCESS);
250+
}
251+
};
252+
253+
umf_test::parallel_exec(NTHREADS, getPutHandlesFn);
254+
255+
for (void *ptr : ptrs) {
256+
umf_result_t ret = umfPoolFree(pool.get(), ptr);
257+
EXPECT_EQ(ret, UMF_RESULT_SUCCESS);
258+
}
259+
260+
pool.reset(nullptr);
261+
EXPECT_EQ(stat.putCount, stat.getCount);
262+
}
161263
};
162264

163265
TEST_P(umfIpcTest, GetIPCHandleSize) {
@@ -473,53 +575,18 @@ TEST_P(umfIpcTest, openInTwoIpcHandlers) {
473575
EXPECT_EQ(stat.closeCount, stat.openCount);
474576
}
475577

476-
TEST_P(umfIpcTest, ConcurrentGetPutHandles) {
477-
std::vector<void *> ptrs;
478-
constexpr size_t ALLOC_SIZE = 100;
479-
constexpr size_t NUM_POINTERS = 100;
480-
umf::pool_unique_handle_t pool = makePool();
481-
ASSERT_NE(pool.get(), nullptr);
482-
483-
for (size_t i = 0; i < NUM_POINTERS; ++i) {
484-
void *ptr = umfPoolMalloc(pool.get(), ALLOC_SIZE);
485-
EXPECT_NE(ptr, nullptr);
486-
ptrs.push_back(ptr);
487-
}
488-
489-
std::array<std::vector<umf_ipc_handle_t>, NTHREADS> ipcHandles;
490-
491-
umf_test::syncthreads_barrier syncthreads(NTHREADS);
492-
493-
auto getHandlesFn = [&ipcHandles, &ptrs, &syncthreads](size_t tid) {
494-
syncthreads();
495-
for (void *ptr : ptrs) {
496-
umf_ipc_handle_t ipcHandle;
497-
size_t handleSize;
498-
umf_result_t ret = umfGetIPCHandle(ptr, &ipcHandle, &handleSize);
499-
ASSERT_EQ(ret, UMF_RESULT_SUCCESS);
500-
ipcHandles[tid].push_back(ipcHandle);
501-
}
502-
};
503-
504-
umf_test::parallel_exec(NTHREADS, getHandlesFn);
505-
506-
auto putHandlesFn = [&ipcHandles, &syncthreads](size_t tid) {
507-
syncthreads();
508-
for (umf_ipc_handle_t ipcHandle : ipcHandles[tid]) {
509-
umf_result_t ret = umfPutIPCHandle(ipcHandle);
510-
EXPECT_EQ(ret, UMF_RESULT_SUCCESS);
511-
}
512-
};
578+
TEST_P(umfIpcTest, ConcurrentGetConcurrentPutHandles) {
579+
concurrentGetConcurrentPutHandles(false);
580+
}
513581

514-
umf_test::parallel_exec(NTHREADS, putHandlesFn);
582+
TEST_P(umfIpcTest, ConcurrentGetConcurrentPutHandlesShuffled) {
583+
concurrentGetConcurrentPutHandles(true);
584+
}
515585

516-
for (void *ptr : ptrs) {
517-
umf_result_t ret = umfPoolFree(pool.get(), ptr);
518-
EXPECT_EQ(ret, UMF_RESULT_SUCCESS);
519-
}
586+
TEST_P(umfIpcTest, ConcurrentGetPutHandles) { concurrentGetPutHandles(false); }
520587

521-
pool.reset(nullptr);
522-
EXPECT_EQ(stat.putCount, stat.getCount);
588+
TEST_P(umfIpcTest, ConcurrentGetPutHandlesShuffled) {
589+
concurrentGetPutHandles(true);
523590
}
524591

525592
TEST_P(umfIpcTest, ConcurrentOpenCloseHandles) {

0 commit comments

Comments
 (0)