Skip to content

Add concurrent tests for IPC Get/Put functions #1126

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 22, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 115 additions & 44 deletions test/ipcFixtures.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
#include <umf/memory_provider.h>
#include <umf/pools/pool_proxy.h>

#include <algorithm>
#include <cstring>
#include <numeric>
#include <random>
#include <tuple>

class MemoryAccessor {
Expand Down Expand Up @@ -158,6 +160,110 @@ struct umfIpcTest : umf_test::test,
umf_memory_provider_ops_t *providerOps = nullptr;
pfnProviderParamsCreate providerParamsCreate = nullptr;
pfnProviderParamsDestroy providerParamsDestroy = nullptr;

void concurrentGetConcurrentPutHandles(bool shuffle) {
std::vector<void *> ptrs;
constexpr size_t ALLOC_SIZE = 100;
constexpr size_t NUM_POINTERS = 100;
umf::pool_unique_handle_t pool = makePool();
ASSERT_NE(pool.get(), nullptr);

for (size_t i = 0; i < NUM_POINTERS; ++i) {
void *ptr = umfPoolMalloc(pool.get(), ALLOC_SIZE);
EXPECT_NE(ptr, nullptr);
ptrs.push_back(ptr);
}

std::array<std::vector<umf_ipc_handle_t>, NTHREADS> ipcHandles;

umf_test::syncthreads_barrier syncthreads(NTHREADS);

auto getHandlesFn = [shuffle, &ipcHandles, &ptrs,
&syncthreads](size_t tid) {
// Each thread gets a copy of the pointers to shuffle them
std::vector<void *> localPtrs = ptrs;
if (shuffle) {
std::random_device rd;
std::mt19937 g(rd());
std::shuffle(localPtrs.begin(), localPtrs.end(), g);
}
syncthreads();
for (void *ptr : localPtrs) {
umf_ipc_handle_t ipcHandle;
size_t handleSize;
umf_result_t ret =
umfGetIPCHandle(ptr, &ipcHandle, &handleSize);
ASSERT_EQ(ret, UMF_RESULT_SUCCESS);
ipcHandles[tid].push_back(ipcHandle);
}
};

umf_test::parallel_exec(NTHREADS, getHandlesFn);

auto putHandlesFn = [&ipcHandles, &syncthreads](size_t tid) {
syncthreads();
for (umf_ipc_handle_t ipcHandle : ipcHandles[tid]) {
umf_result_t ret = umfPutIPCHandle(ipcHandle);
EXPECT_EQ(ret, UMF_RESULT_SUCCESS);
}
};

umf_test::parallel_exec(NTHREADS, putHandlesFn);

for (void *ptr : ptrs) {
umf_result_t ret = umfPoolFree(pool.get(), ptr);
EXPECT_EQ(ret, UMF_RESULT_SUCCESS);
}

pool.reset(nullptr);
EXPECT_EQ(stat.putCount, stat.getCount);
}

void concurrentGetPutHandles(bool shuffle) {
std::vector<void *> ptrs;
constexpr size_t ALLOC_SIZE = 100;
constexpr size_t NUM_POINTERS = 100;
umf::pool_unique_handle_t pool = makePool();
ASSERT_NE(pool.get(), nullptr);

for (size_t i = 0; i < NUM_POINTERS; ++i) {
void *ptr = umfPoolMalloc(pool.get(), ALLOC_SIZE);
EXPECT_NE(ptr, nullptr);
ptrs.push_back(ptr);
}

umf_test::syncthreads_barrier syncthreads(NTHREADS);

auto getPutHandlesFn = [shuffle, &ptrs, &syncthreads](size_t) {
// Each thread gets a copy of the pointers to shuffle them
std::vector<void *> localPtrs = ptrs;
if (shuffle) {
std::random_device rd;
std::mt19937 g(rd());
std::shuffle(localPtrs.begin(), localPtrs.end(), g);
}
syncthreads();
for (void *ptr : localPtrs) {
umf_ipc_handle_t ipcHandle;
size_t handleSize;
umf_result_t ret =
umfGetIPCHandle(ptr, &ipcHandle, &handleSize);
ASSERT_EQ(ret, UMF_RESULT_SUCCESS);
ret = umfPutIPCHandle(ipcHandle);
EXPECT_EQ(ret, UMF_RESULT_SUCCESS);
}
};

umf_test::parallel_exec(NTHREADS, getPutHandlesFn);

for (void *ptr : ptrs) {
umf_result_t ret = umfPoolFree(pool.get(), ptr);
EXPECT_EQ(ret, UMF_RESULT_SUCCESS);
}

pool.reset(nullptr);
EXPECT_EQ(stat.putCount, stat.getCount);
}
};

TEST_P(umfIpcTest, GetIPCHandleSize) {
Expand Down Expand Up @@ -473,53 +579,18 @@ TEST_P(umfIpcTest, openInTwoIpcHandlers) {
EXPECT_EQ(stat.closeCount, stat.openCount);
}

TEST_P(umfIpcTest, ConcurrentGetPutHandles) {
std::vector<void *> ptrs;
constexpr size_t ALLOC_SIZE = 100;
constexpr size_t NUM_POINTERS = 100;
umf::pool_unique_handle_t pool = makePool();
ASSERT_NE(pool.get(), nullptr);

for (size_t i = 0; i < NUM_POINTERS; ++i) {
void *ptr = umfPoolMalloc(pool.get(), ALLOC_SIZE);
EXPECT_NE(ptr, nullptr);
ptrs.push_back(ptr);
}

std::array<std::vector<umf_ipc_handle_t>, NTHREADS> ipcHandles;

umf_test::syncthreads_barrier syncthreads(NTHREADS);

auto getHandlesFn = [&ipcHandles, &ptrs, &syncthreads](size_t tid) {
syncthreads();
for (void *ptr : ptrs) {
umf_ipc_handle_t ipcHandle;
size_t handleSize;
umf_result_t ret = umfGetIPCHandle(ptr, &ipcHandle, &handleSize);
ASSERT_EQ(ret, UMF_RESULT_SUCCESS);
ipcHandles[tid].push_back(ipcHandle);
}
};

umf_test::parallel_exec(NTHREADS, getHandlesFn);

auto putHandlesFn = [&ipcHandles, &syncthreads](size_t tid) {
syncthreads();
for (umf_ipc_handle_t ipcHandle : ipcHandles[tid]) {
umf_result_t ret = umfPutIPCHandle(ipcHandle);
EXPECT_EQ(ret, UMF_RESULT_SUCCESS);
}
};
TEST_P(umfIpcTest, ConcurrentGetConcurrentPutHandles) {
concurrentGetConcurrentPutHandles(false);
}

umf_test::parallel_exec(NTHREADS, putHandlesFn);
TEST_P(umfIpcTest, ConcurrentGetConcurrentPutHandlesShuffled) {
concurrentGetConcurrentPutHandles(true);
}

for (void *ptr : ptrs) {
umf_result_t ret = umfPoolFree(pool.get(), ptr);
EXPECT_EQ(ret, UMF_RESULT_SUCCESS);
}
TEST_P(umfIpcTest, ConcurrentGetPutHandles) { concurrentGetPutHandles(false); }

pool.reset(nullptr);
EXPECT_EQ(stat.putCount, stat.getCount);
TEST_P(umfIpcTest, ConcurrentGetPutHandlesShuffled) {
concurrentGetPutHandles(true);
}

TEST_P(umfIpcTest, ConcurrentOpenCloseHandles) {
Expand Down
Loading