Skip to content

Commit 1e2257d

Browse files
committed
Add multithreaded benchmark for umf
Helper functions taken from pmemstream repo.
1 parent b1068dc commit 1e2257d

File tree

3 files changed

+274
-0
lines changed

3 files changed

+274
-0
lines changed

benchmark/CMakeLists.txt

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,18 +32,29 @@ target_link_libraries(ubench
3232
pthread
3333
m)
3434

35+
add_executable(multithread_bench multithread.cpp)
36+
target_link_libraries(multithread_bench
37+
umf
38+
${LIBS_OPTIONAL}
39+
pthread
40+
m)
41+
3542
if (UMF_BUILD_OS_MEMORY_PROVIDER)
3643
target_compile_definitions(ubench PRIVATE UMF_BUILD_OS_MEMORY_PROVIDER=1)
44+
target_compile_definitions(multithread_bench PRIVATE UMF_BUILD_OS_MEMORY_PROVIDER=1)
3745
endif()
3846

3947
if (UMF_BUILD_LIBUMF_POOL_DISJOINT)
4048
target_compile_definitions(ubench PRIVATE UMF_BUILD_LIBUMF_POOL_DISJOINT=1)
49+
target_compile_definitions(multithread_bench PRIVATE UMF_BUILD_LIBUMF_POOL_DISJOINT=1)
4150
endif()
4251

4352
if (UMF_BUILD_LIBUMF_POOL_JEMALLOC)
4453
target_compile_definitions(ubench PRIVATE UMF_BUILD_LIBUMF_POOL_JEMALLOC=1)
54+
target_compile_definitions(multithread_bench PRIVATE UMF_BUILD_LIBUMF_POOL_JEMALLOC=1)
4555
endif()
4656

4757
if (UMF_BUILD_LIBUMF_POOL_SCALABLE)
4858
target_compile_definitions(ubench PRIVATE UMF_BUILD_LIBUMF_POOL_SCALABLE=1)
59+
target_compile_definitions(multithread_bench PRIVATE UMF_BUILD_LIBUMF_POOL_SCALABLE=1)
4960
endif()

benchmark/multithread.cpp

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
*
3+
* Copyright (C) 2024 Intel Corporation
4+
*
5+
* Under the Apache License v2.0 with LLVM Exceptions. See LICENSE.TXT.
6+
* SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
7+
*
8+
*/
9+
10+
#include "multithread.hpp"
11+
12+
#include <umf/memory_pool.h>
13+
#include <umf/pools/pool_jemalloc.h>
14+
#include <umf/pools/pool_scalable.h>
15+
#include <umf/providers/provider_os_memory.h>
16+
17+
#include <iostream>
18+
#include <memory>
19+
#include <numeric>
20+
21+
static constexpr size_t N_REPEATS = 5;
22+
static constexpr size_t N_ITERATIONS = 50000;
23+
static constexpr size_t N_THREADS = 20;
24+
static constexpr size_t ALLOC_SIZE = 64;
25+
26+
using poolCreateExtParams = std::tuple<umf_memory_pool_ops_t *, void *,
27+
umf_memory_provider_ops_t *, void *>;
28+
29+
static auto poolCreateExtUnique(poolCreateExtParams params) {
30+
umf_memory_pool_handle_t hPool;
31+
auto [pool_ops, pool_params, provider_ops, provider_params] = params;
32+
33+
umf_memory_provider_handle_t provider = nullptr;
34+
auto ret =
35+
umfMemoryProviderCreate(provider_ops, provider_params, &provider);
36+
if (ret != UMF_RESULT_SUCCESS) {
37+
std::cerr << "provider create failed" << std::endl;
38+
abort();
39+
}
40+
41+
ret = umfPoolCreate(pool_ops, provider, pool_params,
42+
UMF_POOL_CREATE_FLAG_OWN_PROVIDER, &hPool);
43+
if (ret != UMF_RESULT_SUCCESS) {
44+
std::cerr << "pool create failed" << std::endl;
45+
abort();
46+
}
47+
48+
return std::shared_ptr<umf_memory_pool_t>(hPool, &umfPoolDestroy);
49+
}
50+
51+
static void mt_alloc_free(poolCreateExtParams params) {
52+
auto pool = poolCreateExtUnique(params);
53+
54+
std::vector<void *> allocs[N_THREADS];
55+
size_t numFailures[N_THREADS] = {};
56+
for (auto &v : allocs) {
57+
v.reserve(N_ITERATIONS);
58+
}
59+
60+
auto values = umf_bench::measure<std::chrono::milliseconds>(
61+
N_REPEATS, N_THREADS, [&, pool = pool.get()](auto thread_id) {
62+
for (int i = 0; i < N_ITERATIONS; i++) {
63+
allocs[thread_id].push_back(umfPoolMalloc(pool, ALLOC_SIZE));
64+
if (!allocs[thread_id].back()) {
65+
numFailures[thread_id]++;
66+
}
67+
}
68+
69+
for (int i = 0; i < N_ITERATIONS; i++) {
70+
umfPoolFree(pool, allocs[thread_id][i]);
71+
}
72+
73+
// clear the vector as this function might be called multiple times
74+
allocs[thread_id].clear();
75+
});
76+
77+
std::cout << "mean: " << umf_bench::mean(values)
78+
<< " [ms] std_dev: " << umf_bench::std_dev(values) << " [ms]"
79+
<< std::endl;
80+
std::cout << "Total alloc failures: "
81+
<< std::accumulate(numFailures, numFailures + N_THREADS, 0)
82+
<< " out of " << N_ITERATIONS * N_REPEATS * N_THREADS
83+
<< std::endl;
84+
}
85+
86+
int main() {
87+
#if defined(UMF_BUILD_OS_MEMORY_PROVIDER)
88+
auto osParams = umfOsMemoryProviderParamsDefault();
89+
#endif
90+
91+
#if defined(UMF_BUILD_OS_MEMORY_PROVIDER) && \
92+
defined(UMF_BUILD_LIBUMF_POOL_SCALABLE)
93+
std::cout << "scalable_pool mt_alloc_free: " << std::endl;
94+
mt_alloc_free(poolCreateExtParams{&UMF_SCALABLE_POOL_OPS, nullptr,
95+
&UMF_OS_MEMORY_PROVIDER_OPS, &osParams});
96+
#else
97+
std::cout << "skipping scalable_pool mt_alloc_free" << std::endl;
98+
#endif
99+
100+
#if defined(UMF_BUILD_OS_MEMORY_PROVIDER) && \
101+
defined(UMF_BUILD_LIBUMF_POOL_JEMALLOC)
102+
std::cout << "jemalloc_pool mt_alloc_free: " << std::endl;
103+
mt_alloc_free(poolCreateExtParams{&UMF_JEMALLOC_POOL_OPS, nullptr,
104+
&UMF_OS_MEMORY_PROVIDER_OPS, &osParams});
105+
#else
106+
std::cout << "skipping jemalloc_pool mt_alloc_free" << std::endl;
107+
#endif
108+
109+
return 0;
110+
}

benchmark/multithread.hpp

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/*
2+
*
3+
* Copyright (C) 2024 Intel Corporation
4+
*
5+
* Under the Apache License v2.0 with LLVM Exceptions. See LICENSE.TXT.
6+
* SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
7+
*
8+
*/
9+
10+
#include <algorithm>
11+
#include <chrono>
12+
#include <cmath>
13+
#include <condition_variable>
14+
#include <functional>
15+
#include <mutex>
16+
#include <numeric>
17+
#include <thread>
18+
#include <vector>
19+
20+
namespace umf_bench {
21+
22+
template <typename Function>
23+
void parallel_exec(size_t threads_number, Function &&f) {
24+
std::vector<std::thread> threads;
25+
threads.reserve(threads_number);
26+
27+
for (size_t i = 0; i < threads_number; ++i) {
28+
threads.emplace_back([&](size_t id) { f(id); }, i);
29+
}
30+
31+
for (auto &t : threads) {
32+
t.join();
33+
}
34+
}
35+
36+
class latch {
37+
public:
38+
latch(size_t desired) : counter(desired) {}
39+
40+
/* Returns true for the last thread arriving at the latch, false for all
41+
* other threads. */
42+
bool wait(std::unique_lock<std::mutex> &lock) {
43+
counter--;
44+
if (counter > 0) {
45+
cv.wait(lock, [&] { return counter == 0; });
46+
return false;
47+
} else {
48+
/*
49+
* notify_call could be called outside of a lock
50+
* (it would perform better) but drd complains
51+
* in that case
52+
*/
53+
cv.notify_all();
54+
return true;
55+
}
56+
}
57+
58+
private:
59+
std::condition_variable cv;
60+
size_t counter = 0;
61+
};
62+
63+
/* Implements multi-use barrier (latch). Once all threads arrive at the
64+
* latch, a new latch is allocated and used by all subsequent calls to
65+
* syncthreads. */
66+
struct syncthreads_barrier {
67+
syncthreads_barrier(size_t num_threads) : num_threads(num_threads) {
68+
mutex = std::shared_ptr<std::mutex>(new std::mutex);
69+
current_latch = std::shared_ptr<latch>(new latch(num_threads));
70+
}
71+
72+
syncthreads_barrier(const syncthreads_barrier &) = delete;
73+
syncthreads_barrier &operator=(const syncthreads_barrier &) = delete;
74+
syncthreads_barrier(syncthreads_barrier &&) = default;
75+
76+
void operator()() {
77+
std::unique_lock<std::mutex> lock(*mutex);
78+
auto l = current_latch;
79+
if (l->wait(lock)) {
80+
current_latch = std::shared_ptr<latch>(new latch(num_threads));
81+
}
82+
}
83+
84+
private:
85+
size_t num_threads;
86+
std::shared_ptr<std::mutex> mutex;
87+
std::shared_ptr<latch> current_latch;
88+
};
89+
90+
template <typename TimeUnit, typename F>
91+
typename TimeUnit::rep measure(F &&func) {
92+
auto start = std::chrono::steady_clock::now();
93+
94+
func();
95+
96+
auto duration = std::chrono::duration_cast<TimeUnit>(
97+
std::chrono::steady_clock::now() - start);
98+
return duration.count();
99+
}
100+
101+
/* Measure time of execution of run_workload(thread_id) function. */
102+
template <typename TimeUnit, typename F>
103+
auto measure(size_t iterations, size_t concurrency, F &&run_workload) {
104+
using ResultsType = typename TimeUnit::rep;
105+
std::vector<ResultsType> results;
106+
107+
for (size_t i = 0; i < iterations; i++) {
108+
std::vector<ResultsType> iteration_results(concurrency);
109+
syncthreads_barrier syncthreads(concurrency);
110+
parallel_exec(concurrency, [&](size_t id) {
111+
syncthreads();
112+
113+
iteration_results[id] =
114+
measure<TimeUnit>([&]() { run_workload(id); });
115+
116+
syncthreads();
117+
});
118+
results.insert(results.end(), iteration_results.begin(),
119+
iteration_results.end());
120+
}
121+
122+
return results;
123+
}
124+
125+
template <typename T> T min(const std::vector<T> &values) {
126+
return *std::min_element(values.begin(), values.end());
127+
}
128+
129+
template <typename T> T max(const std::vector<T> &values) {
130+
return *std::max_element(values.begin(), values.end());
131+
}
132+
133+
template <typename T> double mean(const std::vector<T> &values) {
134+
return std::accumulate(values.begin(), values.end(), 0.0) / values.size();
135+
}
136+
137+
template <typename T> double std_dev(const std::vector<T> &values) {
138+
auto m = mean(values);
139+
std::vector<T> diff_squares;
140+
diff_squares.reserve(values.size());
141+
142+
for (auto &v : values) {
143+
diff_squares.push_back(std::pow((v - m), 2.0));
144+
}
145+
146+
auto variance =
147+
std::accumulate(diff_squares.begin(), diff_squares.end(), 0.0) /
148+
values.size();
149+
150+
return std::sqrt(variance);
151+
}
152+
153+
} // namespace umf_bench

0 commit comments

Comments
 (0)