Skip to content

Commit 1380620

Browse files
committed
Fix data race in the umfIpcOpenedCacheDestroy function
1 parent 6c85540 commit 1380620

File tree

2 files changed

+72
-0
lines changed

2 files changed

+72
-0
lines changed

src/ipc_cache.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,8 @@ umfIpcOpenedCacheCreate(ipc_opened_cache_eviction_cb_t eviction_cb) {
144144

145145
void umfIpcOpenedCacheDestroy(ipc_opened_cache_handle_t cache) {
146146
ipc_opened_cache_entry_t *entry, *tmp;
147+
148+
utils_mutex_lock(&(cache->global->cache_lock));
147149
HASH_ITER(hh, cache->hash_table, entry, tmp) {
148150
DL_DELETE(cache->global->lru_list, entry);
149151
HASH_DEL(cache->hash_table, entry);
@@ -153,6 +155,7 @@ void umfIpcOpenedCacheDestroy(ipc_opened_cache_handle_t cache) {
153155
umf_ba_free(cache->global->cache_allocator, entry);
154156
}
155157
HASH_CLEAR(hh, cache->hash_table);
158+
utils_mutex_unlock(&(cache->global->cache_lock));
156159

157160
umf_ba_global_free(cache);
158161
}

test/ipcFixtures.hpp

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -593,4 +593,73 @@ TEST_P(umfIpcTest, ConcurrentOpenCloseHandles) {
593593
EXPECT_EQ(stat.openCount, stat.closeCount);
594594
}
595595

596+
TEST_P(umfIpcTest, ConcurrentDestroyIpcHandlers) {
597+
constexpr size_t SIZE = 100;
598+
constexpr size_t NUM_ALLOCS = 100;
599+
constexpr size_t NUM_POOLS = 10;
600+
void *ptrs[NUM_ALLOCS];
601+
void *openedPtrs[NUM_POOLS][NUM_ALLOCS];
602+
std::vector<umf::pool_unique_handle_t> consumerPools;
603+
umf::pool_unique_handle_t producerPool = makePool();
604+
ASSERT_NE(producerPool.get(), nullptr);
605+
606+
for (size_t i = 0; i < NUM_POOLS; ++i) {
607+
consumerPools.push_back(makePool());
608+
}
609+
610+
for (size_t i = 0; i < NUM_ALLOCS; ++i) {
611+
void *ptr = umfPoolMalloc(producerPool.get(), SIZE);
612+
ASSERT_NE(ptr, nullptr);
613+
ptrs[i] = ptr;
614+
}
615+
616+
for (size_t i = 0; i < NUM_ALLOCS; ++i) {
617+
umf_ipc_handle_t ipcHandle = nullptr;
618+
size_t handleSize = 0;
619+
umf_result_t ret = umfGetIPCHandle(ptrs[i], &ipcHandle, &handleSize);
620+
ASSERT_EQ(ret, UMF_RESULT_SUCCESS);
621+
622+
for (size_t poolId = 0; poolId < NUM_POOLS; poolId++) {
623+
void *ptr = nullptr;
624+
umf_ipc_handler_handle_t ipcHandler = nullptr;
625+
ret =
626+
umfPoolGetIPCHandler(consumerPools[poolId].get(), &ipcHandler);
627+
ASSERT_EQ(ret, UMF_RESULT_SUCCESS);
628+
ASSERT_NE(ipcHandler, nullptr);
629+
630+
ret = umfOpenIPCHandle(ipcHandler, ipcHandle, &ptr);
631+
ASSERT_EQ(ret, UMF_RESULT_SUCCESS);
632+
openedPtrs[poolId][i] = ptr;
633+
}
634+
635+
ret = umfPutIPCHandle(ipcHandle);
636+
ASSERT_EQ(ret, UMF_RESULT_SUCCESS);
637+
}
638+
639+
for (size_t poolId = 0; poolId < NUM_POOLS; poolId++) {
640+
for (size_t i = 0; i < NUM_ALLOCS; ++i) {
641+
umf_result_t ret = umfCloseIPCHandle(openedPtrs[poolId][i]);
642+
EXPECT_EQ(ret, UMF_RESULT_SUCCESS);
643+
}
644+
}
645+
646+
for (size_t i = 0; i < NUM_ALLOCS; ++i) {
647+
umf_result_t ret = umfFree(ptrs[i]);
648+
EXPECT_EQ(ret, UMF_RESULT_SUCCESS);
649+
}
650+
651+
// Destroy pools in parallel to cause IPC cache cleanup in parallel.
652+
umf_test::syncthreads_barrier syncthreads(NUM_POOLS);
653+
auto poolDestroyFn = [&consumerPools, &syncthreads](size_t tid) {
654+
syncthreads();
655+
consumerPools[tid].reset(nullptr);
656+
};
657+
umf_test::parallel_exec(NUM_POOLS, poolDestroyFn);
658+
659+
producerPool.reset(nullptr);
660+
661+
EXPECT_EQ(stat.putCount, stat.getCount);
662+
EXPECT_EQ(stat.openCount, stat.closeCount);
663+
}
664+
596665
#endif /* UMF_TEST_IPC_FIXTURES_HPP */

0 commit comments

Comments
 (0)